hello,我也来说一下针对其中两个问题我的想法。 春龙的第一个加锁同意他的看法。他说的这两个synchronized之间并没有看出需要串行的理由。
第三个消息丢失的问题,举的场景可能不成立,因为putMessageLock存在,应该不存在两个线程同时appendMessage的情况。另外对于FlushRealTimeService因为消息是直接写入pagecache,所以只要操作系统不崩溃,内存不断电,即使不强制刷盘,还是能保证消息不丢失。但是CommitRealTimeService由于是先写到程序的内存,程序的内存再写入FileChannel,在shutdown后我觉得有极小的理论可能(当appendMessage的线程更新writePostion迟于CommitRealTimeService刷盘时的读writePostion,导致CommitRealTimeService认为已经将消息全部刷盘)导致消息没有写入文件。刚才不小心发送到开发者邮件列表了 > -----原始邮件----- > 发件人: "Gosling Von" <[email protected]> > 发送时间: 2018-12-19 18:00:07 (星期三) > 收件人: [email protected] > 抄送: "[email protected]" <[email protected]> > 主题: Re: 关于RocketMQ消息存储的几点问题 > > Hi, > > 这几个问题挺有意思的,我不清楚为什么大家没有人做出回应。Apache > Way鼓励大家多在邮件里列表里讨论与交流,我们RocketMQ社区的同学看来还是有些腼腆,好吧,我来抛砖引玉一下 :-) > > > 首先明确一点,如果对代码有疑问,建议大家去实测下,然后给出自己的测试数据这样讨论会较好。 > > 1. > 关于锁的问题,可以看到volatile变量修饰的读和写请求,通过synchronized把它们组成原子操作,保证了同步操作原语。如果换成ReentrantLock会更好一些。 > > 2. sleep的时间这块可以斟酌,但基本设计思路是希望IO一直忙,不要被CPU > slot换出,所以你会看到它的时间被设置的很小。而有可能触发问题的shutdown此时就非常讲究了,这块的代码我们都需要再测试下。另外关于sleep(0)和yield的争议一致以来都有,推荐看下OpenJDK[1],在不同平台上会有不通的表现,这个问题值得大家再深入讨论下。 > > 3. 消息丢失与CommitLog数据写满的问题,建议你再测试下,我们可以继续交流。 > > 4. RocketMQ 5.0会重构、剥离数据存储这块,也欢迎大家实测,然后把自己的一些测试结论发出来,我们可以一起交流下。 > > > if (millis == 0) { > // When ConvertSleepToYield is on, this matches the classic VM > implementation of > // JVM_Sleep. Critical for similar threading behaviour (Win32) > // It appears that in certain GUI contexts, it may be beneficial to do a > short sleep > // for SOLARIS > if (ConvertSleepToYield) { > os::yield(); > } else { > ThreadState old_state = thread->osthread()->get_state(); > thread->osthread()->set_state(SLEEPING); > os::sleep(thread, MinSleepInterval, false); > thread->osthread()->set_state(old_state); > } > } else { > ThreadState old_state = thread->osthread()->get_state(); > thread->osthread()->set_state(SLEEPING); > if (os::sleep(thread, millis, true) == OS_INTRPT) { > // An asynchronous exception (e.g., ThreadDeathException) could have > been thrown on > // us while we were sleeping. We do not overwrite those. > if (!HAS_PENDING_EXCEPTION) { > HS_DTRACE_PROBE1(hotspot, thread__sleep__end,1); > // TODO-FIXME: THROW_MSG returns which means we will not call > set_state() > // to properly restore the thread state. That's likely wrong. > THROW_MSG(vmSymbols::java_lang_InterruptedException(), "sleep > interrupted"); > } > } > thread->osthread()->set_state(old_state); > } > HS_DTRACE_PROBE1(hotspot, thread__sleep__end,0); > JVM_END > > > > [1] > http://hg.openjdk.java.net/jdk7/jdk7/hotspot/file/tip/src/share/vm/prims/jvm.cpp > > <http://hg.openjdk.java.net/jdk7/jdk7/hotspot/file/tip/src/share/vm/prims/jvm.cpp> > > Best Regards, > Von Gosling > > > > 在 2018年12月4日,下午9:16,刘春龙 <[email protected]> 写道: > > > > 各位RocketMQ社区朋友好, > > > > 最近将RocketMQ的底层存储源码逐行看了一遍,发现了如下几个问题。希望各位能抽出时间逐个解答一下。如果是BUG的话,还请修复该问题。谢谢💕。 > > > > github的issue地址:https://github.com/apache/rocketmq/issues/574 > > <https://github.com/apache/rocketmq/issues/574> > > > > ———————————————————————————————————————————————————————— > > > > 下面我把内容贴出来: > > > > 1.关于锁synchronized的使用问题; > > > > rocketmq/store/src/main/java/org/apache/rocketmq/store/CommitLog.java > > <https://github.com/apache/rocketmq/blob/1bedba8cbcef32554d2dc80f2a340ea686d68ba9/store/src/main/java/org/apache/rocketmq/store/CommitLog.java#L1069> > > Line 1069 in 1bedba8 > > <https://github.com/apache/rocketmq/commit/1bedba8cbcef32554d2dc80f2a340ea686d68ba9> > > public synchronized void putRequest(final GroupCommitRequest request) { > > rocketmq/store/src/main/java/org/apache/rocketmq/store/CommitLog.java > > <https://github.com/apache/rocketmq/blob/1bedba8cbcef32554d2dc80f2a340ea686d68ba9/store/src/main/java/org/apache/rocketmq/store/CommitLog.java#L1136> > > Line 1136 in 1bedba8 > > <https://github.com/apache/rocketmq/commit/1bedba8cbcef32554d2dc80f2a340ea686d68ba9> > > synchronized (this) { > > > > 我认为上述两处的synchronized是完全没有必要的。减少没必要的锁的使用会增加性能😄。 > > > > > > 2. 睡眠时间不合理,可能会造成正在处理的消息响应失败; > > > > rocketmq/store/src/main/java/org/apache/rocketmq/store/CommitLog.java > > <https://github.com/apache/rocketmq/blob/1bedba8cbcef32554d2dc80f2a340ea686d68ba9/store/src/main/java/org/apache/rocketmq/store/CommitLog.java#L1131> > > Line 1131 in 1bedba8 > > <https://github.com/apache/rocketmq/commit/1bedba8cbcef32554d2dc80f2a340ea686d68ba9> > > Thread.sleep(10); > > GroupCommitService此处的睡眠时间是否合理? > > > > 因为是同步刷盘策略,设置时间过短,在shutdown时,可能会造成正在处理的消息(eg,正在执行appendMessages的消息)来不及刷盘,进而响应失败(FLUSH_DISK_TIMEOUT)。 > > > > 是否可以采用其它方式?或者适当延长下时间?10毫秒是不是太短了。 > > > > > > 3. 可能会造成消息丢失; > > > > rocketmq/store/src/main/java/org/apache/rocketmq/store/CommitLog.java > > <https://github.com/apache/rocketmq/blob/1bedba8cbcef32554d2dc80f2a340ea686d68ba9/store/src/main/java/org/apache/rocketmq/store/CommitLog.java#L943-L947> > > Lines 943 to 947 in 1bedba8 > > <https://github.com/apache/rocketmq/commit/1bedba8cbcef32554d2dc80f2a340ea686d68ba9> > > boolean result = false; > > for (int i = 0; i < RETRY_TIMES_OVER && !result; i++) { > > result = CommitLog.this.mappedFileQueue.commit(0); > > CommitLog.log.info(this.getServiceName() + " service shutdown, retry " > > + (i + 1) + " times " + (result ? "OK" : "Not OK")); > > } > > CommitRealTimeService此处没有第2点中提到的sleep机制。这里是采用尝试10次commit。 > > > > 举个简单的场景: > > > > A 在执行appendMessages; > > B 在执行appendMessages; > > 在B执行完appendMessages,还未来得及通知CommitRealTimeService,与此同时shutdown; > > 注意,A还未执行完appendMessages; > > 按照上述代码逻辑,会尝试10次commit,在第1次时就能将B消息commit,并结束循环; > > A 执行完appendMessages; > > 问题就在于A消息不会被commit了,A消息丢失。但给客户端返回的响应就是OK💢。 > > > > 相同的,FlushRealTimeService异步刷盘也有类似问题。 > > > > > > 4. sleep(0)的问题 > > > > rocketmq/store/src/main/java/org/apache/rocketmq/store/MappedFile.java > > <https://github.com/apache/rocketmq/blob/1bedba8cbcef32554d2dc80f2a340ea686d68ba9/store/src/main/java/org/apache/rocketmq/store/MappedFile.java#L507> > > Line 507 in 1bedba8 > > <https://github.com/apache/rocketmq/commit/1bedba8cbcef32554d2dc80f2a340ea686d68ba9> > > Thread.sleep(0); > > -XX:+ConvertSleepToYield 这个参数默认就是 true,如果 sleep 0 的话会转换成 yield > > > > 但是 ConvertSleepToYield 设为 false 时,会进行 sleep,且 sleep 的最小时间是 1 ms,也就是说 > > ConvertSleepToYield 为 false 时,哪怕是 sleep 0 实际上跟 sleep 1 > > 是一样的。这样的话,并不会让出线程的时间片。 > > > > 我个人认为 RocketMQ 应改为 Thread.yield 处理。或许sleep(0)有其它的意图? > > > > 5.commitLog写满时,消息的处理问题 > > > > rocketmq/store/src/main/java/org/apache/rocketmq/store/CommitLog.java > > <https://github.com/apache/rocketmq/blob/1bedba8cbcef32554d2dc80f2a340ea686d68ba9/store/src/main/java/org/apache/rocketmq/store/CommitLog.java#L1091-L1096> > > Lines 1091 to 1096 in 1bedba8 > > <https://github.com/apache/rocketmq/commit/1bedba8cbcef32554d2dc80f2a340ea686d68ba9> > > for (int i = 0; i < 2 && !flushOK; i++) { > > flushOK = CommitLog.this.mappedFileQueue.getFlushedWhere() >= > > req.getNextOffset(); > > > > if (!flushOK) { > > CommitLog.this.mappedFileQueue.flush(0); > > } > > 同步刷盘策略下,正常消息刷盘时,依赖第二次循环将flushOK置为true。但是如果当消息A写入时发现commitLog已经写满,则消息A会被写入下一个commitLog。当前待刷盘的commitLog会有两个。 > > > > A消息通知刷盘服务执行刷盘,第一次循环刷盘(刷入第一个commitLog的末尾数据)时flushOK = > > CommitLog.this.mappedFileQueue.getFlushedWhere() >= > > req.getNextOffset();肯定为false,第二次刷盘(刷入消息A)时flushOK = > > CommitLog.this.mappedFileQueue.getFlushedWhere() >= > > req.getNextOffset();也肯定是false,需要第三次循环来将flushOK置为true。 > > > > 所以上述场景下,flushOK为false。但实际消息A已经刷盘成功了。 > -----原始邮件----- 发件人:"Gosling Von" <[email protected]> 发送时间:2018-12-19 18:00:07 (星期三) 收件人: [email protected] 抄送: "[email protected]" <[email protected]> 主题: Re: 关于RocketMQ消息存储的几点问题 Hi, 这几个问题挺有意思的,我不清楚为什么大家没有人做出回应。Apache Way鼓励大家多在邮件里列表里讨论与交流,我们RocketMQ社区的同学看来还是有些腼腆,好吧,我来抛砖引玉一下 :-) 首先明确一点,如果对代码有疑问,建议大家去实测下,然后给出自己的测试数据这样讨论会较好。 1. 关于锁的问题,可以看到volatile变量修饰的读和写请求,通过synchronized把它们组成原子操作,保证了同步操作原语。如果换成ReentrantLock会更好一些。 2. sleep的时间这块可以斟酌,但基本设计思路是希望IO一直忙,不要被CPU slot换出,所以你会看到它的时间被设置的很小。而有可能触发问题的shutdown此时就非常讲究了,这块的代码我们都需要再测试下。另外关于sleep(0)和yield的争议一致以来都有,推荐看下OpenJDK[1],在不同平台上会有不通的表现,这个问题值得大家再深入讨论下。 3. 消息丢失与CommitLog数据写满的问题,建议你再测试下,我们可以继续交流。 4. RocketMQ 5.0会重构、剥离数据存储这块,也欢迎大家实测,然后把自己的一些测试结论发出来,我们可以一起交流下。 if (millis == 0) { // When ConvertSleepToYield is on, this matches the classic VM implementation of // JVM_Sleep. Critical for similar threading behaviour (Win32) // It appears that in certain GUI contexts, it may be beneficial to do a short sleep // for SOLARIS if (ConvertSleepToYield) { os::yield(); } else { ThreadState old_state = thread->osthread()->get_state(); thread->osthread()->set_state(SLEEPING); os::sleep(thread, MinSleepInterval, false); thread->osthread()->set_state(old_state); } } else { ThreadState old_state = thread->osthread()->get_state(); thread->osthread()->set_state(SLEEPING); if (os::sleep(thread, millis, true) == OS_INTRPT) { // An asynchronous exception (e.g., ThreadDeathException) could have been thrown on // us while we were sleeping. We do not overwrite those. if (!HAS_PENDING_EXCEPTION) { HS_DTRACE_PROBE1(hotspot, thread__sleep__end,1); // TODO-FIXME: THROW_MSG returns which means we will not call set_state() // to properly restore the thread state. That's likely wrong. THROW_MSG(vmSymbols::java_lang_InterruptedException(), "sleep interrupted"); } } thread->osthread()->set_state(old_state); } HS_DTRACE_PROBE1(hotspot, thread__sleep__end,0); JVM_END [1] http://hg.openjdk.java.net/jdk7/jdk7/hotspot/file/tip/src/share/vm/prims/jvm.cpp Best Regards, Von Gosling 在 2018年12月4日,下午9:16,刘春龙 <[email protected]> 写道: 各位RocketMQ社区朋友好, 最近将RocketMQ的底层存储源码逐行看了一遍,发现了如下几个问题。希望各位能抽出时间逐个解答一下。如果是BUG的话,还请修复该问题。谢谢💕。 github的issue地址:https://github.com/apache/rocketmq/issues/574 ———————————————————————————————————————————————————————— 下面我把内容贴出来: 1.关于锁synchronized的使用问题; rocketmq/store/src/main/java/org/apache/rocketmq/store/CommitLog.java Line 1069 in 1bedba8 | | publicsynchronizedvoidputRequest(finalGroupCommitRequestrequest) { | rocketmq/store/src/main/java/org/apache/rocketmq/store/CommitLog.java Line 1136 in 1bedba8 | | synchronized (this) { | 我认为上述两处的synchronized是完全没有必要的。减少没必要的锁的使用会增加性能😄。 2. 睡眠时间不合理,可能会造成正在处理的消息响应失败; rocketmq/store/src/main/java/org/apache/rocketmq/store/CommitLog.java Line 1131 in 1bedba8 | | Thread.sleep(10); | GroupCommitService此处的睡眠时间是否合理? 因为是同步刷盘策略,设置时间过短,在shutdown时,可能会造成正在处理的消息(eg,正在执行appendMessages的消息)来不及刷盘,进而响应失败(FLUSH_DISK_TIMEOUT)。 是否可以采用其它方式?或者适当延长下时间?10毫秒是不是太短了。 3. 可能会造成消息丢失; rocketmq/store/src/main/java/org/apache/rocketmq/store/CommitLog.java Lines 943 to 947 in 1bedba8 | | boolean result =false; | | | for (int i =0; i <RETRY_TIMES_OVER&&!result; i++) { | | | result =CommitLog.this.mappedFileQueue.commit(0); | | | CommitLog.log.info(this.getServiceName() +" service shutdown, retry "+ (i +1) +" times "+ (result ?"OK":"Not OK")); | | | } | CommitRealTimeService此处没有第2点中提到的sleep机制。这里是采用尝试10次commit。 举个简单的场景: A 在执行appendMessages; B 在执行appendMessages; 在B执行完appendMessages,还未来得及通知CommitRealTimeService,与此同时shutdown; 注意,A还未执行完appendMessages; 按照上述代码逻辑,会尝试10次commit,在第1次时就能将B消息commit,并结束循环; A 执行完appendMessages; 问题就在于A消息不会被commit了,A消息丢失。但给客户端返回的响应就是OK💢。 相同的,FlushRealTimeService异步刷盘也有类似问题。 4. sleep(0)的问题 rocketmq/store/src/main/java/org/apache/rocketmq/store/MappedFile.java Line 507 in 1bedba8 | | Thread.sleep(0); | -XX:+ConvertSleepToYield 这个参数默认就是 true,如果 sleep 0 的话会转换成 yield 但是 ConvertSleepToYield 设为 false 时,会进行 sleep,且 sleep 的最小时间是 1 ms,也就是说 ConvertSleepToYield 为 false 时,哪怕是 sleep 0 实际上跟 sleep 1 是一样的。这样的话,并不会让出线程的时间片。 我个人认为 RocketMQ 应改为 Thread.yield 处理。或许sleep(0)有其它的意图? 5.commitLog写满时,消息的处理问题 rocketmq/store/src/main/java/org/apache/rocketmq/store/CommitLog.java Lines 1091 to 1096 in 1bedba8 | | for (int i =0; i <2&&!flushOK; i++) { | | | flushOK =CommitLog.this.mappedFileQueue.getFlushedWhere() >= req.getNextOffset(); | | | | | | if (!flushOK) { | | | CommitLog.this.mappedFileQueue.flush(0); | | | } | 同步刷盘策略下,正常消息刷盘时,依赖第二次循环将flushOK置为true。但是如果当消息A写入时发现commitLog已经写满,则消息A会被写入下一个commitLog。当前待刷盘的commitLog会有两个。 A消息通知刷盘服务执行刷盘,第一次循环刷盘(刷入第一个commitLog的末尾数据)时flushOK = CommitLog.this.mappedFileQueue.getFlushedWhere() >= req.getNextOffset();肯定为false,第二次刷盘(刷入消息A)时flushOK = CommitLog.this.mappedFileQueue.getFlushedWhere() >= req.getNextOffset();也肯定是false,需要第三次循环来将flushOK置为true。 所以上述场景下,flushOK为false。但实际消息A已经刷盘成功了。
