http://git-wip-us.apache.org/repos/asf/incubator-rocketmq/blob/58f1574b/store/src/main/java/org/apache/rocketmq/store/DispatchRequest.java ---------------------------------------------------------------------- diff --git a/store/src/main/java/org/apache/rocketmq/store/DispatchRequest.java b/store/src/main/java/org/apache/rocketmq/store/DispatchRequest.java index 33529da..3d33eaf 100644 --- a/store/src/main/java/org/apache/rocketmq/store/DispatchRequest.java +++ b/store/src/main/java/org/apache/rocketmq/store/DispatchRequest.java @@ -16,6 +16,8 @@ */ package org.apache.rocketmq.store; +import java.util.Map; + public class DispatchRequest { private final String topic; private final int queueId; @@ -30,6 +32,8 @@ public class DispatchRequest { private final int sysFlag; private final long preparedTransactionOffset; + private final Map<String, String> propertiesMap; + private byte[] bitMap; public DispatchRequest( final String topic, @@ -42,7 +46,8 @@ public class DispatchRequest { final String keys, final String uniqKey, final int sysFlag, - final long preparedTransactionOffset + final long preparedTransactionOffset, + final Map<String, String> propertiesMap ) { this.topic = topic; this.queueId = queueId; @@ -57,6 +62,7 @@ public class DispatchRequest { this.sysFlag = sysFlag; this.preparedTransactionOffset = preparedTransactionOffset; this.success = true; + this.propertiesMap = propertiesMap; } public DispatchRequest(int size) { @@ -81,6 +87,7 @@ public class DispatchRequest { this.sysFlag = 0; this.preparedTransactionOffset = 0; this.success = false; + this.propertiesMap = null; } public DispatchRequest(int size, boolean success) { @@ -105,6 +112,7 @@ public class DispatchRequest { this.sysFlag = 0; this.preparedTransactionOffset = 0; this.success = success; + this.propertiesMap = null; } public String getTopic() { @@ -155,4 +163,15 @@ public class DispatchRequest { return uniqKey; } + public Map<String, String> getPropertiesMap() { + return propertiesMap; + } + + public byte[] getBitMap() { + return bitMap; + } + + public void setBitMap(byte[] bitMap) { + this.bitMap = bitMap; + } }
http://git-wip-us.apache.org/repos/asf/incubator-rocketmq/blob/58f1574b/store/src/main/java/org/apache/rocketmq/store/MappedFile.java ---------------------------------------------------------------------- diff --git a/store/src/main/java/org/apache/rocketmq/store/MappedFile.java b/store/src/main/java/org/apache/rocketmq/store/MappedFile.java index 550e578..a9a00a8 100644 --- a/store/src/main/java/org/apache/rocketmq/store/MappedFile.java +++ b/store/src/main/java/org/apache/rocketmq/store/MappedFile.java @@ -245,6 +245,31 @@ public class MappedFile extends ReferenceResource { } /** + * Content of data from offset to offset + length will be wrote to file. + * + * @param data + * @param offset The offset of the subarray to be used. + * @param length The length of the subarray to be used. + * @return + */ + public boolean appendMessage(final byte[] data, final int offset, final int length) { + int currentPos = this.wrotePosition.get(); + + if ((currentPos + length) <= this.fileSize) { + try { + this.fileChannel.position(currentPos); + this.fileChannel.write(ByteBuffer.wrap(data, offset, length)); + } catch (Throwable e) { + log.error("Error occurred when append message to mappedFile.", e); + } + this.wrotePosition.addAndGet(length); + return true; + } + + return false; + } + + /** * @param flushLeastPages * @return The current flushed position */ http://git-wip-us.apache.org/repos/asf/incubator-rocketmq/blob/58f1574b/store/src/main/java/org/apache/rocketmq/store/MappedFileQueue.java ---------------------------------------------------------------------- diff --git a/store/src/main/java/org/apache/rocketmq/store/MappedFileQueue.java b/store/src/main/java/org/apache/rocketmq/store/MappedFileQueue.java index 5c6c62c..a8fa364 100644 --- a/store/src/main/java/org/apache/rocketmq/store/MappedFileQueue.java +++ b/store/src/main/java/org/apache/rocketmq/store/MappedFileQueue.java @@ -121,7 +121,7 @@ public class MappedFileQueue { this.deleteExpiredFile(willRemoveFiles); } - private void deleteExpiredFile(List<MappedFile> files) { + void deleteExpiredFile(List<MappedFile> files) { if (!files.isEmpty()) { http://git-wip-us.apache.org/repos/asf/incubator-rocketmq/blob/58f1574b/store/src/main/java/org/apache/rocketmq/store/MessageArrivingListener.java ---------------------------------------------------------------------- diff --git a/store/src/main/java/org/apache/rocketmq/store/MessageArrivingListener.java b/store/src/main/java/org/apache/rocketmq/store/MessageArrivingListener.java index 2523c1a..dee1bc7 100644 --- a/store/src/main/java/org/apache/rocketmq/store/MessageArrivingListener.java +++ b/store/src/main/java/org/apache/rocketmq/store/MessageArrivingListener.java @@ -17,6 +17,9 @@ package org.apache.rocketmq.store; +import java.util.Map; + public interface MessageArrivingListener { - void arriving(String topic, int queueId, long logicOffset, long tagsCode); + void arriving(String topic, int queueId, long logicOffset, long tagsCode, + long msgStoreTime, byte[] filterBitMap, Map<String, String> properties); } http://git-wip-us.apache.org/repos/asf/incubator-rocketmq/blob/58f1574b/store/src/main/java/org/apache/rocketmq/store/MessageFilter.java ---------------------------------------------------------------------- diff --git a/store/src/main/java/org/apache/rocketmq/store/MessageFilter.java b/store/src/main/java/org/apache/rocketmq/store/MessageFilter.java index 859ce99..6b34758 100644 --- a/store/src/main/java/org/apache/rocketmq/store/MessageFilter.java +++ b/store/src/main/java/org/apache/rocketmq/store/MessageFilter.java @@ -16,8 +16,30 @@ */ package org.apache.rocketmq.store; -import org.apache.rocketmq.common.protocol.heartbeat.SubscriptionData; +import java.nio.ByteBuffer; +import java.util.Map; public interface MessageFilter { - boolean isMessageMatched(final SubscriptionData subscriptionData, final Long tagsCode); + /** + * match by tags code or filter bit map which is calculated when message received + * and stored in consume queue ext. + * + * @param tagsCode tagsCode + * @param cqExtUnit extend unit of consume queue + * @return + */ + boolean isMatchedByConsumeQueue(final Long tagsCode, + final ConsumeQueueExt.CqExtUnit cqExtUnit); + + /** + * match by message content which are stored in commit log. + * <br>{@code msgBuffer} and {@code properties} are not all null.If invoked in store, + * {@code properties} is null;If invoked in {@code PullRequestHoldService}, {@code msgBuffer} is null. + * + * @param msgBuffer message buffer in commit log, may be null if not invoked in store. + * @param properties message properties, should decode from buffer if null by yourself. + * @return + */ + boolean isMatchedByCommitLog(final ByteBuffer msgBuffer, + final Map<String, String> properties); } http://git-wip-us.apache.org/repos/asf/incubator-rocketmq/blob/58f1574b/store/src/main/java/org/apache/rocketmq/store/MessageStore.java ---------------------------------------------------------------------- diff --git a/store/src/main/java/org/apache/rocketmq/store/MessageStore.java b/store/src/main/java/org/apache/rocketmq/store/MessageStore.java index 65c546b..e841c08 100644 --- a/store/src/main/java/org/apache/rocketmq/store/MessageStore.java +++ b/store/src/main/java/org/apache/rocketmq/store/MessageStore.java @@ -17,10 +17,10 @@ package org.apache.rocketmq.store; import java.util.HashMap; +import java.util.LinkedList; import java.util.Set; import org.apache.rocketmq.common.message.MessageExt; import org.apache.rocketmq.common.message.MessageExtBatch; -import org.apache.rocketmq.common.protocol.heartbeat.SubscriptionData; public interface MessageStore { @@ -37,7 +37,7 @@ public interface MessageStore { PutMessageResult putMessages(final MessageExtBatch messageExtBatch); GetMessageResult getMessage(final String group, final String topic, final int queueId, - final long offset, final int maxMsgNums, final SubscriptionData subscriptionData); + final long offset, final int maxMsgNums, final MessageFilter messageFilter); long getMaxOffsetInQuque(final String topic, final int queueId); @@ -105,4 +105,8 @@ public interface MessageStore { long lockTimeMills(); boolean isTransientStorePoolDeficient(); + + LinkedList<CommitLogDispatcher> getDispatcherList(); + + ConsumeQueue getConsumeQueue(String topic, int queueId); } http://git-wip-us.apache.org/repos/asf/incubator-rocketmq/blob/58f1574b/store/src/main/java/org/apache/rocketmq/store/config/MessageStoreConfig.java ---------------------------------------------------------------------- diff --git a/store/src/main/java/org/apache/rocketmq/store/config/MessageStoreConfig.java b/store/src/main/java/org/apache/rocketmq/store/config/MessageStoreConfig.java index 7ae2ab5..29f800c 100644 --- a/store/src/main/java/org/apache/rocketmq/store/config/MessageStoreConfig.java +++ b/store/src/main/java/org/apache/rocketmq/store/config/MessageStoreConfig.java @@ -34,6 +34,13 @@ public class MessageStoreConfig { private int mapedFileSizeCommitLog = 1024 * 1024 * 1024; // ConsumeQueue file size,default is 30W private int mapedFileSizeConsumeQueue = 300000 * ConsumeQueue.CQ_STORE_UNIT_SIZE; + // enable consume queue ext + private boolean enableConsumeQueueExt = false; + // ConsumeQueue extend file size, 48M + private int mappedFileSizeConsumeQueueExt = 48 * 1024 * 1024; + // Bit count of filter bit map. + // this will be set by pipe of calculate filter bit map. + private int bitMapLengthConsumeQueueExt = 64; // CommitLog flush interval // flush data to disk @@ -191,6 +198,30 @@ public class MessageStoreConfig { this.mapedFileSizeConsumeQueue = mapedFileSizeConsumeQueue; } + public boolean isEnableConsumeQueueExt() { + return enableConsumeQueueExt; + } + + public void setEnableConsumeQueueExt(boolean enableConsumeQueueExt) { + this.enableConsumeQueueExt = enableConsumeQueueExt; + } + + public int getMappedFileSizeConsumeQueueExt() { + return mappedFileSizeConsumeQueueExt; + } + + public void setMappedFileSizeConsumeQueueExt(int mappedFileSizeConsumeQueueExt) { + this.mappedFileSizeConsumeQueueExt = mappedFileSizeConsumeQueueExt; + } + + public int getBitMapLengthConsumeQueueExt() { + return bitMapLengthConsumeQueueExt; + } + + public void setBitMapLengthConsumeQueueExt(int bitMapLengthConsumeQueueExt) { + this.bitMapLengthConsumeQueueExt = bitMapLengthConsumeQueueExt; + } + public int getFlushIntervalCommitLog() { return flushIntervalCommitLog; } http://git-wip-us.apache.org/repos/asf/incubator-rocketmq/blob/58f1574b/store/src/main/java/org/apache/rocketmq/store/config/StorePathConfigHelper.java ---------------------------------------------------------------------- diff --git a/store/src/main/java/org/apache/rocketmq/store/config/StorePathConfigHelper.java b/store/src/main/java/org/apache/rocketmq/store/config/StorePathConfigHelper.java index aebebaf..ef1d670 100644 --- a/store/src/main/java/org/apache/rocketmq/store/config/StorePathConfigHelper.java +++ b/store/src/main/java/org/apache/rocketmq/store/config/StorePathConfigHelper.java @@ -24,6 +24,10 @@ public class StorePathConfigHelper { return rootDir + File.separator + "consumequeue"; } + public static String getStorePathConsumeQueueExt(final String rootDir) { + return rootDir + File.separator + "consumequeue_ext"; + } + public static String getStorePathIndex(final String rootDir) { return rootDir + File.separator + "index"; } http://git-wip-us.apache.org/repos/asf/incubator-rocketmq/blob/58f1574b/store/src/main/java/org/apache/rocketmq/store/schedule/ScheduleMessageService.java ---------------------------------------------------------------------- diff --git a/store/src/main/java/org/apache/rocketmq/store/schedule/ScheduleMessageService.java b/store/src/main/java/org/apache/rocketmq/store/schedule/ScheduleMessageService.java index e08a6f5..d45b994 100644 --- a/store/src/main/java/org/apache/rocketmq/store/schedule/ScheduleMessageService.java +++ b/store/src/main/java/org/apache/rocketmq/store/schedule/ScheduleMessageService.java @@ -32,6 +32,7 @@ import org.apache.rocketmq.common.message.MessageDecoder; import org.apache.rocketmq.common.message.MessageExt; import org.apache.rocketmq.common.running.RunningStats; import org.apache.rocketmq.store.ConsumeQueue; +import org.apache.rocketmq.store.ConsumeQueueExt; import org.apache.rocketmq.store.DefaultMessageStore; import org.apache.rocketmq.store.MessageExtBrokerInner; import org.apache.rocketmq.store.PutMessageResult; @@ -248,11 +249,24 @@ public class ScheduleMessageService extends ConfigManager { try { long nextOffset = offset; int i = 0; + ConsumeQueueExt.CqExtUnit cqExtUnit = new ConsumeQueueExt.CqExtUnit(); for (; i < bufferCQ.getSize(); i += ConsumeQueue.CQ_STORE_UNIT_SIZE) { long offsetPy = bufferCQ.getByteBuffer().getLong(); int sizePy = bufferCQ.getByteBuffer().getInt(); long tagsCode = bufferCQ.getByteBuffer().getLong(); + if (cq.isExtAddr(tagsCode)) { + if (cq.getExt(tagsCode, cqExtUnit)) { + tagsCode = cqExtUnit.getTagsCode(); + } else { + //can't find ext content.So re compute tags code. + log.error("[BUG] can't find consume queue extend file content!addr={}, offsetPy={}, sizePy={}", + tagsCode, offsetPy, sizePy); + long msgStoreTime = defaultMessageStore.getCommitLog().pickupStoreTimestamp(offsetPy, sizePy); + tagsCode = computeDeliverTimestamp(delayLevel, msgStoreTime); + } + } + long now = System.currentTimeMillis(); long deliverTimestamp = this.correctDeliverTimestamp(now, tagsCode); http://git-wip-us.apache.org/repos/asf/incubator-rocketmq/blob/58f1574b/store/src/test/java/org/apache/rocketmq/store/ConsumeQueueExtTest.java ---------------------------------------------------------------------- diff --git a/store/src/test/java/org/apache/rocketmq/store/ConsumeQueueExtTest.java b/store/src/test/java/org/apache/rocketmq/store/ConsumeQueueExtTest.java new file mode 100644 index 0000000..5dbc584 --- /dev/null +++ b/store/src/test/java/org/apache/rocketmq/store/ConsumeQueueExtTest.java @@ -0,0 +1,251 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.rocketmq.store; + +import org.junit.Test; + +import java.io.File; +import java.util.Random; + +import static org.assertj.core.api.Assertions.assertThat; + +public class ConsumeQueueExtTest { + + private static final String topic = "abc"; + private static final int queueId = 0; + private static final String storePath = "." + File.separator + "unit_test_store"; + private static final int bitMapLength = 64; + private static final int unitSizeWithBitMap = ConsumeQueueExt.CqExtUnit.MIN_EXT_UNIT_SIZE + bitMapLength / Byte.SIZE; + private static final int cqExtFileSize = 10 * unitSizeWithBitMap; + private static final int unitCount = 20; + + + protected ConsumeQueueExt genExt() { + return new ConsumeQueueExt( + topic, queueId, storePath, cqExtFileSize, bitMapLength + ); + } + + protected byte[] genBitMap(int bitMapLength) { + byte[] bytes = new byte[bitMapLength / Byte.SIZE]; + + Random random = new Random(System.currentTimeMillis()); + random.nextBytes(bytes); + + return bytes; + } + + protected ConsumeQueueExt.CqExtUnit genUnit(boolean hasBitMap) { + ConsumeQueueExt.CqExtUnit cqExtUnit = new ConsumeQueueExt.CqExtUnit(); + + cqExtUnit.setTagsCode(Math.abs((new Random(System.currentTimeMillis())).nextInt())); + cqExtUnit.setMsgStoreTime(System.currentTimeMillis()); + if (hasBitMap) { + cqExtUnit.setFilterBitMap(genBitMap(bitMapLength)); + } + + return cqExtUnit; + } + + protected void deleteDirectory(String rootPath) { + File file = new File(rootPath); + deleteFile(file); + } + + protected void deleteFile(File file) { + File[] subFiles = file.listFiles(); + if (subFiles != null) { + for (File sub : subFiles) { + deleteFile(sub); + } + } + + file.delete(); + } + + protected void putSth(ConsumeQueueExt consumeQueueExt, boolean getAfterPut, + boolean unitSameSize, int unitCount) { + for (int i = 0; i < unitCount; i++) { + ConsumeQueueExt.CqExtUnit putUnit = + unitSameSize ? genUnit(true) : genUnit(i % 2 == 0); + + long addr = consumeQueueExt.put(putUnit); + assertThat(addr).isLessThan(0); + + if (getAfterPut) { + ConsumeQueueExt.CqExtUnit getUnit = consumeQueueExt.get(addr); + + assertThat(getUnit).isNotNull(); + assertThat(putUnit).isEqualTo(getUnit); + } + + try { + Thread.sleep(10); + } catch (InterruptedException e) { + e.printStackTrace(); + assertThat(false).isTrue(); + } + } + } + + @Test + public void testPut() { + ConsumeQueueExt consumeQueueExt = genExt(); + + try { + putSth(consumeQueueExt, true, false, unitCount); + } finally { + consumeQueueExt.destroy(); + deleteDirectory(storePath); + } + } + + @Test + public void testGet() { + ConsumeQueueExt consumeQueueExt = genExt(); + + putSth(consumeQueueExt, false, false, unitCount); + + try { + // from start. + long addr = consumeQueueExt.decorate(0); + + ConsumeQueueExt.CqExtUnit unit = new ConsumeQueueExt.CqExtUnit(); + while (true) { + boolean ret = consumeQueueExt.get(addr, unit); + + if (!ret) { + break; + } + + assertThat(unit.getSize()).isGreaterThanOrEqualTo(ConsumeQueueExt.CqExtUnit.MIN_EXT_UNIT_SIZE); + + addr += unit.getSize(); + } + } finally { + consumeQueueExt.destroy(); + deleteDirectory(storePath); + } + } + + @Test + public void testGet_invalidAddress() { + ConsumeQueueExt consumeQueueExt = genExt(); + + putSth(consumeQueueExt, false, true, unitCount); + + try { + ConsumeQueueExt.CqExtUnit unit = consumeQueueExt.get(0); + + assertThat(unit).isNull(); + + long addr = (cqExtFileSize / unitSizeWithBitMap) * unitSizeWithBitMap; + addr += unitSizeWithBitMap; + + unit = consumeQueueExt.get(addr); + assertThat(unit).isNull(); + } finally { + consumeQueueExt.destroy(); + deleteDirectory(storePath); + } + } + + @Test + public void testRecovery() { + ConsumeQueueExt putCqExt = genExt(); + + putSth(putCqExt, false, true, unitCount); + + ConsumeQueueExt loadCqExt = genExt(); + + loadCqExt.load(); + + loadCqExt.recover(); + + try { + assertThat(loadCqExt.getMinAddress()).isEqualTo(Long.MIN_VALUE); + + // same unit size. + int countPerFile = (cqExtFileSize - ConsumeQueueExt.END_BLANK_DATA_LENGTH) / unitSizeWithBitMap; + + int lastFileUnitCount = unitCount % countPerFile; + + int fileCount = unitCount / countPerFile + 1; + if (lastFileUnitCount == 0) { + fileCount -= 1; + } + + if (lastFileUnitCount == 0) { + assertThat(loadCqExt.unDecorate(loadCqExt.getMaxAddress()) % cqExtFileSize).isEqualTo(0); + } else { + assertThat(loadCqExt.unDecorate(loadCqExt.getMaxAddress())) + .isEqualTo(lastFileUnitCount * unitSizeWithBitMap + (fileCount - 1) * cqExtFileSize); + } + } finally { + putCqExt.destroy(); + loadCqExt.destroy(); + deleteDirectory(storePath); + } + } + + @Test + public void testTruncateByMinOffset() { + ConsumeQueueExt consumeQueueExt = genExt(); + + putSth(consumeQueueExt, false, true, unitCount * 2); + + try { + // truncate first one file. + long address = consumeQueueExt.decorate((long) (cqExtFileSize * 1.5)); + + long expectMinAddress = consumeQueueExt.decorate(cqExtFileSize); + + consumeQueueExt.truncateByMinAddress(address); + + long minAddress = consumeQueueExt.getMinAddress(); + + assertThat(expectMinAddress).isEqualTo(minAddress); + } finally { + consumeQueueExt.destroy(); + deleteDirectory(storePath); + } + } + + @Test + public void testTruncateByMaxOffset() { + ConsumeQueueExt consumeQueueExt = genExt(); + + putSth(consumeQueueExt, false, true, unitCount * 2); + + try { + // truncate, only first 3 files exist. + long address = consumeQueueExt.decorate(cqExtFileSize * 2 + unitSizeWithBitMap); + + long expectMaxAddress = address + unitSizeWithBitMap; + + consumeQueueExt.truncateByMaxAddress(address); + + long maxAddress = consumeQueueExt.getMaxAddress(); + + assertThat(expectMaxAddress).isEqualTo(maxAddress); + } finally { + consumeQueueExt.destroy(); + deleteDirectory(storePath); + } + } +} http://git-wip-us.apache.org/repos/asf/incubator-rocketmq/blob/58f1574b/store/src/test/java/org/apache/rocketmq/store/ConsumeQueueTest.java ---------------------------------------------------------------------- diff --git a/store/src/test/java/org/apache/rocketmq/store/ConsumeQueueTest.java b/store/src/test/java/org/apache/rocketmq/store/ConsumeQueueTest.java new file mode 100644 index 0000000..9c42fb9 --- /dev/null +++ b/store/src/test/java/org/apache/rocketmq/store/ConsumeQueueTest.java @@ -0,0 +1,226 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.rocketmq.store; + +import org.apache.rocketmq.common.BrokerConfig; +import org.apache.rocketmq.common.message.MessageDecoder; +import org.apache.rocketmq.store.config.MessageStoreConfig; +import org.apache.rocketmq.store.stats.BrokerStatsManager; +import org.junit.Test; + +import java.io.File; +import java.net.InetAddress; +import java.net.InetSocketAddress; +import java.net.SocketAddress; +import java.net.UnknownHostException; +import java.nio.ByteBuffer; +import java.util.Map; + +import static org.assertj.core.api.Assertions.assertThat; + +public class ConsumeQueueTest { + + private static final String msg = "Once, there was a chance for me!"; + private static final byte[] msgBody = msg.getBytes(); + + private static final String topic = "abc"; + private static final int queueId = 0; + private static final String storePath = "." + File.separator + "unit_test_store"; + private static final int commitLogFileSize = 1024 * 8; + private static final int cqFileSize = 10 * 20; + private static final int cqExtFileSize = 10 * (ConsumeQueueExt.CqExtUnit.MIN_EXT_UNIT_SIZE + 64); + + private static SocketAddress BornHost; + + private static SocketAddress StoreHost; + + static { + try { + StoreHost = new InetSocketAddress(InetAddress.getLocalHost(), 8123); + } catch (UnknownHostException e) { + e.printStackTrace(); + } + try { + BornHost = new InetSocketAddress(InetAddress.getByName("127.0.0.1"), 0); + } catch (UnknownHostException e) { + e.printStackTrace(); + } + } + + public MessageExtBrokerInner buildMessage() { + MessageExtBrokerInner msg = new MessageExtBrokerInner(); + msg.setTopic(topic); + msg.setTags("TAG1"); + msg.setKeys("Hello"); + msg.setBody(msgBody); + msg.setKeys(String.valueOf(System.currentTimeMillis())); + msg.setQueueId(queueId); + msg.setSysFlag(0); + msg.setBornTimestamp(System.currentTimeMillis()); + msg.setStoreHost(StoreHost); + msg.setBornHost(BornHost); + for (int i = 0; i < 1; i++) { + msg.putUserProperty(String.valueOf(i), "imagoodperson" + i); + } + msg.setPropertiesString(MessageDecoder.messageProperties2String(msg.getProperties())); + + return msg; + } + + + public MessageStoreConfig buildStoreConfig(int commitLogFileSize, int cqFileSize, + boolean enableCqExt, int cqExtFileSize) { + MessageStoreConfig messageStoreConfig = new MessageStoreConfig(); + messageStoreConfig.setMapedFileSizeCommitLog(commitLogFileSize); + messageStoreConfig.setMapedFileSizeConsumeQueue(cqFileSize); + messageStoreConfig.setMappedFileSizeConsumeQueueExt(cqExtFileSize); + messageStoreConfig.setMessageIndexEnable(false); + messageStoreConfig.setEnableConsumeQueueExt(enableCqExt); + + messageStoreConfig.setStorePathRootDir(storePath); + messageStoreConfig.setStorePathCommitLog(storePath + File.separator + "commitlog"); + + return messageStoreConfig; + } + + protected DefaultMessageStore gen() throws Exception { + MessageStoreConfig messageStoreConfig = buildStoreConfig( + commitLogFileSize, cqFileSize, true, cqExtFileSize + ); + + BrokerConfig brokerConfig = new BrokerConfig(); + + DefaultMessageStore master = new DefaultMessageStore( + messageStoreConfig, + new BrokerStatsManager(brokerConfig.getBrokerClusterName()), + new MessageArrivingListener() { + @Override + public void arriving(String topic, int queueId, long logicOffset, long tagsCode, + long msgStoreTime, byte[] filterBitMap, Map<String, String> properties) { + } + } + , brokerConfig); + + assertThat(master.load()).isTrue(); + + master.start(); + + return master; + } + + protected void putMsg(DefaultMessageStore master) throws Exception { + long totalMsgs = 200; + + for (long i = 0; i < totalMsgs; i++) { + master.putMessage(buildMessage()); + } + } + + protected void deleteDirectory(String rootPath) { + File file = new File(rootPath); + deleteFile(file); + } + + protected void deleteFile(File file) { + File[] subFiles = file.listFiles(); + if (subFiles != null) { + for (File sub : subFiles) { + deleteFile(sub); + } + } + + file.delete(); + } + + @Test + public void testConsumeQueueWithExtendData() { + DefaultMessageStore master = null; + try { + master = gen(); + } catch (Exception e) { + e.printStackTrace(); + assertThat(Boolean.FALSE).isTrue(); + } + + master.getDispatcherList().addFirst(new CommitLogDispatcher() { + + @Override + public void dispatch(DispatchRequest request) { + runCount++; + } + + private int runCount = 0; + }); + + try { + try { + putMsg(master); + // wait build consume queue + Thread.sleep(1000); + } catch (Exception e) { + e.printStackTrace(); + assertThat(Boolean.FALSE).isTrue(); + } + + ConsumeQueue cq = master.getConsumeQueueTable().get(topic).get(queueId); + + assertThat(cq).isNotNull(); + + long index = 0; + + while (index < cq.getMaxOffsetInQueue()) { + SelectMappedBufferResult bufferResult = cq.getIndexBuffer(index); + + assertThat(bufferResult).isNotNull(); + + ByteBuffer buffer = bufferResult.getByteBuffer(); + + assertThat(buffer).isNotNull(); + try { + ConsumeQueueExt.CqExtUnit cqExtUnit = new ConsumeQueueExt.CqExtUnit(); + for (int i = 0; i < bufferResult.getSize(); i += ConsumeQueue.CQ_STORE_UNIT_SIZE) { + long phyOffset = buffer.getLong(); + int size = buffer.getInt(); + long tagsCode = buffer.getLong(); + + assertThat(phyOffset).isGreaterThanOrEqualTo(0); + assertThat(size).isGreaterThan(0); + assertThat(tagsCode).isLessThan(0); + + boolean ret = cq.getExt(tagsCode, cqExtUnit); + + assertThat(ret).isTrue(); + assertThat(cqExtUnit).isNotNull(); + assertThat(cqExtUnit.getSize()).isGreaterThan((short) 0); + assertThat(cqExtUnit.getMsgStoreTime()).isGreaterThan(0); + assertThat(cqExtUnit.getTagsCode()).isGreaterThan(0); + } + + } finally { + bufferResult.release(); + } + + index += cqFileSize / ConsumeQueue.CQ_STORE_UNIT_SIZE; + } + } finally { + master.shutdown(); + master.destroy(); + deleteDirectory(storePath); + } + } +} http://git-wip-us.apache.org/repos/asf/incubator-rocketmq/blob/58f1574b/store/src/test/java/org/apache/rocketmq/store/DefaultMessageStoreTest.java ---------------------------------------------------------------------- diff --git a/store/src/test/java/org/apache/rocketmq/store/DefaultMessageStoreTest.java b/store/src/test/java/org/apache/rocketmq/store/DefaultMessageStoreTest.java index 5c9c46f..75f1de9 100644 --- a/store/src/test/java/org/apache/rocketmq/store/DefaultMessageStoreTest.java +++ b/store/src/test/java/org/apache/rocketmq/store/DefaultMessageStoreTest.java @@ -20,6 +20,7 @@ package org.apache.rocketmq.store; import java.net.InetAddress; import java.net.InetSocketAddress; import java.net.SocketAddress; +import java.util.Map; import java.util.concurrent.atomic.AtomicInteger; import org.apache.rocketmq.common.BrokerConfig; import org.apache.rocketmq.store.config.FlushDiskType; @@ -124,7 +125,8 @@ public class DefaultMessageStoreTest { private class MyMessageArrivingListener implements MessageArrivingListener { @Override - public void arriving(String topic, int queueId, long logicOffset, long tagsCode) { + public void arriving(String topic, int queueId, long logicOffset, long tagsCode, long msgStoreTime, + byte[] filterBitMap, Map<String, String> properties) { } } } http://git-wip-us.apache.org/repos/asf/incubator-rocketmq/blob/58f1574b/tools/src/main/java/org/apache/rocketmq/tools/admin/DefaultMQAdminExt.java ---------------------------------------------------------------------- diff --git a/tools/src/main/java/org/apache/rocketmq/tools/admin/DefaultMQAdminExt.java b/tools/src/main/java/org/apache/rocketmq/tools/admin/DefaultMQAdminExt.java index 19bff89..409ea33 100644 --- a/tools/src/main/java/org/apache/rocketmq/tools/admin/DefaultMQAdminExt.java +++ b/tools/src/main/java/org/apache/rocketmq/tools/admin/DefaultMQAdminExt.java @@ -41,6 +41,7 @@ import org.apache.rocketmq.common.protocol.body.ConsumerRunningInfo; import org.apache.rocketmq.common.protocol.body.GroupList; import org.apache.rocketmq.common.protocol.body.KVTable; import org.apache.rocketmq.common.protocol.body.ProducerConnection; +import org.apache.rocketmq.common.protocol.body.QueryConsumeQueueResponseBody; import org.apache.rocketmq.common.protocol.body.QueueTimeSpan; import org.apache.rocketmq.common.protocol.body.SubscriptionGroupWrapper; import org.apache.rocketmq.common.protocol.body.TopicConfigSerializeWrapper; @@ -469,4 +470,12 @@ public class DefaultMQAdminExt extends ClientConfig implements MQAdminExt { UnsupportedEncodingException { return this.defaultMQAdminExtImpl.getNameServerConfig(nameServers); } + + @Override + public QueryConsumeQueueResponseBody queryConsumeQueue(String brokerAddr, String topic, int queueId, long index, int count, String consumerGroup) + throws InterruptedException, RemotingTimeoutException, RemotingSendRequestException, RemotingConnectException, MQClientException { + return this.defaultMQAdminExtImpl.queryConsumeQueue( + brokerAddr, topic, queueId, index, count, consumerGroup + ); + } } http://git-wip-us.apache.org/repos/asf/incubator-rocketmq/blob/58f1574b/tools/src/main/java/org/apache/rocketmq/tools/admin/DefaultMQAdminExtImpl.java ---------------------------------------------------------------------- diff --git a/tools/src/main/java/org/apache/rocketmq/tools/admin/DefaultMQAdminExtImpl.java b/tools/src/main/java/org/apache/rocketmq/tools/admin/DefaultMQAdminExtImpl.java index a31b69d..157ae21 100644 --- a/tools/src/main/java/org/apache/rocketmq/tools/admin/DefaultMQAdminExtImpl.java +++ b/tools/src/main/java/org/apache/rocketmq/tools/admin/DefaultMQAdminExtImpl.java @@ -63,6 +63,7 @@ import org.apache.rocketmq.common.protocol.body.ConsumerRunningInfo; import org.apache.rocketmq.common.protocol.body.GroupList; import org.apache.rocketmq.common.protocol.body.KVTable; import org.apache.rocketmq.common.protocol.body.ProducerConnection; +import org.apache.rocketmq.common.protocol.body.QueryConsumeQueueResponseBody; import org.apache.rocketmq.common.protocol.body.QueueTimeSpan; import org.apache.rocketmq.common.protocol.body.SubscriptionGroupWrapper; import org.apache.rocketmq.common.protocol.body.TopicConfigSerializeWrapper; @@ -955,4 +956,11 @@ public class DefaultMQAdminExtImpl implements MQAdminExt, MQAdminExtInner { return this.mqClientInstance.getMQClientAPIImpl().getNameServerConfig(nameServers, timeoutMillis); } + @Override + public QueryConsumeQueueResponseBody queryConsumeQueue(String brokerAddr, String topic, int queueId, long index, int count, String consumerGroup) + throws InterruptedException, RemotingTimeoutException, RemotingSendRequestException, RemotingConnectException, MQClientException { + return this.mqClientInstance.getMQClientAPIImpl().queryConsumeQueue( + brokerAddr, topic, queueId, index, count, consumerGroup, timeoutMillis + ); + } } http://git-wip-us.apache.org/repos/asf/incubator-rocketmq/blob/58f1574b/tools/src/main/java/org/apache/rocketmq/tools/admin/MQAdminExt.java ---------------------------------------------------------------------- diff --git a/tools/src/main/java/org/apache/rocketmq/tools/admin/MQAdminExt.java b/tools/src/main/java/org/apache/rocketmq/tools/admin/MQAdminExt.java index 493cf54..82add92 100644 --- a/tools/src/main/java/org/apache/rocketmq/tools/admin/MQAdminExt.java +++ b/tools/src/main/java/org/apache/rocketmq/tools/admin/MQAdminExt.java @@ -39,6 +39,7 @@ import org.apache.rocketmq.common.protocol.body.ConsumerRunningInfo; import org.apache.rocketmq.common.protocol.body.GroupList; import org.apache.rocketmq.common.protocol.body.KVTable; import org.apache.rocketmq.common.protocol.body.ProducerConnection; +import org.apache.rocketmq.common.protocol.body.QueryConsumeQueueResponseBody; import org.apache.rocketmq.common.protocol.body.QueueTimeSpan; import org.apache.rocketmq.common.protocol.body.SubscriptionGroupWrapper; import org.apache.rocketmq.common.protocol.body.TopicConfigSerializeWrapper; @@ -241,4 +242,25 @@ public interface MQAdminExt extends MQAdmin { Map<String, Properties> getNameServerConfig(final List<String> nameServers) throws InterruptedException, RemotingTimeoutException, RemotingSendRequestException, RemotingConnectException, MQClientException, UnsupportedEncodingException; + + /** + * query consume queue data + * + * @param brokerAddr broker ip address + * @param topic topic + * @param queueId id of queue + * @param index start offset + * @param count how many + * @param consumerGroup group + * @return + * @throws InterruptedException + * @throws RemotingTimeoutException + * @throws RemotingSendRequestException + * @throws RemotingConnectException + * @throws MQClientException + */ + QueryConsumeQueueResponseBody queryConsumeQueue(final String brokerAddr, + final String topic, final int queueId, + final long index, final int count, final String consumerGroup) + throws InterruptedException, RemotingTimeoutException, RemotingSendRequestException, RemotingConnectException, MQClientException; } http://git-wip-us.apache.org/repos/asf/incubator-rocketmq/blob/58f1574b/tools/src/main/java/org/apache/rocketmq/tools/command/MQAdminStartup.java ---------------------------------------------------------------------- diff --git a/tools/src/main/java/org/apache/rocketmq/tools/command/MQAdminStartup.java b/tools/src/main/java/org/apache/rocketmq/tools/command/MQAdminStartup.java index 9bd37e8..6398291 100644 --- a/tools/src/main/java/org/apache/rocketmq/tools/command/MQAdminStartup.java +++ b/tools/src/main/java/org/apache/rocketmq/tools/command/MQAdminStartup.java @@ -59,6 +59,7 @@ import org.apache.rocketmq.tools.command.namesrv.UpdateNamesrvConfigCommand; import org.apache.rocketmq.tools.command.namesrv.WipeWritePermSubCommand; import org.apache.rocketmq.tools.command.offset.CloneGroupOffsetCommand; import org.apache.rocketmq.tools.command.offset.ResetOffsetByTimeCommand; +import org.apache.rocketmq.tools.command.queue.QueryConsumeQueueCommand; import org.apache.rocketmq.tools.command.stats.StatsAllSubCommand; import org.apache.rocketmq.tools.command.topic.AllocateMQSubCommand; import org.apache.rocketmq.tools.command.topic.DeleteTopicSubCommand; @@ -189,6 +190,8 @@ public class MQAdminStartup { initCommand(new GetNamesrvConfigCommand()); initCommand(new UpdateNamesrvConfigCommand()); initCommand(new GetBrokerConfigCommand()); + + initCommand(new QueryConsumeQueueCommand()); } private static void initLogback() throws JoranException { http://git-wip-us.apache.org/repos/asf/incubator-rocketmq/blob/58f1574b/tools/src/main/java/org/apache/rocketmq/tools/command/queue/QueryConsumeQueueCommand.java ---------------------------------------------------------------------- diff --git a/tools/src/main/java/org/apache/rocketmq/tools/command/queue/QueryConsumeQueueCommand.java b/tools/src/main/java/org/apache/rocketmq/tools/command/queue/QueryConsumeQueueCommand.java new file mode 100644 index 0000000..611addd --- /dev/null +++ b/tools/src/main/java/org/apache/rocketmq/tools/command/queue/QueryConsumeQueueCommand.java @@ -0,0 +1,159 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.rocketmq.tools.command.queue; + +import com.alibaba.fastjson.JSON; +import org.apache.commons.cli.CommandLine; +import org.apache.commons.cli.Option; +import org.apache.commons.cli.Options; +import org.apache.commons.cli.PosixParser; +import org.apache.rocketmq.common.protocol.body.ConsumeQueueData; +import org.apache.rocketmq.common.protocol.body.QueryConsumeQueueResponseBody; +import org.apache.rocketmq.common.protocol.route.TopicRouteData; +import org.apache.rocketmq.remoting.RPCHook; +import org.apache.rocketmq.srvutil.ServerUtil; +import org.apache.rocketmq.tools.admin.DefaultMQAdminExt; +import org.apache.rocketmq.tools.command.SubCommand; + +public class QueryConsumeQueueCommand implements SubCommand { + + public static void main(String[] args) { + QueryConsumeQueueCommand cmd = new QueryConsumeQueueCommand(); + + Options options = ServerUtil.buildCommandlineOptions(new Options()); + String[] subargs = new String[]{"-t TopicTest", "-q 0", "-i 6447", "-b 100.81.165.119:10911"}; + final CommandLine commandLine = + ServerUtil.parseCmdLine("mqadmin " + cmd.commandName(), subargs, cmd.buildCommandlineOptions(options), + new PosixParser()); + cmd.execute(commandLine, options, null); + } + + @Override + public String commandName() { + return "queryCq"; + } + + @Override + public String commandDesc() { + return "Query cq command."; + } + + @Override + public Options buildCommandlineOptions(Options options) { + Option opt = new Option("t", "topic", true, "topic name"); + opt.setRequired(true); + options.addOption(opt); + + opt = new Option("q", "queue", true, "queue num, ie. 1"); + opt.setRequired(true); + options.addOption(opt); + + opt = new Option("i", "index", true, "start queue index."); + opt.setRequired(true); + options.addOption(opt); + + opt = new Option("c", "count", true, "how many."); + opt.setRequired(false); + options.addOption(opt); + + opt = new Option("b", "broker", true, "broker addr."); + opt.setRequired(false); + options.addOption(opt); + + opt = new Option("g", "consumer", true, "consumer group."); + opt.setRequired(false); + options.addOption(opt); + + return options; + } + + @Override + public void execute(CommandLine commandLine, Options options, RPCHook rpcHook) { + DefaultMQAdminExt defaultMQAdminExt = new DefaultMQAdminExt(rpcHook); + + defaultMQAdminExt.setInstanceName(Long.toString(System.currentTimeMillis())); + + try { + defaultMQAdminExt.start(); + + String topic = commandLine.getOptionValue("t").trim(); + int queueId = Integer.valueOf(commandLine.getOptionValue("q").trim()); + long index = Long.valueOf(commandLine.getOptionValue("i").trim()); + int count = Integer.valueOf(commandLine.getOptionValue("c", "10").trim()); + String broker = null; + if (commandLine.hasOption("b")) { + broker = commandLine.getOptionValue("b").trim(); + } + String consumerGroup = null; + if (commandLine.hasOption("g")) { + consumerGroup = commandLine.getOptionValue("g").trim(); + } + + if (broker == null || broker == "") { + TopicRouteData topicRouteData = defaultMQAdminExt.examineTopicRouteInfo(topic); + + if (topicRouteData == null || topicRouteData.getBrokerDatas() == null + || topicRouteData.getBrokerDatas().isEmpty()) { + throw new Exception("No topic route data!"); + } + + broker = topicRouteData.getBrokerDatas().get(0).getBrokerAddrs().get(0L); + } + + QueryConsumeQueueResponseBody queryConsumeQueueResponseBody = defaultMQAdminExt.queryConsumeQueue( + broker, topic, queueId, index, count, consumerGroup + ); + + if (queryConsumeQueueResponseBody.getSubscriptionData() != null) { + System.out.printf("Subscription data: \n%s\n", JSON.toJSONString(queryConsumeQueueResponseBody.getSubscriptionData(), true)); + System.out.print("======================================\n"); + } + + if (queryConsumeQueueResponseBody.getFilterData() != null) { + System.out.printf("Filter data: \n%s\n", queryConsumeQueueResponseBody.getFilterData()); + System.out.print("======================================\n"); + } + + System.out.printf("Queue data: \nmax: %d, min: %d\n", queryConsumeQueueResponseBody.getMaxQueueIndex(), + queryConsumeQueueResponseBody.getMinQueueIndex()); + System.out.print("======================================\n"); + + if (queryConsumeQueueResponseBody.getQueueData() != null) { + + long i = index; + for (ConsumeQueueData queueData : queryConsumeQueueResponseBody.getQueueData()) { + StringBuilder stringBuilder = new StringBuilder(); + stringBuilder.append("idx: " + i + "\n"); + + stringBuilder.append(queueData.toString() + "\n"); + + stringBuilder.append("======================================\n"); + + System.out.print(stringBuilder.toString()); + i++; + } + + } + + } catch (Exception e) { + e.printStackTrace(); + } finally { + defaultMQAdminExt.shutdown(); + } + } +}