This is an automated email from the ASF dual-hosted git repository.

jinrongtong pushed a commit to branch develop
in repository https://gitbox.apache.org/repos/asf/rocketmq.git


The following commit(s) were added to refs/heads/develop by this push:
     new bcc9db5cba [ISSUE #7614] Fix  flaky test RocksDBMessageStoreTest 
(#7625)
bcc9db5cba is described below

commit bcc9db5cbafba6096daf6171f4d348e146bf5c17
Author: Zhanhui Li <lizhan...@apache.org>
AuthorDate: Mon Dec 11 17:37:01 2023 +0800

    [ISSUE #7614] Fix  flaky test RocksDBMessageStoreTest (#7625)
    
    * fix #7614 Flaky test RocksDBMessageStoreTest
    
    Signed-off-by: lizhanhui <lizhan...@gmail.com>
    
    * fix: give TimerMessageStoreTest#testStateAndRecover more time for 
Awaitability to poll and check
    
    Signed-off-by: lizhanhui <lizhan...@gmail.com>
    
    * clean up exclude test list of bazel
    
    Signed-off-by: lizhanhui <lizhan...@gmail.com>
    
    ---------
    
    Signed-off-by: lizhanhui <lizhan...@gmail.com>
---
 store/BUILD.bazel                                  |   4 +-
 .../rocketmq/store/RocksDBMessageStoreTest.java    | 162 ++++++++++++---------
 .../store/timer/TimerMessageStoreTest.java         |   4 +-
 3 files changed, 94 insertions(+), 76 deletions(-)

diff --git a/store/BUILD.bazel b/store/BUILD.bazel
index 4b046c68eb..b884503b08 100644
--- a/store/BUILD.bazel
+++ b/store/BUILD.bazel
@@ -69,9 +69,8 @@ java_library(
 GenTestRules(
     name = "GeneratedTestRules",
     exclude_tests = [
-        # This test is extremely slow and flaky, exclude it.
+        # These tests are extremely slow and flaky, exclude them before they 
are properly fixed.
         
"src/test/java/org/apache/rocketmq/store/ha/autoswitch/AutoSwitchHATest",
-        "src/test/java/org/apache/rocketmq/store/RocksDBMessageStoreTest",
     ],
     medium_tests = [
         "src/test/java/org/apache/rocketmq/store/DefaultMessageStoreTest",
@@ -80,6 +79,7 @@ GenTestRules(
         "src/test/java/org/apache/rocketmq/store/MappedFileQueueTest",
         
"src/test/java/org/apache/rocketmq/store/queue/BatchConsumeMessageTest",
         "src/test/java/org/apache/rocketmq/store/dledger/MixCommitlogTest",
+        "src/test/java/org/apache/rocketmq/store/RocksDBMessageStoreTest",
     ],
     test_files = glob(["src/test/java/**/*Test.java"]),
     deps = [
diff --git 
a/store/src/test/java/org/apache/rocketmq/store/RocksDBMessageStoreTest.java 
b/store/src/test/java/org/apache/rocketmq/store/RocksDBMessageStoreTest.java
index acf5edf511..2af07197a5 100644
--- a/store/src/test/java/org/apache/rocketmq/store/RocksDBMessageStoreTest.java
+++ b/store/src/test/java/org/apache/rocketmq/store/RocksDBMessageStoreTest.java
@@ -60,15 +60,18 @@ import 
org.apache.rocketmq.store.queue.ConsumeQueueInterface;
 import org.apache.rocketmq.store.queue.CqUnit;
 import org.apache.rocketmq.store.stats.BrokerStatsManager;
 import org.assertj.core.util.Strings;
+import org.awaitility.Awaitility;
 import org.junit.After;
-import org.junit.Assert;
 import org.junit.Before;
 import org.junit.Test;
 import org.junit.runner.RunWith;
 import org.mockito.junit.MockitoJUnitRunner;
 
 import static org.assertj.core.api.Assertions.assertThat;
+import static org.assertj.core.api.Assertions.fail;
 import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertNull;
+import static org.junit.Assert.assertSame;
 import static org.junit.Assert.assertTrue;
 
 @RunWith(MockitoJUnitRunner.class)
@@ -77,7 +80,7 @@ public class RocksDBMessageStoreTest {
     private final String messageTopic = "FooBar";
     private final String storeType = StoreType.DEFAULT_ROCKSDB.getStoreType();
     private int queueTotal = 100;
-    private AtomicInteger queueId = new AtomicInteger(0);
+    private final AtomicInteger queueId = new AtomicInteger(0);
     private SocketAddress bornHost;
     private SocketAddress storeHost;
     private byte[] messageBody;
@@ -171,27 +174,27 @@ public class RocksDBMessageStoreTest {
         if (notExecuted()) {
             return;
         }
-        long ipv4HostMsgs = 10;
-        long ipv6HostMsgs = 10;
-        long totalMsgs = ipv4HostMsgs + ipv6HostMsgs;
+        long ipv4HostMessages = 10;
+        long ipv6HostMessages = 10;
+        long totalMessages = ipv4HostMessages + ipv6HostMessages;
         queueTotal = 1;
         messageBody = storeMessage.getBytes();
-        for (long i = 0; i < ipv4HostMsgs; i++) {
+        for (long i = 0; i < ipv4HostMessages; i++) {
             messageStore.putMessage(buildMessage());
         }
 
-        for (long i = 0; i < ipv6HostMsgs; i++) {
+        for (long i = 0; i < ipv6HostMessages; i++) {
             messageStore.putMessage(buildIPv6HostMessage());
         }
 
         StoreTestUtil.waitCommitLogReput((RocksDBMessageStore) messageStore);
 
-        for (long i = 0; i < totalMsgs; i++) {
+        for (long i = 0; i < totalMessages; i++) {
             GetMessageResult result = messageStore.getMessage("GROUP_A", 
"FooBar", 0, i, 1024 * 1024, null);
             assertThat(result).isNotNull();
             result.release();
         }
-        verifyThatMasterIsFunctional(totalMsgs, messageStore);
+        verifyThatMasterIsFunctional(totalMessages, messageStore);
     }
 
     @Test
@@ -549,15 +552,13 @@ public class RocksDBMessageStoreTest {
         try {
             msg.setBornHost(new 
InetSocketAddress(InetAddress.getByName("1050:0000:0000:0000:0005:0600:300c:326b"),
 0));
         } catch (UnknownHostException e) {
-            e.printStackTrace();
-            assertThat(Boolean.FALSE).isTrue();
+            fail("build IPv6 host message error", e);
         }
 
         try {
             msg.setStoreHost(new 
InetSocketAddress(InetAddress.getByName("::1"), 0));
         } catch (UnknownHostException e) {
-            e.printStackTrace();
-            assertThat(Boolean.FALSE).isTrue();
+            fail("build IPv6 host message error", e);
         }
         return msg;
     }
@@ -582,27 +583,27 @@ public class RocksDBMessageStoreTest {
     }
 
     @Test
-    public void testGroupCommit() throws Exception {
+    public void testGroupCommit() {
         if (notExecuted()) {
             return;
         }
-        long totalMsgs = 10;
+        long totalMessages = 10;
         queueTotal = 1;
         messageBody = storeMessage.getBytes();
-        for (long i = 0; i < totalMsgs; i++) {
+        for (long i = 0; i < totalMessages; i++) {
             messageStore.putMessage(buildMessage());
         }
 
-        for (long i = 0; i < totalMsgs; i++) {
+        for (long i = 0; i < totalMessages; i++) {
             GetMessageResult result = messageStore.getMessage("GROUP_A", 
"TOPIC_A", 0, i, 1024 * 1024, null);
             assertThat(result).isNotNull();
             result.release();
         }
-        verifyThatMasterIsFunctional(totalMsgs, messageStore);
+        verifyThatMasterIsFunctional(totalMessages, messageStore);
     }
 
     @Test
-    public void testMaxOffset() throws InterruptedException {
+    public void testMaxOffset() {
         if (notExecuted()) {
             return;
         }
@@ -618,11 +619,11 @@ public class RocksDBMessageStoreTest {
             messageStore.putMessage(msg);
         }
 
-        while (messageStore.dispatchBehindBytes() != 0) {
-            TimeUnit.MILLISECONDS.sleep(1);
-        }
-
-        assertThat(messageStore.getMaxOffsetInQueue(messageTopic, 
queueId)).isEqualTo(firstBatchMessages);
+        Awaitility.await()
+                .with()
+                .atMost(3, TimeUnit.SECONDS)
+                .pollInterval(1, TimeUnit.MILLISECONDS)
+                .until(() -> messageStore.getMaxOffsetInQueue(messageTopic, 
queueId) == firstBatchMessages);
 
         // Disable the dispatcher
         messageStore.getDispatcherList().clear();
@@ -644,14 +645,14 @@ public class RocksDBMessageStoreTest {
         return buildIPv6HostMessage(messageBody, "FooBar");
     }
 
-    private void verifyThatMasterIsFunctional(long totalMsgs, MessageStore 
master) {
-        for (long i = 0; i < totalMsgs; i++) {
+    private void verifyThatMasterIsFunctional(long totalMessages, MessageStore 
master) {
+        for (long i = 0; i < totalMessages; i++) {
             master.putMessage(buildMessage());
         }
 
         StoreTestUtil.waitCommitLogReput((RocksDBMessageStore) messageStore);
 
-        for (long i = 0; i < totalMsgs; i++) {
+        for (long i = 0; i < totalMessages; i++) {
             GetMessageResult result = master.getMessage("GROUP_A", "FooBar", 
0, i, 1024 * 1024, null);
             assertThat(result).isNotNull();
             result.release();
@@ -660,7 +661,7 @@ public class RocksDBMessageStoreTest {
     }
 
     @Test
-    public void testPullSize() throws Exception {
+    public void testPullSize() {
         if (notExecuted()) {
             return;
         }
@@ -673,9 +674,11 @@ public class RocksDBMessageStoreTest {
             messageStore.putMessage(messageExtBrokerInner);
         }
         // wait for consume queue build
-        // the sleep time should be great than consume queue flush interval
-        //Thread.sleep(100);
-        StoreTestUtil.waitCommitLogReput((RocksDBMessageStore) messageStore);
+        Awaitility.await().atMost(10, TimeUnit.SECONDS)
+                .with()
+                .pollInterval(10, TimeUnit.MILLISECONDS)
+                .until(() -> messageStore.getMaxOffsetInQueue(topic, 0) >= 32);
+
         String group = "simple";
         GetMessageResult getMessageResult32 = messageStore.getMessage(group, 
topic, 0, 0, 32, null);
         
assertThat(getMessageResult32.getMessageBufferList().size()).isEqualTo(32);
@@ -705,21 +708,25 @@ public class RocksDBMessageStoreTest {
             messageStore.putMessage(messageExtBrokerInner);
         }
 
-        // Thread.sleep(100);//wait for build consumer queue
-        StoreTestUtil.waitCommitLogReput((RocksDBMessageStore) messageStore);
+        // wait for build consumer queue
+        Awaitility.await()
+                .with()
+                .pollInterval(100, TimeUnit.MILLISECONDS)
+                .atMost(10, TimeUnit.SECONDS)
+                .until(() -> messageStore.getMaxOffsetInQueue(topic, 0) >= 
100);
 
         long maxPhyOffset = messageStore.getMaxPhyOffset();
         long maxCqOffset = messageStore.getMaxOffsetInQueue(topic, 0);
 
         //1.just reboot
         messageStore.shutdown();
-        String storeRootDir = ((RocksDBMessageStore) 
messageStore).getMessageStoreConfig().getStorePathRootDir();
+        String storeRootDir = 
messageStore.getMessageStoreConfig().getStorePathRootDir();
         messageStore = buildMessageStore(storeRootDir, topic);
         boolean load = messageStore.load();
         assertTrue(load);
         messageStore.start();
-        assertTrue(maxPhyOffset == messageStore.getMaxPhyOffset());
-        assertTrue(maxCqOffset == messageStore.getMaxOffsetInQueue(topic, 0));
+        assertEquals(maxPhyOffset, messageStore.getMaxPhyOffset());
+        assertEquals(maxCqOffset, messageStore.getMaxOffsetInQueue(topic, 0));
 
         //2.damage commit-log and reboot normal
         for (int i = 0; i < 100; i++) {
@@ -728,20 +735,25 @@ public class RocksDBMessageStoreTest {
             messageExtBrokerInner.setQueueId(0);
             messageStore.putMessage(messageExtBrokerInner);
         }
-        //Thread.sleep(100);
-        StoreTestUtil.waitCommitLogReput((RocksDBMessageStore) messageStore);
+
+        Awaitility.await()
+                .with()
+                .pollInterval(100, TimeUnit.MILLISECONDS)
+                .atMost(10, TimeUnit.SECONDS)
+                .until(() -> messageStore.getMaxOffsetInQueue(topic, 0) >= 
200);
+
         long secondLastPhyOffset = messageStore.getMaxPhyOffset();
         long secondLastCqOffset = messageStore.getMaxOffsetInQueue(topic, 0);
 
+        // Append a message to corrupt
         MessageExtBrokerInner messageExtBrokerInner = buildMessage();
         messageExtBrokerInner.setTopic(topic);
         messageExtBrokerInner.setQueueId(0);
         messageStore.putMessage(messageExtBrokerInner);
 
-
         messageStore.shutdown();
 
-        //damage last message
+        // Corrupt the last message
         damageCommitLog((RocksDBMessageStore) messageStore, 
secondLastPhyOffset);
 
         //reboot
@@ -749,18 +761,23 @@ public class RocksDBMessageStoreTest {
         load = messageStore.load();
         assertTrue(load);
         messageStore.start();
-        assertTrue(secondLastPhyOffset == messageStore.getMaxPhyOffset());
-        assertTrue(secondLastCqOffset == 
messageStore.getMaxOffsetInQueue(topic, 0));
+        assertEquals(secondLastPhyOffset, messageStore.getMaxPhyOffset());
+        assertEquals(secondLastCqOffset, 
messageStore.getMaxOffsetInQueue(topic, 0));
 
-        //3.damage commitlog and reboot abnormal
+        //3.Corrupt commit-log and reboot abnormal
         for (int i = 0; i < 100; i++) {
             messageExtBrokerInner = buildMessage();
             messageExtBrokerInner.setTopic(topic);
             messageExtBrokerInner.setQueueId(0);
             messageStore.putMessage(messageExtBrokerInner);
         }
-        //Thread.sleep(100);
-        StoreTestUtil.waitCommitLogReput((RocksDBMessageStore) messageStore);
+
+        Awaitility.await()
+                .with()
+                .pollInterval(100, TimeUnit.MILLISECONDS)
+                .atMost(10, TimeUnit.SECONDS)
+                .until(() -> messageStore.getMaxOffsetInQueue(topic, 0) >= 
300);
+
         secondLastPhyOffset = messageStore.getMaxPhyOffset();
         secondLastCqOffset = messageStore.getMaxOffsetInQueue(topic, 0);
 
@@ -770,20 +787,20 @@ public class RocksDBMessageStoreTest {
         messageStore.putMessage(messageExtBrokerInner);
         messageStore.shutdown();
 
-        //damage last message
+        //Corrupt the last message
         damageCommitLog((RocksDBMessageStore) messageStore, 
secondLastPhyOffset);
         //add abort file
-        String fileName = 
StorePathConfigHelper.getAbortFile(((RocksDBMessageStore) 
messageStore).getMessageStoreConfig().getStorePathRootDir());
+        String fileName = 
StorePathConfigHelper.getAbortFile(messageStore.getMessageStoreConfig().getStorePathRootDir());
         File file = new File(fileName);
         UtilAll.ensureDirOK(file.getParent());
-        file.createNewFile();
+        assertTrue(file.createNewFile());
 
         messageStore = buildMessageStore(storeRootDir, topic);
         load = messageStore.load();
         assertTrue(load);
         messageStore.start();
-        assertTrue(secondLastPhyOffset == messageStore.getMaxPhyOffset());
-        assertTrue(secondLastCqOffset == 
messageStore.getMaxOffsetInQueue(topic, 0));
+        assertEquals(secondLastPhyOffset, messageStore.getMaxPhyOffset());
+        assertEquals(secondLastCqOffset, 
messageStore.getMaxOffsetInQueue(topic, 0));
 
         //message write again
         for (int i = 0; i < 100; i++) {
@@ -860,7 +877,8 @@ public class RocksDBMessageStoreTest {
         MessageExtBatch msgExtBatch = buildMessageBatch(msgBatch);
 
         try {
-            PutMessageResult result = 
this.messageStore.putMessages(msgExtBatch);
+            this.messageStore.putMessages(msgExtBatch);
+            fail("Should have raised an exception");
         } catch (Exception e) {
             assertThat(e.getMessage()).contains("message body size exceeded");
         }
@@ -871,7 +889,7 @@ public class RocksDBMessageStoreTest {
         if (notExecuted()) {
             return;
         }
-        MessageStoreConfig messageStoreConfig = ((RocksDBMessageStore) 
this.messageStore).getMessageStoreConfig();
+        MessageStoreConfig messageStoreConfig = 
this.messageStore.getMessageStoreConfig();
         messageStoreConfig.setBrokerRole(BrokerRole.SYNC_MASTER);
         messageStoreConfig.setTotalReplicas(2);
         messageStoreConfig.setInSyncReplicas(2);
@@ -890,7 +908,7 @@ public class RocksDBMessageStoreTest {
         if (notExecuted()) {
             return;
         }
-        MessageStoreConfig messageStoreConfig = ((RocksDBMessageStore) 
this.messageStore).getMessageStoreConfig();
+        MessageStoreConfig messageStoreConfig = 
this.messageStore.getMessageStoreConfig();
         messageStoreConfig.setBrokerRole(BrokerRole.SYNC_MASTER);
         messageStoreConfig.setTotalReplicas(2);
         messageStoreConfig.setInSyncReplicas(2);
@@ -930,13 +948,13 @@ public class RocksDBMessageStoreTest {
     }
 
     @Test
-    public void testPutLongMessage() throws Exception {
+    public void testPutLongMessage() {
         if (notExecuted()) {
             return;
         }
         MessageExtBrokerInner messageExtBrokerInner = buildMessage();
-        CommitLog commitLog = ((RocksDBMessageStore) 
messageStore).getCommitLog();
-        MessageStoreConfig messageStoreConfig = ((RocksDBMessageStore) 
messageStore).getMessageStoreConfig();
+        CommitLog commitLog = messageStore.getCommitLog();
+        MessageStoreConfig messageStoreConfig = 
messageStore.getMessageStoreConfig();
         MessageExtEncoder.PutMessageThreadLocal putMessageThreadLocal = 
commitLog.getPutMessageThreadLocal().get();
 
         //body size, topic size, properties size exactly equal to max size
@@ -944,30 +962,30 @@ public class RocksDBMessageStoreTest {
         messageExtBrokerInner.setTopic(new String(new byte[127]));
         messageExtBrokerInner.setPropertiesString(new String(new 
byte[Short.MAX_VALUE]));
         PutMessageResult encodeResult1 = 
putMessageThreadLocal.getEncoder().encode(messageExtBrokerInner);
-        assertTrue(encodeResult1 == null);
+        assertNull(encodeResult1);
 
         //body size exactly more than max message body size
         messageExtBrokerInner.setBody(new 
byte[messageStoreConfig.getMaxMessageSize() + 1]);
         PutMessageResult encodeResult2 = 
putMessageThreadLocal.getEncoder().encode(messageExtBrokerInner);
-        assertTrue(encodeResult2.getPutMessageStatus() == 
PutMessageStatus.MESSAGE_ILLEGAL);
+        assertSame(encodeResult2.getPutMessageStatus(), 
PutMessageStatus.MESSAGE_ILLEGAL);
 
         //body size exactly equal to max message size
         messageExtBrokerInner.setBody(new 
byte[messageStoreConfig.getMaxMessageSize() + 64 * 1024]);
         PutMessageResult encodeResult3 = 
putMessageThreadLocal.getEncoder().encode(messageExtBrokerInner);
-        assertTrue(encodeResult3.getPutMessageStatus() == 
PutMessageStatus.MESSAGE_ILLEGAL);
+        assertSame(encodeResult3.getPutMessageStatus(), 
PutMessageStatus.MESSAGE_ILLEGAL);
 
         //message properties length more than properties maxSize
         messageExtBrokerInner.setBody(new 
byte[messageStoreConfig.getMaxMessageSize()]);
         messageExtBrokerInner.setPropertiesString(new String(new 
byte[Short.MAX_VALUE + 1]));
         PutMessageResult encodeResult4 = 
putMessageThreadLocal.getEncoder().encode(messageExtBrokerInner);
-        assertTrue(encodeResult4.getPutMessageStatus() == 
PutMessageStatus.PROPERTIES_SIZE_EXCEEDED);
+        assertSame(encodeResult4.getPutMessageStatus(), 
PutMessageStatus.PROPERTIES_SIZE_EXCEEDED);
 
         //message length more than buffer length capacity
         messageExtBrokerInner.setBody(new 
byte[messageStoreConfig.getMaxMessageSize()]);
         messageExtBrokerInner.setTopic(new String(new byte[Short.MAX_VALUE]));
         messageExtBrokerInner.setPropertiesString(new String(new 
byte[Short.MAX_VALUE]));
         PutMessageResult encodeResult5 = 
putMessageThreadLocal.getEncoder().encode(messageExtBrokerInner);
-        assertTrue(encodeResult5.getPutMessageStatus() == 
PutMessageStatus.MESSAGE_ILLEGAL);
+        assertSame(encodeResult5.getPutMessageStatus(), 
PutMessageStatus.MESSAGE_ILLEGAL);
     }
 
     @Test
@@ -976,21 +994,21 @@ public class RocksDBMessageStoreTest {
             return;
         }
         MessageExtBrokerInner messageExtBrokerInner = buildMessage();
-        MessageStoreConfig messageStoreConfig = ((RocksDBMessageStore) 
messageStore).getMessageStoreConfig();
+        MessageStoreConfig messageStoreConfig = 
messageStore.getMessageStoreConfig();
         int originMaxMessageSize = messageStoreConfig.getMaxMessageSize();
 
         messageExtBrokerInner.setBody(new byte[originMaxMessageSize + 10]);
         PutMessageResult putMessageResult = 
messageStore.putMessage(messageExtBrokerInner);
-        assertTrue(putMessageResult.getPutMessageStatus() == 
PutMessageStatus.MESSAGE_ILLEGAL);
+        assertSame(putMessageResult.getPutMessageStatus(), 
PutMessageStatus.MESSAGE_ILLEGAL);
 
         int newMaxMessageSize = originMaxMessageSize + 10;
         messageStoreConfig.setMaxMessageSize(newMaxMessageSize);
         putMessageResult = messageStore.putMessage(messageExtBrokerInner);
-        assertTrue(putMessageResult.getPutMessageStatus() == 
PutMessageStatus.PUT_OK);
+        assertSame(putMessageResult.getPutMessageStatus(), 
PutMessageStatus.PUT_OK);
 
         messageStoreConfig.setMaxMessageSize(10);
         putMessageResult = messageStore.putMessage(messageExtBrokerInner);
-        assertTrue(putMessageResult.getPutMessageStatus() == 
PutMessageStatus.MESSAGE_ILLEGAL);
+        assertSame(putMessageResult.getPutMessageStatus(), 
PutMessageStatus.MESSAGE_ILLEGAL);
 
         messageStoreConfig.setMaxMessageSize(originMaxMessageSize);
     }
@@ -1013,11 +1031,11 @@ public class RocksDBMessageStoreTest {
             }
             consumeQueueTable.put(topicName, cqTable);
         }
-        Assert.assertEquals(consumeQueueTable.size(), 10);
+        assertEquals(consumeQueueTable.size(), 10);
         HashSet<String> resultSet = Sets.newHashSet("topic-3", "topic-5");
         messageStore.deleteTopics(Sets.difference(consumeQueueTable.keySet(), 
resultSet));
-        Assert.assertEquals(consumeQueueTable.size(), 2);
-        Assert.assertEquals(resultSet, consumeQueueTable.keySet());
+        assertEquals(consumeQueueTable.size(), 2);
+        assertEquals(resultSet, consumeQueueTable.keySet());
     }
 
     @Test
@@ -1038,14 +1056,14 @@ public class RocksDBMessageStoreTest {
             }
             consumeQueueTable.put(topicName, cqTable);
         }
-        Assert.assertEquals(consumeQueueTable.size(), 10);
+        assertEquals(consumeQueueTable.size(), 10);
         HashSet<String> resultSet = Sets.newHashSet("topic-3", "topic-5");
         messageStore.cleanUnusedTopic(resultSet);
-        Assert.assertEquals(consumeQueueTable.size(), 2);
-        Assert.assertEquals(resultSet, consumeQueueTable.keySet());
+        assertEquals(consumeQueueTable.size(), 2);
+        assertEquals(resultSet, consumeQueueTable.keySet());
     }
 
-    private class MyMessageArrivingListener implements MessageArrivingListener 
{
+    private static class MyMessageArrivingListener implements 
MessageArrivingListener {
         @Override
         public void arriving(String topic, int queueId, long logicOffset, long 
tagsCode, long msgStoreTime,
             byte[] filterBitMap, Map<String, String> properties) {
diff --git 
a/store/src/test/java/org/apache/rocketmq/store/timer/TimerMessageStoreTest.java
 
b/store/src/test/java/org/apache/rocketmq/store/timer/TimerMessageStoreTest.java
index 02ff35681d..4ce3985f6c 100644
--- 
a/store/src/test/java/org/apache/rocketmq/store/timer/TimerMessageStoreTest.java
+++ 
b/store/src/test/java/org/apache/rocketmq/store/timer/TimerMessageStoreTest.java
@@ -426,8 +426,8 @@ public class TimerMessageStoreTest {
         assertEquals(first.getCommitReadTimeMs(), 
second.getCommitReadTimeMs());
         second.start(true);
 
-        // Wait until all messages have wrote back to commitLog and 
consumeQueue.
-        await().atMost(5000, TimeUnit.MILLISECONDS).until(new 
Callable<Boolean>() {
+        // Wait until all messages have been written back to commitLog and 
consumeQueue.
+        await().atMost(30000, TimeUnit.MILLISECONDS).until(new 
Callable<Boolean>() {
             @Override
             public Boolean call() {
                 ConsumeQueue cq = (ConsumeQueue) 
messageStore.getConsumeQueue(topic, 0);

Reply via email to