把RocketMQ的文件过期删除机制弄明白了,至少可以拯救一半的头发

大家好! 今天聊一下RocketMQ的文件过期删除机制 本章概括 源码定位 Broker是Roc…

大家好!

今天聊一下RocketMQ的文件过期删除机制

本章概括

源码定位

Broker是RocketMQ的核心,提供了消息的接收,存储,拉取等功能

我们可以先从Broker服务入手。从源码可以得知。RocketMQ启用了一个 BrokerController 的 start 函数

下列是start 函数启动的异步线程,他启动了一个 messageStore

从 messageStore.start() 函数进入后会有一个消息存储的第三方接口。

继续围绕 start 函数展开实现类查找,可以看到最终由 DefaultMessageStore 实现类实现

定位到具体问题之后,可以看到 start 调用了一个 addScheduleTask 函数

这个函数主要处理的就是清除过期日志服务。

这篇文件聊的就是这个 addScheduleTask 函数。言归正传,步入正题!

流程图

过期删除机制

文件过期删除

首次执行时间是60000毫秒=60秒。其余间隔执行都是每10秒执行一次删除。

对于删除过期的时机包括以下3种:

  1. 默认凌晨4点。这个也比较好理解,这个时候用的人也比较少,删除对系统的影响就降到最小。
  2. 磁盘空间不足。当磁盘空间不足的时候,就要删除过期文件以提供更多的空间出来接收消息。
  3. 人工触发,指人为的介入去删除。

删除的文件是过期文件,那哪些文件是过期的呢?

首先是保留时间,默认72小时,也就是3天,超过3天的数据,是需要删除的。

deleteExpiredFiles 是用于删除过期文件。执行步骤如下:

  1. 首先是需要判断是否需要删除文件,通过两个方法的调用isTimeToDelete和isSpaceToDelete判断是否达到定时删除时间以及是否磁盘已满需要删除,以及判断属性DefaultMessageStore.CleanCommitLogService#manualDeleteFileSeveralTimes是否大于0意味着需要手动删除。如果这三个条件任意为真,意味着需要执行删除,那就继续后续的流程。否则结束当前方法。
  2. 如果是手动删除,则属性DefaultMessageStore.CleanCommitLogService#manualDeleteFileSeveralTimes减1.
  3. 如果属性MessageStoreConfig#cleanFileForciblyEnable和DefaultMessageStore.CleanCommitLogService#cleanImmediately为真,声明cleanAtOnece为true,否则为false。
  4. 调用方法 CommitLog#deleteExpiredFile 进行文件删除。方法需要4个入参,分别是: expiredTime:过期时间或者说文件删除前的保留时间,默认为72小时。 deleteFilesInterval:文件删除间隔,这里取值为100. intervalForcibly:该参数用于强制文件强制释放时间间隔,单位是毫秒。这里取值为120*1000, cleanImmediately:是否立即执行删除,这边使用的就是步骤3中的数据。

如果这个文件被其他线程引用了,此时就不会进行删除,记录第一次删除的时间戳,退出本次任务,等120s后,就会把文件的引用减1000,再强制删除。

在删除的过程中,会存在删除多个文件的情况,每个文件之间,还有一个时间间隔,比如第一个文件删除完后,需要等100ms再删除第二个文件。

120s可以通过 destroyMapedFileIntervalForcibly 得知

100ms可以通过 deletePhysicFilesInterval 得知

如果当前删除的文件数量,已经超过了可以删除的最大批量数,则退出本次任务。可以通过上述代码中的 spacefull 得出

由 timeup 变量我们可以引申出 isTimeToDelete函数

RocketMQ会配置执行删除工作的时间,默认是早上四点。如果当前时间在04:00~04:59之间,就返回true。

由 spacefull 变量我们可以引申出 isSpaceToDelete函数

判断磁盘空间是否满足删除的条件,判断要求如下:

  1. 使用提交日志的路径,检查其所在的磁盘空间的使用率。默认情况下,使用率超过90%,设置磁盘不可用标志位,并且设置属性DefaultMessageStore.CleanCommitLogService#cleanImmediately为true。使用率超过85%,设置属性DefaultMessageStore.CleanCommitLogService#cleanImmediately为true。其他情况,设置运行状态位为磁盘可用。
  2. 磁盘使用率小于0或者大于属性MessageStoreConfig#diskMaxUsedSpaceRatio的要求,默认是75%,则返回true给调用。
  3. 针对消费队列的文件路径,上述步骤重复一次。
  4. 如果步骤1~3都没有返回true,则返回false给调用者。意味着此时磁盘空间有剩余,不要求删除。

消费队列过期删除

CLeanConsumeQueueService的run方法就是直接委托这个方法来实现。这个方法的作用就是删除无效的消费队列条目内容或者文件本身。其代码逻辑如下:

  1. 通过方法CommitLog#getMinOffset获取提交日志最小的偏移量,声明为minOffset。
  2. 如果minOffset大于类属性lastPhysicalMinOffset,那么意味着当前提交日志的最小偏移量对比上一次查询的值发生了变化,也就是说必然是最少一个提交日志文件被删除,那么相应的在消费队列中的过期数据也可以被删除,就执行后面的流程。反之,则意味着不需要执行任何操作,结束方法即可。
  3. 将minOffset赋值给lastPhysicalMinOffset。
  4. 对属性consumeQueueTable进行遍历,遍历其中每一个ConsumeQueue对象。使用本次的minOffset作为入参,调用方法ConsumeQueue#deleteExpiredFile删除过期的消费队列文件以及更新消费队列的最小偏移量。如果有删除到文件,则休眠MessageStoreConfig#deleteConsumeQueueFilesInterval配置的时间,继续对下一个消费队列执行删除。
  5. 当循环执行完毕,使用参数minOffset作为入参,调用方法IndexService#deleteExpiredFile(long)来删除索引文件中已经完全无效的索引文件。

索引文件删除

索引文件的删除是在消费队列删除完成后,调用方法 deleteExpiredFile 完成的。

该方法是用于删除索引文件中的无效文件。执行流程如下:

  1. 首先需要确认,索引文件中是否存在无效文件。获取第一个索引文件,获取其endPhyOffset属性,判断该属性的值是否小于入参的offset。如果是的话,至少意味着有一个文件是无效的,则执行后续流程。否则没有无效文件,则直接结束整个方法。
  2. 声明一个局部变量fileList,遍历索引文件IndexFile对象,如果其endPhyOffset小于入参的offset,说明该文件是无效的,添加到fileList中。
  3. 使用第二步的fileList作为入参,调用方法IndexService#deleteExpiredFile(List)。该方法内部调用了IndexFile#destory方法,内部也是委托了MappedFile#destory方法实现的文件销毁。并且删除成功的IndexFile还会从属性indexFileList列表中删除对应的对象。

文件恢复机制

从源码定位中,我们可以看到执行 ./mqbroker 命令后,会启动main函数的 createBrokerController函数。

在函数中调用了一个 initialize 初始化 ,我们在初始化函数中找到了 this.messageStore.load

这里的 load 和上面的代码一样,都是接口实现类。统一由 DefaultMessageStore 实现。

所以文件恢复函数 recover 从 Broker启动之后,就会随之启动。启动之后

  1. 检查当前文件是否损坏(异常关闭)或者存不存在 (检查依据已在下列代码的尾部贴出)
  2. 加载Commit Log 和 Consume Queue文件。加载成功之后进行 recover 文件恢复

recover 函数的实现逻辑

从 ConsumeQueue文件的集合中取出,从倒数第三个文件开始,逐条遍历消息,如果取出的物理点位大于0并且message的size大于0,说明数据有效。

恢复commitlog分正常退出和非正常退出。

正常退出的commitlog所有数据都是flush完成的,所以只要从倒数第三个文件开始恢复即可,遍历每一个message,并校验其CRC。

非正常退出则从最后一个文件开始恢复,一般出现问题的都是最后一个文件,然后获取文件中的第一个message,其存储时间是否小于checkpoint时间点中的最小的一个,如果是,表示其就是需要恢复的起始文件。然后检验每一个message的CRC,并将通过校验的数据dispatch到consumelog和index文件中。

往期推荐

2022年文章目录整理

RocketMQ性能提升

RocketMQ刷盘机制

结尾

本篇文件介绍的就是RocketMQ的过期删除机制,与恢复机制。

文件过期删除机制 触发主要有三点

  1. 默认凌晨4点。这个也比较好理解,这个时候用的人也比较少,删除对系统的影响就降到最小。
  2. 磁盘空间不足。当磁盘空间不足的时候,就要删除过期文件以提供更多的空间出来接收消息。
  3. 人工触发,指人为的介入去删除。

由上述三种情况展开聊了一些文件过大,被占用,文件损坏的一些安全性处理。

恢复机制 没有硬性条件,主要有以下2点

  1. 检查当前文件是否损坏(异常关闭)或者存不存在
  2. 加载Commit Log 和 Consume Queue文件。加载成功之后才执行

消息队列过期删除

取出commitlog中第一个文件的起始物理offset位置,与末次最小物理坐标offset做对比。如果发现上次的下标已经变小了,说明commitlog已经发生过删除操作了

索引过期删除

执行完消息队列的过期删除,根据坐标直接删掉对应的索引

原文链接:https://juejin.cn/post/7094512628575256607

本文来自网络,不代表软粉网立场,转载请注明出处:https://www.rfff.net/p/4442.html

作者: HUI

发表评论

您的电子邮箱地址不会被公开。

返回顶部