Repository: incubator-rocketmq Updated Branches: refs/heads/develop 6a97d2884 -> 368e7c86a
[ROCKETMQ-265] fix consume queue's data maybe repeat bug Author: å å² <yubao....@alibaba-inc.com> Author: fuyou001 <fuyou...@gmail.com> Closes #146 from fuyou001/ROCKETMQ-265. Project: http://git-wip-us.apache.org/repos/asf/incubator-rocketmq/repo Commit: http://git-wip-us.apache.org/repos/asf/incubator-rocketmq/commit/368e7c86 Tree: http://git-wip-us.apache.org/repos/asf/incubator-rocketmq/tree/368e7c86 Diff: http://git-wip-us.apache.org/repos/asf/incubator-rocketmq/diff/368e7c86 Branch: refs/heads/develop Commit: 368e7c86a0b06099f336c81672112dcb5143cf9e Parents: 6a97d28 Author: å å² <yubao....@alibaba-inc.com> Authored: Tue Sep 5 20:28:23 2017 +0800 Committer: yukon <yu...@apache.org> Committed: Tue Sep 5 20:28:23 2017 +0800 ---------------------------------------------------------------------- .../org/apache/rocketmq/store/ConsumeQueue.java | 7 ++ .../apache/rocketmq/store/ConsumeQueueTest.java | 74 +++++++++++++++++--- 2 files changed, 73 insertions(+), 8 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/incubator-rocketmq/blob/368e7c86/store/src/main/java/org/apache/rocketmq/store/ConsumeQueue.java ---------------------------------------------------------------------- diff --git a/store/src/main/java/org/apache/rocketmq/store/ConsumeQueue.java b/store/src/main/java/org/apache/rocketmq/store/ConsumeQueue.java index 0bf0aa9..4922e3d 100644 --- a/store/src/main/java/org/apache/rocketmq/store/ConsumeQueue.java +++ b/store/src/main/java/org/apache/rocketmq/store/ConsumeQueue.java @@ -446,6 +446,13 @@ public class ConsumeQueue { if (cqOffset != 0) { long currentLogicOffset = mappedFile.getWrotePosition() + mappedFile.getFileFromOffset(); + + if (expectLogicOffset < currentLogicOffset) { + log.warn("Build consume queue repeatedly, expectLogicOffset: {} currentLogicOffset: {} Topic: {} QID: {} Diff: {}", + expectLogicOffset, currentLogicOffset, this.topic, this.queueId, expectLogicOffset - currentLogicOffset); + return true; + } + if (expectLogicOffset != currentLogicOffset) { LOG_ERROR.warn( "[BUG]logic queue order maybe wrong, expectLogicOffset: {} currentLogicOffset: {} Topic: {} QID: {} Diff: {}", http://git-wip-us.apache.org/repos/asf/incubator-rocketmq/blob/368e7c86/store/src/test/java/org/apache/rocketmq/store/ConsumeQueueTest.java ---------------------------------------------------------------------- diff --git a/store/src/test/java/org/apache/rocketmq/store/ConsumeQueueTest.java b/store/src/test/java/org/apache/rocketmq/store/ConsumeQueueTest.java index b03f2fc..b7d38f8 100644 --- a/store/src/test/java/org/apache/rocketmq/store/ConsumeQueueTest.java +++ b/store/src/test/java/org/apache/rocketmq/store/ConsumeQueueTest.java @@ -17,22 +17,21 @@ package org.apache.rocketmq.store; -import org.apache.rocketmq.common.BrokerConfig; -import org.apache.rocketmq.common.UtilAll; -import org.apache.rocketmq.common.message.MessageDecoder; -import org.apache.rocketmq.store.config.MessageStoreConfig; -import org.apache.rocketmq.store.stats.BrokerStatsManager; -import org.junit.Test; - import java.io.File; +import java.lang.reflect.Method; import java.net.InetAddress; import java.net.InetSocketAddress; import java.net.SocketAddress; import java.net.UnknownHostException; import java.nio.ByteBuffer; import java.util.Map; - +import org.apache.rocketmq.common.BrokerConfig; +import org.apache.rocketmq.common.UtilAll; +import org.apache.rocketmq.common.message.MessageDecoder; +import org.apache.rocketmq.store.config.MessageStoreConfig; +import org.apache.rocketmq.store.stats.BrokerStatsManager; import static org.assertj.core.api.Assertions.assertThat; +import org.junit.Test; public class ConsumeQueueTest { @@ -131,6 +130,65 @@ public class ConsumeQueueTest { } } + protected void deleteDirectory(String rootPath) { + File file = new File(rootPath); + deleteFile(file); + } + + protected void deleteFile(File file) { + File[] subFiles = file.listFiles(); + if (subFiles != null) { + for (File sub : subFiles) { + deleteFile(sub); + } + } + + file.delete(); + } + + @Test + public void testPutMessagePositionInfo_buildCQRepeatedly() throws Exception { + DefaultMessageStore messageStore = null; + try { + + messageStore = gen(); + + int totalMessages = 10; + + for (int i = 0; i < totalMessages; i++) { + putMsg(messageStore); + } + Thread.sleep(5); + + ConsumeQueue cq = messageStore.getConsumeQueueTable().get(topic).get(queueId); + Method method = cq.getClass().getDeclaredMethod("putMessagePositionInfo", long.class, int.class, long.class, long.class); + + assertThat(method).isNotNull(); + + method.setAccessible(true); + + SelectMappedBufferResult result = messageStore.getCommitLog().getData(0); + assertThat(result != null).isTrue(); + + DispatchRequest dispatchRequest = messageStore.getCommitLog().checkMessageAndReturnSize(result.getByteBuffer(), false, false); + + assertThat(cq).isNotNull(); + + Object dispatchResult = method.invoke(cq, dispatchRequest.getCommitLogOffset(), + dispatchRequest.getMsgSize(), dispatchRequest.getTagsCode(), dispatchRequest.getConsumeQueueOffset()); + + assertThat(Boolean.parseBoolean(dispatchResult.toString())).isTrue(); + + } finally { + if (messageStore != null) { + messageStore.shutdown(); + messageStore.destroy(); + } + deleteDirectory(storePath); + } + + } + @Test public void testConsumeQueueWithExtendData() { DefaultMessageStore master = null;