This is an automated email from the ASF dual-hosted git repository.
dinglei pushed a commit to branch develop
in repository https://gitbox.apache.org/repos/asf/rocketmq.git
The following commit(s) were added to refs/heads/develop by this push:
new 1128a5a [RIP-10]Add test cases for DefaultMessageStore which methods
read from CommitLog/ConsumeQueue (#778)
1128a5a is described below
commit 1128a5ad3757906d81fe6f5de710106245091a75
Author: Frank Lin <[email protected]>
AuthorDate: Wed Feb 20 16:25:24 2019 +0800
[RIP-10]Add test cases for DefaultMessageStore which methods read from
CommitLog/ConsumeQueue (#778)
[RIP-10]Add test cases for DefaultMessageStore which methods read from
CommitLog/ConsumeQueue
---
.../rocketmq/store/DefaultMessageStoreTest.java | 283 ++++++++++++++++++++-
1 file changed, 276 insertions(+), 7 deletions(-)
diff --git
a/store/src/test/java/org/apache/rocketmq/store/DefaultMessageStoreTest.java
b/store/src/test/java/org/apache/rocketmq/store/DefaultMessageStoreTest.java
index 57b6999..ad4ca91 100644
--- a/store/src/test/java/org/apache/rocketmq/store/DefaultMessageStoreTest.java
+++ b/store/src/test/java/org/apache/rocketmq/store/DefaultMessageStoreTest.java
@@ -19,28 +19,36 @@ package org.apache.rocketmq.store;
import java.io.File;
import java.io.RandomAccessFile;
+import java.lang.reflect.InvocationTargetException;
+import java.lang.reflect.Method;
import java.net.InetAddress;
import java.net.InetSocketAddress;
import java.net.SocketAddress;
+import java.nio.ByteBuffer;
import java.nio.MappedByteBuffer;
import java.nio.channels.FileChannel;
import java.nio.channels.OverlappingFileLockException;
import java.util.Map;
+import java.util.Random;
import java.util.concurrent.atomic.AtomicInteger;
-
import org.apache.rocketmq.common.BrokerConfig;
import org.apache.rocketmq.common.UtilAll;
+import org.apache.rocketmq.common.message.MessageExt;
import org.apache.rocketmq.store.config.FlushDiskType;
import org.apache.rocketmq.store.config.MessageStoreConfig;
import org.apache.rocketmq.store.config.StorePathConfigHelper;
-import org.junit.After;
import org.apache.rocketmq.store.stats.BrokerStatsManager;
+import org.junit.After;
import org.junit.Before;
import org.junit.Test;
+import org.junit.runner.RunWith;
+import org.mockito.junit.MockitoJUnitRunner;
import static org.assertj.core.api.Assertions.assertThat;
import static org.junit.Assert.assertTrue;
+import static org.mockito.Mockito.mock;
+@RunWith(MockitoJUnitRunner.class)
public class DefaultMessageStoreTest {
private final String StoreMessage = "Once, there was a chance for me!";
private int QUEUE_TOTAL = 100;
@@ -62,7 +70,7 @@ public class DefaultMessageStoreTest {
}
@Test(expected = OverlappingFileLockException.class)
- public void test_repate_restart() throws Exception {
+ public void test_repeat_restart() throws Exception {
QUEUE_TOTAL = 1;
MessageBody = StoreMessage.getBytes();
@@ -86,7 +94,7 @@ public class DefaultMessageStoreTest {
}
@After
- public void destory() {
+ public void destroy() {
messageStore.shutdown();
messageStore.destroy();
@@ -123,12 +131,269 @@ public class DefaultMessageStoreTest {
verifyThatMasterIsFunctional(totalMsgs, messageStore);
}
- private MessageExtBrokerInner buildMessage() {
+ @Test
+ public void should_look_message_successfully_when_offset_is_first() {
+ final int totalCount = 10;
+ int queueId = new Random().nextInt(10);
+ String topic = "FooBar";
+ int firstOffset = 0;
+ AppendMessageResult[] appendMessageResultArray =
putMessages(totalCount, topic, queueId);
+ AppendMessageResult firstResult = appendMessageResultArray[0];
+
+ MessageExt messageExt =
messageStore.lookMessageByOffset(firstResult.getWroteOffset());
+ MessageExt messageExt1 =
getDefaultMessageStore().lookMessageByOffset(firstResult.getWroteOffset(),
firstResult.getWroteBytes());
+
+ assertThat(new
String(messageExt.getBody())).isEqualTo(buildMessageBodyByOffset(StoreMessage,
firstOffset));
+ assertThat(new
String(messageExt1.getBody())).isEqualTo(buildMessageBodyByOffset(StoreMessage,
firstOffset));
+ }
+
+ @Test
+ public void should_look_message_successfully_when_offset_is_last() {
+ final int totalCount = 10;
+ int queueId = new Random().nextInt(10);
+ String topic = "FooBar";
+ AppendMessageResult[] appendMessageResultArray =
putMessages(totalCount, topic, queueId);
+ int lastIndex = totalCount - 1;
+ AppendMessageResult lastResult = appendMessageResultArray[lastIndex];
+
+ MessageExt messageExt =
getDefaultMessageStore().lookMessageByOffset(lastResult.getWroteOffset(),
lastResult.getWroteBytes());
+
+ assertThat(new
String(messageExt.getBody())).isEqualTo(buildMessageBodyByOffset(StoreMessage,
lastIndex));
+ }
+
+ @Test
+ public void
should_look_message_failed_and_return_null_when_offset_is_out_of_bound() {
+ final int totalCount = 10;
+ int queueId = new Random().nextInt(10);
+ String topic = "FooBar";
+ AppendMessageResult[] appendMessageResultArray =
putMessages(totalCount, topic, queueId);
+ long lastOffset = getMaxOffset(appendMessageResultArray);
+
+ MessageExt messageExt =
getDefaultMessageStore().lookMessageByOffset(lastOffset);
+
+ assertThat(messageExt).isNull();
+ }
+
+ @Test
+ public void
should_get_consume_queue_offset_successfully_when_incomming_by_timestamp()
throws InterruptedException {
+ final int totalCount = 10;
+ int queueId = 0;
+ String topic = "FooBar";
+ AppendMessageResult[] appendMessageResults = putMessages(totalCount,
topic, queueId, true);
+ Thread.sleep(10);
+
+ ConsumeQueue consumeQueue =
getDefaultMessageStore().findConsumeQueue(topic, queueId);
+ for (AppendMessageResult appendMessageResult : appendMessageResults) {
+ long offset = messageStore.getOffsetInQueueByTime(topic, queueId,
appendMessageResult.getStoreTimestamp());
+ SelectMappedBufferResult indexBuffer =
consumeQueue.getIndexBuffer(offset);
+
assertThat(indexBuffer.getByteBuffer().getLong()).isEqualTo(appendMessageResult.getWroteOffset());
+
assertThat(indexBuffer.getByteBuffer().getInt()).isEqualTo(appendMessageResult.getWroteBytes());
+ indexBuffer.release();
+ }
+ }
+
+ @Test
+ public void
should_get_consume_queue_offset_successfully_when_timestamp_is_skewing() throws
InterruptedException {
+ final int totalCount = 10;
+ int queueId = 0;
+ String topic = "FooBar";
+ AppendMessageResult[] appendMessageResults = putMessages(totalCount,
topic, queueId, true);
+ Thread.sleep(10);
+ int skewing = 2;
+
+ ConsumeQueue consumeQueue =
getDefaultMessageStore().findConsumeQueue(topic, queueId);
+ for (AppendMessageResult appendMessageResult : appendMessageResults) {
+ long offset = messageStore.getOffsetInQueueByTime(topic, queueId,
appendMessageResult.getStoreTimestamp() + skewing);
+ long offset2 = messageStore.getOffsetInQueueByTime(topic, queueId,
appendMessageResult.getStoreTimestamp() - skewing);
+ SelectMappedBufferResult indexBuffer =
consumeQueue.getIndexBuffer(offset);
+ SelectMappedBufferResult indexBuffer2 =
consumeQueue.getIndexBuffer(offset2);
+
assertThat(indexBuffer.getByteBuffer().getLong()).isEqualTo(appendMessageResult.getWroteOffset());
+
assertThat(indexBuffer.getByteBuffer().getInt()).isEqualTo(appendMessageResult.getWroteBytes());
+
assertThat(indexBuffer2.getByteBuffer().getLong()).isEqualTo(appendMessageResult.getWroteOffset());
+
assertThat(indexBuffer2.getByteBuffer().getInt()).isEqualTo(appendMessageResult.getWroteBytes());
+ indexBuffer.release();
+ indexBuffer2.release();
+ }
+ }
+
+ @Test
+ public void
should_get_min_of_max_consume_queue_offset_when_timestamp_s_skewing_is_large()
throws InterruptedException {
+ final int totalCount = 10;
+ int queueId = 0;
+ String topic = "FooBar";
+ AppendMessageResult[] appendMessageResults = putMessages(totalCount,
topic, queueId, true);
+ Thread.sleep(10);
+ int skewing = 20000;
+
+ ConsumeQueue consumeQueue =
getDefaultMessageStore().findConsumeQueue(topic, queueId);
+ for (AppendMessageResult appendMessageResult : appendMessageResults) {
+ long offset = messageStore.getOffsetInQueueByTime(topic, queueId,
appendMessageResult.getStoreTimestamp() + skewing);
+ long offset2 = messageStore.getOffsetInQueueByTime(topic, queueId,
appendMessageResult.getStoreTimestamp() - skewing);
+ SelectMappedBufferResult indexBuffer =
consumeQueue.getIndexBuffer(offset);
+ SelectMappedBufferResult indexBuffer2 =
consumeQueue.getIndexBuffer(offset2);
+
assertThat(indexBuffer.getByteBuffer().getLong()).isEqualTo(appendMessageResults[totalCount
- 1].getWroteOffset());
+
assertThat(indexBuffer.getByteBuffer().getInt()).isEqualTo(appendMessageResults[totalCount
- 1].getWroteBytes());
+
assertThat(indexBuffer2.getByteBuffer().getLong()).isEqualTo(appendMessageResults[0].getWroteOffset());
+
assertThat(indexBuffer2.getByteBuffer().getInt()).isEqualTo(appendMessageResults[0].getWroteBytes());
+ }
+ }
+
+ @Test
+ public void should_return_zero_when_consume_queue_not_found() throws
InterruptedException {
+ final int totalCount = 10;
+ int queueId = 0;
+ int wrongQueueId = 1;
+ String topic = "FooBar";
+ AppendMessageResult[] appendMessageResults = putMessages(totalCount,
topic, queueId, false);
+ Thread.sleep(10);
+
+ long offset = messageStore.getOffsetInQueueByTime(topic, wrongQueueId,
appendMessageResults[0].getStoreTimestamp());
+
+ assertThat(offset).isEqualTo(0);
+ }
+
+ @Test
+ public void
should_return_negative_one_when_invoke_getMessageStoreTimeStamp_if_consume_queue_not_found()
throws InterruptedException {
+ final int totalCount = 10;
+ int queueId = 0;
+ int wrongQueueId = 1;
+ String topic = "FooBar";
+ putMessages(totalCount, topic, queueId, false);
+ Thread.sleep(10);
+
+ long messageStoreTimeStamp =
messageStore.getMessageStoreTimeStamp(topic, wrongQueueId, 0);
+
+ assertThat(messageStoreTimeStamp).isEqualTo(-1);
+ }
+
+ @Test
+ public void
should_return_negative_one_when_invoke_getMessageStoreTimeStamp_if_consumeQueueOffset_not_exist()
throws InterruptedException {
+ final int totalCount = 10;
+ int queueId = 0;
+ int wrongQueueId = 1;
+ String topic = "FooBar";
+ putMessages(totalCount, topic, queueId, true);
+ Thread.sleep(10);
+
+ long messageStoreTimeStamp =
messageStore.getMessageStoreTimeStamp(topic, wrongQueueId, -1);
+
+ assertThat(messageStoreTimeStamp).isEqualTo(-1);
+ }
+
+
+ @Test
+ public void
should_get_message_store_timestamp_successfully_when_incomming_by_topic_queueId_and_consumeQueueOffset()
throws InterruptedException {
+ final int totalCount = 10;
+ int queueId = 0;
+ String topic = "FooBar";
+ AppendMessageResult[] appendMessageResults = putMessages(totalCount,
topic, queueId, false);
+ Thread.sleep(10);
+
+ ConsumeQueue consumeQueue =
getDefaultMessageStore().findConsumeQueue(topic, queueId);
+ int minOffsetInQueue = (int)consumeQueue.getMinOffsetInQueue();
+ for (int i = minOffsetInQueue; i < consumeQueue.getMaxOffsetInQueue();
i++) {
+ long messageStoreTimeStamp =
messageStore.getMessageStoreTimeStamp(topic, queueId, i);
+
assertThat(messageStoreTimeStamp).isEqualTo(appendMessageResults[i].getStoreTimestamp());
+ }
+ }
+
+ @Test
+ public void
should_return_negative_one_when_invoke_getStoreTime_if_incomming_param_is_null()
{
+ long storeTime = getStoreTime(null);
+
+ assertThat(storeTime).isEqualTo(-1);
+ }
+
+ @Test
+ public void
should_get_store_time_successfully_when_invoke_getStoreTime_if_everything_is_ok()
throws InterruptedException {
+ final int totalCount = 10;
+ int queueId = 0;
+ String topic = "FooBar";
+ AppendMessageResult[] appendMessageResults = putMessages(totalCount,
topic, queueId, false);
+ Thread.sleep(10);
+ ConsumeQueue consumeQueue = messageStore.getConsumeQueue(topic,
queueId);
+
+ for (int i = 0; i < totalCount; i++) {
+ SelectMappedBufferResult indexBuffer =
consumeQueue.getIndexBuffer(i);
+ long storeTime = getStoreTime(indexBuffer);
+
assertThat(storeTime).isEqualTo(appendMessageResults[i].getStoreTimestamp());
+ indexBuffer.release();
+ }
+ }
+
+ @Test
+ public void
should_return_negative_one_when_invoke_getStoreTime_if_phyOffset_is_less_than_commitLog_s_minOffset()
{
+ long phyOffset = -10;
+ int size = 138;
+ ByteBuffer byteBuffer = ByteBuffer.allocate(100);
+ byteBuffer.putLong(phyOffset);
+ byteBuffer.putInt(size);
+ byteBuffer.flip();
+ MappedFile mappedFile = mock(MappedFile.class);
+ SelectMappedBufferResult result = new SelectMappedBufferResult(0,
byteBuffer, size, mappedFile);
+
+ long storeTime = getStoreTime(result);
+ result.release();
+
+ assertThat(storeTime).isEqualTo(-1);
+ }
+
+ private DefaultMessageStore getDefaultMessageStore() {
+ return (DefaultMessageStore)this.messageStore;
+ }
+
+ private AppendMessageResult[] putMessages(int totalCount, String topic,
int queueId) {
+ return putMessages(totalCount, topic, queueId, false);
+ }
+
+ private AppendMessageResult[] putMessages(int totalCount, String topic,
int queueId, boolean interval) {
+ AppendMessageResult[] appendMessageResultArray = new
AppendMessageResult[totalCount];
+ for (int i = 0; i < totalCount; i++) {
+ String messageBody = buildMessageBodyByOffset(StoreMessage, i);
+ MessageExtBrokerInner msgInner =
buildMessage(messageBody.getBytes(), topic);
+ msgInner.setQueueId(queueId);
+ PutMessageResult result = messageStore.putMessage(msgInner);
+ appendMessageResultArray[i] = result.getAppendMessageResult();
+
assertThat(result.getPutMessageStatus()).isEqualTo(PutMessageStatus.PUT_OK);
+ if (interval) {
+ try {
+ Thread.sleep(10);
+ } catch (InterruptedException e) {
+ throw new RuntimeException("Thread sleep ERROR");
+ }
+ }
+ }
+ return appendMessageResultArray;
+ }
+
+ private long getMaxOffset(AppendMessageResult[] appendMessageResultArray) {
+ if (appendMessageResultArray == null) {
+ return 0;
+ }
+ AppendMessageResult last =
appendMessageResultArray[appendMessageResultArray.length - 1];
+ return last.getWroteOffset() + last.getWroteBytes();
+ }
+
+ private String buildMessageBodyByOffset(String message, long i) {
+ return String.format("%s offset %d", message, i);
+ }
+
+ private long getStoreTime(SelectMappedBufferResult result) {
+ try {
+ Method getStoreTime =
getDefaultMessageStore().getClass().getDeclaredMethod("getStoreTime",
SelectMappedBufferResult.class);
+ getStoreTime.setAccessible(true);
+ return (long)getStoreTime.invoke(getDefaultMessageStore(), result);
+ } catch (NoSuchMethodException | IllegalAccessException |
InvocationTargetException e) {
+ throw new RuntimeException(e);
+ }
+ }
+
+ private MessageExtBrokerInner buildMessage(byte[] messageBody, String
topic) {
MessageExtBrokerInner msg = new MessageExtBrokerInner();
- msg.setTopic("FooBar");
+ msg.setTopic(topic);
msg.setTags("TAG1");
msg.setKeys("Hello");
- msg.setBody(MessageBody);
+ msg.setBody(messageBody);
msg.setKeys(String.valueOf(System.currentTimeMillis()));
msg.setQueueId(Math.abs(QueueId.getAndIncrement()) % QUEUE_TOTAL);
msg.setSysFlag(0);
@@ -138,6 +403,10 @@ public class DefaultMessageStoreTest {
return msg;
}
+ private MessageExtBrokerInner buildMessage() {
+ return buildMessage(MessageBody, "FooBar");
+ }
+
private void verifyThatMasterIsFunctional(long totalMsgs, MessageStore
master) {
for (long i = 0; i < totalMsgs; i++) {
master.putMessage(buildMessage());