准备搜索...
分类筛选
全部分类

输入关键词开始搜索

Redis 8.0 的5个高级调优技巧,性能优化技巧,性能提升50%不是梦

最近我在生产环境折腾 Redis 8.0,发现这版本确实有点东西。之前用 7.x 的时候,遇到高并发场景就有点吃力,升级到 8.0 后配合几个优化技巧,性能直接提升了 50% 以上。今天就跟兄弟们分享下我踩过的坑和总结出来的 5 个高级技巧,别磨叽,直接上干货。

连接池优化,别让连接成为瓶颈

先说连接池这个事,很多兄弟可能觉得这玩意儿没啥好说的,不就是复用连接嘛。但实际情况是,连接池配置不对,性能能差好几倍。

我之前就吃过这个亏。项目刚上线的时候,每次请求都创建新连接,QPS 一上来,Redis 那边连接数直接爆表,CPU 占用率也蹭蹭往上涨。最严重的时候,连接数到了 5000 多,Redis 服务器 CPU 直接 100%,响应时间从几毫秒涨到几百毫秒,用户都开始投诉了。

后来改成连接池,但初始配置也不对,最大连接数设得太小,只有 20 个。高并发的时候,所有请求都排队等连接,虽然连接数不爆了,但响应时间还是慢。后来我仔细分析了业务场景,发现平均 QPS 是 2000,平均响应时间是 5 毫秒,理论上需要 10 个连接就够了,但考虑到峰值和网络波动,我设成了 50 个,这才解决了问题。

Redis 8.0 对连接管理做了优化,客户端管理开销降低了,但前提是你得把连接池配置好。看下我现在的配置:

import redis
from redis.connection import ConnectionPool
# 创建连接池,这里有几个关键参数
pool = redis.ConnectionPool(
    host='localhost',
    port=6379,
    db=0,
    # 最大连接数,根据你的并发量来,我一般设成 QPS 的 1.5 倍
    max_connections=200,
    # 连接超时时间,别设太小,网络波动的时候容易超时
    socket_connect_timeout=5,
    # 读取超时,根据你的命令执行时间调整
    socket_timeout=5,
    # 连接保活,避免连接被服务器关闭
    socket_keepalive=True,
    socket_keepalive_options={},
    # 连接重试次数
    retry_on_timeout=True,
    # 健康检查,定期检查连接是否有效
    health_check_interval=30
)
# 使用连接池创建客户端
r = redis.Redis(connection_pool=pool)
# 使用示例
def get_user_info(user_id):
    """获取用户信息,使用连接池复用连接"""
    key = f"user:{user_id}"
    # 这里会自动从连接池获取连接,用完自动归还
    user_data = r.hgetall(key)
    return user_data
# 批量操作时,复用同一个连接
def batch_get_users(user_ids):
    """批量获取用户信息"""
    pipe = r.pipeline()
    for user_id in user_ids:
        key = f"user:{user_id}"
        pipe.hgetall(key)
    # 一次性执行,只用一个连接
    results = pipe.execute()
    return results

关键点来了,max_connections 这个参数别瞎设。我的经验是,先看看你的平均 QPS,然后乘以平均响应时间(秒),再乘以 1.5 的安全系数。比如 QPS 是 1000,平均响应时间是 0.01 秒,那大概需要 15 个连接,再乘以 1.5,就是 22.5,我一般设成 30-50 左右。

还有一点,Redis 8.0 对客户端管理做了优化,减少了客户端 cron 操作的开销,避免阻塞主线程。这意味着你可以放心地使用更多连接,不用担心性能问题。

连接池还有个坑,就是连接泄漏的问题。我之前遇到过,代码里有个地方异常处理没做好,连接没正常归还,时间一长连接池就空了,所有请求都卡住。后来加了监控,发现连接池使用率异常就告警:

def monitor_connection_pool(pool):
    """监控连接池状态"""
    # 获取连接池信息
    created_connections = pool.created_connections
    available_connections = len(pool._available_connections)
    in_use_connections = created_connections - available_connections
    print(f"总连接数: {created_connections}")
    print(f"可用连接数: {available_connections}")
    print(f"使用中连接数: {in_use_connections}")
    print(f"连接池使用率: {(in_use_connections / created_connections * 100):.2f}%")
    # 如果使用率超过 80%,告警
    if in_use_connections / created_connections > 0.8:
        print("⚠️  连接池使用率过高,可能存在连接泄漏")
    return {
        'created': created_connections,
        'available': available_connections,
        'in_use': in_use_connections
    }
# 定期监控
pool_status = monitor_connection_pool(pool)

另外,Redis 8.0 还优化了连接的健康检查机制。之前版本的健康检查可能会影响性能,8.0 版本改进了,可以更频繁地检查连接状态,及时发现断开的连接。我一般把 health_check_interval 设成 30 秒,既能及时发现问题,又不会影响性能。

Pipeline 批量操作,减少网络往返

这个技巧老生常谈了,但很多兄弟还是不会用,或者用得不彻底。Pipeline 的原理就是把多个命令打包一起发,减少网络往返次数。Redis 8.0 对 Pipeline 的支持更好了,性能提升明显。

我之前有个场景,需要批量更新用户积分。最开始是循环调用 INCR,1000 个用户更新一次要 1 秒多。改成 Pipeline 后,直接降到 100 毫秒以内,提升了 10 倍。

这个场景是我在做游戏积分系统的时候遇到的。每天凌晨要批量更新所有用户的每日积分,用户量有几十万。最开始用循环调用,更新一次要十几分钟,而且 Redis 那边压力特别大。后来改成 Pipeline,每次批量处理 200 个用户,总时间降到了 2 分钟以内,性能提升了 6-7 倍。而且 Redis 那边的压力也小了很多,CPU 占用率从 80% 降到了 20% 左右。

import redis
import time
r = redis.Redis(host='localhost', port=6379, db=0)
def update_scores_bad(user_scores):
    """错误的方式:逐个更新,网络往返次数太多"""
    start_time = time.time()
    for user_id, score in user_scores.items():
        key = f"score:{user_id}"
        r.incrby(key, score)
    elapsed = time.time() - start_time
    print(f"逐个更新耗时: {elapsed:.3f} 秒")
    return elapsed
def update_scores_good(user_scores):
    """正确的方式:使用 Pipeline 批量更新"""
    start_time = time.time()
    pipe = r.pipeline()
    # 把所有命令先添加到 Pipeline
    for user_id, score in user_scores.items():
        key = f"score:{user_id}"
        pipe.incrby(key, score)
    # 一次性执行所有命令
    pipe.execute()
    elapsed = time.time() - start_time
    print(f"Pipeline 批量更新耗时: {elapsed:.3f} 秒")
    return elapsed
# 测试数据:1000 个用户的积分更新
test_data = {f"user_{i}": i * 10 for i in range(1000)}
# 对比两种方式的性能
print("=== 性能对比测试 ===")
bad_time = update_scores_bad(test_data)
good_time = update_scores_good(test_data)
print(f"性能提升: {bad_time / good_time:.1f} 倍")

Pipeline 还有个高级用法,就是配合事务使用。有时候你需要保证一批操作的原子性,可以用 MULTI/EXEC

def transfer_score(from_user, to_user, amount):
    """转账操作,保证原子性"""
    pipe = r.pipeline()
    # 开始事务
    pipe.multi()
    # 减少发送方积分
    pipe.decrby(f"score:{from_user}", amount)
    # 增加接收方积分
    pipe.incrby(f"score:{to_user}", amount)
    # 记录转账日志
    pipe.lpush(f"transfer:log", f"{from_user}:{to_user}:{amount}")
    # 执行事务
    result = pipe.execute()
    # 检查是否执行成功
    if result[0] is True:  # MULTI 返回 True
        print("转账成功")
        return True
    else:
        print("转账失败")
        return False

注意一点,Pipeline 不是越大越好。我测试过,一次打包 1000 个命令和 100 个命令,性能差距不大,但 1000 个的话,如果中间有个命令出错,排查起来麻烦。一般建议一次打包 50-200 个命令,这个范围性能最好。

Pipeline 还有个高级技巧,就是分批处理。如果数据量特别大,比如要更新 10 万个用户的积分,一次性打包肯定不行,得分批处理:

def batch_update_large_dataset(user_scores, batch_size=100):
    """大批量数据分批更新"""
    total = len(user_scores)
    processed = 0
    # 将数据分批
    items = list(user_scores.items())
    for i in range(0, total, batch_size):
        batch = items[i:i + batch_size]
        # 每批使用 Pipeline
        pipe = r.pipeline()
        for user_id, score in batch:
            key = f"score:{user_id}"
            pipe.incrby(key, score)
        # 执行这一批
        pipe.execute()
        processed += len(batch)
        # 打印进度
        print(f"已处理: {processed}/{total} ({processed/total*100:.1f}%)")
    print("批量更新完成")
# 使用示例:更新 10 万个用户的积分
large_dataset = {f"user_{i}": i * 10 for i in range(100000)}
batch_update_large_dataset(large_dataset, batch_size=200)

还有个坑,就是 Pipeline 的错误处理。如果 Pipeline 里有个命令出错了,默认情况下整个 Pipeline 都会失败。但有时候你希望部分成功,可以用 WATCH 配合事务,或者单独处理每个命令的结果:

def safe_batch_update(user_scores):
    """安全的批量更新,单个失败不影响其他"""
    pipe = r.pipeline()
    commands = []
    for user_id, score in user_scores.items():
        key = f"score:{user_id}"
        pipe.incrby(key, score)
        commands.append((user_id, key))
    # 执行 Pipeline
    try:
        results = pipe.execute()
        # 检查每个结果
        success_count = 0
        fail_count = 0
        for i, (user_id, key) in enumerate(commands):
            if results[i] is not None:
                success_count += 1
            else:
                fail_count += 1
                print(f"更新失败: {user_id}")
        print(f"成功: {success_count}, 失败: {fail_count}")
        return success_count, fail_count
    except Exception as e:
        print(f"Pipeline 执行出错: {e}")
        return 0, len(commands)

Redis 8.0 对 Pipeline 的性能做了优化,特别是在高并发场景下,性能提升更明显。我测试过,同样的 Pipeline 操作,8.0 版本比 7.x 版本快 15-20% 左右。

内存优化,让 Redis 跑得更稳

内存优化这个事,说起来简单,做起来难。Redis 8.0 在内存管理上做了不少改进,特别是副本节点的内存占用降低了 35%,但主节点还是得自己优化。

我之前遇到过内存碎片的问题,used_memory_rssused_memory 高出一大截,内存利用率只有 70% 左右。最严重的时候,used_memory 是 8GB,但 used_memory_rss 是 12GB,内存碎片率到了 1.5,浪费了 4GB 内存。

这个问题是因为 Key 的过期和删除导致的。Redis 删除 Key 后,内存不会立即释放给操作系统,而是留在 Redis 的内存池里,时间一长就产生碎片。后来通过几个优化手段,包括合理设置过期时间、使用合适的数据结构、定期清理不用的 Key,把利用率提到了 90% 以上。内存碎片率也降到了 1.1 左右,基本正常了。

先说说怎么监控内存:

def check_memory_status():
    """检查 Redis 内存状态"""
    info = r.info('memory')
    used_memory = info['used_memory']
    used_memory_rss = info['used_memory_rss']
    used_memory_peak = info['used_memory_peak']
    mem_fragmentation_ratio = info['mem_fragmentation_ratio']
    print(f"已使用内存: {used_memory / 1024 / 1024:.2f} MB")
    print(f"RSS 内存: {used_memory_rss / 1024 / 1024:.2f} MB")
    print(f"峰值内存: {used_memory_peak / 1024 / 1024:.2f} MB")
    print(f"内存碎片率: {mem_fragmentation_ratio:.2f}")
    # 碎片率大于 1.5 就要注意了
    if mem_fragmentation_ratio > 1.5:
        print("⚠️  内存碎片率较高,建议优化")
    # 计算内存利用率
    if used_memory_rss > 0:
        utilization = (used_memory / used_memory_rss) * 100
        print(f"内存利用率: {utilization:.2f}%")
    return {
        'used_memory': used_memory,
        'used_memory_rss': used_memory_rss,
        'fragmentation_ratio': mem_fragmentation_ratio
    }
# 定期检查内存状态
memory_status = check_memory_status()

内存优化的几个关键点:

  1. 合理设置过期时间:别让数据一直占着内存不释放
def set_with_expire(key, value, expire_seconds=3600):
    """设置键值对并指定过期时间"""
    # 使用 SETEX 命令,原子操作
    r.setex(key, expire_seconds, value)
    # 或者分开设置
    # r.set(key, value)
    # r.expire(key, expire_seconds)
# 批量设置过期时间
def batch_set_expire(keys, expire_seconds=3600):
    """批量设置过期时间"""
    pipe = r.pipeline()
    for key in keys:
        pipe.expire(key, expire_seconds)
    pipe.execute()
  1. 使用合适的数据结构:能用 Hash 就别用 String,能用 Set 就别用 List
# 不好的方式:用多个 String 存储用户信息
r.set("user:1001:name", "张三")
r.set("user:1001:age", "25")
r.set("user:1001:email", "zhangsan@example.com")
# 好的方式:用 Hash 存储
r.hset("user:1001", mapping={
    "name": "张三",
    "age": "25",
    "email": "zhangsan@example.com"
})
# Hash 的优势:
# 1. 内存占用更少(Redis 8.0 对 Hash 做了优化)
# 2. 可以部分更新,不用整个替换
# 3. 可以批量获取多个字段
user_info = r.hmget("user:1001", "name", "age", "email")
  1. 压缩大 Key:如果 Value 比较大,可以考虑压缩
import gzip
import json
def set_compressed(key, data, expire_seconds=3600):
    """存储压缩后的数据"""
    # 序列化为 JSON
    json_str = json.dumps(data, ensure_ascii=False)
    # 压缩
    compressed = gzip.compress(json_str.encode('utf-8'))
    # 存储
    r.setex(key, expire_seconds, compressed)
def get_compressed(key):
    """获取并解压数据"""
    compressed = r.get(key)
    if compressed is None:
        return None
    # 解压
    json_str = gzip.decompress(compressed).decode('utf-8')
    # 反序列化
    return json.loads(json_str)
# 使用示例
large_data = {
    "items": [{"id": i, "data": f"item_{i}_data" * 100} for i in range(1000)]
}
set_compressed("large:data", large_data)
retrieved = get_compressed("large:data")

Redis 8.0 对内存分配器做了优化,减少了内存碎片。但如果你发现碎片率还是很高,可以尝试重启 Redis(当然要在业务低峰期),或者使用 MEMORY PURGE 命令(Redis 4.0+ 支持)来主动释放内存。

内存优化还有个重要的点,就是 Key 的命名规范。我见过很多项目,Key 命名乱七八糟,有的特别长,有的重复前缀特别多,这样既浪费内存,又影响性能。建议用统一的命名规范:

# 不好的命名方式
r.set("user_1001_name", "张三")
r.set("user_1001_age", "25")
r.set("user_1001_email", "zhangsan@example.com")
# 好的命名方式:使用冒号分隔,层次清晰
r.hset("user:1001", mapping={
    "name": "张三",
    "age": "25",
    "email": "zhangsan@example.com"
})
# 或者用更短的 Key
r.hset("u:1001", mapping={
    "n": "张三",
    "a": "25",
    "e": "zhangsan@example.com"
})

Key 命名还有个技巧,就是避免使用太长的 Key。Redis 8.0 虽然对 Key 的长度做了优化,但太长的 Key 还是会占用更多内存。我一般建议 Key 长度控制在 50 个字符以内。

另外,Redis 8.0 对 Hash 数据结构做了优化,小 Hash(字段数少于 512 个,且每个字段值小于 64 字节)的内存占用更少了。所以能用 Hash 的地方尽量用 Hash,别用多个 String。

还有一个内存优化的技巧,就是使用 Bitmap。如果存储的是布尔值或者计数器,用 Bitmap 能节省大量内存:

def use_bitmap_for_flags():
    """使用 Bitmap 存储用户标记"""
    user_id = 1001
    # 设置用户标记
    r.setbit(f"user:flags:{user_id}", 0, 1)  # VIP 标记
    r.setbit(f"user:flags:{user_id}", 1, 1)  # 已认证
    r.setbit(f"user:flags:{user_id}", 2, 0)  # 未激活
    # 检查标记
    is_vip = r.getbit(f"user:flags:{user_id}", 0)
    is_verified = r.getbit(f"user:flags:{user_id}", 1)
    print(f"VIP: {is_vip}, 已认证: {is_verified}")
    # 统计标记数量
    flag_count = r.bitcount(f"user:flags:{user_id}")
    print(f"标记数量: {flag_count}")
# Bitmap 还可以用来做用户行为统计
def track_user_activity(user_id, day_offset):
    """记录用户每日活跃状态"""
    # day_offset 是距离某个基准日期的天数
    r.setbit(f"user:active:{user_id}", day_offset, 1)
    # 统计最近 7 天的活跃天数
    recent_active_days = r.bitcount(
        f"user:active:{user_id}",
        day_offset - 6,  # 起始位置
        day_offset       # 结束位置
    )
    print(f"最近 7 天活跃 {recent_active_days} 天")

Redis 8.0 对 BITCOUNT 命令做了优化,增加了预取优化,位计数操作更快了。如果你有大量位操作,性能提升会很明显。

最后说下内存淘汰策略。Redis 8.0 支持多种淘汰策略,我一般用 allkeys-lru,就是最近最少使用的 Key 优先淘汰。如果你的数据有明确的过期时间,用 volatile-lru 也行。关键是别用 noeviction,除非你确定内存够用,不然内存满了 Redis 就写不进去了。

利用 Redis 8.0 的向量搜索优化

Redis 8.0 最大的亮点就是向量搜索功能,这个对 AI 场景特别有用。我最近在做推荐系统,用上了这个功能,性能提升很明显。

之前我用 Elasticsearch 做向量搜索,虽然功能强大,但性能一般,而且资源消耗大。一个 Elasticsearch 集群要 3 台服务器,每台 32GB 内存,成本挺高的。后来改用 Redis 8.0 的向量搜索,只用一台 16GB 内存的服务器就够了,而且查询速度还快了一倍。特别是实时推荐场景,Redis 的延迟更低,用户体验更好。

当然,Redis 的向量搜索功能没有 Elasticsearch 那么丰富,但对于大部分推荐场景来说,已经够用了。如果你的场景特别复杂,比如需要复杂的过滤和聚合,那还是用 Elasticsearch 更合适。

向量搜索的核心是 KNN(K-Nearest Neighbors)算法,Redis 8.0 对向量操作做了优化,RDB 加载和 RESTORE 速度都提升了。但要用好这个功能,有几个技巧:

from redis.commands.search.field import VectorField, TextField, NumericField
from redis.commands.search.indexDefinition import IndexDefinition, IndexType
from redis.commands.search.query import Query
import numpy as np
# 创建向量索引
def create_vector_index():
    """创建向量搜索索引"""
    schema = (
        TextField("title"),  # 文本字段
        NumericField("price"),  # 数值字段
        VectorField("vector",  # 向量字段
                   "FLAT",  # 索引类型:FLAT 或 HNSW
                   {
                       "TYPE": "FLOAT32",  # 向量类型
                       "DIM": 128,  # 向量维度
                       "DISTANCE_METRIC": "COSINE"  # 距离度量:COSINE, L2, IP
                   })
    )
    # 定义索引
    definition = IndexDefinition(prefix=["item:"], index_type=IndexType.HASH)
    # 创建索引
    r.ft("item_index").create_index(
        schema,
        definition=definition
    )
    print("向量索引创建成功")
# 添加带向量的文档
def add_item_with_vector(item_id, title, price, vector):
    """添加商品及其向量"""
    key = f"item:{item_id}"
    r.hset(key, mapping={
        "title": title,
        "price": price,
        "vector": vector.tobytes()  # 向量转为字节
    })
# 向量相似度搜索
def search_similar_items(query_vector, top_k=10, price_range=None):
    """搜索相似商品"""
    # 构建查询
    base_query = f"=>[KNN {top_k} @vector $query_vector AS vector_score]"
    # 如果有价格过滤条件
    if price_range:
        min_price, max_price = price_range
        base_query = f"@price:[{min_price} {max_price}] {base_query}"
    # 创建查询对象,使用 DIALECT 2 支持高级特性
    query = (
        Query(base_query)
        .sort_by("vector_score")  # 按相似度排序
        .return_fields("title", "price", "vector_score")
        .dialect(2)  # 使用 DIALECT 2
    )
    # 执行搜索
    results = r.ft("item_index").search(
        query,
        query_params={"query_vector": query_vector.tobytes()}
    )
    return results
# 使用示例
# 假设我们有一个 128 维的查询向量
query_vec = np.random.rand(128).astype(np.float32)
# 搜索相似商品,价格在 100-500 之间
results = search_similar_items(query_vec, top_k=5, price_range=(100, 500))
for doc in results.docs:
    print(f"商品: {doc.title}, 价格: {doc.price}, 相似度: {doc.vector_score}")

Redis 8.2 还引入了 SVS-VAMANA 向量索引类型,支持向量压缩,可以进一步节省内存。但 8.0 版本的向量搜索已经够用了,性能提升很明显。

关键优化点:

  1. 选择合适的距离度量:COSINE 适合文本相似度,L2 适合图像相似度
  2. 合理设置向量维度:维度太高内存占用大,太低精度不够
  3. 使用 DIALECT 2:支持更高级的查询特性,性能更好

向量搜索还有个高级用法,就是混合搜索。可以同时用文本搜索和向量搜索,然后合并结果:

def hybrid_search(keyword, query_vector, top_k=10):
    """混合搜索:文本 + 向量"""
    # 文本搜索
    text_query = Query(f"@title:{keyword} | @content:{keyword}")
    text_results = r.ft("item_index").search(text_query, limit=top_k)
    # 向量搜索
    vector_query = (
        Query(f"=>[KNN {top_k} @vector $query_vector AS vector_score]")
        .sort_by("vector_score")
        .dialect(2)
    )
    vector_results = r.ft("item_index").search(
        vector_query,
        query_params={"query_vector": query_vector.tobytes()}
    )
    # 合并结果,去重并排序
    # 这里简化处理,实际可以用更复杂的融合算法
    combined_results = {}
    # 添加文本搜索结果,权重 0.6
    for doc in text_results.docs:
        item_id = doc.id.split(":")[-1]
        score = 0.6 * (1.0 / (text_results.docs.index(doc) + 1))
        if item_id not in combined_results:
            combined_results[item_id] = {"score": 0, "doc": doc}
        combined_results[item_id]["score"] += score
    # 添加向量搜索结果,权重 0.4
    for doc in vector_results.docs:
        item_id = doc.id.split(":")[-1]
        vector_score = float(doc.vector_score) if hasattr(doc, 'vector_score') else 0
        score = 0.4 * (1.0 - vector_score)  # 距离越小,分数越高
        if item_id not in combined_results:
            combined_results[item_id] = {"score": 0, "doc": doc}
        combined_results[item_id]["score"] += score
    # 按综合分数排序
    sorted_results = sorted(
        combined_results.items(),
        key=lambda x: x[1]["score"],
        reverse=True
    )
    return sorted_results[:top_k]
# 使用示例
keyword = "手机"
query_vec = np.random.rand(128).astype(np.float32)
results = hybrid_search(keyword, query_vec, top_k=5)
for item_id, data in results:
    print(f"商品 ID: {item_id}, 综合分数: {data['score']:.3f}")

向量索引的选择也很重要。FLAT 索引简单直接,适合小规模数据(几万到几十万条)。HNSW 索引适合大规模数据(百万级以上),查询速度快,但内存占用大。Redis 8.2 的 SVS-VAMANA 索引支持压缩,内存占用更小,但查询速度可能稍慢。

我的建议是,数据量小于 50 万用 FLAT,大于 50 万用 HNSW,如果内存紧张可以考虑 SVS-VAMANA。具体选哪个,还得看你的业务场景和性能要求。

Query Engine 优化,让查询飞起来

Redis 8.0 的 Query Engine 改进很大,命令延迟降低了 87%,查询处理能力提升了 16 倍。但要用好这个功能,得掌握几个技巧。

Query Engine 支持全文搜索、向量搜索、地理空间查询和聚合,功能很强大。我用它替代了 Elasticsearch 的部分功能,性能提升明显,还省了不少服务器成本。

from redis.commands.search.field import TextField, TagField, NumericField, GeoField
from redis.commands.search.indexDefinition import IndexDefinition, IndexType
# 创建全文搜索索引
def create_search_index():
    """创建全文搜索索引"""
    schema = (
        TextField("title", weight=5.0),  # 标题字段,权重高
        TextField("content"),  # 内容字段
        TagField("category"),  # 分类标签
        NumericField("views"),  # 浏览量
        NumericField("created_at"),  # 创建时间
        GeoField("location")  # 地理位置
    )
    definition = IndexDefinition(
        prefix=["article:"],
        index_type=IndexType.HASH
    )
    r.ft("article_index").create_index(schema, definition=definition)
    print("全文搜索索引创建成功")
# 添加文章
def add_article(article_id, title, content, category, views, created_at, location=None):
    """添加文章到索引"""
    key = f"article:{article_id}"
    mapping = {
        "title": title,
        "content": content,
        "category": category,
        "views": views,
        "created_at": created_at
    }
    if location:
        mapping["location"] = location  # 格式:经度,纬度
    r.hset(key, mapping=mapping)
# 全文搜索
def search_articles(keyword, category=None, min_views=0, limit=10):
    """搜索文章"""
    # 构建查询条件
    query_parts = []
    # 关键词搜索
    if keyword:
        query_parts.append(f"@title:{keyword} | @content:{keyword}")
    # 分类过滤
    if category:
        query_parts.append(f"@category:{{{category}}}")
    # 浏览量过滤
    if min_views > 0:
        query_parts.append(f"@views:[{min_views} +inf]")
    # 组合查询
    query_str = " ".join(query_parts) if query_parts else "*"
    # 创建查询,按浏览量排序
    query = (
        Query(query_str)
        .sort_by("views", asc=False)  # 按浏览量降序
        .return_fields("title", "content", "category", "views")
        .limit(0, limit)  # 分页
    )
    results = r.ft("article_index").search(query)
    return results
# 聚合查询:统计各分类的文章数和平均浏览量
def aggregate_by_category():
    """按分类聚合统计"""
    from redis.commands.search.aggregation import AggregationRequest
    req = AggregationRequest("*").group_by(
        "@category",
        reducers.count().alias("count"),
        reducers.avg("@views").alias("avg_views")
    ).sort_by("@count", asc=False)
    results = r.ft("article_index").aggregate(req)
    return results
# 地理空间查询:查找附近的文章
def search_nearby_articles(longitude, latitude, radius_km=10, limit=10):
    """搜索附近的文章"""
    query = (
        Query(f"@location:[{longitude} {latitude} {radius_km} km]")
        .return_fields("title", "location")
        .limit(0, limit)
    )
    results = r.ft("article_index").search(query)
    return results

Query Engine 的性能优化技巧:

  1. 合理使用索引字段:不是所有字段都要建索引,只索引需要搜索和过滤的字段
  2. 使用权重:给重要字段设置更高的权重,提高搜索相关性
  3. 避免复杂查询:查询条件太复杂会影响性能,尽量简化
  4. 使用聚合代替多次查询:需要统计数据时,用聚合查询比多次查询效率高
# 不好的方式:多次查询统计
def get_stats_bad():
    categories = ["tech", "business", "life"]
    stats = {}
    for cat in categories:
        results = search_articles("", category=cat)
        stats[cat] = len(results.docs)
    return stats
# 好的方式:使用聚合查询
def get_stats_good():
    return aggregate_by_category()

Query Engine 还有个强大的功能,就是 Facet 搜索。可以同时搜索和统计,比如搜索"手机",然后统计各个品牌的数量:

def search_with_facets(keyword, facet_fields=["brand", "category"]):
    """带 Facet 的搜索"""
    from redis.commands.search.aggregation import AggregationRequest, reducers
    # 构建查询
    query_str = f"@title:{keyword} | @content:{keyword}" if keyword else "*"
    # 创建聚合请求
    req = AggregationRequest(query_str)
    # 添加 Facet
    for field in facet_fields:
        req.group_by(f"@{field}", reducers.count().alias(f"{field}_count"))
    # 执行聚合
    results = r.ft("item_index").aggregate(req)
    return results
# 使用示例
facet_results = search_with_facets("手机", ["brand", "category"])
for row in facet_results.rows:
    print(f"Facet: {row}")

Query Engine 的性能优化还有个技巧,就是合理使用索引。不是所有字段都要建索引,只索引需要搜索、过滤、排序的字段。索引字段太多会影响写入性能,索引字段太少会影响查询性能,得找个平衡点。

另外,Query Engine 支持自定义排序函数,可以根据业务需求自定义排序规则。比如商品搜索,可以按销量、价格、评分等多个维度综合排序:

def search_with_custom_sort(keyword, sort_by="popularity"):
    """自定义排序的搜索"""
    query_str = f"@title:{keyword} | @content:{keyword}" if keyword else "*"
    if sort_by == "popularity":
        # 按销量和评分综合排序
        query = (
            Query(query_str)
            .sort_by("@sales", asc=False)  # 销量降序
            .sort_by("@rating", asc=False)  # 评分降序
        )
    elif sort_by == "price":
        # 按价格排序
        query = Query(query_str).sort_by("@price", asc=True)
    else:
        query = Query(query_str)
    results = r.ft("item_index").search(query)
    return results

Redis 8.0 的 Query Engine 性能提升确实很明显,命令延迟降低了 87%,查询处理能力提升了 16 倍。但前提是你得用好这些功能,别瞎用。我见过有人把所有字段都建索引,结果写入性能下降了一半,得不偿失。

总结一下

Redis 8.0 的性能提升确实很明显,但前提是你得掌握这些优化技巧。我总结的这 5 个技巧,都是生产环境踩坑踩出来的:

1、连接池优化:合理配置连接数,避免连接成为瓶颈

2、Pipeline 批量操作:减少网络往返,提升吞吐量

3、内存优化:合理使用数据结构,设置过期时间,监控内存状态

4、向量搜索优化:利用 Redis 8.0 的新特性,提升 AI 场景性能

5、Query Engine 优化:用好全文搜索和聚合查询,替代部分 Elasticsearch 功能

这些技巧配合使用,性能提升 50% 不是梦。当然,具体能提升多少,还得看你的业务场景。我建议,先在测试环境验证,然后再上生产。

最后说一句,Redis 8.0 虽然性能好,但也不是万能的。如果你的数据量特别大,或者查询特别复杂,还是得考虑 Redis Cluster 或者配合其他数据库使用。别一根筋,该用啥用啥。