消息队列
消息队列的作用
通过异步处理提高系统性能,减少响应时间
削峰填谷,缓解瞬时系统压力、提高系统资源利用
降低系统耦合性
消息队列会带来的问题
系统复杂度提升(处理消息丢失、重复消费问题)
系统可用性降低(需要考虑消息丢失,MQ挂掉等问题)
一致性问题(异步处理问题)
RocketMQ
模式
队列、主题、消息
一个 Topic
分布在多个 Broker
上,一个 Broker
可以配置多个 Topic
,它们是多对多的关系。
一个主题有多个Message Queue
一个Message Queue只能对应一个消费者组中的消费者
角色
NameServer:邮局,Broker 管理 和 路由信息管理,每十秒检查一次,时间戳超过两分钟则认为broker失效(zookeeper功能太复杂了,nameserver无需选举、去中心化设计、更加轻量级)
Broker:暂存者,生产者生产消息到 Broker
,消费者从 Broker
拉取消息并消费。
Consumer:收信者
Producer:发信者
broker 做了集群和主从,slave定时从master同步数据,从节点只读不写入消息
NameServer去中心化,在
RocketMQ
中是通过 单个 Broker 和所有 NameServer 保持长连接 ,并且在每隔 30 秒Broker
会向所有Nameserver
发送心跳生产者从nameserver获取Broker信息,通过轮询的方式生产数据
消费者通过
NameServer
获取所有Broker
的路由信息后,向Broker发送pull来获取数据。可以用两种模式启动 广播(全部都消费者发)和集群(其中一个消费者)
顺序消费
生产者:
使用Hash 取模法保证消息发送到一个队列当中就可以。
如果要求全局顺序消息,那么只能设置一个读写队列
消费者:
防止并发消费,MessageListenerOrderly ,会为每个consumerqueue加锁
重复消费
实现幂等,使用redis或者数据库唯一键来保证不会重复。
消息堆积
判断是不是生产的太快,限流降级,增加消费者或者增加消费速度
判断是不是消费的太慢,可能是消费错误,排查下日志
分布式事务
- 发送half消息
- 返回确认消息
- 系统执行事务
- 发送commit或者Rollback消息
- 如果因为网络原因未收到,则会发送事务反查
- 检查事务状态
- 发送commit或者Rollback消息
同步刷盘和异步刷盘
不同点:同步刷盘需要等待刷盘完成然后才能发送成功状态,异步刷盘需要开启另一个线程去刷盘,写线程可以直接返回,提高吞吐量,可以积累一定量的消息统一触发写磁盘任务。异步刷盘会在broker意外宕机的时候才会丢失部分数据。
内存一般指的是页缓存
当用户在读写数据的时候,内核会申请一个内存页与文件中的数据块进行绑定
用户对文件的读写实际上是对页缓存的读写,如果读的数据已经存在页缓存当中,就不用在进行读取,否则就会申请一个新的内存页,进行读取数据然后复制给用户;写操作也相同。
同步复制和异步复制
Borker
主从模式
同步复制是等 Master 和 Slave 均写成功后才反馈给客户端写成功状态;
异步复制是只要 Master 写成功即可反馈给客户端写成功状态 。
异步复制并不会影响消息的可靠性,主节点宕机,从节点只是会缺少部分消息,在主节点下次重启的时候消息就会重新发送过来。
MQ存储机制
commitLog:消息主体以及元数据保存主体
ConsumeQueue:消费队列,主要是为了提高消费性能,保存了指定topic下的offset、size、tag的哈希值,存储路径$HOME/store/consumequeue/{topic}/{queueId}/{fileName}
IndexFile:索引文件,可以通过 key 或时间区间来查询消息
先将消息发到全部发送到CommitLog,并通过topic和queueId将commitLog中的offset、消息大小和tag的hashcode一起发送到consumerQueue中,消费者只需要获取consumerOffset获取对应位置的commitLogOffset即可获取到原始信息。
消息丢失
在以下情况会发生消息丢失
- 生产者使用事务消息发送,保证不会丢失
- 开启了异步刷盘机制,在broker宕机的时候,就会丢失部分消息(开启同步刷盘)
- mq发生收到消息后,磁盘发生损坏,导致消息无法读出来(开启多Master多从开启同步复制)