This is an automated email from the ASF dual-hosted git repository.

dinglei pushed a commit to branch develop
in repository https://gitbox.apache.org/repos/asf/rocketmq.git


The following commit(s) were added to refs/heads/develop by this push:
     new 1128a5a  [RIP-10]Add test cases for DefaultMessageStore which methods 
read from CommitLog/ConsumeQueue (#778)
1128a5a is described below

commit 1128a5ad3757906d81fe6f5de710106245091a75
Author: Frank Lin <[email protected]>
AuthorDate: Wed Feb 20 16:25:24 2019 +0800

    [RIP-10]Add test cases for DefaultMessageStore which methods read from 
CommitLog/ConsumeQueue (#778)
    
    [RIP-10]Add test cases for DefaultMessageStore which methods read from 
CommitLog/ConsumeQueue
---
 .../rocketmq/store/DefaultMessageStoreTest.java    | 283 ++++++++++++++++++++-
 1 file changed, 276 insertions(+), 7 deletions(-)

diff --git 
a/store/src/test/java/org/apache/rocketmq/store/DefaultMessageStoreTest.java 
b/store/src/test/java/org/apache/rocketmq/store/DefaultMessageStoreTest.java
index 57b6999..ad4ca91 100644
--- a/store/src/test/java/org/apache/rocketmq/store/DefaultMessageStoreTest.java
+++ b/store/src/test/java/org/apache/rocketmq/store/DefaultMessageStoreTest.java
@@ -19,28 +19,36 @@ package org.apache.rocketmq.store;
 
 import java.io.File;
 import java.io.RandomAccessFile;
+import java.lang.reflect.InvocationTargetException;
+import java.lang.reflect.Method;
 import java.net.InetAddress;
 import java.net.InetSocketAddress;
 import java.net.SocketAddress;
+import java.nio.ByteBuffer;
 import java.nio.MappedByteBuffer;
 import java.nio.channels.FileChannel;
 import java.nio.channels.OverlappingFileLockException;
 import java.util.Map;
+import java.util.Random;
 import java.util.concurrent.atomic.AtomicInteger;
-
 import org.apache.rocketmq.common.BrokerConfig;
 import org.apache.rocketmq.common.UtilAll;
+import org.apache.rocketmq.common.message.MessageExt;
 import org.apache.rocketmq.store.config.FlushDiskType;
 import org.apache.rocketmq.store.config.MessageStoreConfig;
 import org.apache.rocketmq.store.config.StorePathConfigHelper;
-import org.junit.After;
 import org.apache.rocketmq.store.stats.BrokerStatsManager;
+import org.junit.After;
 import org.junit.Before;
 import org.junit.Test;
+import org.junit.runner.RunWith;
+import org.mockito.junit.MockitoJUnitRunner;
 
 import static org.assertj.core.api.Assertions.assertThat;
 import static org.junit.Assert.assertTrue;
+import static org.mockito.Mockito.mock;
 
+@RunWith(MockitoJUnitRunner.class)
 public class DefaultMessageStoreTest {
     private final String StoreMessage = "Once, there was a chance for me!";
     private int QUEUE_TOTAL = 100;
@@ -62,7 +70,7 @@ public class DefaultMessageStoreTest {
     }
 
     @Test(expected = OverlappingFileLockException.class)
-    public void test_repate_restart() throws Exception {
+    public void test_repeat_restart() throws Exception {
         QUEUE_TOTAL = 1;
         MessageBody = StoreMessage.getBytes();
 
@@ -86,7 +94,7 @@ public class DefaultMessageStoreTest {
     }
 
     @After
-    public void destory() {
+    public void destroy() {
         messageStore.shutdown();
         messageStore.destroy();
 
@@ -123,12 +131,269 @@ public class DefaultMessageStoreTest {
         verifyThatMasterIsFunctional(totalMsgs, messageStore);
     }
 
-    private MessageExtBrokerInner buildMessage() {
+    @Test
+    public void should_look_message_successfully_when_offset_is_first() {
+        final int totalCount = 10;
+        int queueId = new Random().nextInt(10);
+        String topic = "FooBar";
+        int firstOffset = 0;
+        AppendMessageResult[] appendMessageResultArray = 
putMessages(totalCount, topic, queueId);
+        AppendMessageResult firstResult = appendMessageResultArray[0];
+
+        MessageExt messageExt = 
messageStore.lookMessageByOffset(firstResult.getWroteOffset());
+        MessageExt messageExt1 = 
getDefaultMessageStore().lookMessageByOffset(firstResult.getWroteOffset(), 
firstResult.getWroteBytes());
+
+        assertThat(new 
String(messageExt.getBody())).isEqualTo(buildMessageBodyByOffset(StoreMessage, 
firstOffset));
+        assertThat(new 
String(messageExt1.getBody())).isEqualTo(buildMessageBodyByOffset(StoreMessage, 
firstOffset));
+    }
+
+    @Test
+    public void should_look_message_successfully_when_offset_is_last() {
+        final int totalCount = 10;
+        int queueId = new Random().nextInt(10);
+        String topic = "FooBar";
+        AppendMessageResult[] appendMessageResultArray = 
putMessages(totalCount, topic, queueId);
+        int lastIndex = totalCount - 1;
+        AppendMessageResult lastResult = appendMessageResultArray[lastIndex];
+
+        MessageExt messageExt = 
getDefaultMessageStore().lookMessageByOffset(lastResult.getWroteOffset(), 
lastResult.getWroteBytes());
+
+        assertThat(new 
String(messageExt.getBody())).isEqualTo(buildMessageBodyByOffset(StoreMessage, 
lastIndex));
+    }
+
+    @Test
+    public void 
should_look_message_failed_and_return_null_when_offset_is_out_of_bound() {
+        final int totalCount = 10;
+        int queueId = new Random().nextInt(10);
+        String topic = "FooBar";
+        AppendMessageResult[] appendMessageResultArray = 
putMessages(totalCount, topic, queueId);
+        long lastOffset = getMaxOffset(appendMessageResultArray);
+
+        MessageExt messageExt = 
getDefaultMessageStore().lookMessageByOffset(lastOffset);
+
+        assertThat(messageExt).isNull();
+    }
+
+    @Test
+    public void 
should_get_consume_queue_offset_successfully_when_incomming_by_timestamp() 
throws InterruptedException {
+        final int totalCount = 10;
+        int queueId = 0;
+        String topic = "FooBar";
+        AppendMessageResult[] appendMessageResults = putMessages(totalCount, 
topic, queueId, true);
+        Thread.sleep(10);
+
+        ConsumeQueue consumeQueue = 
getDefaultMessageStore().findConsumeQueue(topic, queueId);
+        for (AppendMessageResult appendMessageResult : appendMessageResults) {
+            long offset = messageStore.getOffsetInQueueByTime(topic, queueId, 
appendMessageResult.getStoreTimestamp());
+            SelectMappedBufferResult indexBuffer = 
consumeQueue.getIndexBuffer(offset);
+            
assertThat(indexBuffer.getByteBuffer().getLong()).isEqualTo(appendMessageResult.getWroteOffset());
+            
assertThat(indexBuffer.getByteBuffer().getInt()).isEqualTo(appendMessageResult.getWroteBytes());
+            indexBuffer.release();
+        }
+    }
+
+    @Test
+    public void 
should_get_consume_queue_offset_successfully_when_timestamp_is_skewing() throws 
InterruptedException {
+        final int totalCount = 10;
+        int queueId = 0;
+        String topic = "FooBar";
+        AppendMessageResult[] appendMessageResults = putMessages(totalCount, 
topic, queueId, true);
+        Thread.sleep(10);
+        int skewing = 2;
+
+        ConsumeQueue consumeQueue = 
getDefaultMessageStore().findConsumeQueue(topic, queueId);
+        for (AppendMessageResult appendMessageResult : appendMessageResults) {
+            long offset = messageStore.getOffsetInQueueByTime(topic, queueId, 
appendMessageResult.getStoreTimestamp() + skewing);
+            long offset2 = messageStore.getOffsetInQueueByTime(topic, queueId, 
appendMessageResult.getStoreTimestamp() - skewing);
+            SelectMappedBufferResult indexBuffer = 
consumeQueue.getIndexBuffer(offset);
+            SelectMappedBufferResult indexBuffer2 = 
consumeQueue.getIndexBuffer(offset2);
+            
assertThat(indexBuffer.getByteBuffer().getLong()).isEqualTo(appendMessageResult.getWroteOffset());
+            
assertThat(indexBuffer.getByteBuffer().getInt()).isEqualTo(appendMessageResult.getWroteBytes());
+            
assertThat(indexBuffer2.getByteBuffer().getLong()).isEqualTo(appendMessageResult.getWroteOffset());
+            
assertThat(indexBuffer2.getByteBuffer().getInt()).isEqualTo(appendMessageResult.getWroteBytes());
+            indexBuffer.release();
+            indexBuffer2.release();
+        }
+    }
+
+    @Test
+    public void 
should_get_min_of_max_consume_queue_offset_when_timestamp_s_skewing_is_large() 
throws InterruptedException {
+        final int totalCount = 10;
+        int queueId = 0;
+        String topic = "FooBar";
+        AppendMessageResult[] appendMessageResults = putMessages(totalCount, 
topic, queueId, true);
+        Thread.sleep(10);
+        int skewing = 20000;
+
+        ConsumeQueue consumeQueue = 
getDefaultMessageStore().findConsumeQueue(topic, queueId);
+        for (AppendMessageResult appendMessageResult : appendMessageResults) {
+            long offset = messageStore.getOffsetInQueueByTime(topic, queueId, 
appendMessageResult.getStoreTimestamp() + skewing);
+            long offset2 = messageStore.getOffsetInQueueByTime(topic, queueId, 
appendMessageResult.getStoreTimestamp() - skewing);
+            SelectMappedBufferResult indexBuffer = 
consumeQueue.getIndexBuffer(offset);
+            SelectMappedBufferResult indexBuffer2 = 
consumeQueue.getIndexBuffer(offset2);
+            
assertThat(indexBuffer.getByteBuffer().getLong()).isEqualTo(appendMessageResults[totalCount
 - 1].getWroteOffset());
+            
assertThat(indexBuffer.getByteBuffer().getInt()).isEqualTo(appendMessageResults[totalCount
 - 1].getWroteBytes());
+            
assertThat(indexBuffer2.getByteBuffer().getLong()).isEqualTo(appendMessageResults[0].getWroteOffset());
+            
assertThat(indexBuffer2.getByteBuffer().getInt()).isEqualTo(appendMessageResults[0].getWroteBytes());
+        }
+    }
+
+    @Test
+    public void should_return_zero_when_consume_queue_not_found() throws 
InterruptedException {
+        final int totalCount = 10;
+        int queueId = 0;
+        int wrongQueueId = 1;
+        String topic = "FooBar";
+        AppendMessageResult[] appendMessageResults = putMessages(totalCount, 
topic, queueId, false);
+        Thread.sleep(10);
+
+        long offset = messageStore.getOffsetInQueueByTime(topic, wrongQueueId, 
appendMessageResults[0].getStoreTimestamp());
+
+        assertThat(offset).isEqualTo(0);
+    }
+
+    @Test
+    public void 
should_return_negative_one_when_invoke_getMessageStoreTimeStamp_if_consume_queue_not_found()
 throws InterruptedException {
+        final int totalCount = 10;
+        int queueId = 0;
+        int wrongQueueId = 1;
+        String topic = "FooBar";
+        putMessages(totalCount, topic, queueId, false);
+        Thread.sleep(10);
+
+        long messageStoreTimeStamp = 
messageStore.getMessageStoreTimeStamp(topic, wrongQueueId, 0);
+
+        assertThat(messageStoreTimeStamp).isEqualTo(-1);
+    }
+
+    @Test
+    public void 
should_return_negative_one_when_invoke_getMessageStoreTimeStamp_if_consumeQueueOffset_not_exist()
 throws InterruptedException {
+        final int totalCount = 10;
+        int queueId = 0;
+        int wrongQueueId = 1;
+        String topic = "FooBar";
+        putMessages(totalCount, topic, queueId, true);
+        Thread.sleep(10);
+
+        long messageStoreTimeStamp = 
messageStore.getMessageStoreTimeStamp(topic, wrongQueueId, -1);
+
+        assertThat(messageStoreTimeStamp).isEqualTo(-1);
+    }
+
+
+    @Test
+    public void 
should_get_message_store_timestamp_successfully_when_incomming_by_topic_queueId_and_consumeQueueOffset()
 throws InterruptedException {
+        final int totalCount = 10;
+        int queueId = 0;
+        String topic = "FooBar";
+        AppendMessageResult[] appendMessageResults = putMessages(totalCount, 
topic, queueId, false);
+        Thread.sleep(10);
+
+        ConsumeQueue consumeQueue = 
getDefaultMessageStore().findConsumeQueue(topic, queueId);
+        int minOffsetInQueue = (int)consumeQueue.getMinOffsetInQueue();
+        for (int i = minOffsetInQueue; i < consumeQueue.getMaxOffsetInQueue(); 
i++) {
+            long messageStoreTimeStamp = 
messageStore.getMessageStoreTimeStamp(topic, queueId, i);
+            
assertThat(messageStoreTimeStamp).isEqualTo(appendMessageResults[i].getStoreTimestamp());
+        }
+    }
+
+    @Test
+    public void 
should_return_negative_one_when_invoke_getStoreTime_if_incomming_param_is_null()
 {
+        long storeTime = getStoreTime(null);
+
+        assertThat(storeTime).isEqualTo(-1);
+    }
+
+    @Test
+    public void 
should_get_store_time_successfully_when_invoke_getStoreTime_if_everything_is_ok()
 throws InterruptedException {
+        final int totalCount = 10;
+        int queueId = 0;
+        String topic = "FooBar";
+        AppendMessageResult[] appendMessageResults = putMessages(totalCount, 
topic, queueId, false);
+        Thread.sleep(10);
+        ConsumeQueue consumeQueue = messageStore.getConsumeQueue(topic, 
queueId);
+
+        for (int i = 0; i < totalCount; i++) {
+            SelectMappedBufferResult indexBuffer = 
consumeQueue.getIndexBuffer(i);
+            long storeTime = getStoreTime(indexBuffer);
+            
assertThat(storeTime).isEqualTo(appendMessageResults[i].getStoreTimestamp());
+            indexBuffer.release();
+        }
+    }
+
+    @Test
+    public void 
should_return_negative_one_when_invoke_getStoreTime_if_phyOffset_is_less_than_commitLog_s_minOffset()
 {
+        long phyOffset = -10;
+        int size = 138;
+        ByteBuffer byteBuffer = ByteBuffer.allocate(100);
+        byteBuffer.putLong(phyOffset);
+        byteBuffer.putInt(size);
+        byteBuffer.flip();
+        MappedFile mappedFile = mock(MappedFile.class);
+        SelectMappedBufferResult result = new SelectMappedBufferResult(0, 
byteBuffer, size, mappedFile);
+
+        long storeTime = getStoreTime(result);
+        result.release();
+
+        assertThat(storeTime).isEqualTo(-1);
+    }
+
+    private DefaultMessageStore getDefaultMessageStore() {
+        return (DefaultMessageStore)this.messageStore;
+    }
+
+    private AppendMessageResult[] putMessages(int totalCount, String topic, 
int queueId) {
+        return putMessages(totalCount, topic, queueId, false);
+    }
+
+    private AppendMessageResult[] putMessages(int totalCount, String topic, 
int queueId, boolean interval) {
+        AppendMessageResult[] appendMessageResultArray = new 
AppendMessageResult[totalCount];
+        for (int i = 0; i < totalCount; i++) {
+            String messageBody = buildMessageBodyByOffset(StoreMessage, i);
+            MessageExtBrokerInner msgInner = 
buildMessage(messageBody.getBytes(), topic);
+            msgInner.setQueueId(queueId);
+            PutMessageResult result = messageStore.putMessage(msgInner);
+            appendMessageResultArray[i] = result.getAppendMessageResult();
+            
assertThat(result.getPutMessageStatus()).isEqualTo(PutMessageStatus.PUT_OK);
+            if (interval) {
+                try {
+                    Thread.sleep(10);
+                } catch (InterruptedException e) {
+                    throw  new RuntimeException("Thread sleep ERROR");
+                }
+            }
+        }
+        return appendMessageResultArray;
+    }
+
+    private long getMaxOffset(AppendMessageResult[] appendMessageResultArray) {
+        if (appendMessageResultArray == null) {
+            return 0;
+        }
+        AppendMessageResult last = 
appendMessageResultArray[appendMessageResultArray.length - 1];
+        return last.getWroteOffset() + last.getWroteBytes();
+    }
+
+    private String buildMessageBodyByOffset(String message, long i) {
+        return String.format("%s offset %d", message, i);
+    }
+
+    private long getStoreTime(SelectMappedBufferResult result) {
+        try {
+            Method getStoreTime = 
getDefaultMessageStore().getClass().getDeclaredMethod("getStoreTime", 
SelectMappedBufferResult.class);
+            getStoreTime.setAccessible(true);
+            return (long)getStoreTime.invoke(getDefaultMessageStore(), result);
+        } catch (NoSuchMethodException | IllegalAccessException | 
InvocationTargetException e) {
+            throw new RuntimeException(e);
+        }
+    }
+
+    private MessageExtBrokerInner buildMessage(byte[] messageBody, String 
topic) {
         MessageExtBrokerInner msg = new MessageExtBrokerInner();
-        msg.setTopic("FooBar");
+        msg.setTopic(topic);
         msg.setTags("TAG1");
         msg.setKeys("Hello");
-        msg.setBody(MessageBody);
+        msg.setBody(messageBody);
         msg.setKeys(String.valueOf(System.currentTimeMillis()));
         msg.setQueueId(Math.abs(QueueId.getAndIncrement()) % QUEUE_TOTAL);
         msg.setSysFlag(0);
@@ -138,6 +403,10 @@ public class DefaultMessageStoreTest {
         return msg;
     }
 
+    private MessageExtBrokerInner buildMessage() {
+        return buildMessage(MessageBody, "FooBar");
+    }
+
     private void verifyThatMasterIsFunctional(long totalMsgs, MessageStore 
master) {
         for (long i = 0; i < totalMsgs; i++) {
             master.putMessage(buildMessage());

Reply via email to