This is an automated email from the ASF dual-hosted git repository. jinrongtong pushed a commit to branch develop in repository https://gitbox.apache.org/repos/asf/rocketmq.git
The following commit(s) were added to refs/heads/develop by this push: new bcc9db5cba [ISSUE #7614] Fix flaky test RocksDBMessageStoreTest (#7625) bcc9db5cba is described below commit bcc9db5cbafba6096daf6171f4d348e146bf5c17 Author: Zhanhui Li <lizhan...@apache.org> AuthorDate: Mon Dec 11 17:37:01 2023 +0800 [ISSUE #7614] Fix flaky test RocksDBMessageStoreTest (#7625) * fix #7614 Flaky test RocksDBMessageStoreTest Signed-off-by: lizhanhui <lizhan...@gmail.com> * fix: give TimerMessageStoreTest#testStateAndRecover more time for Awaitability to poll and check Signed-off-by: lizhanhui <lizhan...@gmail.com> * clean up exclude test list of bazel Signed-off-by: lizhanhui <lizhan...@gmail.com> --------- Signed-off-by: lizhanhui <lizhan...@gmail.com> --- store/BUILD.bazel | 4 +- .../rocketmq/store/RocksDBMessageStoreTest.java | 162 ++++++++++++--------- .../store/timer/TimerMessageStoreTest.java | 4 +- 3 files changed, 94 insertions(+), 76 deletions(-) diff --git a/store/BUILD.bazel b/store/BUILD.bazel index 4b046c68eb..b884503b08 100644 --- a/store/BUILD.bazel +++ b/store/BUILD.bazel @@ -69,9 +69,8 @@ java_library( GenTestRules( name = "GeneratedTestRules", exclude_tests = [ - # This test is extremely slow and flaky, exclude it. + # These tests are extremely slow and flaky, exclude them before they are properly fixed. "src/test/java/org/apache/rocketmq/store/ha/autoswitch/AutoSwitchHATest", - "src/test/java/org/apache/rocketmq/store/RocksDBMessageStoreTest", ], medium_tests = [ "src/test/java/org/apache/rocketmq/store/DefaultMessageStoreTest", @@ -80,6 +79,7 @@ GenTestRules( "src/test/java/org/apache/rocketmq/store/MappedFileQueueTest", "src/test/java/org/apache/rocketmq/store/queue/BatchConsumeMessageTest", "src/test/java/org/apache/rocketmq/store/dledger/MixCommitlogTest", + "src/test/java/org/apache/rocketmq/store/RocksDBMessageStoreTest", ], test_files = glob(["src/test/java/**/*Test.java"]), deps = [ diff --git a/store/src/test/java/org/apache/rocketmq/store/RocksDBMessageStoreTest.java b/store/src/test/java/org/apache/rocketmq/store/RocksDBMessageStoreTest.java index acf5edf511..2af07197a5 100644 --- a/store/src/test/java/org/apache/rocketmq/store/RocksDBMessageStoreTest.java +++ b/store/src/test/java/org/apache/rocketmq/store/RocksDBMessageStoreTest.java @@ -60,15 +60,18 @@ import org.apache.rocketmq.store.queue.ConsumeQueueInterface; import org.apache.rocketmq.store.queue.CqUnit; import org.apache.rocketmq.store.stats.BrokerStatsManager; import org.assertj.core.util.Strings; +import org.awaitility.Awaitility; import org.junit.After; -import org.junit.Assert; import org.junit.Before; import org.junit.Test; import org.junit.runner.RunWith; import org.mockito.junit.MockitoJUnitRunner; import static org.assertj.core.api.Assertions.assertThat; +import static org.assertj.core.api.Assertions.fail; import static org.junit.Assert.assertEquals; +import static org.junit.Assert.assertNull; +import static org.junit.Assert.assertSame; import static org.junit.Assert.assertTrue; @RunWith(MockitoJUnitRunner.class) @@ -77,7 +80,7 @@ public class RocksDBMessageStoreTest { private final String messageTopic = "FooBar"; private final String storeType = StoreType.DEFAULT_ROCKSDB.getStoreType(); private int queueTotal = 100; - private AtomicInteger queueId = new AtomicInteger(0); + private final AtomicInteger queueId = new AtomicInteger(0); private SocketAddress bornHost; private SocketAddress storeHost; private byte[] messageBody; @@ -171,27 +174,27 @@ public class RocksDBMessageStoreTest { if (notExecuted()) { return; } - long ipv4HostMsgs = 10; - long ipv6HostMsgs = 10; - long totalMsgs = ipv4HostMsgs + ipv6HostMsgs; + long ipv4HostMessages = 10; + long ipv6HostMessages = 10; + long totalMessages = ipv4HostMessages + ipv6HostMessages; queueTotal = 1; messageBody = storeMessage.getBytes(); - for (long i = 0; i < ipv4HostMsgs; i++) { + for (long i = 0; i < ipv4HostMessages; i++) { messageStore.putMessage(buildMessage()); } - for (long i = 0; i < ipv6HostMsgs; i++) { + for (long i = 0; i < ipv6HostMessages; i++) { messageStore.putMessage(buildIPv6HostMessage()); } StoreTestUtil.waitCommitLogReput((RocksDBMessageStore) messageStore); - for (long i = 0; i < totalMsgs; i++) { + for (long i = 0; i < totalMessages; i++) { GetMessageResult result = messageStore.getMessage("GROUP_A", "FooBar", 0, i, 1024 * 1024, null); assertThat(result).isNotNull(); result.release(); } - verifyThatMasterIsFunctional(totalMsgs, messageStore); + verifyThatMasterIsFunctional(totalMessages, messageStore); } @Test @@ -549,15 +552,13 @@ public class RocksDBMessageStoreTest { try { msg.setBornHost(new InetSocketAddress(InetAddress.getByName("1050:0000:0000:0000:0005:0600:300c:326b"), 0)); } catch (UnknownHostException e) { - e.printStackTrace(); - assertThat(Boolean.FALSE).isTrue(); + fail("build IPv6 host message error", e); } try { msg.setStoreHost(new InetSocketAddress(InetAddress.getByName("::1"), 0)); } catch (UnknownHostException e) { - e.printStackTrace(); - assertThat(Boolean.FALSE).isTrue(); + fail("build IPv6 host message error", e); } return msg; } @@ -582,27 +583,27 @@ public class RocksDBMessageStoreTest { } @Test - public void testGroupCommit() throws Exception { + public void testGroupCommit() { if (notExecuted()) { return; } - long totalMsgs = 10; + long totalMessages = 10; queueTotal = 1; messageBody = storeMessage.getBytes(); - for (long i = 0; i < totalMsgs; i++) { + for (long i = 0; i < totalMessages; i++) { messageStore.putMessage(buildMessage()); } - for (long i = 0; i < totalMsgs; i++) { + for (long i = 0; i < totalMessages; i++) { GetMessageResult result = messageStore.getMessage("GROUP_A", "TOPIC_A", 0, i, 1024 * 1024, null); assertThat(result).isNotNull(); result.release(); } - verifyThatMasterIsFunctional(totalMsgs, messageStore); + verifyThatMasterIsFunctional(totalMessages, messageStore); } @Test - public void testMaxOffset() throws InterruptedException { + public void testMaxOffset() { if (notExecuted()) { return; } @@ -618,11 +619,11 @@ public class RocksDBMessageStoreTest { messageStore.putMessage(msg); } - while (messageStore.dispatchBehindBytes() != 0) { - TimeUnit.MILLISECONDS.sleep(1); - } - - assertThat(messageStore.getMaxOffsetInQueue(messageTopic, queueId)).isEqualTo(firstBatchMessages); + Awaitility.await() + .with() + .atMost(3, TimeUnit.SECONDS) + .pollInterval(1, TimeUnit.MILLISECONDS) + .until(() -> messageStore.getMaxOffsetInQueue(messageTopic, queueId) == firstBatchMessages); // Disable the dispatcher messageStore.getDispatcherList().clear(); @@ -644,14 +645,14 @@ public class RocksDBMessageStoreTest { return buildIPv6HostMessage(messageBody, "FooBar"); } - private void verifyThatMasterIsFunctional(long totalMsgs, MessageStore master) { - for (long i = 0; i < totalMsgs; i++) { + private void verifyThatMasterIsFunctional(long totalMessages, MessageStore master) { + for (long i = 0; i < totalMessages; i++) { master.putMessage(buildMessage()); } StoreTestUtil.waitCommitLogReput((RocksDBMessageStore) messageStore); - for (long i = 0; i < totalMsgs; i++) { + for (long i = 0; i < totalMessages; i++) { GetMessageResult result = master.getMessage("GROUP_A", "FooBar", 0, i, 1024 * 1024, null); assertThat(result).isNotNull(); result.release(); @@ -660,7 +661,7 @@ public class RocksDBMessageStoreTest { } @Test - public void testPullSize() throws Exception { + public void testPullSize() { if (notExecuted()) { return; } @@ -673,9 +674,11 @@ public class RocksDBMessageStoreTest { messageStore.putMessage(messageExtBrokerInner); } // wait for consume queue build - // the sleep time should be great than consume queue flush interval - //Thread.sleep(100); - StoreTestUtil.waitCommitLogReput((RocksDBMessageStore) messageStore); + Awaitility.await().atMost(10, TimeUnit.SECONDS) + .with() + .pollInterval(10, TimeUnit.MILLISECONDS) + .until(() -> messageStore.getMaxOffsetInQueue(topic, 0) >= 32); + String group = "simple"; GetMessageResult getMessageResult32 = messageStore.getMessage(group, topic, 0, 0, 32, null); assertThat(getMessageResult32.getMessageBufferList().size()).isEqualTo(32); @@ -705,21 +708,25 @@ public class RocksDBMessageStoreTest { messageStore.putMessage(messageExtBrokerInner); } - // Thread.sleep(100);//wait for build consumer queue - StoreTestUtil.waitCommitLogReput((RocksDBMessageStore) messageStore); + // wait for build consumer queue + Awaitility.await() + .with() + .pollInterval(100, TimeUnit.MILLISECONDS) + .atMost(10, TimeUnit.SECONDS) + .until(() -> messageStore.getMaxOffsetInQueue(topic, 0) >= 100); long maxPhyOffset = messageStore.getMaxPhyOffset(); long maxCqOffset = messageStore.getMaxOffsetInQueue(topic, 0); //1.just reboot messageStore.shutdown(); - String storeRootDir = ((RocksDBMessageStore) messageStore).getMessageStoreConfig().getStorePathRootDir(); + String storeRootDir = messageStore.getMessageStoreConfig().getStorePathRootDir(); messageStore = buildMessageStore(storeRootDir, topic); boolean load = messageStore.load(); assertTrue(load); messageStore.start(); - assertTrue(maxPhyOffset == messageStore.getMaxPhyOffset()); - assertTrue(maxCqOffset == messageStore.getMaxOffsetInQueue(topic, 0)); + assertEquals(maxPhyOffset, messageStore.getMaxPhyOffset()); + assertEquals(maxCqOffset, messageStore.getMaxOffsetInQueue(topic, 0)); //2.damage commit-log and reboot normal for (int i = 0; i < 100; i++) { @@ -728,20 +735,25 @@ public class RocksDBMessageStoreTest { messageExtBrokerInner.setQueueId(0); messageStore.putMessage(messageExtBrokerInner); } - //Thread.sleep(100); - StoreTestUtil.waitCommitLogReput((RocksDBMessageStore) messageStore); + + Awaitility.await() + .with() + .pollInterval(100, TimeUnit.MILLISECONDS) + .atMost(10, TimeUnit.SECONDS) + .until(() -> messageStore.getMaxOffsetInQueue(topic, 0) >= 200); + long secondLastPhyOffset = messageStore.getMaxPhyOffset(); long secondLastCqOffset = messageStore.getMaxOffsetInQueue(topic, 0); + // Append a message to corrupt MessageExtBrokerInner messageExtBrokerInner = buildMessage(); messageExtBrokerInner.setTopic(topic); messageExtBrokerInner.setQueueId(0); messageStore.putMessage(messageExtBrokerInner); - messageStore.shutdown(); - //damage last message + // Corrupt the last message damageCommitLog((RocksDBMessageStore) messageStore, secondLastPhyOffset); //reboot @@ -749,18 +761,23 @@ public class RocksDBMessageStoreTest { load = messageStore.load(); assertTrue(load); messageStore.start(); - assertTrue(secondLastPhyOffset == messageStore.getMaxPhyOffset()); - assertTrue(secondLastCqOffset == messageStore.getMaxOffsetInQueue(topic, 0)); + assertEquals(secondLastPhyOffset, messageStore.getMaxPhyOffset()); + assertEquals(secondLastCqOffset, messageStore.getMaxOffsetInQueue(topic, 0)); - //3.damage commitlog and reboot abnormal + //3.Corrupt commit-log and reboot abnormal for (int i = 0; i < 100; i++) { messageExtBrokerInner = buildMessage(); messageExtBrokerInner.setTopic(topic); messageExtBrokerInner.setQueueId(0); messageStore.putMessage(messageExtBrokerInner); } - //Thread.sleep(100); - StoreTestUtil.waitCommitLogReput((RocksDBMessageStore) messageStore); + + Awaitility.await() + .with() + .pollInterval(100, TimeUnit.MILLISECONDS) + .atMost(10, TimeUnit.SECONDS) + .until(() -> messageStore.getMaxOffsetInQueue(topic, 0) >= 300); + secondLastPhyOffset = messageStore.getMaxPhyOffset(); secondLastCqOffset = messageStore.getMaxOffsetInQueue(topic, 0); @@ -770,20 +787,20 @@ public class RocksDBMessageStoreTest { messageStore.putMessage(messageExtBrokerInner); messageStore.shutdown(); - //damage last message + //Corrupt the last message damageCommitLog((RocksDBMessageStore) messageStore, secondLastPhyOffset); //add abort file - String fileName = StorePathConfigHelper.getAbortFile(((RocksDBMessageStore) messageStore).getMessageStoreConfig().getStorePathRootDir()); + String fileName = StorePathConfigHelper.getAbortFile(messageStore.getMessageStoreConfig().getStorePathRootDir()); File file = new File(fileName); UtilAll.ensureDirOK(file.getParent()); - file.createNewFile(); + assertTrue(file.createNewFile()); messageStore = buildMessageStore(storeRootDir, topic); load = messageStore.load(); assertTrue(load); messageStore.start(); - assertTrue(secondLastPhyOffset == messageStore.getMaxPhyOffset()); - assertTrue(secondLastCqOffset == messageStore.getMaxOffsetInQueue(topic, 0)); + assertEquals(secondLastPhyOffset, messageStore.getMaxPhyOffset()); + assertEquals(secondLastCqOffset, messageStore.getMaxOffsetInQueue(topic, 0)); //message write again for (int i = 0; i < 100; i++) { @@ -860,7 +877,8 @@ public class RocksDBMessageStoreTest { MessageExtBatch msgExtBatch = buildMessageBatch(msgBatch); try { - PutMessageResult result = this.messageStore.putMessages(msgExtBatch); + this.messageStore.putMessages(msgExtBatch); + fail("Should have raised an exception"); } catch (Exception e) { assertThat(e.getMessage()).contains("message body size exceeded"); } @@ -871,7 +889,7 @@ public class RocksDBMessageStoreTest { if (notExecuted()) { return; } - MessageStoreConfig messageStoreConfig = ((RocksDBMessageStore) this.messageStore).getMessageStoreConfig(); + MessageStoreConfig messageStoreConfig = this.messageStore.getMessageStoreConfig(); messageStoreConfig.setBrokerRole(BrokerRole.SYNC_MASTER); messageStoreConfig.setTotalReplicas(2); messageStoreConfig.setInSyncReplicas(2); @@ -890,7 +908,7 @@ public class RocksDBMessageStoreTest { if (notExecuted()) { return; } - MessageStoreConfig messageStoreConfig = ((RocksDBMessageStore) this.messageStore).getMessageStoreConfig(); + MessageStoreConfig messageStoreConfig = this.messageStore.getMessageStoreConfig(); messageStoreConfig.setBrokerRole(BrokerRole.SYNC_MASTER); messageStoreConfig.setTotalReplicas(2); messageStoreConfig.setInSyncReplicas(2); @@ -930,13 +948,13 @@ public class RocksDBMessageStoreTest { } @Test - public void testPutLongMessage() throws Exception { + public void testPutLongMessage() { if (notExecuted()) { return; } MessageExtBrokerInner messageExtBrokerInner = buildMessage(); - CommitLog commitLog = ((RocksDBMessageStore) messageStore).getCommitLog(); - MessageStoreConfig messageStoreConfig = ((RocksDBMessageStore) messageStore).getMessageStoreConfig(); + CommitLog commitLog = messageStore.getCommitLog(); + MessageStoreConfig messageStoreConfig = messageStore.getMessageStoreConfig(); MessageExtEncoder.PutMessageThreadLocal putMessageThreadLocal = commitLog.getPutMessageThreadLocal().get(); //body size, topic size, properties size exactly equal to max size @@ -944,30 +962,30 @@ public class RocksDBMessageStoreTest { messageExtBrokerInner.setTopic(new String(new byte[127])); messageExtBrokerInner.setPropertiesString(new String(new byte[Short.MAX_VALUE])); PutMessageResult encodeResult1 = putMessageThreadLocal.getEncoder().encode(messageExtBrokerInner); - assertTrue(encodeResult1 == null); + assertNull(encodeResult1); //body size exactly more than max message body size messageExtBrokerInner.setBody(new byte[messageStoreConfig.getMaxMessageSize() + 1]); PutMessageResult encodeResult2 = putMessageThreadLocal.getEncoder().encode(messageExtBrokerInner); - assertTrue(encodeResult2.getPutMessageStatus() == PutMessageStatus.MESSAGE_ILLEGAL); + assertSame(encodeResult2.getPutMessageStatus(), PutMessageStatus.MESSAGE_ILLEGAL); //body size exactly equal to max message size messageExtBrokerInner.setBody(new byte[messageStoreConfig.getMaxMessageSize() + 64 * 1024]); PutMessageResult encodeResult3 = putMessageThreadLocal.getEncoder().encode(messageExtBrokerInner); - assertTrue(encodeResult3.getPutMessageStatus() == PutMessageStatus.MESSAGE_ILLEGAL); + assertSame(encodeResult3.getPutMessageStatus(), PutMessageStatus.MESSAGE_ILLEGAL); //message properties length more than properties maxSize messageExtBrokerInner.setBody(new byte[messageStoreConfig.getMaxMessageSize()]); messageExtBrokerInner.setPropertiesString(new String(new byte[Short.MAX_VALUE + 1])); PutMessageResult encodeResult4 = putMessageThreadLocal.getEncoder().encode(messageExtBrokerInner); - assertTrue(encodeResult4.getPutMessageStatus() == PutMessageStatus.PROPERTIES_SIZE_EXCEEDED); + assertSame(encodeResult4.getPutMessageStatus(), PutMessageStatus.PROPERTIES_SIZE_EXCEEDED); //message length more than buffer length capacity messageExtBrokerInner.setBody(new byte[messageStoreConfig.getMaxMessageSize()]); messageExtBrokerInner.setTopic(new String(new byte[Short.MAX_VALUE])); messageExtBrokerInner.setPropertiesString(new String(new byte[Short.MAX_VALUE])); PutMessageResult encodeResult5 = putMessageThreadLocal.getEncoder().encode(messageExtBrokerInner); - assertTrue(encodeResult5.getPutMessageStatus() == PutMessageStatus.MESSAGE_ILLEGAL); + assertSame(encodeResult5.getPutMessageStatus(), PutMessageStatus.MESSAGE_ILLEGAL); } @Test @@ -976,21 +994,21 @@ public class RocksDBMessageStoreTest { return; } MessageExtBrokerInner messageExtBrokerInner = buildMessage(); - MessageStoreConfig messageStoreConfig = ((RocksDBMessageStore) messageStore).getMessageStoreConfig(); + MessageStoreConfig messageStoreConfig = messageStore.getMessageStoreConfig(); int originMaxMessageSize = messageStoreConfig.getMaxMessageSize(); messageExtBrokerInner.setBody(new byte[originMaxMessageSize + 10]); PutMessageResult putMessageResult = messageStore.putMessage(messageExtBrokerInner); - assertTrue(putMessageResult.getPutMessageStatus() == PutMessageStatus.MESSAGE_ILLEGAL); + assertSame(putMessageResult.getPutMessageStatus(), PutMessageStatus.MESSAGE_ILLEGAL); int newMaxMessageSize = originMaxMessageSize + 10; messageStoreConfig.setMaxMessageSize(newMaxMessageSize); putMessageResult = messageStore.putMessage(messageExtBrokerInner); - assertTrue(putMessageResult.getPutMessageStatus() == PutMessageStatus.PUT_OK); + assertSame(putMessageResult.getPutMessageStatus(), PutMessageStatus.PUT_OK); messageStoreConfig.setMaxMessageSize(10); putMessageResult = messageStore.putMessage(messageExtBrokerInner); - assertTrue(putMessageResult.getPutMessageStatus() == PutMessageStatus.MESSAGE_ILLEGAL); + assertSame(putMessageResult.getPutMessageStatus(), PutMessageStatus.MESSAGE_ILLEGAL); messageStoreConfig.setMaxMessageSize(originMaxMessageSize); } @@ -1013,11 +1031,11 @@ public class RocksDBMessageStoreTest { } consumeQueueTable.put(topicName, cqTable); } - Assert.assertEquals(consumeQueueTable.size(), 10); + assertEquals(consumeQueueTable.size(), 10); HashSet<String> resultSet = Sets.newHashSet("topic-3", "topic-5"); messageStore.deleteTopics(Sets.difference(consumeQueueTable.keySet(), resultSet)); - Assert.assertEquals(consumeQueueTable.size(), 2); - Assert.assertEquals(resultSet, consumeQueueTable.keySet()); + assertEquals(consumeQueueTable.size(), 2); + assertEquals(resultSet, consumeQueueTable.keySet()); } @Test @@ -1038,14 +1056,14 @@ public class RocksDBMessageStoreTest { } consumeQueueTable.put(topicName, cqTable); } - Assert.assertEquals(consumeQueueTable.size(), 10); + assertEquals(consumeQueueTable.size(), 10); HashSet<String> resultSet = Sets.newHashSet("topic-3", "topic-5"); messageStore.cleanUnusedTopic(resultSet); - Assert.assertEquals(consumeQueueTable.size(), 2); - Assert.assertEquals(resultSet, consumeQueueTable.keySet()); + assertEquals(consumeQueueTable.size(), 2); + assertEquals(resultSet, consumeQueueTable.keySet()); } - private class MyMessageArrivingListener implements MessageArrivingListener { + private static class MyMessageArrivingListener implements MessageArrivingListener { @Override public void arriving(String topic, int queueId, long logicOffset, long tagsCode, long msgStoreTime, byte[] filterBitMap, Map<String, String> properties) { diff --git a/store/src/test/java/org/apache/rocketmq/store/timer/TimerMessageStoreTest.java b/store/src/test/java/org/apache/rocketmq/store/timer/TimerMessageStoreTest.java index 02ff35681d..4ce3985f6c 100644 --- a/store/src/test/java/org/apache/rocketmq/store/timer/TimerMessageStoreTest.java +++ b/store/src/test/java/org/apache/rocketmq/store/timer/TimerMessageStoreTest.java @@ -426,8 +426,8 @@ public class TimerMessageStoreTest { assertEquals(first.getCommitReadTimeMs(), second.getCommitReadTimeMs()); second.start(true); - // Wait until all messages have wrote back to commitLog and consumeQueue. - await().atMost(5000, TimeUnit.MILLISECONDS).until(new Callable<Boolean>() { + // Wait until all messages have been written back to commitLog and consumeQueue. + await().atMost(30000, TimeUnit.MILLISECONDS).until(new Callable<Boolean>() { @Override public Boolean call() { ConsumeQueue cq = (ConsumeQueue) messageStore.getConsumeQueue(topic, 0);