前言
本文主要内容:对 Redis Stream 进行学习和实践。
背景
最近在对接另一个系统 B时,需要 B系统推送消息给本系统 A,B 系统异步HTTP 到 A 系统,并且推送了就不管了,所以考虑加一层 MQ 增强系统的可靠性,可以获得以下收益:
- 将消息接受逻辑与复杂的业务逻辑分离,分开部署,消息接受逻辑与 B 系统对接,接口响应速度快,接收到消息马上就返回给 B 系统应答
- 复杂业务逻辑进行维护时,即使业务服务不在线,消息可以也正常接收
- 消息被持久化到队列中,可以协调 A、B 系统的速度。即使接收消息后处理异常,也能重新消费,减少数据丢失的可能性
考虑到 RabbitMQ 之类的消息队列太重了,使用 Redis 实现一下更轻量。正好之前也没用过,学习一下也记录一下。
Redis命令实践
XADD
使用 XADD 向队列添加消息,如果指定的队列不存在,则创建一个队列
XADD key ID field value [field value ...]
xadd key [NOMKSTREAM] [MAXLEN|MINID [=|~] threshold [LIMIT count]] *|id field value [field value ...]
key :队列名称,如果不存在就创建
ID :消息 id,我们使用 * 表示由 redis 生成,可以自定义,但是要自己保证递增性。
field value : 记录。
127.0.0.1:6379> xadd mqtest * field1 value1
"1739605679710-0"
XLEN
队列消息数
XLEN key
key :队列名称
XRANGE/XREVRANGE
使用 XRANGE 获取消息列表,会自动过滤已经删除的消息
XREVRANGE是XRANGE的倒序
XRANGE key start end [COUNT count]
XREVRANGE key end start [COUNT count]
key :队列名
start :开始值, - 表示最小值
end :结束值, + 表示最大值
count :数量
127.0.0.1:6379> xrange mqtest - +
1) 1) "1739605679710-0"
2) 1) "field1"
2) "value1"
2) 1) "1739605968521-0"
2) 1) "field1"
2) "value1"
3) 1) "1739606238341-0"
2) 1) "field1"
2) "value1"
4) 1) "1739606243441-0"
2) 1) "field2"
2) "value2"
XDEL
删除消息
XDEL key ID [ID ...]
key:队列名称
ID :消息 ID
XTRIM
XTRIM key MAXLEN|MINID [=|~] threshold [LIMIT count]
key
表示目标 Stream 的键名。
MAXLEN
指定修剪策略为基于消息数量的最大长度。
Redis 会根据 threshold 的值保留最多指定数量的消息,超出的部分会被删除。
= | ~
=:精确模式。表示严格保留不超过 threshold 条消息。
~:近似模式。允许稍微超过 threshold 的消息数量,但性能更优(Redis 在内部优化了近似模式下的操作)。
threshold
指定保留的消息数量阈值。
如果使用 =, Redis 会确保保留的消息数量不超过这个值。
如果使用 ~, Redis 可能会保留略多于这个值的消息,但通常不会显著超出。
LIMIT count (可选)
控制每次删除操作的最大删除数量。
默认情况下,Redis 会尝试一次性删除所有超出阈值的消息,但如果 Stream 非常大,这可能会导致性能问题。
使用 LIMIT 参数可以分批删除消息,从而减少单次操作对性能的影响。
127.0.0.1:6379> xrange mqtest - +
1) 1) "1739605679710-0"
2) 1) "field1"
2) "value1"
2) 1) "1739606238341-0"
2) 1) "field1"
2) "value1"
3) 1) "1739606243441-0"
2) 1) "field2"
2) "value2"
127.0.0.1:6379> xtrim mqtest MAXLEN 2
(integer) 1
127.0.0.1:6379> xrange mqtest - +
1) 1) "1739606238341-0"
2) 1) "field1"
2) "value1"
2) 1) "1739606243441-0"
2) 1) "field2"
2) "value2"
XREAD
以阻塞或非阻塞方式获取消息列表
xread [COUNT count] [BLOCK milliseconds] STREAMS key [key ...] id [id ...]
COUNT count (可选)
指定从每个 Stream 中最多读取的消息数量。
如果不指定,默认会读取所有可用消息。
BLOCK milliseconds (可选)
阻塞模式:如果当前没有新消息可用,Redis 会在指定的毫秒数内等待新消息的到来。
如果设置为 0,表示无限期阻塞,直到有新消息到达。
如果不使用 BLOCK 参数,则是非阻塞模式,立即返回结果。
STREAMS key [key ...]
指定要读取的一个或多个 Stream 的键名。
支持同时从多个 Stream 中读取消息。
ID [ID ...]
指定从哪个消息 ID 开始读取。
每个 Stream 对应一个 ID,必须与 STREAMS 参数中的键一一对应。
特殊值:
"0-0":从 Stream 的最开始读取(包括历史消息)。
"$": 仅读取新的消息(即从当前时间点之后的新消息开始)。
127.0.0.1:6379> xread COUNT 1 STREAMS mqtest $
(nil)
127.0.0.1:6379> xread COUNT 1 BLOCK 3000 STREAMS mqtest $
(nil)
(3.04s)
127.0.0.1:6379> xread COUNT 1 STREAMS mqtest 0-0
1) 1) "mqtest"
2) 1) 1) "1739606238341-0"
2) 1) "field1"
2) "value1"
127.0.0.1:6379> xread COUNT 1 BLOCK 3000 STREAMS mqtest 0-0
1) 1) "mqtest"
2) 1) 1) "1739606238341-0"
2) 1) "field1"
2) "value1"
消费者组模式
- XGROUP CREATE - 创建消费者组
- XREADGROUP GROUP - 读取消费者组中的消息
- XACK - 将消息标记为"已处理"
- XGROUP SETID - 为消费者组设置新的最后递送消息ID
- XGROUP DELCONSUMER - 删除消费者
- XGROUP DESTROY - 删除消费者组
- XPENDING - 显示待处理消息的相关信息
- XCLAIM - 转移消息的归属权
- XINFO - 查看流和消费者组的相关信息;
- XINFO GROUPS - 打印消费者组的信息;
- XINFO STREAM - 打印流信息
XGROUP CREATE
xgroup create key group id|$ [MKSTREAM] [ENTRIESREAD entries-read]
key :队列名称,如果不存在就创建
group :组名。
$ : 表示从尾部开始消费,只接受新消息,当前 Stream 消息会全部忽略。
0-0: 表示从头部开始消费
127.0.0.1:6379> xgroup create mqtest grouptest 0-0
OK
XREADGROUP
xreadgroup GROUP group consumer [COUNT count] [BLOCK milliseconds] [NOACK] STREAMS key [key ...] id [id ...]
GROUP group consumer
指定消费者组和当前消费者的名字。
group:消费者组的名称。
consumer:当前消费者的名称。
COUNT count (可选)
限制从每个 Stream 中最多读取的消息数量。
如果不指定,默认会读取所有可用消息。
BLOCK milliseconds (可选)
阻塞模式:如果当前没有新消息可用,Redis 会在指定的毫秒数内等待新消息的到来。
如果设置为 0,表示无限期阻塞,直到有新消息到达。
如果不使用 BLOCK 参数,则是非阻塞模式,立即返回结果。
NOACK (可选)
如果设置了 NOACK,Redis 不会将消息标记为待确认状态(pending),而是直接返回消息。
这种方式适用于不需要确认机制的场景。
STREAMS key [key ...]
指定要读取的一个或多个 Stream 的键名。
ID [ID ...]
指定从哪个消息 ID 开始读取。
每个 Stream 对应一个 ID,必须与 STREAMS 参数中的键一一对应。
特殊值:
"0-0":从 Stream 的最开始读取(包括历史消息)。
"$": 仅读取新的消息(即从当前时间点之后的新消息开始)。
">": 仅读取从未被任何消费者处理过的消息(即未被分配到任何消费者的 pending entries)。
作为消息队列使用的基本流程
- 向队列生产消息
- 创建消费者组
- 消费者读取消息
- 使用 XTRIM 清理已经处理完的消息
注意
-
redis stream 作为消息队列,有可能会丢失数据;如果生产者一直生产消息,消费者一直不消费,那么会导致内存占用越来越大,甚至 OOM;
-
消费者 ack 并不会删除消息,只有手动处理才能删除消息。
总结
所以这一通下来我倒是觉得不太好用了。问了一下 AI,也确认了我的想法。
AI 的回答RabbitMQ 与 Redis 内存占用对比
特性 | RabbitMQ | Redis |
---|---|---|
默认存储位置 | 内存 + 磁盘(持久化消息) | 全部存储在内存中 |
内存管理机制 | Erlang 垃圾回收机制 | 单线程内存管理,支持内存淘汰策略 |
消息清理机制 | 消费者确认后自动清理 | 需要手动配置截断策略(如 MAXLEN 或 XTRIM ) |
持久化支持 | 支持消息持久化 | 支持 RDB 和 AOF 持久化 |
内存占用影响因素 | 队列数量、消息数量、消费者连接数 | Stream 长度、消息大小、消费者组状态 |
内存限制配置 | vm_memory_high_watermark |
maxmemory 和 maxmemory-policy |
适用场景 | 复杂路由、持久化需求、高可靠性 | 高性能、简单队列、实时处理 |
AI 的总结与建议
(1)RabbitMQ 更适合以下场景
- 需要复杂的消息路由(如 Topic、Fanout 等)。
- 对消息持久化和高可靠性要求较高。
- 不介意较高的内存占用,能够接受 RabbitMQ 的内存管理机制。
(2)Redis 更适合以下场景
- 需要高性能、低延迟的消息处理。
- 可以接受手动管理消息生命周期(如通过
MAXLEN
或XTRIM
清理旧消息)。 - 对内存占用敏感,需要灵活控制内存使用。
(3)如何选择
- 如果你的业务对消息持久化和可靠性要求较高,可以选择 RabbitMQ。
- 如果你的业务更关注性能和灵活性,并且可以接受手动管理消息生命周期,则 Redis 是更好的选择。