Redis Stream 轻量级消息队列实践

学习、使用Redis Stream,在某些场景下代替其他 MQ 系统

前言

本文主要内容:对 Redis Stream 进行学习和实践。

背景

最近在对接另一个系统 B时,需要 B系统推送消息给本系统 A,B 系统异步HTTP 到 A 系统,并且推送了就不管了,所以考虑加一层 MQ 增强系统的可靠性,可以获得以下收益:

  • 将消息接受逻辑与复杂的业务逻辑分离,分开部署,消息接受逻辑与 B 系统对接,接口响应速度快,接收到消息马上就返回给 B 系统应答
  • 复杂业务逻辑进行维护时,即使业务服务不在线,消息可以也正常接收
  • 消息被持久化到队列中,可以协调 A、B 系统的速度。即使接收消息后处理异常,也能重新消费,减少数据丢失的可能性

考虑到 RabbitMQ 之类的消息队列太重了,使用 Redis 实现一下更轻量。正好之前也没用过,学习一下也记录一下。

Redis命令实践

XADD

使用 XADD 向队列添加消息,如果指定的队列不存在,则创建一个队列

XADD key ID field value [field value ...]
xadd key [NOMKSTREAM] [MAXLEN|MINID [=|~] threshold [LIMIT count]] *|id field value [field value ...]

key :队列名称,如果不存在就创建
ID :消息 id,我们使用 * 表示由 redis 生成,可以自定义,但是要自己保证递增性。
field value : 记录。
127.0.0.1:6379> xadd mqtest * field1 value1
"1739605679710-0"

XLEN

队列消息数

XLEN key

key :队列名称

XRANGE/XREVRANGE

使用 XRANGE 获取消息列表,会自动过滤已经删除的消息

XREVRANGE是XRANGE的倒序

XRANGE key start end [COUNT count]
XREVRANGE key end start [COUNT count]

key :队列名
start :开始值, - 表示最小值
end :结束值, + 表示最大值
count :数量
127.0.0.1:6379> xrange mqtest - +
1) 1) "1739605679710-0"
   2) 1) "field1"
      2) "value1"
2) 1) "1739605968521-0"
   2) 1) "field1"
      2) "value1"
3) 1) "1739606238341-0"
   2) 1) "field1"
      2) "value1"
4) 1) "1739606243441-0"
   2) 1) "field2"
      2) "value2"

XDEL

删除消息

XDEL key ID [ID ...]

key:队列名称
ID :消息 ID

XTRIM

XTRIM key MAXLEN|MINID [=|~] threshold [LIMIT count]

key
表示目标 Stream 的键名。

MAXLEN
指定修剪策略为基于消息数量的最大长度。
Redis 会根据 threshold 的值保留最多指定数量的消息,超出的部分会被删除。
= | ~
=:精确模式。表示严格保留不超过 threshold 条消息。
~:近似模式。允许稍微超过 threshold 的消息数量,但性能更优(Redis 在内部优化了近似模式下的操作)。

threshold
指定保留的消息数量阈值。
如果使用 =, Redis 会确保保留的消息数量不超过这个值。
如果使用 ~, Redis 可能会保留略多于这个值的消息,但通常不会显著超出。

LIMIT count (可选)
控制每次删除操作的最大删除数量。
默认情况下,Redis 会尝试一次性删除所有超出阈值的消息,但如果 Stream 非常大,这可能会导致性能问题。
使用 LIMIT 参数可以分批删除消息,从而减少单次操作对性能的影响。
127.0.0.1:6379> xrange mqtest - +
1) 1) "1739605679710-0"
   2) 1) "field1"
      2) "value1"
2) 1) "1739606238341-0"
   2) 1) "field1"
      2) "value1"
3) 1) "1739606243441-0"
   2) 1) "field2"
      2) "value2"
127.0.0.1:6379> xtrim mqtest MAXLEN 2
(integer) 1
127.0.0.1:6379> xrange mqtest - +
1) 1) "1739606238341-0"
   2) 1) "field1"
      2) "value1"
2) 1) "1739606243441-0"
   2) 1) "field2"
      2) "value2"

XREAD

以阻塞或非阻塞方式获取消息列表

xread [COUNT count] [BLOCK milliseconds] STREAMS key [key ...] id [id ...]

COUNT count (可选)
指定从每个 Stream 中最多读取的消息数量。
如果不指定,默认会读取所有可用消息。

BLOCK milliseconds (可选)
阻塞模式:如果当前没有新消息可用,Redis 会在指定的毫秒数内等待新消息的到来。
如果设置为 0,表示无限期阻塞,直到有新消息到达。
如果不使用 BLOCK 参数,则是非阻塞模式,立即返回结果。

STREAMS key [key ...]
指定要读取的一个或多个 Stream 的键名。
支持同时从多个 Stream 中读取消息。

ID [ID ...]
指定从哪个消息 ID 开始读取。
每个 Stream 对应一个 ID,必须与 STREAMS 参数中的键一一对应。
特殊值:
"0-0":从 Stream 的最开始读取(包括历史消息)。
"$": 仅读取新的消息(即从当前时间点之后的新消息开始)。
127.0.0.1:6379> xread COUNT 1 STREAMS mqtest $
(nil)
127.0.0.1:6379> xread COUNT 1 BLOCK 3000 STREAMS mqtest $
(nil)
(3.04s)
127.0.0.1:6379> xread COUNT 1 STREAMS mqtest 0-0
1) 1) "mqtest"
   2) 1) 1) "1739606238341-0"
         2) 1) "field1"
            2) "value1"
127.0.0.1:6379> xread COUNT 1 BLOCK 3000 STREAMS mqtest 0-0
1) 1) "mqtest"
   2) 1) 1) "1739606238341-0"
         2) 1) "field1"
            2) "value1"

消费者组模式

  • XGROUP CREATE - 创建消费者组
  • XREADGROUP GROUP - 读取消费者组中的消息
  • XACK - 将消息标记为"已处理"
  • XGROUP SETID - 为消费者组设置新的最后递送消息ID
  • XGROUP DELCONSUMER - 删除消费者
  • XGROUP DESTROY - 删除消费者组
  • XPENDING - 显示待处理消息的相关信息
  • XCLAIM - 转移消息的归属权
  • XINFO - 查看流和消费者组的相关信息;
  • XINFO GROUPS - 打印消费者组的信息;
  • XINFO STREAM - 打印流信息

XGROUP CREATE

xgroup create key group id|$ [MKSTREAM] [ENTRIESREAD entries-read]

key :队列名称,如果不存在就创建
group :组名。
$ : 表示从尾部开始消费,只接受新消息,当前 Stream 消息会全部忽略。
0-0: 表示从头部开始消费
127.0.0.1:6379> xgroup create mqtest grouptest 0-0
OK

XREADGROUP

xreadgroup GROUP group consumer [COUNT count] [BLOCK milliseconds] [NOACK] STREAMS key [key ...] id [id ...]


GROUP group consumer
指定消费者组和当前消费者的名字。
group:消费者组的名称。
consumer:当前消费者的名称。

COUNT count (可选)
限制从每个 Stream 中最多读取的消息数量。
如果不指定,默认会读取所有可用消息。

BLOCK milliseconds (可选)
阻塞模式:如果当前没有新消息可用,Redis 会在指定的毫秒数内等待新消息的到来。
如果设置为 0,表示无限期阻塞,直到有新消息到达。
如果不使用 BLOCK 参数,则是非阻塞模式,立即返回结果。

NOACK (可选)
如果设置了 NOACK,Redis 不会将消息标记为待确认状态(pending),而是直接返回消息。
这种方式适用于不需要确认机制的场景。

STREAMS key [key ...]
指定要读取的一个或多个 Stream 的键名。

ID [ID ...]
指定从哪个消息 ID 开始读取。
每个 Stream 对应一个 ID,必须与 STREAMS 参数中的键一一对应。
特殊值:
"0-0":从 Stream 的最开始读取(包括历史消息)。
"$": 仅读取新的消息(即从当前时间点之后的新消息开始)。
">": 仅读取从未被任何消费者处理过的消息(即未被分配到任何消费者的 pending entries)。

作为消息队列使用的基本流程

  • 向队列生产消息
  • 创建消费者组
  • 消费者读取消息
  • 使用 XTRIM 清理已经处理完的消息

注意

  • redis stream 作为消息队列,有可能会丢失数据;如果生产者一直生产消息,消费者一直不消费,那么会导致内存占用越来越大,甚至 OOM;

  • 消费者 ack 并不会删除消息,只有手动处理才能删除消息。

总结

所以这一通下来我倒是觉得不太好用了。问了一下 AI,也确认了我的想法。

AI 的回答RabbitMQ 与 Redis 内存占用对比

特性 RabbitMQ Redis
默认存储位置 内存 + 磁盘(持久化消息) 全部存储在内存中
内存管理机制 Erlang 垃圾回收机制 单线程内存管理,支持内存淘汰策略
消息清理机制 消费者确认后自动清理 需要手动配置截断策略(如 MAXLENXTRIM
持久化支持 支持消息持久化 支持 RDB 和 AOF 持久化
内存占用影响因素 队列数量、消息数量、消费者连接数 Stream 长度、消息大小、消费者组状态
内存限制配置 vm_memory_high_watermark maxmemorymaxmemory-policy
适用场景 复杂路由、持久化需求、高可靠性 高性能、简单队列、实时处理

AI 的总结与建议

(1)RabbitMQ 更适合以下场景

  • 需要复杂的消息路由(如 Topic、Fanout 等)。
  • 对消息持久化和高可靠性要求较高。
  • 不介意较高的内存占用,能够接受 RabbitMQ 的内存管理机制。

(2)Redis 更适合以下场景

  • 需要高性能、低延迟的消息处理。
  • 可以接受手动管理消息生命周期(如通过 MAXLENXTRIM 清理旧消息)。
  • 对内存占用敏感,需要灵活控制内存使用。

(3)如何选择

  • 如果你的业务对消息持久化和可靠性要求较高,可以选择 RabbitMQ。
  • 如果你的业务更关注性能和灵活性,并且可以接受手动管理消息生命周期,则 Redis 是更好的选择。