[ROCKETMQ-121]Support message filtering based on SQL92 closes apache/incubator-rocketmq#82
Project: http://git-wip-us.apache.org/repos/asf/incubator-rocketmq/repo Commit: http://git-wip-us.apache.org/repos/asf/incubator-rocketmq/commit/58f1574b Tree: http://git-wip-us.apache.org/repos/asf/incubator-rocketmq/tree/58f1574b Diff: http://git-wip-us.apache.org/repos/asf/incubator-rocketmq/diff/58f1574b Branch: refs/heads/release-4.1.0-incubating Commit: 58f1574b28bf8bf18a795036545c7a700437ed0b Parents: 42f78c2 Author: vsair <liuxue...@gmail.com> Authored: Fri Apr 21 18:17:58 2017 +0800 Committer: dongeforever <zhendongli...@yeah.net> Committed: Fri Apr 21 18:17:58 2017 +0800 ---------------------------------------------------------------------- broker/pom.xml | 4 + .../rocketmq/broker/BrokerController.java | 27 + .../rocketmq/broker/BrokerPathConfigHelper.java | 3 + .../broker/client/ConsumerGroupEvent.java | 33 + .../client/ConsumerIdsChangeListener.java | 6 +- .../rocketmq/broker/client/ConsumerManager.java | 11 +- .../DefaultConsumerIdsChangeListener.java | 37 +- .../filter/CommitLogDispatcherCalcBitMap.java | 110 ++ .../broker/filter/ConsumerFilterData.java | 151 ++ .../broker/filter/ConsumerFilterManager.java | 471 ++++++ .../filter/ExpressionForRetryMessageFilter.java | 97 ++ .../broker/filter/ExpressionMessageFilter.java | 162 +++ .../broker/filter/MessageEvaluationContext.java | 58 + .../NotifyMessageArrivingListener.java | 8 +- .../broker/longpolling/PullRequest.java | 10 +- .../longpolling/PullRequestHoldService.java | 19 +- .../rocketmq/broker/out/BrokerOuterAPI.java | 2 +- .../plugin/AbstractPluginMessageStore.java | 18 +- .../broker/processor/AdminBrokerProcessor.java | 91 ++ .../broker/processor/ClientManageProcessor.java | 44 + .../broker/processor/PullMessageProcessor.java | 59 +- .../CommitLogDispatcherCalcBitMapTest.java | 192 +++ .../filter/ConsumerFilterManagerTest.java | 291 ++++ .../filter/MessageStoreWithFilterTest.java | 392 +++++ .../processor/PullMessageProcessorTest.java | 9 +- .../client/consumer/DefaultMQPushConsumer.java | 15 + .../client/consumer/MQPushConsumer.java | 21 + .../client/consumer/MessageSelector.java | 77 + .../rocketmq/client/impl/FindBrokerResult.java | 12 + .../rocketmq/client/impl/MQClientAPIImpl.java | 57 +- .../consumer/DefaultMQPushConsumerImpl.java | 40 +- .../client/impl/consumer/PullAPIWrapper.java | 40 + .../client/impl/factory/MQClientInstance.java | 60 +- .../apache/rocketmq/common/BrokerConfig.java | 67 + .../java/org/apache/rocketmq/common/MixAll.java | 14 +- .../rocketmq/common/constant/LoggerName.java | 1 + .../rocketmq/common/filter/ExpressionType.java | 67 + .../rocketmq/common/filter/FilterAPI.java | 18 + .../apache/rocketmq/common/message/Message.java | 6 + .../rocketmq/common/message/MessageDecoder.java | 39 + .../rocketmq/common/namesrv/TopAddressing.java | 2 +- .../rocketmq/common/protocol/RequestCode.java | 4 + .../rocketmq/common/protocol/ResponseCode.java | 4 + .../protocol/body/CheckClientRequestBody.java | 52 + .../common/protocol/body/ConsumeQueueData.java | 98 ++ .../body/QueryConsumeQueueResponseBody.java | 72 + .../header/PullMessageRequestHeader.java | 9 + .../header/QueryConsumeQueueRequestHeader.java | 75 + .../protocol/heartbeat/SubscriptionData.java | 17 +- .../rocketmq/common/filter/FilterAPITest.java | 49 + .../common/message/MessageDecoderTest.java | 80 ++ distribution/conf/logback_broker.xml | 28 + distribution/release.xml | 1 + .../rocketmq/example/benchmark/Consumer.java | 31 +- .../rocketmq/example/benchmark/Producer.java | 34 +- .../rocketmq/example/filter/SqlConsumer.java | 62 + .../rocketmq/example/filter/SqlProducer.java | 67 + filter/pom.xml | 43 + .../apache/rocketmq/filter/FilterFactory.java | 72 + .../org/apache/rocketmq/filter/FilterSpi.java | 43 + .../org/apache/rocketmq/filter/SqlFilter.java | 43 + .../rocketmq/filter/constant/UnaryType.java | 26 + .../filter/expression/BinaryExpression.java | 91 ++ .../filter/expression/BooleanExpression.java | 39 + .../filter/expression/ComparisonExpression.java | 413 ++++++ .../filter/expression/ConstantExpression.java | 156 ++ .../expression/EmptyEvaluationContext.java | 35 + .../filter/expression/EvaluationContext.java | 43 + .../rocketmq/filter/expression/Expression.java | 38 + .../filter/expression/LogicExpression.java | 94 ++ .../filter/expression/MQFilterException.java | 46 + .../filter/expression/NowExpression.java | 36 + .../filter/expression/PropertyExpression.java | 70 + .../filter/expression/UnaryExpression.java | 267 ++++ .../filter/expression/UnaryInExpression.java | 61 + .../rocketmq/filter/parser/ParseException.java | 204 +++ .../rocketmq/filter/parser/SelectorParser.java | 1354 ++++++++++++++++++ .../rocketmq/filter/parser/SelectorParser.jj | 524 +++++++ .../filter/parser/SelectorParserConstants.java | 140 ++ .../parser/SelectorParserTokenManager.java | 919 ++++++++++++ .../filter/parser/SimpleCharStream.java | 502 +++++++ .../apache/rocketmq/filter/parser/Token.java | 152 ++ .../rocketmq/filter/parser/TokenMgrError.java | 174 +++ .../apache/rocketmq/filter/util/BitsArray.java | 260 ++++ .../rocketmq/filter/util/BloomFilter.java | 338 +++++ .../rocketmq/filter/util/BloomFilterData.java | 83 ++ .../apache/rocketmq/filter/BitsArrayTest.java | 123 ++ .../apache/rocketmq/filter/BloomFilterTest.java | 172 +++ .../apache/rocketmq/filter/ExpressionTest.java | 594 ++++++++ .../apache/rocketmq/filter/FilterSpiTest.java | 84 ++ .../org/apache/rocketmq/filter/ParserTest.java | 129 ++ pom.xml | 11 + srvutil/pom.xml | 4 + .../org/apache/rocketmq/store/CommitLog.java | 8 +- .../rocketmq/store/CommitLogDispatcher.java | 26 + .../org/apache/rocketmq/store/ConsumeQueue.java | 122 +- .../apache/rocketmq/store/ConsumeQueueExt.java | 638 +++++++++ .../rocketmq/store/DefaultMessageFilter.java | 29 +- .../rocketmq/store/DefaultMessageStore.java | 132 +- .../apache/rocketmq/store/DispatchRequest.java | 21 +- .../org/apache/rocketmq/store/MappedFile.java | 25 + .../apache/rocketmq/store/MappedFileQueue.java | 2 +- .../rocketmq/store/MessageArrivingListener.java | 5 +- .../apache/rocketmq/store/MessageFilter.java | 26 +- .../org/apache/rocketmq/store/MessageStore.java | 8 +- .../store/config/MessageStoreConfig.java | 31 + .../store/config/StorePathConfigHelper.java | 4 + .../store/schedule/ScheduleMessageService.java | 14 + .../rocketmq/store/ConsumeQueueExtTest.java | 251 ++++ .../apache/rocketmq/store/ConsumeQueueTest.java | 226 +++ .../rocketmq/store/DefaultMessageStoreTest.java | 4 +- .../rocketmq/tools/admin/DefaultMQAdminExt.java | 9 + .../tools/admin/DefaultMQAdminExtImpl.java | 8 + .../apache/rocketmq/tools/admin/MQAdminExt.java | 22 + .../rocketmq/tools/command/MQAdminStartup.java | 3 + .../command/queue/QueryConsumeQueueCommand.java | 159 ++ 116 files changed, 12552 insertions(+), 128 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/incubator-rocketmq/blob/58f1574b/broker/pom.xml ---------------------------------------------------------------------- diff --git a/broker/pom.xml b/broker/pom.xml index 8cdafea..0f8ad0a 100644 --- a/broker/pom.xml +++ b/broker/pom.xml @@ -49,6 +49,10 @@ <artifactId>rocketmq-srvutil</artifactId> </dependency> <dependency> + <groupId>${project.groupId}</groupId> + <artifactId>rocketmq-filter</artifactId> + </dependency> + <dependency> <groupId>ch.qos.logback</groupId> <artifactId>logback-classic</artifactId> </dependency> http://git-wip-us.apache.org/repos/asf/incubator-rocketmq/blob/58f1574b/broker/src/main/java/org/apache/rocketmq/broker/BrokerController.java ---------------------------------------------------------------------- diff --git a/broker/src/main/java/org/apache/rocketmq/broker/BrokerController.java b/broker/src/main/java/org/apache/rocketmq/broker/BrokerController.java index 6acd40c..bacd25c 100644 --- a/broker/src/main/java/org/apache/rocketmq/broker/BrokerController.java +++ b/broker/src/main/java/org/apache/rocketmq/broker/BrokerController.java @@ -37,6 +37,8 @@ import org.apache.rocketmq.broker.client.DefaultConsumerIdsChangeListener; import org.apache.rocketmq.broker.client.ProducerManager; import org.apache.rocketmq.broker.client.net.Broker2Client; import org.apache.rocketmq.broker.client.rebalance.RebalanceLockManager; +import org.apache.rocketmq.broker.filter.CommitLogDispatcherCalcBitMap; +import org.apache.rocketmq.broker.filter.ConsumerFilterManager; import org.apache.rocketmq.broker.filtersrv.FilterServerManager; import org.apache.rocketmq.broker.latency.BrokerFastFailure; import org.apache.rocketmq.broker.latency.BrokerFixedThreadPoolExecutor; @@ -96,6 +98,7 @@ public class BrokerController { private final MessageStoreConfig messageStoreConfig; private final ConsumerOffsetManager consumerOffsetManager; private final ConsumerManager consumerManager; + private final ConsumerFilterManager consumerFilterManager; private final ProducerManager producerManager; private final ClientHousekeepingService clientHousekeepingService; private final PullMessageProcessor pullMessageProcessor; @@ -149,6 +152,7 @@ public class BrokerController { this.messageArrivingListener = new NotifyMessageArrivingListener(this.pullRequestHoldService); this.consumerIdsChangeListener = new DefaultConsumerIdsChangeListener(this); this.consumerManager = new ConsumerManager(this.consumerIdsChangeListener); + this.consumerFilterManager = new ConsumerFilterManager(this); this.producerManager = new ProducerManager(); this.clientHousekeepingService = new ClientHousekeepingService(this); this.broker2Client = new Broker2Client(this); @@ -192,6 +196,7 @@ public class BrokerController { result = result && this.consumerOffsetManager.load(); result = result && this.subscriptionGroupManager.load(); + result = result && this.consumerFilterManager.load(); if (result) { try { @@ -202,6 +207,7 @@ public class BrokerController { //load plugin MessageStorePluginContext context = new MessageStorePluginContext(messageStoreConfig, brokerStatsManager, messageArrivingListener, brokerConfig); this.messageStore = MessageStoreFactory.build(context, this.messageStore); + this.messageStore.getDispatcherList().addFirst(new CommitLogDispatcherCalcBitMap(this.brokerConfig, this.consumerFilterManager)); } catch (IOException e) { result = false; e.printStackTrace(); @@ -278,6 +284,17 @@ public class BrokerController { @Override public void run() { try { + BrokerController.this.consumerFilterManager.persist(); + } catch (Throwable e) { + log.error("schedule persist consumer filter error.", e); + } + } + }, 1000 * 10, 1000 * 10, TimeUnit.MILLISECONDS); + + this.scheduledExecutorService.scheduleAtFixedRate(new Runnable() { + @Override + public void run() { + try { BrokerController.this.protectBroker(); } catch (Exception e) { log.error("protectBroker error.", e); @@ -400,9 +417,11 @@ public class BrokerController { ClientManageProcessor clientProcessor = new ClientManageProcessor(this); this.remotingServer.registerProcessor(RequestCode.HEART_BEAT, clientProcessor, this.clientManageExecutor); this.remotingServer.registerProcessor(RequestCode.UNREGISTER_CLIENT, clientProcessor, this.clientManageExecutor); + this.remotingServer.registerProcessor(RequestCode.CHECK_CLIENT_CONFIG, clientProcessor, this.clientManageExecutor); this.fastRemotingServer.registerProcessor(RequestCode.HEART_BEAT, clientProcessor, this.clientManageExecutor); this.fastRemotingServer.registerProcessor(RequestCode.UNREGISTER_CLIENT, clientProcessor, this.clientManageExecutor); + this.remotingServer.registerProcessor(RequestCode.CHECK_CLIENT_CONFIG, clientProcessor, this.clientManageExecutor); /** * ConsumerManageProcessor @@ -504,6 +523,10 @@ public class BrokerController { return consumerManager; } + public ConsumerFilterManager getConsumerFilterManager() { + return consumerFilterManager; + } + public ConsumerOffsetManager getConsumerOffsetManager() { return consumerOffsetManager; } @@ -590,6 +613,10 @@ public class BrokerController { if (this.brokerFastFailure != null) { this.brokerFastFailure.shutdown(); } + + if (this.consumerFilterManager != null) { + this.consumerFilterManager.persist(); + } } private void unregisterBrokerAll() { http://git-wip-us.apache.org/repos/asf/incubator-rocketmq/blob/58f1574b/broker/src/main/java/org/apache/rocketmq/broker/BrokerPathConfigHelper.java ---------------------------------------------------------------------- diff --git a/broker/src/main/java/org/apache/rocketmq/broker/BrokerPathConfigHelper.java b/broker/src/main/java/org/apache/rocketmq/broker/BrokerPathConfigHelper.java index 24876df..0a323ee 100644 --- a/broker/src/main/java/org/apache/rocketmq/broker/BrokerPathConfigHelper.java +++ b/broker/src/main/java/org/apache/rocketmq/broker/BrokerPathConfigHelper.java @@ -44,4 +44,7 @@ public class BrokerPathConfigHelper { return rootDir + File.separator + "config" + File.separator + "subscriptionGroup.json"; } + public static String getConsumerFilterPath(final String rootDir) { + return rootDir + File.separator + "config" + File.separator + "consumerFilter.json"; + } } http://git-wip-us.apache.org/repos/asf/incubator-rocketmq/blob/58f1574b/broker/src/main/java/org/apache/rocketmq/broker/client/ConsumerGroupEvent.java ---------------------------------------------------------------------- diff --git a/broker/src/main/java/org/apache/rocketmq/broker/client/ConsumerGroupEvent.java b/broker/src/main/java/org/apache/rocketmq/broker/client/ConsumerGroupEvent.java new file mode 100644 index 0000000..717fb70 --- /dev/null +++ b/broker/src/main/java/org/apache/rocketmq/broker/client/ConsumerGroupEvent.java @@ -0,0 +1,33 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.rocketmq.broker.client; + +public enum ConsumerGroupEvent { + + /** + * Some consumers in the group are changed. + */ + CHANGE, + /** + * The group of consumer is unregistered. + */ + UNREGISTER, + /** + * The group of consumer is registered. + */ + REGISTER +} http://git-wip-us.apache.org/repos/asf/incubator-rocketmq/blob/58f1574b/broker/src/main/java/org/apache/rocketmq/broker/client/ConsumerIdsChangeListener.java ---------------------------------------------------------------------- diff --git a/broker/src/main/java/org/apache/rocketmq/broker/client/ConsumerIdsChangeListener.java b/broker/src/main/java/org/apache/rocketmq/broker/client/ConsumerIdsChangeListener.java index 07d28dc..831e293 100644 --- a/broker/src/main/java/org/apache/rocketmq/broker/client/ConsumerIdsChangeListener.java +++ b/broker/src/main/java/org/apache/rocketmq/broker/client/ConsumerIdsChangeListener.java @@ -16,9 +16,7 @@ */ package org.apache.rocketmq.broker.client; -import io.netty.channel.Channel; -import java.util.List; - public interface ConsumerIdsChangeListener { - void consumerIdsChanged(final String group, final List<Channel> channels); + + void handle(ConsumerGroupEvent event, String group, Object... args); } http://git-wip-us.apache.org/repos/asf/incubator-rocketmq/blob/58f1574b/broker/src/main/java/org/apache/rocketmq/broker/client/ConsumerManager.java ---------------------------------------------------------------------- diff --git a/broker/src/main/java/org/apache/rocketmq/broker/client/ConsumerManager.java b/broker/src/main/java/org/apache/rocketmq/broker/client/ConsumerManager.java index a2d88d5..a5ddec8 100644 --- a/broker/src/main/java/org/apache/rocketmq/broker/client/ConsumerManager.java +++ b/broker/src/main/java/org/apache/rocketmq/broker/client/ConsumerManager.java @@ -85,10 +85,11 @@ public class ConsumerManager { if (remove != null) { log.info("unregister consumer ok, no any connection, and remove consumer group, {}", next.getKey()); + this.consumerIdsChangeListener.handle(ConsumerGroupEvent.UNREGISTER, next.getKey()); } } - this.consumerIdsChangeListener.consumerIdsChanged(next.getKey(), info.getAllChannel()); + this.consumerIdsChangeListener.handle(ConsumerGroupEvent.CHANGE, next.getKey(), info.getAllChannel()); } } } @@ -111,10 +112,12 @@ public class ConsumerManager { if (r1 || r2) { if (isNotifyConsumerIdsChangedEnable) { - this.consumerIdsChangeListener.consumerIdsChanged(group, consumerGroupInfo.getAllChannel()); + this.consumerIdsChangeListener.handle(ConsumerGroupEvent.CHANGE, group, consumerGroupInfo.getAllChannel()); } } + this.consumerIdsChangeListener.handle(ConsumerGroupEvent.REGISTER, group, subList); + return r1 || r2; } @@ -126,10 +129,12 @@ public class ConsumerManager { ConsumerGroupInfo remove = this.consumerTable.remove(group); if (remove != null) { log.info("unregister consumer ok, no any connection, and remove consumer group, {}", group); + + this.consumerIdsChangeListener.handle(ConsumerGroupEvent.UNREGISTER, group); } } if (isNotifyConsumerIdsChangedEnable) { - this.consumerIdsChangeListener.consumerIdsChanged(group, consumerGroupInfo.getAllChannel()); + this.consumerIdsChangeListener.handle(ConsumerGroupEvent.CHANGE, group, consumerGroupInfo.getAllChannel()); } } } http://git-wip-us.apache.org/repos/asf/incubator-rocketmq/blob/58f1574b/broker/src/main/java/org/apache/rocketmq/broker/client/DefaultConsumerIdsChangeListener.java ---------------------------------------------------------------------- diff --git a/broker/src/main/java/org/apache/rocketmq/broker/client/DefaultConsumerIdsChangeListener.java b/broker/src/main/java/org/apache/rocketmq/broker/client/DefaultConsumerIdsChangeListener.java index a1b2d8a..d716a33 100644 --- a/broker/src/main/java/org/apache/rocketmq/broker/client/DefaultConsumerIdsChangeListener.java +++ b/broker/src/main/java/org/apache/rocketmq/broker/client/DefaultConsumerIdsChangeListener.java @@ -17,8 +17,12 @@ package org.apache.rocketmq.broker.client; import io.netty.channel.Channel; + +import java.util.Collection; import java.util.List; + import org.apache.rocketmq.broker.BrokerController; +import org.apache.rocketmq.common.protocol.heartbeat.SubscriptionData; public class DefaultConsumerIdsChangeListener implements ConsumerIdsChangeListener { private final BrokerController brokerController; @@ -28,11 +32,34 @@ public class DefaultConsumerIdsChangeListener implements ConsumerIdsChangeListen } @Override - public void consumerIdsChanged(String group, List<Channel> channels) { - if (channels != null && brokerController.getBrokerConfig().isNotifyConsumerIdsChangedEnable()) { - for (Channel chl : channels) { - this.brokerController.getBroker2Client().notifyConsumerIdsChanged(chl, group); - } + public void handle(ConsumerGroupEvent event, String group, Object... args) { + if (event == null) { + return; + } + switch (event) { + case CHANGE: + if (args == null || args.length < 1) { + return; + } + List<Channel> channels = (List<Channel>) args[0]; + if (channels != null && brokerController.getBrokerConfig().isNotifyConsumerIdsChangedEnable()) { + for (Channel chl : channels) { + this.brokerController.getBroker2Client().notifyConsumerIdsChanged(chl, group); + } + } + break; + case UNREGISTER: + this.brokerController.getConsumerFilterManager().unRegister(group); + break; + case REGISTER: + if (args == null || args.length < 1) { + return; + } + Collection<SubscriptionData> subscriptionDataList = (Collection<SubscriptionData>) args[0]; + this.brokerController.getConsumerFilterManager().register(group, subscriptionDataList); + break; + default: + throw new RuntimeException("Unknown event " + event); } } } http://git-wip-us.apache.org/repos/asf/incubator-rocketmq/blob/58f1574b/broker/src/main/java/org/apache/rocketmq/broker/filter/CommitLogDispatcherCalcBitMap.java ---------------------------------------------------------------------- diff --git a/broker/src/main/java/org/apache/rocketmq/broker/filter/CommitLogDispatcherCalcBitMap.java b/broker/src/main/java/org/apache/rocketmq/broker/filter/CommitLogDispatcherCalcBitMap.java new file mode 100644 index 0000000..85415d6 --- /dev/null +++ b/broker/src/main/java/org/apache/rocketmq/broker/filter/CommitLogDispatcherCalcBitMap.java @@ -0,0 +1,110 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.rocketmq.broker.filter; + +import org.apache.rocketmq.common.BrokerConfig; +import org.apache.rocketmq.common.constant.LoggerName; +import org.apache.rocketmq.filter.util.BitsArray; +import org.apache.rocketmq.store.CommitLogDispatcher; +import org.apache.rocketmq.store.DispatchRequest; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import java.util.Collection; +import java.util.Iterator; + +/** + * Calculate bit map of filter. + */ +public class CommitLogDispatcherCalcBitMap implements CommitLogDispatcher { + + private static final Logger log = LoggerFactory.getLogger(LoggerName.FILTER_LOGGER_NAME); + + protected final BrokerConfig brokerConfig; + protected final ConsumerFilterManager consumerFilterManager; + + public CommitLogDispatcherCalcBitMap(BrokerConfig brokerConfig, ConsumerFilterManager consumerFilterManager) { + this.brokerConfig = brokerConfig; + this.consumerFilterManager = consumerFilterManager; + } + + @Override + public void dispatch(DispatchRequest request) { + if (!this.brokerConfig.isEnableCalcFilterBitMap()) { + return; + } + + try { + + Collection<ConsumerFilterData> filterDatas = consumerFilterManager.get(request.getTopic()); + + if (filterDatas == null || filterDatas.isEmpty()) { + return; + } + + Iterator<ConsumerFilterData> iterator = filterDatas.iterator(); + BitsArray filterBitMap = BitsArray.create( + this.consumerFilterManager.getBloomFilter().getM() + ); + + long startTime = System.currentTimeMillis(); + while (iterator.hasNext()) { + ConsumerFilterData filterData = iterator.next(); + + if (filterData.getCompiledExpression() == null) { + log.error("[BUG] Consumer in filter manager has no compiled expression! {}", filterData); + continue; + } + + if (filterData.getBloomFilterData() == null) { + log.error("[BUG] Consumer in filter manager has no bloom data! {}", filterData); + continue; + } + + Object ret = null; + try { + MessageEvaluationContext context = new MessageEvaluationContext(request.getPropertiesMap()); + + ret = filterData.getCompiledExpression().evaluate(context); + } catch (Throwable e) { + log.error("Calc filter bit map error!commitLogOffset={}, consumer={}, {}", request.getCommitLogOffset(), filterData, e); + } + + log.debug("Result of Calc bit map:ret={}, data={}, props={}, offset={}", ret, filterData, request.getPropertiesMap(), request.getCommitLogOffset()); + + // eval true + if (ret != null && ret instanceof Boolean && (Boolean) ret) { + consumerFilterManager.getBloomFilter().hashTo( + filterData.getBloomFilterData(), + filterBitMap + ); + } + } + + request.setBitMap(filterBitMap.bytes()); + + long eclipseTime = System.currentTimeMillis() - startTime; + // 1ms + if (eclipseTime >= 1) { + log.warn("Spend {} ms to calc bit map, consumerNum={}, topic={}", eclipseTime, filterDatas.size(), request.getTopic()); + } + } catch (Throwable e) { + log.error("Calc bit map error! topic={}, offset={}, queueId={}, {}", request.getTopic(), request.getCommitLogOffset(), request.getQueueId(), e); + } + } +} http://git-wip-us.apache.org/repos/asf/incubator-rocketmq/blob/58f1574b/broker/src/main/java/org/apache/rocketmq/broker/filter/ConsumerFilterData.java ---------------------------------------------------------------------- diff --git a/broker/src/main/java/org/apache/rocketmq/broker/filter/ConsumerFilterData.java b/broker/src/main/java/org/apache/rocketmq/broker/filter/ConsumerFilterData.java new file mode 100644 index 0000000..4db02e2 --- /dev/null +++ b/broker/src/main/java/org/apache/rocketmq/broker/filter/ConsumerFilterData.java @@ -0,0 +1,151 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.rocketmq.broker.filter; + +import org.apache.commons.lang3.builder.EqualsBuilder; +import org.apache.commons.lang3.builder.HashCodeBuilder; +import org.apache.commons.lang3.builder.ReflectionToStringBuilder; +import org.apache.commons.lang3.builder.ToStringStyle; +import org.apache.rocketmq.filter.expression.Expression; +import org.apache.rocketmq.filter.util.BloomFilterData; + +import java.util.Collections; + +/** + * Filter data of consumer. + */ +public class ConsumerFilterData { + + private String consumerGroup; + private String topic; + private String expression; + private String expressionType; + private transient Expression compiledExpression; + private long bornTime; + private long deadTime = 0; + private BloomFilterData bloomFilterData; + private long clientVersion; + + public boolean isDead() { + return this.deadTime >= this.bornTime; + } + + public long howLongAfterDeath() { + if (isDead()) { + return System.currentTimeMillis() - getDeadTime(); + } + return -1; + } + + /** + * Check this filter data has been used to calculate bit map when msg was stored in server. + * + * @param msgStoreTime + * @return + */ + public boolean isMsgInLive(long msgStoreTime) { + return msgStoreTime > getBornTime(); + } + + public String getConsumerGroup() { + return consumerGroup; + } + + public void setConsumerGroup(final String consumerGroup) { + this.consumerGroup = consumerGroup; + } + + public String getTopic() { + return topic; + } + + public void setTopic(final String topic) { + this.topic = topic; + } + + public String getExpression() { + return expression; + } + + public void setExpression(final String expression) { + this.expression = expression; + } + + public String getExpressionType() { + return expressionType; + } + + public void setExpressionType(final String expressionType) { + this.expressionType = expressionType; + } + + public Expression getCompiledExpression() { + return compiledExpression; + } + + public void setCompiledExpression(final Expression compiledExpression) { + this.compiledExpression = compiledExpression; + } + + public long getBornTime() { + return bornTime; + } + + public void setBornTime(final long bornTime) { + this.bornTime = bornTime; + } + + public long getDeadTime() { + return deadTime; + } + + public void setDeadTime(final long deadTime) { + this.deadTime = deadTime; + } + + public BloomFilterData getBloomFilterData() { + return bloomFilterData; + } + + public void setBloomFilterData(final BloomFilterData bloomFilterData) { + this.bloomFilterData = bloomFilterData; + } + + public long getClientVersion() { + return clientVersion; + } + + public void setClientVersion(long clientVersion) { + this.clientVersion = clientVersion; + } + + @Override + public boolean equals(Object o) { + return EqualsBuilder.reflectionEquals(this, o, Collections.<String>emptyList()); + } + + @Override + public int hashCode() { + return HashCodeBuilder.reflectionHashCode(this, Collections.<String>emptyList()); + } + + @Override + public String toString() { + return ReflectionToStringBuilder.toString(this, ToStringStyle.SHORT_PREFIX_STYLE); + } +} http://git-wip-us.apache.org/repos/asf/incubator-rocketmq/blob/58f1574b/broker/src/main/java/org/apache/rocketmq/broker/filter/ConsumerFilterManager.java ---------------------------------------------------------------------- diff --git a/broker/src/main/java/org/apache/rocketmq/broker/filter/ConsumerFilterManager.java b/broker/src/main/java/org/apache/rocketmq/broker/filter/ConsumerFilterManager.java new file mode 100644 index 0000000..7f790af --- /dev/null +++ b/broker/src/main/java/org/apache/rocketmq/broker/filter/ConsumerFilterManager.java @@ -0,0 +1,471 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.rocketmq.broker.filter; + +import org.apache.rocketmq.broker.BrokerController; +import org.apache.rocketmq.broker.BrokerPathConfigHelper; +import org.apache.rocketmq.common.ConfigManager; +import org.apache.rocketmq.common.constant.LoggerName; +import org.apache.rocketmq.common.protocol.heartbeat.SubscriptionData; +import org.apache.rocketmq.filter.FilterFactory; +import org.apache.rocketmq.common.filter.ExpressionType; +import org.apache.rocketmq.filter.util.BloomFilter; +import org.apache.rocketmq.filter.util.BloomFilterData; +import org.apache.rocketmq.remoting.protocol.RemotingSerializable; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import java.util.Collection; +import java.util.HashSet; +import java.util.Iterator; +import java.util.Map; +import java.util.concurrent.ConcurrentHashMap; + +/** + * Consumer filter data manager.Just manage the consumers use expression filter. + */ +public class ConsumerFilterManager extends ConfigManager { + + private static final Logger log = LoggerFactory.getLogger(LoggerName.FILTER_LOGGER_NAME); + + private static final long MS_24_HOUR = 24 * 3600 * 1000; + + private ConcurrentHashMap<String/*Topic*/, FilterDataMapByTopic> + filterDataByTopic = new ConcurrentHashMap<String/*consumer group*/, FilterDataMapByTopic>(256); + + private transient BrokerController brokerController; + private transient BloomFilter bloomFilter; + + public ConsumerFilterManager() { + // just for test + this.bloomFilter = BloomFilter.createByFn(20, 64); + } + + public ConsumerFilterManager(BrokerController brokerController) { + this.brokerController = brokerController; + this.bloomFilter = BloomFilter.createByFn( + brokerController.getBrokerConfig().getMaxErrorRateOfBloomFilter(), + brokerController.getBrokerConfig().getExpectConsumerNumUseFilter() + ); + // then set bit map length of store config. + brokerController.getMessageStoreConfig().setBitMapLengthConsumeQueueExt( + this.bloomFilter.getM() + ); + } + + /** + * Build consumer filter data.Be care, bloom filter data is not included. + * + * @param topic + * @param consumerGroup + * @param expression + * @param type + * @param clientVersion + * @return maybe null + */ + public static ConsumerFilterData build(final String topic, final String consumerGroup, + final String expression, final String type, + final long clientVersion) { + if (ExpressionType.isTagType(type)) { + return null; + } + + ConsumerFilterData consumerFilterData = new ConsumerFilterData(); + consumerFilterData.setTopic(topic); + consumerFilterData.setConsumerGroup(consumerGroup); + consumerFilterData.setBornTime(System.currentTimeMillis()); + consumerFilterData.setDeadTime(0); + consumerFilterData.setExpression(expression); + consumerFilterData.setExpressionType(type); + consumerFilterData.setClientVersion(clientVersion); + try { + consumerFilterData.setCompiledExpression( + FilterFactory.INSTANCE.get(type).compile(expression) + ); + } catch (Throwable e) { + log.error("parse error: expr={}, topic={}, group={}, error={}", expression, topic, consumerGroup, e.getMessage()); + return null; + } + + return consumerFilterData; + } + + public void register(final String consumerGroup, final Collection<SubscriptionData> subList) { + for (SubscriptionData subscriptionData : subList) { + register( + subscriptionData.getTopic(), + consumerGroup, + subscriptionData.getSubString(), + subscriptionData.getExpressionType(), + subscriptionData.getSubVersion() + ); + } + + // make illegal topic dead. + Collection<ConsumerFilterData> groupFilterData = getByGroup(consumerGroup); + + Iterator<ConsumerFilterData> iterator = groupFilterData.iterator(); + while (iterator.hasNext()) { + ConsumerFilterData filterData = iterator.next(); + + boolean exist = false; + for (SubscriptionData subscriptionData : subList) { + if (subscriptionData.getTopic().equals(filterData.getTopic())) { + exist = true; + break; + } + } + + if (!exist && !filterData.isDead()) { + filterData.setDeadTime(System.currentTimeMillis()); + log.info("Consumer filter changed: {}, make illegal topic dead:{}", consumerGroup, filterData); + } + } + } + + public boolean register(final String topic, final String consumerGroup, final String expression, + final String type, final long clientVersion) { + if (ExpressionType.isTagType(type)) { + return false; + } + + if (expression == null || expression.length() == 0) { + return false; + } + + FilterDataMapByTopic filterDataMapByTopic = this.filterDataByTopic.get(topic); + + if (filterDataMapByTopic == null) { + FilterDataMapByTopic temp = new FilterDataMapByTopic(topic); + FilterDataMapByTopic prev = this.filterDataByTopic.putIfAbsent(topic, temp); + filterDataMapByTopic = prev != null ? prev : temp; + } + + BloomFilterData bloomFilterData = bloomFilter.generate(consumerGroup + "#" + topic); + + return filterDataMapByTopic.register(consumerGroup, expression, type, bloomFilterData, clientVersion); + } + + public void unRegister(final String consumerGroup) { + for (String topic : filterDataByTopic.keySet()) { + this.filterDataByTopic.get(topic).unRegister(consumerGroup); + } + } + + public ConsumerFilterData get(final String topic, final String consumerGroup) { + if (!this.filterDataByTopic.containsKey(topic)) { + return null; + } + if (this.filterDataByTopic.get(topic).getGroupFilterData().isEmpty()) { + return null; + } + + return this.filterDataByTopic.get(topic).getGroupFilterData().get(consumerGroup); + } + + public Collection<ConsumerFilterData> getByGroup(final String consumerGroup) { + Collection<ConsumerFilterData> ret = new HashSet<ConsumerFilterData>(); + + Iterator<FilterDataMapByTopic> topicIterator = this.filterDataByTopic.values().iterator(); + while (topicIterator.hasNext()) { + FilterDataMapByTopic filterDataMapByTopic = topicIterator.next(); + + Iterator<ConsumerFilterData> filterDataIterator = filterDataMapByTopic.getGroupFilterData().values().iterator(); + + while (filterDataIterator.hasNext()) { + ConsumerFilterData filterData = filterDataIterator.next(); + + if (filterData.getConsumerGroup().equals(consumerGroup)) { + ret.add(filterData); + } + } + } + + return ret; + } + + public final Collection<ConsumerFilterData> get(final String topic) { + if (!this.filterDataByTopic.containsKey(topic)) { + return null; + } + if (this.filterDataByTopic.get(topic).getGroupFilterData().isEmpty()) { + return null; + } + + return this.filterDataByTopic.get(topic).getGroupFilterData().values(); + } + + public BloomFilter getBloomFilter() { + return bloomFilter; + } + + @Override + public String encode() { + return encode(false); + } + + @Override + public String configFilePath() { + if (this.brokerController != null) { + return BrokerPathConfigHelper.getConsumerFilterPath( + this.brokerController.getMessageStoreConfig().getStorePathRootDir() + ); + } + return BrokerPathConfigHelper.getConsumerFilterPath("./unit_test"); + } + + @Override + public void decode(final String jsonString) { + ConsumerFilterManager load = RemotingSerializable.fromJson(jsonString, ConsumerFilterManager.class); + if (load != null && load.filterDataByTopic != null) { + boolean bloomChanged = false; + for (String topic : load.filterDataByTopic.keySet()) { + FilterDataMapByTopic dataMapByTopic = load.filterDataByTopic.get(topic); + if (dataMapByTopic == null) { + continue; + } + + for (String group : dataMapByTopic.getGroupFilterData().keySet()) { + + ConsumerFilterData filterData = dataMapByTopic.getGroupFilterData().get(group); + + if (filterData == null) { + continue; + } + + try { + filterData.setCompiledExpression( + FilterFactory.INSTANCE.get(filterData.getExpressionType()).compile(filterData.getExpression()) + ); + } catch (Exception e) { + log.error("load filter data error, " + filterData, e); + } + + // check whether bloom filter is changed + // if changed, ignore the bit map calculated before. + if (!this.bloomFilter.isValid(filterData.getBloomFilterData())) { + bloomChanged = true; + log.info("Bloom filter is changed!So ignore all filter data persisted! {}, {}", this.bloomFilter, filterData.getBloomFilterData()); + break; + } + + log.info("load exist consumer filter data: {}", filterData); + + if (filterData.getDeadTime() == 0) { + // we think all consumers are dead when load + long deadTime = System.currentTimeMillis() - 30 * 1000; + filterData.setDeadTime( + deadTime <= filterData.getBornTime() ? filterData.getBornTime() : deadTime + ); + } + } + } + + if (!bloomChanged) { + this.filterDataByTopic = load.filterDataByTopic; + } + } + } + + @Override + public String encode(final boolean prettyFormat) { + // clean + { + clean(); + } + return RemotingSerializable.toJson(this, prettyFormat); + } + + public void clean() { + Iterator<Map.Entry<String, FilterDataMapByTopic>> topicIterator = this.filterDataByTopic.entrySet().iterator(); + while (topicIterator.hasNext()) { + Map.Entry<String, FilterDataMapByTopic> filterDataMapByTopic = topicIterator.next(); + + Iterator<Map.Entry<String, ConsumerFilterData>> filterDataIterator + = filterDataMapByTopic.getValue().getGroupFilterData().entrySet().iterator(); + + while (filterDataIterator.hasNext()) { + Map.Entry<String, ConsumerFilterData> filterDataByGroup = filterDataIterator.next(); + + ConsumerFilterData filterData = filterDataByGroup.getValue(); + if (filterData.howLongAfterDeath() >= (this.brokerController == null ? MS_24_HOUR : this.brokerController.getBrokerConfig().getFilterDataCleanTimeSpan())) { + log.info("Remove filter consumer {}, died too long!", filterDataByGroup.getValue()); + filterDataIterator.remove(); + } + } + + if (filterDataMapByTopic.getValue().getGroupFilterData().isEmpty()) { + log.info("Topic has no consumer, remove it! {}", filterDataMapByTopic.getKey()); + topicIterator.remove(); + } + } + } + + public ConcurrentHashMap<String, FilterDataMapByTopic> getFilterDataByTopic() { + return filterDataByTopic; + } + + public void setFilterDataByTopic(final ConcurrentHashMap<String, FilterDataMapByTopic> filterDataByTopic) { + this.filterDataByTopic = filterDataByTopic; + } + + public static class FilterDataMapByTopic { + + private ConcurrentHashMap<String/*consumer group*/, ConsumerFilterData> + groupFilterData = new ConcurrentHashMap<String, ConsumerFilterData>(); + + private String topic; + + public FilterDataMapByTopic() { + } + + public FilterDataMapByTopic(String topic) { + this.topic = topic; + } + + public void unRegister(String consumerGroup) { + if (!this.groupFilterData.containsKey(consumerGroup)) { + return; + } + + ConsumerFilterData data = this.groupFilterData.get(consumerGroup); + + if (data == null || data.isDead()) { + return; + } + + long now = System.currentTimeMillis(); + + log.info("Unregister consumer filter: {}, deadTime: {}", data, now); + + data.setDeadTime(now); + } + + public boolean register(String consumerGroup, String expression, String type, BloomFilterData bloomFilterData, long clientVersion) { + ConsumerFilterData old = this.groupFilterData.get(consumerGroup); + + if (old == null) { + ConsumerFilterData consumerFilterData = build(topic, consumerGroup, expression, type, clientVersion); + if (consumerFilterData == null) { + return false; + } + consumerFilterData.setBloomFilterData(bloomFilterData); + + old = this.groupFilterData.putIfAbsent(consumerGroup, consumerFilterData); + if (old == null) { + log.info("New consumer filter registered: {}", consumerFilterData); + return true; + } else { + if (clientVersion <= old.getClientVersion()) { + if (!type.equals(old.getExpressionType()) || !expression.equals(old.getExpression())) { + log.warn("Ignore consumer({} : {}) filter(concurrent), because of version {} <= {}, but maybe info changed!old={}:{}, ignored={}:{}", + consumerGroup, topic, + clientVersion, old.getClientVersion(), + old.getExpressionType(), old.getExpression(), + type, expression); + } + if (clientVersion == old.getClientVersion() && old.isDead()) { + reAlive(old); + return true; + } + + return false; + } else { + this.groupFilterData.put(consumerGroup, consumerFilterData); + log.info("New consumer filter registered(concurrent): {}, old: {}", consumerFilterData, old); + return true; + } + } + } else { + if (clientVersion <= old.getClientVersion()) { + if (!type.equals(old.getExpressionType()) || !expression.equals(old.getExpression())) { + log.info("Ignore consumer({}:{}) filter, because of version {} <= {}, but maybe info changed!old={}:{}, ignored={}:{}", + consumerGroup, topic, + clientVersion, old.getClientVersion(), + old.getExpressionType(), old.getExpression(), + type, expression); + } + if (clientVersion == old.getClientVersion() && old.isDead()) { + reAlive(old); + return true; + } + + return false; + } + + boolean change = !old.getExpression().equals(expression) || !old.getExpressionType().equals(type); + if (old.getBloomFilterData() == null && bloomFilterData != null) { + change = true; + } + if (old.getBloomFilterData() != null && !old.getBloomFilterData().equals(bloomFilterData)) { + change = true; + } + + // if subscribe data is changed, or consumer is died too long. + if (change) { + ConsumerFilterData consumerFilterData = build(topic, consumerGroup, expression, type, clientVersion); + if (consumerFilterData == null) { + // new expression compile error, remove old, let client report error. + this.groupFilterData.remove(consumerGroup); + return false; + } + consumerFilterData.setBloomFilterData(bloomFilterData); + + this.groupFilterData.put(consumerGroup, consumerFilterData); + + log.info("Consumer filter info change, old: {}, new: {}, change: {}", + old, consumerFilterData, change); + + return true; + } else { + old.setClientVersion(clientVersion); + if (old.isDead()) { + reAlive(old); + } + return true; + } + } + } + + protected void reAlive(ConsumerFilterData filterData) { + long oldDeadTime = filterData.getDeadTime(); + filterData.setDeadTime(0); + log.info("Re alive consumer filter: {}, oldDeadTime: {}", filterData, oldDeadTime); + } + + public final ConsumerFilterData get(String consumerGroup) { + return this.groupFilterData.get(consumerGroup); + } + + public final ConcurrentHashMap<String, ConsumerFilterData> getGroupFilterData() { + return this.groupFilterData; + } + + public void setGroupFilterData(final ConcurrentHashMap<String, ConsumerFilterData> groupFilterData) { + this.groupFilterData = groupFilterData; + } + + public String getTopic() { + return topic; + } + + public void setTopic(final String topic) { + this.topic = topic; + } + } +} http://git-wip-us.apache.org/repos/asf/incubator-rocketmq/blob/58f1574b/broker/src/main/java/org/apache/rocketmq/broker/filter/ExpressionForRetryMessageFilter.java ---------------------------------------------------------------------- diff --git a/broker/src/main/java/org/apache/rocketmq/broker/filter/ExpressionForRetryMessageFilter.java b/broker/src/main/java/org/apache/rocketmq/broker/filter/ExpressionForRetryMessageFilter.java new file mode 100644 index 0000000..9518178 --- /dev/null +++ b/broker/src/main/java/org/apache/rocketmq/broker/filter/ExpressionForRetryMessageFilter.java @@ -0,0 +1,97 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.rocketmq.broker.filter; + + +import org.apache.rocketmq.common.MixAll; +import org.apache.rocketmq.common.filter.ExpressionType; +import org.apache.rocketmq.common.message.MessageConst; +import org.apache.rocketmq.common.message.MessageDecoder; +import org.apache.rocketmq.common.protocol.heartbeat.SubscriptionData; + +import java.nio.ByteBuffer; +import java.util.Map; + +/** + * Support filter to retry topic. + * <br>It will decode properties first in order to get real topic. + */ +public class ExpressionForRetryMessageFilter extends ExpressionMessageFilter { + public ExpressionForRetryMessageFilter(SubscriptionData subscriptionData, ConsumerFilterData consumerFilterData, ConsumerFilterManager consumerFilterManager) { + super(subscriptionData, consumerFilterData, consumerFilterManager); + } + + @Override + public boolean isMatchedByCommitLog(ByteBuffer msgBuffer, Map<String, String> properties) { + if (subscriptionData == null) { + return true; + } + + if (subscriptionData.isClassFilterMode()) { + return true; + } + + boolean isRetryTopic = subscriptionData.getTopic().startsWith(MixAll.RETRY_GROUP_TOPIC_PREFIX); + + if (!isRetryTopic && ExpressionType.isTagType(subscriptionData.getExpressionType())) { + return true; + } + + ConsumerFilterData realFilterData = this.consumerFilterData; + Map<String, String> tempProperties = properties; + boolean decoded = false; + if (isRetryTopic) { + // retry topic, use original filter data. + // poor performance to support retry filter. + if (tempProperties == null && msgBuffer != null) { + decoded = true; + tempProperties = MessageDecoder.decodeProperties(msgBuffer); + } + String realTopic = tempProperties.get(MessageConst.PROPERTY_RETRY_TOPIC); + String group = subscriptionData.getTopic().substring(MixAll.RETRY_GROUP_TOPIC_PREFIX.length()); + realFilterData = this.consumerFilterManager.get(realTopic, group); + } + + // no expression + if (realFilterData == null || realFilterData.getExpression() == null + || realFilterData.getCompiledExpression() == null) { + return true; + } + + if (!decoded && tempProperties == null && msgBuffer != null) { + tempProperties = MessageDecoder.decodeProperties(msgBuffer); + } + + Object ret = null; + try { + MessageEvaluationContext context = new MessageEvaluationContext(tempProperties); + + ret = realFilterData.getCompiledExpression().evaluate(context); + } catch (Throwable e) { + log.error("Message Filter error, " + realFilterData + ", " + tempProperties, e); + } + + log.debug("Pull eval result: {}, {}, {}", ret, realFilterData, tempProperties); + + if (ret == null || !(ret instanceof Boolean)) { + return false; + } + + return (Boolean) ret; + } +} http://git-wip-us.apache.org/repos/asf/incubator-rocketmq/blob/58f1574b/broker/src/main/java/org/apache/rocketmq/broker/filter/ExpressionMessageFilter.java ---------------------------------------------------------------------- diff --git a/broker/src/main/java/org/apache/rocketmq/broker/filter/ExpressionMessageFilter.java b/broker/src/main/java/org/apache/rocketmq/broker/filter/ExpressionMessageFilter.java new file mode 100644 index 0000000..893df0d --- /dev/null +++ b/broker/src/main/java/org/apache/rocketmq/broker/filter/ExpressionMessageFilter.java @@ -0,0 +1,162 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.rocketmq.broker.filter; + +import org.apache.rocketmq.common.constant.LoggerName; +import org.apache.rocketmq.common.filter.ExpressionType; +import org.apache.rocketmq.common.message.MessageDecoder; +import org.apache.rocketmq.common.protocol.heartbeat.SubscriptionData; +import org.apache.rocketmq.filter.util.BitsArray; +import org.apache.rocketmq.filter.util.BloomFilter; +import org.apache.rocketmq.store.ConsumeQueueExt; +import org.apache.rocketmq.store.MessageFilter; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import java.nio.ByteBuffer; +import java.util.Map; + +public class ExpressionMessageFilter implements MessageFilter { + + protected static final Logger log = LoggerFactory.getLogger(LoggerName.FILTER_LOGGER_NAME); + + protected final SubscriptionData subscriptionData; + protected final ConsumerFilterData consumerFilterData; + protected final ConsumerFilterManager consumerFilterManager; + protected final boolean bloomDataValid; + + public ExpressionMessageFilter(SubscriptionData subscriptionData, ConsumerFilterData consumerFilterData, + ConsumerFilterManager consumerFilterManager) { + this.subscriptionData = subscriptionData; + this.consumerFilterData = consumerFilterData; + this.consumerFilterManager = consumerFilterManager; + if (consumerFilterData == null) { + bloomDataValid = false; + return; + } + BloomFilter bloomFilter = this.consumerFilterManager.getBloomFilter(); + if (bloomFilter != null && bloomFilter.isValid(consumerFilterData.getBloomFilterData())) { + bloomDataValid = true; + } else { + bloomDataValid = false; + } + } + + @Override + public boolean isMatchedByConsumeQueue(Long tagsCode, ConsumeQueueExt.CqExtUnit cqExtUnit) { + if (null == subscriptionData) { + return true; + } + + if (subscriptionData.isClassFilterMode()) { + return true; + } + + // by tags code. + if (ExpressionType.isTagType(subscriptionData.getExpressionType())) { + + if (tagsCode == null || tagsCode < 0L) { + return true; + } + + if (subscriptionData.getSubString().equals(SubscriptionData.SUB_ALL)) { + return true; + } + + return subscriptionData.getCodeSet().contains(tagsCode.intValue()); + } else { + // no expression or no bloom + if (consumerFilterData == null || consumerFilterData.getExpression() == null + || consumerFilterData.getCompiledExpression() == null || consumerFilterData.getBloomFilterData() == null) { + return true; + } + + // message is before consumer + if (cqExtUnit == null || !consumerFilterData.isMsgInLive(cqExtUnit.getMsgStoreTime())) { + log.debug("Pull matched because not in live: {}, {}", consumerFilterData, cqExtUnit); + return true; + } + + byte[] filterBitMap = cqExtUnit.getFilterBitMap(); + BloomFilter bloomFilter = this.consumerFilterManager.getBloomFilter(); + if (filterBitMap == null || !this.bloomDataValid + || filterBitMap.length * Byte.SIZE != consumerFilterData.getBloomFilterData().getBitNum()) { + return true; + } + + BitsArray bitsArray = null; + try { + bitsArray = BitsArray.create(filterBitMap); + boolean ret = bloomFilter.isHit(consumerFilterData.getBloomFilterData(), bitsArray); + log.debug("Pull {} by bit map:{}, {}, {}", ret, consumerFilterData, bitsArray, cqExtUnit); + return ret; + } catch (Throwable e) { + log.error("bloom filter error, sub=" + subscriptionData + + ", filter=" + consumerFilterData + ", bitMap=" + bitsArray, e); + } + } + + return true; + } + + @Override + public boolean isMatchedByCommitLog(ByteBuffer msgBuffer, Map<String, String> properties) { + if (subscriptionData == null) { + return true; + } + + if (subscriptionData.isClassFilterMode()) { + return true; + } + + if (ExpressionType.isTagType(subscriptionData.getExpressionType())) { + return true; + } + + ConsumerFilterData realFilterData = this.consumerFilterData; + Map<String, String> tempProperties = properties; + + // no expression + if (realFilterData == null || realFilterData.getExpression() == null + || realFilterData.getCompiledExpression() == null) { + return true; + } + + if (tempProperties == null && msgBuffer != null) { + tempProperties = MessageDecoder.decodeProperties(msgBuffer); + } + + Object ret = null; + try { + MessageEvaluationContext context = new MessageEvaluationContext(tempProperties); + + ret = realFilterData.getCompiledExpression().evaluate(context); + } catch (Throwable e) { + log.error("Message Filter error, " + realFilterData + ", " + tempProperties, e); + } + + log.debug("Pull eval result: {}, {}, {}", ret, realFilterData, tempProperties); + + if (ret == null || !(ret instanceof Boolean)) { + return false; + } + + return (Boolean) ret; + } + +} http://git-wip-us.apache.org/repos/asf/incubator-rocketmq/blob/58f1574b/broker/src/main/java/org/apache/rocketmq/broker/filter/MessageEvaluationContext.java ---------------------------------------------------------------------- diff --git a/broker/src/main/java/org/apache/rocketmq/broker/filter/MessageEvaluationContext.java b/broker/src/main/java/org/apache/rocketmq/broker/filter/MessageEvaluationContext.java new file mode 100644 index 0000000..879d179 --- /dev/null +++ b/broker/src/main/java/org/apache/rocketmq/broker/filter/MessageEvaluationContext.java @@ -0,0 +1,58 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.rocketmq.broker.filter; + +import org.apache.rocketmq.filter.expression.EvaluationContext; + +import java.util.HashMap; +import java.util.Map; + +/** + * Evaluation context from message. + */ +public class MessageEvaluationContext implements EvaluationContext { + + private Map<String, String> properties; + + public MessageEvaluationContext(Map<String, String> properties) { + this.properties = properties; + } + + @Override + public Object get(final String name) { + if (this.properties == null) { + return null; + } + return this.properties.get(name); + } + + @Override + public Map<String, Object> keyValues() { + if (properties == null) { + return null; + } + + Map<String, Object> copy = new HashMap<String, Object>(properties.size(), 1); + + for (String key : properties.keySet()) { + copy.put(key, properties.get(key)); + } + + return copy; + } +} http://git-wip-us.apache.org/repos/asf/incubator-rocketmq/blob/58f1574b/broker/src/main/java/org/apache/rocketmq/broker/longpolling/NotifyMessageArrivingListener.java ---------------------------------------------------------------------- diff --git a/broker/src/main/java/org/apache/rocketmq/broker/longpolling/NotifyMessageArrivingListener.java b/broker/src/main/java/org/apache/rocketmq/broker/longpolling/NotifyMessageArrivingListener.java index 2dec9f7..fd38c4f 100644 --- a/broker/src/main/java/org/apache/rocketmq/broker/longpolling/NotifyMessageArrivingListener.java +++ b/broker/src/main/java/org/apache/rocketmq/broker/longpolling/NotifyMessageArrivingListener.java @@ -19,6 +19,8 @@ package org.apache.rocketmq.broker.longpolling; import org.apache.rocketmq.store.MessageArrivingListener; +import java.util.Map; + public class NotifyMessageArrivingListener implements MessageArrivingListener { private final PullRequestHoldService pullRequestHoldService; @@ -27,7 +29,9 @@ public class NotifyMessageArrivingListener implements MessageArrivingListener { } @Override - public void arriving(String topic, int queueId, long logicOffset, long tagsCode) { - this.pullRequestHoldService.notifyMessageArriving(topic, queueId, logicOffset, tagsCode); + public void arriving(String topic, int queueId, long logicOffset, long tagsCode, + long msgStoreTime, byte[] filterBitMap, Map<String, String> properties) { + this.pullRequestHoldService.notifyMessageArriving(topic, queueId, logicOffset, tagsCode, + msgStoreTime, filterBitMap, properties); } } http://git-wip-us.apache.org/repos/asf/incubator-rocketmq/blob/58f1574b/broker/src/main/java/org/apache/rocketmq/broker/longpolling/PullRequest.java ---------------------------------------------------------------------- diff --git a/broker/src/main/java/org/apache/rocketmq/broker/longpolling/PullRequest.java b/broker/src/main/java/org/apache/rocketmq/broker/longpolling/PullRequest.java index b66344f..045ab9b 100644 --- a/broker/src/main/java/org/apache/rocketmq/broker/longpolling/PullRequest.java +++ b/broker/src/main/java/org/apache/rocketmq/broker/longpolling/PullRequest.java @@ -19,6 +19,7 @@ package org.apache.rocketmq.broker.longpolling; import io.netty.channel.Channel; import org.apache.rocketmq.common.protocol.heartbeat.SubscriptionData; import org.apache.rocketmq.remoting.protocol.RemotingCommand; +import org.apache.rocketmq.store.MessageFilter; public class PullRequest { private final RemotingCommand requestCommand; @@ -27,15 +28,18 @@ public class PullRequest { private final long suspendTimestamp; private final long pullFromThisOffset; private final SubscriptionData subscriptionData; + private final MessageFilter messageFilter; public PullRequest(RemotingCommand requestCommand, Channel clientChannel, long timeoutMillis, long suspendTimestamp, - long pullFromThisOffset, SubscriptionData subscriptionData) { + long pullFromThisOffset, SubscriptionData subscriptionData, + MessageFilter messageFilter) { this.requestCommand = requestCommand; this.clientChannel = clientChannel; this.timeoutMillis = timeoutMillis; this.suspendTimestamp = suspendTimestamp; this.pullFromThisOffset = pullFromThisOffset; this.subscriptionData = subscriptionData; + this.messageFilter = messageFilter; } public RemotingCommand getRequestCommand() { @@ -61,4 +65,8 @@ public class PullRequest { public SubscriptionData getSubscriptionData() { return subscriptionData; } + + public MessageFilter getMessageFilter() { + return messageFilter; + } } http://git-wip-us.apache.org/repos/asf/incubator-rocketmq/blob/58f1574b/broker/src/main/java/org/apache/rocketmq/broker/longpolling/PullRequestHoldService.java ---------------------------------------------------------------------- diff --git a/broker/src/main/java/org/apache/rocketmq/broker/longpolling/PullRequestHoldService.java b/broker/src/main/java/org/apache/rocketmq/broker/longpolling/PullRequestHoldService.java index fdba50d..1a53db1 100644 --- a/broker/src/main/java/org/apache/rocketmq/broker/longpolling/PullRequestHoldService.java +++ b/broker/src/main/java/org/apache/rocketmq/broker/longpolling/PullRequestHoldService.java @@ -18,13 +18,13 @@ package org.apache.rocketmq.broker.longpolling; import java.util.ArrayList; import java.util.List; +import java.util.Map; import java.util.concurrent.ConcurrentHashMap; import org.apache.rocketmq.broker.BrokerController; import org.apache.rocketmq.common.ServiceThread; import org.apache.rocketmq.common.SystemClock; import org.apache.rocketmq.common.constant.LoggerName; -import org.apache.rocketmq.store.DefaultMessageFilter; -import org.apache.rocketmq.store.MessageFilter; +import org.apache.rocketmq.store.ConsumeQueueExt; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -33,7 +33,6 @@ public class PullRequestHoldService extends ServiceThread { private static final String TOPIC_QUEUEID_SEPARATOR = "@"; private final BrokerController brokerController; private final SystemClock systemClock = new SystemClock(); - private final MessageFilter messageFilter = new DefaultMessageFilter(); private ConcurrentHashMap<String/* topic@queueId */, ManyPullRequest> pullRequestTable = new ConcurrentHashMap<String, ManyPullRequest>(1024); @@ -110,10 +109,11 @@ public class PullRequestHoldService extends ServiceThread { } public void notifyMessageArriving(final String topic, final int queueId, final long maxOffset) { - notifyMessageArriving(topic, queueId, maxOffset, null); + notifyMessageArriving(topic, queueId, maxOffset, null, 0, null, null); } - public void notifyMessageArriving(final String topic, final int queueId, final long maxOffset, final Long tagsCode) { + public void notifyMessageArriving(final String topic, final int queueId, final long maxOffset, final Long tagsCode, + long msgStoreTime, byte[] filterBitMap, Map<String, String> properties) { String key = this.buildKey(topic, queueId); ManyPullRequest mpr = this.pullRequestTable.get(key); if (mpr != null) { @@ -128,7 +128,14 @@ public class PullRequestHoldService extends ServiceThread { } if (newestOffset > request.getPullFromThisOffset()) { - if (this.messageFilter.isMessageMatched(request.getSubscriptionData(), tagsCode)) { + boolean match = request.getMessageFilter().isMatchedByConsumeQueue(tagsCode, + new ConsumeQueueExt.CqExtUnit(tagsCode, msgStoreTime, filterBitMap)); + // match by bit map, need eval again when properties is not null. + if (match && properties != null) { + match = request.getMessageFilter().isMatchedByCommitLog(null, properties); + } + + if (match) { try { this.brokerController.getPullMessageProcessor().executeRequestWhenWakeup(request.getClientChannel(), request.getRequestCommand()); http://git-wip-us.apache.org/repos/asf/incubator-rocketmq/blob/58f1574b/broker/src/main/java/org/apache/rocketmq/broker/out/BrokerOuterAPI.java ---------------------------------------------------------------------- diff --git a/broker/src/main/java/org/apache/rocketmq/broker/out/BrokerOuterAPI.java b/broker/src/main/java/org/apache/rocketmq/broker/out/BrokerOuterAPI.java index 039c942..6c2a987 100644 --- a/broker/src/main/java/org/apache/rocketmq/broker/out/BrokerOuterAPI.java +++ b/broker/src/main/java/org/apache/rocketmq/broker/out/BrokerOuterAPI.java @@ -50,7 +50,7 @@ import org.slf4j.LoggerFactory; public class BrokerOuterAPI { private static final Logger log = LoggerFactory.getLogger(LoggerName.BROKER_LOGGER_NAME); private final RemotingClient remotingClient; - private final TopAddressing topAddressing = new TopAddressing(MixAll.WS_ADDR); + private final TopAddressing topAddressing = new TopAddressing(MixAll.getWSAddr()); private String nameSrvAddr = null; public BrokerOuterAPI(final NettyClientConfig nettyClientConfig) { http://git-wip-us.apache.org/repos/asf/incubator-rocketmq/blob/58f1574b/broker/src/main/java/org/apache/rocketmq/broker/plugin/AbstractPluginMessageStore.java ---------------------------------------------------------------------- diff --git a/broker/src/main/java/org/apache/rocketmq/broker/plugin/AbstractPluginMessageStore.java b/broker/src/main/java/org/apache/rocketmq/broker/plugin/AbstractPluginMessageStore.java index 00257fd..8ded973 100644 --- a/broker/src/main/java/org/apache/rocketmq/broker/plugin/AbstractPluginMessageStore.java +++ b/broker/src/main/java/org/apache/rocketmq/broker/plugin/AbstractPluginMessageStore.java @@ -18,11 +18,14 @@ package org.apache.rocketmq.broker.plugin; import java.util.HashMap; +import java.util.LinkedList; import java.util.Set; import org.apache.rocketmq.common.message.MessageExt; -import org.apache.rocketmq.common.protocol.heartbeat.SubscriptionData; +import org.apache.rocketmq.store.CommitLogDispatcher; +import org.apache.rocketmq.store.ConsumeQueue; import org.apache.rocketmq.store.GetMessageResult; import org.apache.rocketmq.store.MessageExtBrokerInner; +import org.apache.rocketmq.store.MessageFilter; import org.apache.rocketmq.store.MessageStore; import org.apache.rocketmq.store.PutMessageResult; import org.apache.rocketmq.store.QueryMessageResult; @@ -84,8 +87,8 @@ public abstract class AbstractPluginMessageStore implements MessageStore { @Override public GetMessageResult getMessage(String group, String topic, int queueId, long offset, - int maxMsgNums, SubscriptionData subscriptionData) { - return next.getMessage(group, topic, queueId, offset, maxMsgNums, subscriptionData); + int maxMsgNums, final MessageFilter messageFilter) { + return next.getMessage(group, topic, queueId, offset, maxMsgNums, messageFilter); } @Override @@ -234,4 +237,13 @@ public abstract class AbstractPluginMessageStore implements MessageStore { next.setConfirmOffset(phyOffset); } + @Override + public LinkedList<CommitLogDispatcher> getDispatcherList() { + return next.getDispatcherList(); + } + + @Override + public ConsumeQueue getConsumeQueue(String topic, int queueId) { + return next.getConsumeQueue(topic, queueId); + } } http://git-wip-us.apache.org/repos/asf/incubator-rocketmq/blob/58f1574b/broker/src/main/java/org/apache/rocketmq/broker/processor/AdminBrokerProcessor.java ---------------------------------------------------------------------- diff --git a/broker/src/main/java/org/apache/rocketmq/broker/processor/AdminBrokerProcessor.java b/broker/src/main/java/org/apache/rocketmq/broker/processor/AdminBrokerProcessor.java index e35316d..daea53c 100644 --- a/broker/src/main/java/org/apache/rocketmq/broker/processor/AdminBrokerProcessor.java +++ b/broker/src/main/java/org/apache/rocketmq/broker/processor/AdminBrokerProcessor.java @@ -16,6 +16,7 @@ */ package org.apache.rocketmq.broker.processor; +import com.alibaba.fastjson.JSON; import io.netty.channel.Channel; import io.netty.channel.ChannelHandlerContext; import java.io.UnsupportedEncodingException; @@ -32,6 +33,8 @@ import java.util.concurrent.ConcurrentHashMap; import org.apache.rocketmq.broker.BrokerController; import org.apache.rocketmq.broker.client.ClientChannelInfo; import org.apache.rocketmq.broker.client.ConsumerGroupInfo; +import org.apache.rocketmq.broker.filter.ConsumerFilterData; +import org.apache.rocketmq.broker.filter.ExpressionMessageFilter; import org.apache.rocketmq.common.MQVersion; import org.apache.rocketmq.common.MixAll; import org.apache.rocketmq.common.TopicConfig; @@ -49,6 +52,7 @@ import org.apache.rocketmq.common.protocol.ResponseCode; import org.apache.rocketmq.common.protocol.body.BrokerStatsData; import org.apache.rocketmq.common.protocol.body.BrokerStatsItem; import org.apache.rocketmq.common.protocol.body.Connection; +import org.apache.rocketmq.common.protocol.body.ConsumeQueueData; import org.apache.rocketmq.common.protocol.body.ConsumeStatsList; import org.apache.rocketmq.common.protocol.body.ConsumerConnection; import org.apache.rocketmq.common.protocol.body.GroupList; @@ -56,6 +60,7 @@ import org.apache.rocketmq.common.protocol.body.KVTable; import org.apache.rocketmq.common.protocol.body.LockBatchRequestBody; import org.apache.rocketmq.common.protocol.body.LockBatchResponseBody; import org.apache.rocketmq.common.protocol.body.ProducerConnection; +import org.apache.rocketmq.common.protocol.body.QueryConsumeQueueResponseBody; import org.apache.rocketmq.common.protocol.body.QueryConsumeTimeSpanBody; import org.apache.rocketmq.common.protocol.body.QueryCorrectionOffsetBody; import org.apache.rocketmq.common.protocol.body.QueueTimeSpan; @@ -81,6 +86,7 @@ import org.apache.rocketmq.common.protocol.header.GetMinOffsetRequestHeader; import org.apache.rocketmq.common.protocol.header.GetMinOffsetResponseHeader; import org.apache.rocketmq.common.protocol.header.GetProducerConnectionListRequestHeader; import org.apache.rocketmq.common.protocol.header.GetTopicStatsInfoRequestHeader; +import org.apache.rocketmq.common.protocol.header.QueryConsumeQueueRequestHeader; import org.apache.rocketmq.common.protocol.header.QueryConsumeTimeSpanRequestHeader; import org.apache.rocketmq.common.protocol.header.QueryCorrectionOffsetHeader; import org.apache.rocketmq.common.protocol.header.QueryTopicConsumeByWhoRequestHeader; @@ -94,6 +100,7 @@ import org.apache.rocketmq.common.protocol.heartbeat.SubscriptionData; import org.apache.rocketmq.common.stats.StatsItem; import org.apache.rocketmq.common.stats.StatsSnapshot; import org.apache.rocketmq.common.subscription.SubscriptionGroupConfig; +import org.apache.rocketmq.filter.util.BitsArray; import org.apache.rocketmq.remoting.common.RemotingHelper; import org.apache.rocketmq.remoting.exception.RemotingCommandException; import org.apache.rocketmq.remoting.exception.RemotingTimeoutException; @@ -101,7 +108,10 @@ import org.apache.rocketmq.remoting.netty.NettyRequestProcessor; import org.apache.rocketmq.remoting.protocol.LanguageCode; import org.apache.rocketmq.remoting.protocol.RemotingCommand; import org.apache.rocketmq.remoting.protocol.RemotingSerializable; +import org.apache.rocketmq.store.ConsumeQueue; +import org.apache.rocketmq.store.ConsumeQueueExt; import org.apache.rocketmq.store.DefaultMessageStore; +import org.apache.rocketmq.store.MessageFilter; import org.apache.rocketmq.store.SelectMappedBufferResult; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -187,6 +197,8 @@ public class AdminBrokerProcessor implements NettyRequestProcessor { return ViewBrokerStatsData(ctx, request); case RequestCode.GET_BROKER_CONSUME_STATS: return fetchAllConsumeStatsInBroker(ctx, request); + case RequestCode.QUERY_CONSUME_QUEUE: + return queryConsumeQueue(ctx, request); default: break; } @@ -1244,4 +1256,83 @@ public class AdminBrokerProcessor implements NettyRequestProcessor { } } + private RemotingCommand queryConsumeQueue(ChannelHandlerContext ctx, RemotingCommand request) throws RemotingCommandException { + QueryConsumeQueueRequestHeader requestHeader = + (QueryConsumeQueueRequestHeader) request.decodeCommandCustomHeader(QueryConsumeQueueRequestHeader.class); + + RemotingCommand response = RemotingCommand.createResponseCommand(null); + + ConsumeQueue consumeQueue = this.brokerController.getMessageStore().getConsumeQueue(requestHeader.getTopic(), + requestHeader.getQueueId()); + if (consumeQueue == null) { + response.setCode(ResponseCode.SYSTEM_ERROR); + response.setRemark(String.format("%d@%s is not exist!", requestHeader.getQueueId(), requestHeader.getTopic())); + return response; + } + + QueryConsumeQueueResponseBody body = new QueryConsumeQueueResponseBody(); + response.setCode(ResponseCode.SUCCESS); + response.setBody(body.encode()); + + body.setMaxQueueIndex(consumeQueue.getMaxOffsetInQueue()); + body.setMinQueueIndex(consumeQueue.getMinOffsetInQueue()); + + MessageFilter messageFilter = null; + if (requestHeader.getConsumerGroup() != null) { + SubscriptionData subscriptionData = this.brokerController.getConsumerManager().findSubscriptionData( + requestHeader.getConsumerGroup(), requestHeader.getTopic() + ); + body.setSubscriptionData(subscriptionData); + if (subscriptionData == null) { + body.setFilterData(String.format("%s@%s is not online!", requestHeader.getConsumerGroup(), requestHeader.getTopic())); + } else { + ConsumerFilterData filterData = this.brokerController.getConsumerFilterManager() + .get(requestHeader.getTopic(), requestHeader.getConsumerGroup()); + body.setFilterData(JSON.toJSONString(filterData, true)); + + messageFilter = new ExpressionMessageFilter(subscriptionData, filterData, + this.brokerController.getConsumerFilterManager()); + } + } + + SelectMappedBufferResult result = consumeQueue.getIndexBuffer(requestHeader.getIndex()); + if (result == null) { + response.setRemark(String.format("Index %d of %d@%s is not exist!", requestHeader.getIndex(), requestHeader.getQueueId(), requestHeader.getTopic())); + return response; + } + try { + List<ConsumeQueueData> queues = new ArrayList<>(); + for (int i = 0; i < result.getSize() && i < requestHeader.getCount() * ConsumeQueue.CQ_STORE_UNIT_SIZE; i += ConsumeQueue.CQ_STORE_UNIT_SIZE) { + ConsumeQueueData one = new ConsumeQueueData(); + one.setPhysicOffset(result.getByteBuffer().getLong()); + one.setPhysicSize(result.getByteBuffer().getInt()); + one.setTagsCode(result.getByteBuffer().getLong()); + + if (!consumeQueue.isExtAddr(one.getTagsCode())) { + queues.add(one); + continue; + } + + ConsumeQueueExt.CqExtUnit cqExtUnit = consumeQueue.getExt(one.getTagsCode()); + if (cqExtUnit != null) { + one.setExtendDataJson(JSON.toJSONString(cqExtUnit)); + if (cqExtUnit.getFilterBitMap() != null) { + one.setBitMap(BitsArray.create(cqExtUnit.getFilterBitMap()).toString()); + } + if (messageFilter != null) { + one.setEval(messageFilter.isMatchedByConsumeQueue(cqExtUnit.getTagsCode(), cqExtUnit)); + } + } else { + one.setMsg("Cq extend not exist!addr: " + one.getTagsCode()); + } + + queues.add(one); + } + body.setQueueData(queues); + } finally { + result.release(); + } + + return response; + } }