Repository: incubator-rocketmq Updated Branches: refs/heads/develop 845830865 -> 254d43249
[ROCKETMQ-284] ExpressionMessageFilter will pass some message. Author: vsair <liuxue...@gmail.com> Closes #160 from vsair/ROCKETMQ-284. Project: http://git-wip-us.apache.org/repos/asf/incubator-rocketmq/repo Commit: http://git-wip-us.apache.org/repos/asf/incubator-rocketmq/commit/254d4324 Tree: http://git-wip-us.apache.org/repos/asf/incubator-rocketmq/tree/254d4324 Diff: http://git-wip-us.apache.org/repos/asf/incubator-rocketmq/diff/254d4324 Branch: refs/heads/develop Commit: 254d432496c424717d45b8dc9e44ae3bd78ab466 Parents: 8458308 Author: vsair <liuxue...@gmail.com> Authored: Wed Aug 30 00:14:27 2017 +0800 Committer: vsair <vs...@apache.org> Committed: Wed Aug 30 00:14:27 2017 +0800 ---------------------------------------------------------------------- .../broker/filter/ExpressionMessageFilter.java | 2 +- .../filter/MessageStoreWithFilterTest.java | 85 ++++++++++---------- .../org/apache/rocketmq/store/ConsumeQueue.java | 2 +- .../apache/rocketmq/store/ConsumeQueueExt.java | 2 +- .../rocketmq/store/DefaultMessageStore.java | 40 ++++----- 5 files changed, 67 insertions(+), 64 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/incubator-rocketmq/blob/254d4324/broker/src/main/java/org/apache/rocketmq/broker/filter/ExpressionMessageFilter.java ---------------------------------------------------------------------- diff --git a/broker/src/main/java/org/apache/rocketmq/broker/filter/ExpressionMessageFilter.java b/broker/src/main/java/org/apache/rocketmq/broker/filter/ExpressionMessageFilter.java index 2f94de2..64c28ec 100644 --- a/broker/src/main/java/org/apache/rocketmq/broker/filter/ExpressionMessageFilter.java +++ b/broker/src/main/java/org/apache/rocketmq/broker/filter/ExpressionMessageFilter.java @@ -70,7 +70,7 @@ public class ExpressionMessageFilter implements MessageFilter { // by tags code. if (ExpressionType.isTagType(subscriptionData.getExpressionType())) { - if (tagsCode == null || tagsCode < 0L) { + if (tagsCode == null) { return true; } http://git-wip-us.apache.org/repos/asf/incubator-rocketmq/blob/254d4324/broker/src/test/java/org/apache/rocketmq/broker/filter/MessageStoreWithFilterTest.java ---------------------------------------------------------------------- diff --git a/broker/src/test/java/org/apache/rocketmq/broker/filter/MessageStoreWithFilterTest.java b/broker/src/test/java/org/apache/rocketmq/broker/filter/MessageStoreWithFilterTest.java index 7978942..e544d90 100644 --- a/broker/src/test/java/org/apache/rocketmq/broker/filter/MessageStoreWithFilterTest.java +++ b/broker/src/test/java/org/apache/rocketmq/broker/filter/MessageStoreWithFilterTest.java @@ -24,12 +24,14 @@ import org.apache.rocketmq.common.message.MessageDecoder; import org.apache.rocketmq.common.message.MessageExt; import org.apache.rocketmq.common.protocol.heartbeat.SubscriptionData; import org.apache.rocketmq.store.CommitLogDispatcher; +import org.apache.rocketmq.store.ConsumeQueueExt; import org.apache.rocketmq.store.DefaultMessageStore; import org.apache.rocketmq.store.DispatchRequest; import org.apache.rocketmq.store.GetMessageResult; import org.apache.rocketmq.store.GetMessageStatus; import org.apache.rocketmq.store.MessageArrivingListener; import org.apache.rocketmq.store.MessageExtBrokerInner; +import org.apache.rocketmq.store.MessageFilter; import org.apache.rocketmq.store.PutMessageResult; import org.apache.rocketmq.store.config.MessageStoreConfig; import org.apache.rocketmq.store.stats.BrokerStatsManager; @@ -77,24 +79,17 @@ public class MessageStoreWithFilterTest { try { StoreHost = new InetSocketAddress(InetAddress.getLocalHost(), 8123); } catch (UnknownHostException e) { - e.printStackTrace(); } try { BornHost = new InetSocketAddress(InetAddress.getByName("127.0.0.1"), 0); } catch (UnknownHostException e) { - e.printStackTrace(); } } @Before - public void init() { + public void init() throws Exception { filterManager = ConsumerFilterManagerTest.gen(topicCount, msgPerTopic); - try { - master = gen(filterManager); - } catch (Exception e) { - e.printStackTrace(); - assertThat(true).isFalse(); - } + master = gen(filterManager); } @After @@ -107,7 +102,7 @@ public class MessageStoreWithFilterTest { public MessageExtBrokerInner buildMessage() { MessageExtBrokerInner msg = new MessageExtBrokerInner(); msg.setTopic(topic); - msg.setTags("TAG1"); + msg.setTags(System.currentTimeMillis() + "TAG"); msg.setKeys("Hello"); msg.setBody(msgBody); msg.setKeys(String.valueOf(System.currentTimeMillis())); @@ -125,7 +120,7 @@ public class MessageStoreWithFilterTest { } public MessageStoreConfig buildStoreConfig(int commitLogFileSize, int cqFileSize, - boolean enableCqExt, int cqExtFileSize) { + boolean enableCqExt, int cqExtFileSize) { MessageStoreConfig messageStoreConfig = new MessageStoreConfig(); messageStoreConfig.setMapedFileSizeCommitLog(commitLogFileSize); messageStoreConfig.setMapedFileSizeConsumeQueue(cqFileSize); @@ -155,9 +150,7 @@ public class MessageStoreWithFilterTest { new MessageArrivingListener() { @Override public void arriving(String topic, int queueId, long logicOffset, long tagsCode, - long msgStoreTime, byte[] filterBitMap, Map<String, String> properties) { -// System.out.println(String.format("Msg coming: %s, %d, %d, %d", -// topic, queueId, logicOffset, tagsCode)); + long msgStoreTime, byte[] filterBitMap, Map<String, String> properties) { } } , brokerConfig); @@ -166,8 +159,6 @@ public class MessageStoreWithFilterTest { @Override public void dispatch(DispatchRequest request) { try { -// System.out.println(String.format("offset:%d, bitMap:%s", request.getCommitLogOffset(), -// BitsArray.create(request.getBitMap()).toString())); } catch (Throwable e) { e.printStackTrace(); } @@ -183,7 +174,7 @@ public class MessageStoreWithFilterTest { } protected List<MessageExtBrokerInner> putMsg(DefaultMessageStore master, int topicCount, - int msgCountPerTopic) throws Exception { + int msgCountPerTopic) throws Exception { List<MessageExtBrokerInner> msgs = new ArrayList<MessageExtBrokerInner>(); for (int i = 0; i < topicCount; i++) { String realTopic = topic + i; @@ -229,22 +220,10 @@ public class MessageStoreWithFilterTest { } @Test - public void testGetMessage_withFilterBitMapAndConsumerChanged() { - List<MessageExtBrokerInner> msgs = null; - try { - msgs = putMsg(master, topicCount, msgPerTopic); - } catch (Exception e) { - e.printStackTrace(); - assertThat(true).isFalse(); - } + public void testGetMessage_withFilterBitMapAndConsumerChanged() throws Exception { + List<MessageExtBrokerInner> msgs = putMsg(master, topicCount, msgPerTopic); - // sleep to wait for consume queue has been constructed. - try { - Thread.sleep(200); - } catch (InterruptedException e) { - e.printStackTrace(); - assertThat(true).isFalse(); - } + Thread.sleep(200); // reset consumer; String topic = "topic" + 0; @@ -303,16 +282,10 @@ public class MessageStoreWithFilterTest { } @Test - public void testGetMessage_withFilterBitMap() { - List<MessageExtBrokerInner> msgs = null; - try { - msgs = putMsg(master, topicCount, msgPerTopic); - // sleep to wait for consume queue has been constructed. - Thread.sleep(200); - } catch (Exception e) { - e.printStackTrace(); - assertThat(true).isFalse(); - } + public void testGetMessage_withFilterBitMap() throws Exception { + List<MessageExtBrokerInner> msgs = putMsg(master, topicCount, msgPerTopic); + + Thread.sleep(100); for (int i = 0; i < topicCount; i++) { String realTopic = topic + i; @@ -369,4 +342,32 @@ public class MessageStoreWithFilterTest { } } } + + @Test + public void testGetMessage_withFilter_checkTagsCode() throws Exception { + putMsg(master, topicCount, msgPerTopic); + + Thread.sleep(200); + + for (int i = 0; i < topicCount; i++) { + String realTopic = topic + i; + + GetMessageResult getMessageResult = master.getMessage("test", realTopic, queueId, 0, 10000, + new MessageFilter() { + @Override + public boolean isMatchedByConsumeQueue(Long tagsCode, ConsumeQueueExt.CqExtUnit cqExtUnit) { + if (tagsCode != null && tagsCode <= ConsumeQueueExt.MAX_ADDR) { + return false; + } + return true; + } + + @Override + public boolean isMatchedByCommitLog(ByteBuffer msgBuffer, Map<String, String> properties) { + return true; + } + }); + assertThat(getMessageResult.getMessageCount()).isEqualTo(msgPerTopic); + } + } } http://git-wip-us.apache.org/repos/asf/incubator-rocketmq/blob/254d4324/store/src/main/java/org/apache/rocketmq/store/ConsumeQueue.java ---------------------------------------------------------------------- diff --git a/store/src/main/java/org/apache/rocketmq/store/ConsumeQueue.java b/store/src/main/java/org/apache/rocketmq/store/ConsumeQueue.java index 379162d..0bf0aa9 100644 --- a/store/src/main/java/org/apache/rocketmq/store/ConsumeQueue.java +++ b/store/src/main/java/org/apache/rocketmq/store/ConsumeQueue.java @@ -569,6 +569,6 @@ public class ConsumeQueue { * Check {@code tagsCode} is address of extend file or tags code. */ public boolean isExtAddr(long tagsCode) { - return isExtReadEnable() && this.consumeQueueExt.isExtAddr(tagsCode); + return ConsumeQueueExt.isExtAddr(tagsCode); } } http://git-wip-us.apache.org/repos/asf/incubator-rocketmq/blob/254d4324/store/src/main/java/org/apache/rocketmq/store/ConsumeQueueExt.java ---------------------------------------------------------------------- diff --git a/store/src/main/java/org/apache/rocketmq/store/ConsumeQueueExt.java b/store/src/main/java/org/apache/rocketmq/store/ConsumeQueueExt.java index a118cde..aeb2803 100644 --- a/store/src/main/java/org/apache/rocketmq/store/ConsumeQueueExt.java +++ b/store/src/main/java/org/apache/rocketmq/store/ConsumeQueueExt.java @@ -95,7 +95,7 @@ public class ConsumeQueueExt { * Just test {@code address} is less than 0. * </p> */ - public boolean isExtAddr(final long address) { + public static boolean isExtAddr(final long address) { return address <= MAX_ADDR; } http://git-wip-us.apache.org/repos/asf/incubator-rocketmq/blob/254d4324/store/src/main/java/org/apache/rocketmq/store/DefaultMessageStore.java ---------------------------------------------------------------------- diff --git a/store/src/main/java/org/apache/rocketmq/store/DefaultMessageStore.java b/store/src/main/java/org/apache/rocketmq/store/DefaultMessageStore.java index 95a017a..ffa8dbc 100644 --- a/store/src/main/java/org/apache/rocketmq/store/DefaultMessageStore.java +++ b/store/src/main/java/org/apache/rocketmq/store/DefaultMessageStore.java @@ -16,23 +16,6 @@ */ package org.apache.rocketmq.store; -import java.io.File; -import java.io.IOException; -import java.net.SocketAddress; -import java.nio.ByteBuffer; -import java.util.Collections; -import java.util.HashMap; -import java.util.Iterator; -import java.util.LinkedList; -import java.util.Map; -import java.util.Map.Entry; -import java.util.Set; -import java.util.concurrent.ConcurrentHashMap; -import java.util.concurrent.ConcurrentMap; -import java.util.concurrent.Executors; -import java.util.concurrent.ScheduledExecutorService; -import java.util.concurrent.TimeUnit; -import java.util.concurrent.atomic.AtomicLong; import org.apache.rocketmq.common.BrokerConfig; import org.apache.rocketmq.common.MixAll; import org.apache.rocketmq.common.ServiceThread; @@ -56,6 +39,24 @@ import org.apache.rocketmq.store.stats.BrokerStatsManager; import org.slf4j.Logger; import org.slf4j.LoggerFactory; +import java.io.File; +import java.io.IOException; +import java.net.SocketAddress; +import java.nio.ByteBuffer; +import java.util.Collections; +import java.util.HashMap; +import java.util.Iterator; +import java.util.LinkedList; +import java.util.Map; +import java.util.Map.Entry; +import java.util.Set; +import java.util.concurrent.ConcurrentHashMap; +import java.util.concurrent.ConcurrentMap; +import java.util.concurrent.Executors; +import java.util.concurrent.ScheduledExecutorService; +import java.util.concurrent.TimeUnit; +import java.util.concurrent.atomic.AtomicLong; + import static org.apache.rocketmq.store.config.BrokerRole.SLAVE; public class DefaultMessageStore implements MessageStore { @@ -487,7 +488,7 @@ public class DefaultMessageStore implements MessageStore { break; } - boolean extRet = false; + boolean extRet = false, isTagsCodeLegal = true; if (consumeQueue.isExtAddr(tagsCode)) { extRet = consumeQueue.getExt(tagsCode, cqExtUnit); if (extRet) { @@ -496,11 +497,12 @@ public class DefaultMessageStore implements MessageStore { // can't find ext content.Client will filter messages by tag also. log.error("[BUG] can't find consume queue extend file content!addr={}, offsetPy={}, sizePy={}, topic={}, group={}", tagsCode, offsetPy, sizePy, topic, group); + isTagsCodeLegal = false; } } if (messageFilter != null - && !messageFilter.isMatchedByConsumeQueue(tagsCode, extRet ? cqExtUnit : null)) { + && !messageFilter.isMatchedByConsumeQueue(isTagsCodeLegal ? tagsCode : null, extRet ? cqExtUnit : null)) { if (getResult.getBufferTotalSize() == 0) { status = GetMessageStatus.NO_MATCHED_MESSAGE; }