最近我在生产环境折腾 Redis 8.0,发现这版本确实有点东西。之前用 7.x 的时候,遇到高并发场景就有点吃力,升级到 8.0 后配合几个优化技巧,性能直接提升了 50% 以上。今天就跟兄弟们分享下我踩过的坑和总结出来的 5 个高级技巧,别磨叽,直接上干货。
连接池优化,别让连接成为瓶颈
先说连接池这个事,很多兄弟可能觉得这玩意儿没啥好说的,不就是复用连接嘛。但实际情况是,连接池配置不对,性能能差好几倍。
我之前就吃过这个亏。项目刚上线的时候,每次请求都创建新连接,QPS 一上来,Redis 那边连接数直接爆表,CPU 占用率也蹭蹭往上涨。最严重的时候,连接数到了 5000 多,Redis 服务器 CPU 直接 100%,响应时间从几毫秒涨到几百毫秒,用户都开始投诉了。
后来改成连接池,但初始配置也不对,最大连接数设得太小,只有 20 个。高并发的时候,所有请求都排队等连接,虽然连接数不爆了,但响应时间还是慢。后来我仔细分析了业务场景,发现平均 QPS 是 2000,平均响应时间是 5 毫秒,理论上需要 10 个连接就够了,但考虑到峰值和网络波动,我设成了 50 个,这才解决了问题。
Redis 8.0 对连接管理做了优化,客户端管理开销降低了,但前提是你得把连接池配置好。看下我现在的配置:
import redis
from redis.connection import ConnectionPool
# 创建连接池,这里有几个关键参数
pool = redis.ConnectionPool(
host='localhost',
port=6379,
db=0,
# 最大连接数,根据你的并发量来,我一般设成 QPS 的 1.5 倍
max_connections=200,
# 连接超时时间,别设太小,网络波动的时候容易超时
socket_connect_timeout=5,
# 读取超时,根据你的命令执行时间调整
socket_timeout=5,
# 连接保活,避免连接被服务器关闭
socket_keepalive=True,
socket_keepalive_options={},
# 连接重试次数
retry_on_timeout=True,
# 健康检查,定期检查连接是否有效
health_check_interval=30
)
# 使用连接池创建客户端
r = redis.Redis(connection_pool=pool)
# 使用示例
def get_user_info(user_id):
"""获取用户信息,使用连接池复用连接"""
key = f"user:{user_id}"
# 这里会自动从连接池获取连接,用完自动归还
user_data = r.hgetall(key)
return user_data
# 批量操作时,复用同一个连接
def batch_get_users(user_ids):
"""批量获取用户信息"""
pipe = r.pipeline()
for user_id in user_ids:
key = f"user:{user_id}"
pipe.hgetall(key)
# 一次性执行,只用一个连接
results = pipe.execute()
return results
关键点来了,max_connections 这个参数别瞎设。我的经验是,先看看你的平均 QPS,然后乘以平均响应时间(秒),再乘以 1.5 的安全系数。比如 QPS 是 1000,平均响应时间是 0.01 秒,那大概需要 15 个连接,再乘以 1.5,就是 22.5,我一般设成 30-50 左右。
还有一点,Redis 8.0 对客户端管理做了优化,减少了客户端 cron 操作的开销,避免阻塞主线程。这意味着你可以放心地使用更多连接,不用担心性能问题。
连接池还有个坑,就是连接泄漏的问题。我之前遇到过,代码里有个地方异常处理没做好,连接没正常归还,时间一长连接池就空了,所有请求都卡住。后来加了监控,发现连接池使用率异常就告警:
def monitor_connection_pool(pool):
"""监控连接池状态"""
# 获取连接池信息
created_connections = pool.created_connections
available_connections = len(pool._available_connections)
in_use_connections = created_connections - available_connections
print(f"总连接数: {created_connections}")
print(f"可用连接数: {available_connections}")
print(f"使用中连接数: {in_use_connections}")
print(f"连接池使用率: {(in_use_connections / created_connections * 100):.2f}%")
# 如果使用率超过 80%,告警
if in_use_connections / created_connections > 0.8:
print("⚠️ 连接池使用率过高,可能存在连接泄漏")
return {
'created': created_connections,
'available': available_connections,
'in_use': in_use_connections
}
# 定期监控
pool_status = monitor_connection_pool(pool)
另外,Redis 8.0 还优化了连接的健康检查机制。之前版本的健康检查可能会影响性能,8.0 版本改进了,可以更频繁地检查连接状态,及时发现断开的连接。我一般把 health_check_interval 设成 30 秒,既能及时发现问题,又不会影响性能。
Pipeline 批量操作,减少网络往返
这个技巧老生常谈了,但很多兄弟还是不会用,或者用得不彻底。Pipeline 的原理就是把多个命令打包一起发,减少网络往返次数。Redis 8.0 对 Pipeline 的支持更好了,性能提升明显。
我之前有个场景,需要批量更新用户积分。最开始是循环调用 INCR,1000 个用户更新一次要 1 秒多。改成 Pipeline 后,直接降到 100 毫秒以内,提升了 10 倍。
这个场景是我在做游戏积分系统的时候遇到的。每天凌晨要批量更新所有用户的每日积分,用户量有几十万。最开始用循环调用,更新一次要十几分钟,而且 Redis 那边压力特别大。后来改成 Pipeline,每次批量处理 200 个用户,总时间降到了 2 分钟以内,性能提升了 6-7 倍。而且 Redis 那边的压力也小了很多,CPU 占用率从 80% 降到了 20% 左右。
import redis
import time
r = redis.Redis(host='localhost', port=6379, db=0)
def update_scores_bad(user_scores):
"""错误的方式:逐个更新,网络往返次数太多"""
start_time = time.time()
for user_id, score in user_scores.items():
key = f"score:{user_id}"
r.incrby(key, score)
elapsed = time.time() - start_time
print(f"逐个更新耗时: {elapsed:.3f} 秒")
return elapsed
def update_scores_good(user_scores):
"""正确的方式:使用 Pipeline 批量更新"""
start_time = time.time()
pipe = r.pipeline()
# 把所有命令先添加到 Pipeline
for user_id, score in user_scores.items():
key = f"score:{user_id}"
pipe.incrby(key, score)
# 一次性执行所有命令
pipe.execute()
elapsed = time.time() - start_time
print(f"Pipeline 批量更新耗时: {elapsed:.3f} 秒")
return elapsed
# 测试数据:1000 个用户的积分更新
test_data = {f"user_{i}": i * 10 for i in range(1000)}
# 对比两种方式的性能
print("=== 性能对比测试 ===")
bad_time = update_scores_bad(test_data)
good_time = update_scores_good(test_data)
print(f"性能提升: {bad_time / good_time:.1f} 倍")
Pipeline 还有个高级用法,就是配合事务使用。有时候你需要保证一批操作的原子性,可以用 MULTI/EXEC:
def transfer_score(from_user, to_user, amount):
"""转账操作,保证原子性"""
pipe = r.pipeline()
# 开始事务
pipe.multi()
# 减少发送方积分
pipe.decrby(f"score:{from_user}", amount)
# 增加接收方积分
pipe.incrby(f"score:{to_user}", amount)
# 记录转账日志
pipe.lpush(f"transfer:log", f"{from_user}:{to_user}:{amount}")
# 执行事务
result = pipe.execute()
# 检查是否执行成功
if result[0] is True: # MULTI 返回 True
print("转账成功")
return True
else:
print("转账失败")
return False
注意一点,Pipeline 不是越大越好。我测试过,一次打包 1000 个命令和 100 个命令,性能差距不大,但 1000 个的话,如果中间有个命令出错,排查起来麻烦。一般建议一次打包 50-200 个命令,这个范围性能最好。
Pipeline 还有个高级技巧,就是分批处理。如果数据量特别大,比如要更新 10 万个用户的积分,一次性打包肯定不行,得分批处理:
def batch_update_large_dataset(user_scores, batch_size=100):
"""大批量数据分批更新"""
total = len(user_scores)
processed = 0
# 将数据分批
items = list(user_scores.items())
for i in range(0, total, batch_size):
batch = items[i:i + batch_size]
# 每批使用 Pipeline
pipe = r.pipeline()
for user_id, score in batch:
key = f"score:{user_id}"
pipe.incrby(key, score)
# 执行这一批
pipe.execute()
processed += len(batch)
# 打印进度
print(f"已处理: {processed}/{total} ({processed/total*100:.1f}%)")
print("批量更新完成")
# 使用示例:更新 10 万个用户的积分
large_dataset = {f"user_{i}": i * 10 for i in range(100000)}
batch_update_large_dataset(large_dataset, batch_size=200)
还有个坑,就是 Pipeline 的错误处理。如果 Pipeline 里有个命令出错了,默认情况下整个 Pipeline 都会失败。但有时候你希望部分成功,可以用 WATCH 配合事务,或者单独处理每个命令的结果:
def safe_batch_update(user_scores):
"""安全的批量更新,单个失败不影响其他"""
pipe = r.pipeline()
commands = []
for user_id, score in user_scores.items():
key = f"score:{user_id}"
pipe.incrby(key, score)
commands.append((user_id, key))
# 执行 Pipeline
try:
results = pipe.execute()
# 检查每个结果
success_count = 0
fail_count = 0
for i, (user_id, key) in enumerate(commands):
if results[i] is not None:
success_count += 1
else:
fail_count += 1
print(f"更新失败: {user_id}")
print(f"成功: {success_count}, 失败: {fail_count}")
return success_count, fail_count
except Exception as e:
print(f"Pipeline 执行出错: {e}")
return 0, len(commands)
Redis 8.0 对 Pipeline 的性能做了优化,特别是在高并发场景下,性能提升更明显。我测试过,同样的 Pipeline 操作,8.0 版本比 7.x 版本快 15-20% 左右。
内存优化,让 Redis 跑得更稳
内存优化这个事,说起来简单,做起来难。Redis 8.0 在内存管理上做了不少改进,特别是副本节点的内存占用降低了 35%,但主节点还是得自己优化。
我之前遇到过内存碎片的问题,used_memory_rss 比 used_memory 高出一大截,内存利用率只有 70% 左右。最严重的时候,used_memory 是 8GB,但 used_memory_rss 是 12GB,内存碎片率到了 1.5,浪费了 4GB 内存。
这个问题是因为 Key 的过期和删除导致的。Redis 删除 Key 后,内存不会立即释放给操作系统,而是留在 Redis 的内存池里,时间一长就产生碎片。后来通过几个优化手段,包括合理设置过期时间、使用合适的数据结构、定期清理不用的 Key,把利用率提到了 90% 以上。内存碎片率也降到了 1.1 左右,基本正常了。
先说说怎么监控内存:
def check_memory_status():
"""检查 Redis 内存状态"""
info = r.info('memory')
used_memory = info['used_memory']
used_memory_rss = info['used_memory_rss']
used_memory_peak = info['used_memory_peak']
mem_fragmentation_ratio = info['mem_fragmentation_ratio']
print(f"已使用内存: {used_memory / 1024 / 1024:.2f} MB")
print(f"RSS 内存: {used_memory_rss / 1024 / 1024:.2f} MB")
print(f"峰值内存: {used_memory_peak / 1024 / 1024:.2f} MB")
print(f"内存碎片率: {mem_fragmentation_ratio:.2f}")
# 碎片率大于 1.5 就要注意了
if mem_fragmentation_ratio > 1.5:
print("⚠️ 内存碎片率较高,建议优化")
# 计算内存利用率
if used_memory_rss > 0:
utilization = (used_memory / used_memory_rss) * 100
print(f"内存利用率: {utilization:.2f}%")
return {
'used_memory': used_memory,
'used_memory_rss': used_memory_rss,
'fragmentation_ratio': mem_fragmentation_ratio
}
# 定期检查内存状态
memory_status = check_memory_status()
内存优化的几个关键点:
- 合理设置过期时间:别让数据一直占着内存不释放
def set_with_expire(key, value, expire_seconds=3600):
"""设置键值对并指定过期时间"""
# 使用 SETEX 命令,原子操作
r.setex(key, expire_seconds, value)
# 或者分开设置
# r.set(key, value)
# r.expire(key, expire_seconds)
# 批量设置过期时间
def batch_set_expire(keys, expire_seconds=3600):
"""批量设置过期时间"""
pipe = r.pipeline()
for key in keys:
pipe.expire(key, expire_seconds)
pipe.execute()
- 使用合适的数据结构:能用 Hash 就别用 String,能用 Set 就别用 List
# 不好的方式:用多个 String 存储用户信息
r.set("user:1001:name", "张三")
r.set("user:1001:age", "25")
r.set("user:1001:email", "zhangsan@example.com")
# 好的方式:用 Hash 存储
r.hset("user:1001", mapping={
"name": "张三",
"age": "25",
"email": "zhangsan@example.com"
})
# Hash 的优势:
# 1. 内存占用更少(Redis 8.0 对 Hash 做了优化)
# 2. 可以部分更新,不用整个替换
# 3. 可以批量获取多个字段
user_info = r.hmget("user:1001", "name", "age", "email")
- 压缩大 Key:如果 Value 比较大,可以考虑压缩
import gzip
import json
def set_compressed(key, data, expire_seconds=3600):
"""存储压缩后的数据"""
# 序列化为 JSON
json_str = json.dumps(data, ensure_ascii=False)
# 压缩
compressed = gzip.compress(json_str.encode('utf-8'))
# 存储
r.setex(key, expire_seconds, compressed)
def get_compressed(key):
"""获取并解压数据"""
compressed = r.get(key)
if compressed is None:
return None
# 解压
json_str = gzip.decompress(compressed).decode('utf-8')
# 反序列化
return json.loads(json_str)
# 使用示例
large_data = {
"items": [{"id": i, "data": f"item_{i}_data" * 100} for i in range(1000)]
}
set_compressed("large:data", large_data)
retrieved = get_compressed("large:data")
Redis 8.0 对内存分配器做了优化,减少了内存碎片。但如果你发现碎片率还是很高,可以尝试重启 Redis(当然要在业务低峰期),或者使用 MEMORY PURGE 命令(Redis 4.0+ 支持)来主动释放内存。
内存优化还有个重要的点,就是 Key 的命名规范。我见过很多项目,Key 命名乱七八糟,有的特别长,有的重复前缀特别多,这样既浪费内存,又影响性能。建议用统一的命名规范:
# 不好的命名方式
r.set("user_1001_name", "张三")
r.set("user_1001_age", "25")
r.set("user_1001_email", "zhangsan@example.com")
# 好的命名方式:使用冒号分隔,层次清晰
r.hset("user:1001", mapping={
"name": "张三",
"age": "25",
"email": "zhangsan@example.com"
})
# 或者用更短的 Key
r.hset("u:1001", mapping={
"n": "张三",
"a": "25",
"e": "zhangsan@example.com"
})
Key 命名还有个技巧,就是避免使用太长的 Key。Redis 8.0 虽然对 Key 的长度做了优化,但太长的 Key 还是会占用更多内存。我一般建议 Key 长度控制在 50 个字符以内。
另外,Redis 8.0 对 Hash 数据结构做了优化,小 Hash(字段数少于 512 个,且每个字段值小于 64 字节)的内存占用更少了。所以能用 Hash 的地方尽量用 Hash,别用多个 String。
还有一个内存优化的技巧,就是使用 Bitmap。如果存储的是布尔值或者计数器,用 Bitmap 能节省大量内存:
def use_bitmap_for_flags():
"""使用 Bitmap 存储用户标记"""
user_id = 1001
# 设置用户标记
r.setbit(f"user:flags:{user_id}", 0, 1) # VIP 标记
r.setbit(f"user:flags:{user_id}", 1, 1) # 已认证
r.setbit(f"user:flags:{user_id}", 2, 0) # 未激活
# 检查标记
is_vip = r.getbit(f"user:flags:{user_id}", 0)
is_verified = r.getbit(f"user:flags:{user_id}", 1)
print(f"VIP: {is_vip}, 已认证: {is_verified}")
# 统计标记数量
flag_count = r.bitcount(f"user:flags:{user_id}")
print(f"标记数量: {flag_count}")
# Bitmap 还可以用来做用户行为统计
def track_user_activity(user_id, day_offset):
"""记录用户每日活跃状态"""
# day_offset 是距离某个基准日期的天数
r.setbit(f"user:active:{user_id}", day_offset, 1)
# 统计最近 7 天的活跃天数
recent_active_days = r.bitcount(
f"user:active:{user_id}",
day_offset - 6, # 起始位置
day_offset # 结束位置
)
print(f"最近 7 天活跃 {recent_active_days} 天")
Redis 8.0 对 BITCOUNT 命令做了优化,增加了预取优化,位计数操作更快了。如果你有大量位操作,性能提升会很明显。
最后说下内存淘汰策略。Redis 8.0 支持多种淘汰策略,我一般用 allkeys-lru,就是最近最少使用的 Key 优先淘汰。如果你的数据有明确的过期时间,用 volatile-lru 也行。关键是别用 noeviction,除非你确定内存够用,不然内存满了 Redis 就写不进去了。
利用 Redis 8.0 的向量搜索优化
Redis 8.0 最大的亮点就是向量搜索功能,这个对 AI 场景特别有用。我最近在做推荐系统,用上了这个功能,性能提升很明显。
之前我用 Elasticsearch 做向量搜索,虽然功能强大,但性能一般,而且资源消耗大。一个 Elasticsearch 集群要 3 台服务器,每台 32GB 内存,成本挺高的。后来改用 Redis 8.0 的向量搜索,只用一台 16GB 内存的服务器就够了,而且查询速度还快了一倍。特别是实时推荐场景,Redis 的延迟更低,用户体验更好。
当然,Redis 的向量搜索功能没有 Elasticsearch 那么丰富,但对于大部分推荐场景来说,已经够用了。如果你的场景特别复杂,比如需要复杂的过滤和聚合,那还是用 Elasticsearch 更合适。
向量搜索的核心是 KNN(K-Nearest Neighbors)算法,Redis 8.0 对向量操作做了优化,RDB 加载和 RESTORE 速度都提升了。但要用好这个功能,有几个技巧:
from redis.commands.search.field import VectorField, TextField, NumericField
from redis.commands.search.indexDefinition import IndexDefinition, IndexType
from redis.commands.search.query import Query
import numpy as np
# 创建向量索引
def create_vector_index():
"""创建向量搜索索引"""
schema = (
TextField("title"), # 文本字段
NumericField("price"), # 数值字段
VectorField("vector", # 向量字段
"FLAT", # 索引类型:FLAT 或 HNSW
{
"TYPE": "FLOAT32", # 向量类型
"DIM": 128, # 向量维度
"DISTANCE_METRIC": "COSINE" # 距离度量:COSINE, L2, IP
})
)
# 定义索引
definition = IndexDefinition(prefix=["item:"], index_type=IndexType.HASH)
# 创建索引
r.ft("item_index").create_index(
schema,
definition=definition
)
print("向量索引创建成功")
# 添加带向量的文档
def add_item_with_vector(item_id, title, price, vector):
"""添加商品及其向量"""
key = f"item:{item_id}"
r.hset(key, mapping={
"title": title,
"price": price,
"vector": vector.tobytes() # 向量转为字节
})
# 向量相似度搜索
def search_similar_items(query_vector, top_k=10, price_range=None):
"""搜索相似商品"""
# 构建查询
base_query = f"=>[KNN {top_k} @vector $query_vector AS vector_score]"
# 如果有价格过滤条件
if price_range:
min_price, max_price = price_range
base_query = f"@price:[{min_price} {max_price}] {base_query}"
# 创建查询对象,使用 DIALECT 2 支持高级特性
query = (
Query(base_query)
.sort_by("vector_score") # 按相似度排序
.return_fields("title", "price", "vector_score")
.dialect(2) # 使用 DIALECT 2
)
# 执行搜索
results = r.ft("item_index").search(
query,
query_params={"query_vector": query_vector.tobytes()}
)
return results
# 使用示例
# 假设我们有一个 128 维的查询向量
query_vec = np.random.rand(128).astype(np.float32)
# 搜索相似商品,价格在 100-500 之间
results = search_similar_items(query_vec, top_k=5, price_range=(100, 500))
for doc in results.docs:
print(f"商品: {doc.title}, 价格: {doc.price}, 相似度: {doc.vector_score}")
Redis 8.2 还引入了 SVS-VAMANA 向量索引类型,支持向量压缩,可以进一步节省内存。但 8.0 版本的向量搜索已经够用了,性能提升很明显。
关键优化点:
- 选择合适的距离度量:COSINE 适合文本相似度,L2 适合图像相似度
- 合理设置向量维度:维度太高内存占用大,太低精度不够
- 使用 DIALECT 2:支持更高级的查询特性,性能更好
向量搜索还有个高级用法,就是混合搜索。可以同时用文本搜索和向量搜索,然后合并结果:
def hybrid_search(keyword, query_vector, top_k=10):
"""混合搜索:文本 + 向量"""
# 文本搜索
text_query = Query(f"@title:{keyword} | @content:{keyword}")
text_results = r.ft("item_index").search(text_query, limit=top_k)
# 向量搜索
vector_query = (
Query(f"=>[KNN {top_k} @vector $query_vector AS vector_score]")
.sort_by("vector_score")
.dialect(2)
)
vector_results = r.ft("item_index").search(
vector_query,
query_params={"query_vector": query_vector.tobytes()}
)
# 合并结果,去重并排序
# 这里简化处理,实际可以用更复杂的融合算法
combined_results = {}
# 添加文本搜索结果,权重 0.6
for doc in text_results.docs:
item_id = doc.id.split(":")[-1]
score = 0.6 * (1.0 / (text_results.docs.index(doc) + 1))
if item_id not in combined_results:
combined_results[item_id] = {"score": 0, "doc": doc}
combined_results[item_id]["score"] += score
# 添加向量搜索结果,权重 0.4
for doc in vector_results.docs:
item_id = doc.id.split(":")[-1]
vector_score = float(doc.vector_score) if hasattr(doc, 'vector_score') else 0
score = 0.4 * (1.0 - vector_score) # 距离越小,分数越高
if item_id not in combined_results:
combined_results[item_id] = {"score": 0, "doc": doc}
combined_results[item_id]["score"] += score
# 按综合分数排序
sorted_results = sorted(
combined_results.items(),
key=lambda x: x[1]["score"],
reverse=True
)
return sorted_results[:top_k]
# 使用示例
keyword = "手机"
query_vec = np.random.rand(128).astype(np.float32)
results = hybrid_search(keyword, query_vec, top_k=5)
for item_id, data in results:
print(f"商品 ID: {item_id}, 综合分数: {data['score']:.3f}")
向量索引的选择也很重要。FLAT 索引简单直接,适合小规模数据(几万到几十万条)。HNSW 索引适合大规模数据(百万级以上),查询速度快,但内存占用大。Redis 8.2 的 SVS-VAMANA 索引支持压缩,内存占用更小,但查询速度可能稍慢。
我的建议是,数据量小于 50 万用 FLAT,大于 50 万用 HNSW,如果内存紧张可以考虑 SVS-VAMANA。具体选哪个,还得看你的业务场景和性能要求。
Query Engine 优化,让查询飞起来
Redis 8.0 的 Query Engine 改进很大,命令延迟降低了 87%,查询处理能力提升了 16 倍。但要用好这个功能,得掌握几个技巧。
Query Engine 支持全文搜索、向量搜索、地理空间查询和聚合,功能很强大。我用它替代了 Elasticsearch 的部分功能,性能提升明显,还省了不少服务器成本。
from redis.commands.search.field import TextField, TagField, NumericField, GeoField
from redis.commands.search.indexDefinition import IndexDefinition, IndexType
# 创建全文搜索索引
def create_search_index():
"""创建全文搜索索引"""
schema = (
TextField("title", weight=5.0), # 标题字段,权重高
TextField("content"), # 内容字段
TagField("category"), # 分类标签
NumericField("views"), # 浏览量
NumericField("created_at"), # 创建时间
GeoField("location") # 地理位置
)
definition = IndexDefinition(
prefix=["article:"],
index_type=IndexType.HASH
)
r.ft("article_index").create_index(schema, definition=definition)
print("全文搜索索引创建成功")
# 添加文章
def add_article(article_id, title, content, category, views, created_at, location=None):
"""添加文章到索引"""
key = f"article:{article_id}"
mapping = {
"title": title,
"content": content,
"category": category,
"views": views,
"created_at": created_at
}
if location:
mapping["location"] = location # 格式:经度,纬度
r.hset(key, mapping=mapping)
# 全文搜索
def search_articles(keyword, category=None, min_views=0, limit=10):
"""搜索文章"""
# 构建查询条件
query_parts = []
# 关键词搜索
if keyword:
query_parts.append(f"@title:{keyword} | @content:{keyword}")
# 分类过滤
if category:
query_parts.append(f"@category:{{{category}}}")
# 浏览量过滤
if min_views > 0:
query_parts.append(f"@views:[{min_views} +inf]")
# 组合查询
query_str = " ".join(query_parts) if query_parts else "*"
# 创建查询,按浏览量排序
query = (
Query(query_str)
.sort_by("views", asc=False) # 按浏览量降序
.return_fields("title", "content", "category", "views")
.limit(0, limit) # 分页
)
results = r.ft("article_index").search(query)
return results
# 聚合查询:统计各分类的文章数和平均浏览量
def aggregate_by_category():
"""按分类聚合统计"""
from redis.commands.search.aggregation import AggregationRequest
req = AggregationRequest("*").group_by(
"@category",
reducers.count().alias("count"),
reducers.avg("@views").alias("avg_views")
).sort_by("@count", asc=False)
results = r.ft("article_index").aggregate(req)
return results
# 地理空间查询:查找附近的文章
def search_nearby_articles(longitude, latitude, radius_km=10, limit=10):
"""搜索附近的文章"""
query = (
Query(f"@location:[{longitude} {latitude} {radius_km} km]")
.return_fields("title", "location")
.limit(0, limit)
)
results = r.ft("article_index").search(query)
return results
Query Engine 的性能优化技巧:
- 合理使用索引字段:不是所有字段都要建索引,只索引需要搜索和过滤的字段
- 使用权重:给重要字段设置更高的权重,提高搜索相关性
- 避免复杂查询:查询条件太复杂会影响性能,尽量简化
- 使用聚合代替多次查询:需要统计数据时,用聚合查询比多次查询效率高
# 不好的方式:多次查询统计
def get_stats_bad():
categories = ["tech", "business", "life"]
stats = {}
for cat in categories:
results = search_articles("", category=cat)
stats[cat] = len(results.docs)
return stats
# 好的方式:使用聚合查询
def get_stats_good():
return aggregate_by_category()
Query Engine 还有个强大的功能,就是 Facet 搜索。可以同时搜索和统计,比如搜索"手机",然后统计各个品牌的数量:
def search_with_facets(keyword, facet_fields=["brand", "category"]):
"""带 Facet 的搜索"""
from redis.commands.search.aggregation import AggregationRequest, reducers
# 构建查询
query_str = f"@title:{keyword} | @content:{keyword}" if keyword else "*"
# 创建聚合请求
req = AggregationRequest(query_str)
# 添加 Facet
for field in facet_fields:
req.group_by(f"@{field}", reducers.count().alias(f"{field}_count"))
# 执行聚合
results = r.ft("item_index").aggregate(req)
return results
# 使用示例
facet_results = search_with_facets("手机", ["brand", "category"])
for row in facet_results.rows:
print(f"Facet: {row}")
Query Engine 的性能优化还有个技巧,就是合理使用索引。不是所有字段都要建索引,只索引需要搜索、过滤、排序的字段。索引字段太多会影响写入性能,索引字段太少会影响查询性能,得找个平衡点。
另外,Query Engine 支持自定义排序函数,可以根据业务需求自定义排序规则。比如商品搜索,可以按销量、价格、评分等多个维度综合排序:
def search_with_custom_sort(keyword, sort_by="popularity"):
"""自定义排序的搜索"""
query_str = f"@title:{keyword} | @content:{keyword}" if keyword else "*"
if sort_by == "popularity":
# 按销量和评分综合排序
query = (
Query(query_str)
.sort_by("@sales", asc=False) # 销量降序
.sort_by("@rating", asc=False) # 评分降序
)
elif sort_by == "price":
# 按价格排序
query = Query(query_str).sort_by("@price", asc=True)
else:
query = Query(query_str)
results = r.ft("item_index").search(query)
return results
Redis 8.0 的 Query Engine 性能提升确实很明显,命令延迟降低了 87%,查询处理能力提升了 16 倍。但前提是你得用好这些功能,别瞎用。我见过有人把所有字段都建索引,结果写入性能下降了一半,得不偿失。
总结一下
Redis 8.0 的性能提升确实很明显,但前提是你得掌握这些优化技巧。我总结的这 5 个技巧,都是生产环境踩坑踩出来的:
1、连接池优化:合理配置连接数,避免连接成为瓶颈
2、Pipeline 批量操作:减少网络往返,提升吞吐量
3、内存优化:合理使用数据结构,设置过期时间,监控内存状态
4、向量搜索优化:利用 Redis 8.0 的新特性,提升 AI 场景性能
5、Query Engine 优化:用好全文搜索和聚合查询,替代部分 Elasticsearch 功能
这些技巧配合使用,性能提升 50% 不是梦。当然,具体能提升多少,还得看你的业务场景。我建议,先在测试环境验证,然后再上生产。
最后说一句,Redis 8.0 虽然性能好,但也不是万能的。如果你的数据量特别大,或者查询特别复杂,还是得考虑 Redis Cluster 或者配合其他数据库使用。别一根筋,该用啥用啥。