http://git-wip-us.apache.org/repos/asf/incubator-rocketmq/blob/58f1574b/broker/src/main/java/org/apache/rocketmq/broker/processor/ClientManageProcessor.java ---------------------------------------------------------------------- diff --git a/broker/src/main/java/org/apache/rocketmq/broker/processor/ClientManageProcessor.java b/broker/src/main/java/org/apache/rocketmq/broker/processor/ClientManageProcessor.java index 6349ffc..67807a8 100644 --- a/broker/src/main/java/org/apache/rocketmq/broker/processor/ClientManageProcessor.java +++ b/broker/src/main/java/org/apache/rocketmq/broker/processor/ClientManageProcessor.java @@ -22,15 +22,19 @@ import org.apache.rocketmq.broker.client.ClientChannelInfo; import org.apache.rocketmq.common.MixAll; import org.apache.rocketmq.common.constant.LoggerName; import org.apache.rocketmq.common.constant.PermName; +import org.apache.rocketmq.common.filter.ExpressionType; import org.apache.rocketmq.common.protocol.RequestCode; import org.apache.rocketmq.common.protocol.ResponseCode; +import org.apache.rocketmq.common.protocol.body.CheckClientRequestBody; import org.apache.rocketmq.common.protocol.header.UnregisterClientRequestHeader; import org.apache.rocketmq.common.protocol.header.UnregisterClientResponseHeader; import org.apache.rocketmq.common.protocol.heartbeat.ConsumerData; import org.apache.rocketmq.common.protocol.heartbeat.HeartbeatData; import org.apache.rocketmq.common.protocol.heartbeat.ProducerData; +import org.apache.rocketmq.common.protocol.heartbeat.SubscriptionData; import org.apache.rocketmq.common.subscription.SubscriptionGroupConfig; import org.apache.rocketmq.common.sysflag.TopicSysFlag; +import org.apache.rocketmq.filter.FilterFactory; import org.apache.rocketmq.remoting.common.RemotingHelper; import org.apache.rocketmq.remoting.exception.RemotingCommandException; import org.apache.rocketmq.remoting.netty.NettyRequestProcessor; @@ -54,6 +58,8 @@ public class ClientManageProcessor implements NettyRequestProcessor { return this.heartBeat(ctx, request); case RequestCode.UNREGISTER_CLIENT: return this.unregisterClient(ctx, request); + case RequestCode.CHECK_CLIENT_CONFIG: + return this.checkClientConfig(ctx, request); default: break; } @@ -157,4 +163,42 @@ public class ClientManageProcessor implements NettyRequestProcessor { response.setRemark(null); return response; } + + public RemotingCommand checkClientConfig(ChannelHandlerContext ctx, RemotingCommand request) + throws RemotingCommandException { + final RemotingCommand response = RemotingCommand.createResponseCommand(null); + + CheckClientRequestBody requestBody = CheckClientRequestBody.decode(request.getBody(), + CheckClientRequestBody.class); + + if (requestBody != null && requestBody.getSubscriptionData() != null) { + SubscriptionData subscriptionData = requestBody.getSubscriptionData(); + + if (ExpressionType.isTagType(subscriptionData.getExpressionType())) { + response.setCode(ResponseCode.SUCCESS); + response.setRemark(null); + return response; + } + + if (!this.brokerController.getBrokerConfig().isEnablePropertyFilter()) { + response.setCode(ResponseCode.SYSTEM_ERROR); + response.setRemark("The broker does not support consumer to filter message by " + subscriptionData.getExpressionType()); + return response; + } + + try { + FilterFactory.INSTANCE.get(subscriptionData.getExpressionType()).compile(subscriptionData.getSubString()); + } catch (Exception e) { + log.warn("Client {}@{} filter message, but failed to compile expression! sub={}, error={}", + requestBody.getClientId(), requestBody.getGroup(), requestBody.getSubscriptionData(), e.getMessage()); + response.setCode(ResponseCode.SUBSCRIPTION_PARSE_FAILED); + response.setRemark(e.getMessage()); + return response; + } + } + + response.setCode(ResponseCode.SUCCESS); + response.setRemark(null); + return response; + } }
http://git-wip-us.apache.org/repos/asf/incubator-rocketmq/blob/58f1574b/broker/src/main/java/org/apache/rocketmq/broker/processor/PullMessageProcessor.java ---------------------------------------------------------------------- diff --git a/broker/src/main/java/org/apache/rocketmq/broker/processor/PullMessageProcessor.java b/broker/src/main/java/org/apache/rocketmq/broker/processor/PullMessageProcessor.java index 89967d8..10945da 100644 --- a/broker/src/main/java/org/apache/rocketmq/broker/processor/PullMessageProcessor.java +++ b/broker/src/main/java/org/apache/rocketmq/broker/processor/PullMessageProcessor.java @@ -25,6 +25,10 @@ import java.nio.ByteBuffer; import java.util.List; import org.apache.rocketmq.broker.BrokerController; import org.apache.rocketmq.broker.client.ConsumerGroupInfo; +import org.apache.rocketmq.broker.filter.ConsumerFilterData; +import org.apache.rocketmq.broker.filter.ConsumerFilterManager; +import org.apache.rocketmq.broker.filter.ExpressionForRetryMessageFilter; +import org.apache.rocketmq.broker.filter.ExpressionMessageFilter; import org.apache.rocketmq.broker.longpolling.PullRequest; import org.apache.rocketmq.broker.mqtrace.ConsumeMessageContext; import org.apache.rocketmq.broker.mqtrace.ConsumeMessageHook; @@ -34,6 +38,7 @@ import org.apache.rocketmq.common.TopicConfig; import org.apache.rocketmq.common.TopicFilterType; import org.apache.rocketmq.common.constant.LoggerName; import org.apache.rocketmq.common.constant.PermName; +import org.apache.rocketmq.common.filter.ExpressionType; import org.apache.rocketmq.common.filter.FilterAPI; import org.apache.rocketmq.common.help.FAQUrl; import org.apache.rocketmq.common.message.MessageDecoder; @@ -54,6 +59,7 @@ import org.apache.rocketmq.remoting.netty.RequestTask; import org.apache.rocketmq.remoting.protocol.RemotingCommand; import org.apache.rocketmq.store.GetMessageResult; import org.apache.rocketmq.store.MessageExtBrokerInner; +import org.apache.rocketmq.store.MessageFilter; import org.apache.rocketmq.store.PutMessageResult; import org.apache.rocketmq.store.config.BrokerRole; import org.apache.rocketmq.store.stats.BrokerStatsManager; @@ -142,13 +148,22 @@ public class PullMessageProcessor implements NettyRequestProcessor { } SubscriptionData subscriptionData = null; + ConsumerFilterData consumerFilterData = null; if (hasSubscriptionFlag) { try { - subscriptionData = FilterAPI.buildSubscriptionData(requestHeader.getConsumerGroup(), requestHeader.getTopic(), - requestHeader.getSubscription()); + subscriptionData = FilterAPI.build( + requestHeader.getTopic(), requestHeader.getSubscription(), requestHeader.getExpressionType() + ); + if (!ExpressionType.isTagType(subscriptionData.getExpressionType())) { + consumerFilterData = ConsumerFilterManager.build( + requestHeader.getTopic(), requestHeader.getConsumerGroup(), requestHeader.getSubscription(), + requestHeader.getExpressionType(), requestHeader.getSubVersion() + ); + assert consumerFilterData != null; + } } catch (Exception e) { LOG.warn("Parse the consumer's subscription[{}] failed, group: {}", requestHeader.getSubscription(), // - requestHeader.getConsumerGroup()); + requestHeader.getConsumerGroup()); response.setCode(ResponseCode.SUBSCRIPTION_PARSE_FAILED); response.setRemark("parse the consumer's subscription failed"); return response; @@ -180,16 +195,48 @@ public class PullMessageProcessor implements NettyRequestProcessor { if (subscriptionData.getSubVersion() < requestHeader.getSubVersion()) { LOG.warn("The broker's subscription is not latest, group: {} {}", requestHeader.getConsumerGroup(), - subscriptionData.getSubString()); + subscriptionData.getSubString()); response.setCode(ResponseCode.SUBSCRIPTION_NOT_LATEST); response.setRemark("the consumer's subscription not latest"); return response; } + if (!ExpressionType.isTagType(subscriptionData.getExpressionType())) { + consumerFilterData = this.brokerController.getConsumerFilterManager().get(requestHeader.getTopic(), + requestHeader.getConsumerGroup()); + if (consumerFilterData == null) { + response.setCode(ResponseCode.FILTER_DATA_NOT_EXIST); + response.setRemark("The broker's consumer filter data is not exist!Your expression may be wrong!"); + return response; + } + if (consumerFilterData.getClientVersion() < requestHeader.getSubVersion()) { + LOG.warn("The broker's consumer filter data is not latest, group: {}, topic: {}, serverV: {}, clientV: {}", + requestHeader.getConsumerGroup(), requestHeader.getTopic(), consumerFilterData.getClientVersion(), requestHeader.getSubVersion()); + response.setCode(ResponseCode.FILTER_DATA_NOT_LATEST); + response.setRemark("the consumer's consumer filter data not latest"); + return response; + } + } + } + + if (!ExpressionType.isTagType(subscriptionData.getExpressionType()) + && !this.brokerController.getBrokerConfig().isEnablePropertyFilter()) { + response.setCode(ResponseCode.SYSTEM_ERROR); + response.setRemark("The broker does not support consumer to filter message by " + subscriptionData.getExpressionType()); + return response; + } + + MessageFilter messageFilter; + if (this.brokerController.getBrokerConfig().isFilterSupportRetry()) { + messageFilter = new ExpressionForRetryMessageFilter(subscriptionData, consumerFilterData, + this.brokerController.getConsumerFilterManager()); + } else { + messageFilter = new ExpressionMessageFilter(subscriptionData, consumerFilterData, + this.brokerController.getConsumerFilterManager()); } final GetMessageResult getMessageResult = this.brokerController.getMessageStore().getMessage(requestHeader.getConsumerGroup(), requestHeader.getTopic(), - requestHeader.getQueueId(), requestHeader.getQueueOffset(), requestHeader.getMaxMsgNums(), subscriptionData); + requestHeader.getQueueId(), requestHeader.getQueueOffset(), requestHeader.getMaxMsgNums(), messageFilter); if (getMessageResult != null) { response.setRemark(getMessageResult.getStatus().name()); responseHeader.setNextBeginOffset(getMessageResult.getNextBeginOffset()); @@ -368,7 +415,7 @@ public class PullMessageProcessor implements NettyRequestProcessor { long offset = requestHeader.getQueueOffset(); int queueId = requestHeader.getQueueId(); PullRequest pullRequest = new PullRequest(request, channel, pollingTimeMills, - this.brokerController.getMessageStore().now(), offset, subscriptionData); + this.brokerController.getMessageStore().now(), offset, subscriptionData, messageFilter); this.brokerController.getPullRequestHoldService().suspendPullRequest(topic, queueId, pullRequest); response = null; break; http://git-wip-us.apache.org/repos/asf/incubator-rocketmq/blob/58f1574b/broker/src/test/java/org/apache/rocketmq/broker/filter/CommitLogDispatcherCalcBitMapTest.java ---------------------------------------------------------------------- diff --git a/broker/src/test/java/org/apache/rocketmq/broker/filter/CommitLogDispatcherCalcBitMapTest.java b/broker/src/test/java/org/apache/rocketmq/broker/filter/CommitLogDispatcherCalcBitMapTest.java new file mode 100644 index 0000000..87f6256 --- /dev/null +++ b/broker/src/test/java/org/apache/rocketmq/broker/filter/CommitLogDispatcherCalcBitMapTest.java @@ -0,0 +1,192 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.rocketmq.broker.filter; + +import org.apache.rocketmq.common.BrokerConfig; +import org.apache.rocketmq.common.filter.ExpressionType; +import org.apache.rocketmq.filter.util.BitsArray; +import org.apache.rocketmq.store.DispatchRequest; +import org.junit.Test; + +import java.util.Collection; +import java.util.HashMap; +import java.util.Map; +import java.util.UUID; + +import static org.assertj.core.api.Assertions.assertThat; + +public class CommitLogDispatcherCalcBitMapTest { + + @Test + public void testDispatch_filterDataIllegal() { + BrokerConfig brokerConfig = new BrokerConfig(); + brokerConfig.setEnableCalcFilterBitMap(true); + + ConsumerFilterManager filterManager = new ConsumerFilterManager(); + + filterManager.register("topic0", "CID_0", "a is not null and a >= 5", + ExpressionType.SQL92, System.currentTimeMillis()); + + filterManager.register("topic0", "CID_1", "a is not null and a >= 15", + ExpressionType.SQL92, System.currentTimeMillis()); + + ConsumerFilterData nullExpression = filterManager.get("topic0", "CID_0"); + nullExpression.setExpression(null); + nullExpression.setCompiledExpression(null); + ConsumerFilterData nullBloomData = filterManager.get("topic0", "CID_1"); + nullBloomData.setBloomFilterData(null); + + + CommitLogDispatcherCalcBitMap calcBitMap = new CommitLogDispatcherCalcBitMap(brokerConfig, + filterManager); + + for (int i = 0; i < 1; i++) { + Map<String, String> properties = new HashMap<String, String>(4); + properties.put("a", String.valueOf(i * 10 + 5)); + + String topic = "topic" + i; + + DispatchRequest dispatchRequest = new DispatchRequest( + topic, + 0, + i * 100 + 123, + 100, + (long) ("tags" + i).hashCode(), + System.currentTimeMillis(), + i, + null, + UUID.randomUUID().toString(), + 0, + 0, + properties + ); + + calcBitMap.dispatch(dispatchRequest); + + assertThat(dispatchRequest.getBitMap()).isNotNull(); + + BitsArray bitsArray = BitsArray.create(dispatchRequest.getBitMap(), + filterManager.getBloomFilter().getM()); + + for (int j = 0; j < bitsArray.bitLength(); j++) { + assertThat(bitsArray.getBit(j)).isFalse(); + } + } + } + + @Test + public void testDispatch_blankFilterData() { + BrokerConfig brokerConfig = new BrokerConfig(); + brokerConfig.setEnableCalcFilterBitMap(true); + + ConsumerFilterManager filterManager = new ConsumerFilterManager(); + + CommitLogDispatcherCalcBitMap calcBitMap = new CommitLogDispatcherCalcBitMap(brokerConfig, + filterManager); + + for (int i = 0; i < 10; i++) { + Map<String, String> properties = new HashMap<String, String>(4); + properties.put("a", String.valueOf(i * 10 + 5)); + + String topic = "topic" + i; + + DispatchRequest dispatchRequest = new DispatchRequest( + topic, + 0, + i * 100 + 123, + 100, + (long) ("tags" + i).hashCode(), + System.currentTimeMillis(), + i, + null, + UUID.randomUUID().toString(), + 0, + 0, + properties + ); + + calcBitMap.dispatch(dispatchRequest); + + assertThat(dispatchRequest.getBitMap()).isNull(); + } + } + + @Test + public void testDispatch() { + BrokerConfig brokerConfig = new BrokerConfig(); + brokerConfig.setEnableCalcFilterBitMap(true); + + ConsumerFilterManager filterManager = ConsumerFilterManagerTest.gen(10, 10); + + CommitLogDispatcherCalcBitMap calcBitMap = new CommitLogDispatcherCalcBitMap(brokerConfig, + filterManager); + + for (int i = 0; i < 10; i++) { + Map<String, String> properties = new HashMap<String, String>(4); + properties.put("a", String.valueOf(i * 10 + 5)); + + String topic = "topic" + i; + + DispatchRequest dispatchRequest = new DispatchRequest( + topic, + 0, + i * 100 + 123, + 100, + (long) ("tags" + i).hashCode(), + System.currentTimeMillis(), + i, + null, + UUID.randomUUID().toString(), + 0, + 0, + properties + ); + + calcBitMap.dispatch(dispatchRequest); + + assertThat(dispatchRequest.getBitMap()).isNotNull(); + + BitsArray bits = BitsArray.create(dispatchRequest.getBitMap()); + + Collection<ConsumerFilterData> filterDatas = filterManager.get(topic); + + for (ConsumerFilterData filterData : filterDatas) { + + if (filterManager.getBloomFilter().isHit(filterData.getBloomFilterData(), bits)) { + try { + assertThat((Boolean) filterData.getCompiledExpression().evaluate( + new MessageEvaluationContext(properties) + )).isTrue(); + } catch (Exception e) { + e.printStackTrace(); + assertThat(true).isFalse(); + } + } else { + try { + assertThat((Boolean) filterData.getCompiledExpression().evaluate( + new MessageEvaluationContext(properties) + )).isFalse(); + } catch (Exception e) { + e.printStackTrace(); + assertThat(true).isFalse(); + } + } + } + } + } +} http://git-wip-us.apache.org/repos/asf/incubator-rocketmq/blob/58f1574b/broker/src/test/java/org/apache/rocketmq/broker/filter/ConsumerFilterManagerTest.java ---------------------------------------------------------------------- diff --git a/broker/src/test/java/org/apache/rocketmq/broker/filter/ConsumerFilterManagerTest.java b/broker/src/test/java/org/apache/rocketmq/broker/filter/ConsumerFilterManagerTest.java new file mode 100644 index 0000000..c8412a8 --- /dev/null +++ b/broker/src/test/java/org/apache/rocketmq/broker/filter/ConsumerFilterManagerTest.java @@ -0,0 +1,291 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.rocketmq.broker.filter; + +import org.apache.rocketmq.common.filter.ExpressionType; +import org.apache.rocketmq.common.filter.FilterAPI; +import org.apache.rocketmq.common.protocol.heartbeat.SubscriptionData; +import org.junit.Test; + +import java.io.File; +import java.util.ArrayList; +import java.util.Collection; +import java.util.Iterator; +import java.util.List; + +import static org.assertj.core.api.Assertions.assertThat; + +public class ConsumerFilterManagerTest { + + public static ConsumerFilterManager gen(int topicCount, int consumerCount) { + ConsumerFilterManager filterManager = new ConsumerFilterManager(); + + for (int i = 0; i < topicCount; i++) { + String topic = "topic" + i; + + for (int j = 0; j < consumerCount; j++) { + + String consumer = "CID_" + j; + + filterManager.register(topic, consumer, expr(j), ExpressionType.SQL92, System.currentTimeMillis()); + } + } + + return filterManager; + } + + public static String expr(int i) { + return "a is not null and a > " + ((i - 1) * 10) + " and a < " + ((i + 1) * 10); + } + + @Test + public void testRegister_newExpressionCompileErrorAndRemoveOld() { + ConsumerFilterManager filterManager = gen(10, 10); + + assertThat(filterManager.get("topic9", "CID_9")).isNotNull(); + + String newExpr = "a between 10,20"; + + assertThat(filterManager.register("topic9", "CID_9", newExpr, ExpressionType.SQL92, System.currentTimeMillis() + 1)) + .isFalse(); + assertThat(filterManager.get("topic9", "CID_9")).isNull(); + + newExpr = "a between 10 AND 20"; + + assertThat(filterManager.register("topic9", "CID_9", newExpr, ExpressionType.SQL92, System.currentTimeMillis() + 1)) + .isTrue(); + + ConsumerFilterData filterData = filterManager.get("topic9", "CID_9"); + + assertThat(filterData).isNotNull(); + assertThat(newExpr).isEqualTo(filterData.getExpression()); + } + + @Test + public void testRegister_change() { + ConsumerFilterManager filterManager = gen(10, 10); + + ConsumerFilterData filterData = filterManager.get("topic9", "CID_9"); + + System.out.println(filterData.getCompiledExpression()); + + String newExpr = "a > 0 and a < 10"; + + filterManager.register("topic9", "CID_9", newExpr, ExpressionType.SQL92, System.currentTimeMillis() + 1); + + filterData = filterManager.get("topic9", "CID_9"); + + assertThat(newExpr).isEqualTo(filterData.getExpression()); + + System.out.println(filterData.toString()); + } + + @Test + public void testRegister() { + ConsumerFilterManager filterManager = gen(10, 10); + + ConsumerFilterData filterData = filterManager.get("topic9", "CID_9"); + + assertThat(filterData).isNotNull(); + assertThat(filterData.isDead()).isFalse(); + + // new version + assertThat(filterManager.register( + "topic9", "CID_9", "a is not null", ExpressionType.SQL92, System.currentTimeMillis() + 1000 + )).isTrue(); + + ConsumerFilterData newFilter = filterManager.get("topic9", "CID_9"); + + assertThat(newFilter).isNotEqualTo(filterData); + + // same version + assertThat(filterManager.register( + "topic9", "CID_9", "a is null", ExpressionType.SQL92, newFilter.getClientVersion() + )).isFalse(); + + ConsumerFilterData filterData1 = filterManager.get("topic9", "CID_9"); + + assertThat(newFilter).isEqualTo(filterData1); + } + + @Test + public void testRegister_reAlive() { + ConsumerFilterManager filterManager = gen(10, 10); + + ConsumerFilterData filterData = filterManager.get("topic9", "CID_9"); + + assertThat(filterData).isNotNull(); + assertThat(filterData.isDead()).isFalse(); + + //make dead + filterManager.unRegister("CID_9"); + + //reAlive + filterManager.register( + filterData.getTopic(), + filterData.getConsumerGroup(), + filterData.getExpression(), + filterData.getExpressionType(), + System.currentTimeMillis() + ); + + ConsumerFilterData newFilterData = filterManager.get("topic9", "CID_9"); + + assertThat(newFilterData).isNotNull(); + assertThat(newFilterData.isDead()).isFalse(); + } + + @Test + public void testRegister_bySubscriptionData() { + ConsumerFilterManager filterManager = new ConsumerFilterManager(); + List<SubscriptionData> subscriptionDatas = new ArrayList<>(); + for (int i = 0; i < 10; i++) { + try { + subscriptionDatas.add( + FilterAPI.build( + "topic" + i, + "a is not null and a > " + i, + ExpressionType.SQL92 + ) + ); + } catch (Exception e) { + e.printStackTrace(); + assertThat(true).isFalse(); + } + } + + filterManager.register("CID_0", subscriptionDatas); + + Collection<ConsumerFilterData> filterDatas = filterManager.getByGroup("CID_0"); + + assertThat(filterDatas).isNotNull(); + assertThat(filterDatas.size()).isEqualTo(10); + + Iterator<ConsumerFilterData> iterator = filterDatas.iterator(); + while (iterator.hasNext()) { + ConsumerFilterData filterData = iterator.next(); + + assertThat(filterData).isNotNull(); + assertThat(filterManager.getBloomFilter().isValid(filterData.getBloomFilterData())).isTrue(); + } + } + + @Test + public void testRegister_tag() { + ConsumerFilterManager filterManager = new ConsumerFilterManager(); + + assertThat(filterManager.register("topic0", "CID_0", "*", null, System.currentTimeMillis())).isFalse(); + + Collection<ConsumerFilterData> filterDatas = filterManager.getByGroup("CID_0"); + + assertThat(filterDatas).isNullOrEmpty(); + } + + @Test + public void testUnregister() { + ConsumerFilterManager filterManager = gen(10, 10); + + ConsumerFilterData filterData = filterManager.get("topic9", "CID_9"); + + assertThat(filterData).isNotNull(); + assertThat(filterData.isDead()).isFalse(); + + filterManager.unRegister("CID_9"); + + assertThat(filterData.isDead()).isTrue(); + } + + @Test + public void testPersist() { + ConsumerFilterManager filterManager = gen(10, 10); + + try { + filterManager.persist(); + + ConsumerFilterData filterData = filterManager.get("topic9", "CID_9"); + + assertThat(filterData).isNotNull(); + assertThat(filterData.isDead()).isFalse(); + + ConsumerFilterManager loadFilter = new ConsumerFilterManager(); + + assertThat(loadFilter.load()).isTrue(); + + filterData = loadFilter.get("topic9", "CID_9"); + + assertThat(filterData).isNotNull(); + assertThat(filterData.isDead()).isTrue(); + assertThat(filterData.getCompiledExpression()).isNotNull(); + } finally { + deleteDirectory("./unit_test"); + } + } + + @Test + public void testPersist_clean() { + ConsumerFilterManager filterManager = gen(10, 10); + + String topic = "topic9"; + for (int i = 0; i < 10; i++) { + String cid = "CID_" + i; + + ConsumerFilterData filterData = filterManager.get(topic, cid); + + assertThat(filterData).isNotNull(); + assertThat(filterData.isDead()).isFalse(); + + //make dead more than 24h + filterData.setBornTime(System.currentTimeMillis() - 26 * 60 * 60 * 1000); + filterData.setDeadTime(System.currentTimeMillis() - 25 * 60 * 60 * 1000); + } + + try { + filterManager.persist(); + + ConsumerFilterManager loadFilter = new ConsumerFilterManager(); + + assertThat(loadFilter.load()).isTrue(); + + ConsumerFilterData filterData = loadFilter.get(topic, "CID_9"); + + assertThat(filterData).isNull(); + + Collection<ConsumerFilterData> topicData = loadFilter.get(topic); + + assertThat(topicData).isNullOrEmpty(); + } finally { + deleteDirectory("./unit_test"); + } + } + + protected void deleteDirectory(String rootPath) { + File file = new File(rootPath); + deleteFile(file); + } + + protected void deleteFile(File file) { + File[] subFiles = file.listFiles(); + if (subFiles != null) { + for (File sub : subFiles) { + deleteFile(sub); + } + } + + file.delete(); + } +} http://git-wip-us.apache.org/repos/asf/incubator-rocketmq/blob/58f1574b/broker/src/test/java/org/apache/rocketmq/broker/filter/MessageStoreWithFilterTest.java ---------------------------------------------------------------------- diff --git a/broker/src/test/java/org/apache/rocketmq/broker/filter/MessageStoreWithFilterTest.java b/broker/src/test/java/org/apache/rocketmq/broker/filter/MessageStoreWithFilterTest.java new file mode 100644 index 0000000..53e563e --- /dev/null +++ b/broker/src/test/java/org/apache/rocketmq/broker/filter/MessageStoreWithFilterTest.java @@ -0,0 +1,392 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.rocketmq.broker.filter; + +import org.apache.rocketmq.common.BrokerConfig; +import org.apache.rocketmq.common.filter.ExpressionType; +import org.apache.rocketmq.common.message.MessageDecoder; +import org.apache.rocketmq.common.message.MessageExt; +import org.apache.rocketmq.common.protocol.heartbeat.SubscriptionData; +import org.apache.rocketmq.store.CommitLogDispatcher; +import org.apache.rocketmq.store.DefaultMessageStore; +import org.apache.rocketmq.store.DispatchRequest; +import org.apache.rocketmq.store.GetMessageResult; +import org.apache.rocketmq.store.GetMessageStatus; +import org.apache.rocketmq.store.MessageArrivingListener; +import org.apache.rocketmq.store.MessageExtBrokerInner; +import org.apache.rocketmq.store.PutMessageResult; +import org.apache.rocketmq.store.config.MessageStoreConfig; +import org.apache.rocketmq.store.stats.BrokerStatsManager; +import org.junit.Test; + +import java.io.File; +import java.net.InetAddress; +import java.net.InetSocketAddress; +import java.net.SocketAddress; +import java.net.UnknownHostException; +import java.nio.ByteBuffer; +import java.util.ArrayList; +import java.util.List; +import java.util.Map; + +import static org.assertj.core.api.Assertions.assertThat; + +public class MessageStoreWithFilterTest { + + private static final String msg = "Once, there was a chance for me!"; + private static final byte[] msgBody = msg.getBytes(); + + private static final String topic = "topic"; + private static final int queueId = 0; + private static final String storePath = "." + File.separator + "unit_test_store"; + private static final int commitLogFileSize = 1024 * 1024 * 256; + private static final int cqFileSize = 300000 * 20; + private static final int cqExtFileSize = 300000 * 128; + + private static SocketAddress BornHost; + + private static SocketAddress StoreHost; + + static { + try { + StoreHost = new InetSocketAddress(InetAddress.getLocalHost(), 8123); + } catch (UnknownHostException e) { + e.printStackTrace(); + } + try { + BornHost = new InetSocketAddress(InetAddress.getByName("127.0.0.1"), 0); + } catch (UnknownHostException e) { + e.printStackTrace(); + } + } + + public MessageExtBrokerInner buildMessage() { + MessageExtBrokerInner msg = new MessageExtBrokerInner(); + msg.setTopic(topic); + msg.setTags("TAG1"); + msg.setKeys("Hello"); + msg.setBody(msgBody); + msg.setKeys(String.valueOf(System.currentTimeMillis())); + msg.setQueueId(queueId); + msg.setSysFlag(0); + msg.setBornTimestamp(System.currentTimeMillis()); + msg.setStoreHost(StoreHost); + msg.setBornHost(BornHost); + for (int i = 1; i < 3; i++) { + msg.putUserProperty(String.valueOf(i), "imagoodperson" + i); + } + msg.setPropertiesString(MessageDecoder.messageProperties2String(msg.getProperties())); + + return msg; + } + + public MessageStoreConfig buildStoreConfig(int commitLogFileSize, int cqFileSize, + boolean enableCqExt, int cqExtFileSize) { + MessageStoreConfig messageStoreConfig = new MessageStoreConfig(); + messageStoreConfig.setMapedFileSizeCommitLog(commitLogFileSize); + messageStoreConfig.setMapedFileSizeConsumeQueue(cqFileSize); + messageStoreConfig.setMappedFileSizeConsumeQueueExt(cqExtFileSize); + messageStoreConfig.setMessageIndexEnable(false); + messageStoreConfig.setEnableConsumeQueueExt(enableCqExt); + + messageStoreConfig.setStorePathRootDir(storePath); + messageStoreConfig.setStorePathCommitLog(storePath + File.separator + "commitlog"); + + return messageStoreConfig; + } + + protected DefaultMessageStore gen(ConsumerFilterManager filterManager) throws Exception { + MessageStoreConfig messageStoreConfig = buildStoreConfig( + commitLogFileSize, cqFileSize, true, cqExtFileSize + ); + + BrokerConfig brokerConfig = new BrokerConfig(); + brokerConfig.setEnableCalcFilterBitMap(true); + brokerConfig.setMaxErrorRateOfBloomFilter(20); + brokerConfig.setExpectConsumerNumUseFilter(64); + + DefaultMessageStore master = new DefaultMessageStore( + messageStoreConfig, + new BrokerStatsManager(brokerConfig.getBrokerClusterName()), + new MessageArrivingListener() { + @Override + public void arriving(String topic, int queueId, long logicOffset, long tagsCode, + long msgStoreTime, byte[] filterBitMap, Map<String, String> properties) { +// System.out.println(String.format("Msg coming: %s, %d, %d, %d", +// topic, queueId, logicOffset, tagsCode)); + } + } + , brokerConfig); + + master.getDispatcherList().addFirst(new CommitLogDispatcher() { + @Override + public void dispatch(DispatchRequest request) { + try { +// System.out.println(String.format("offset:%d, bitMap:%s", request.getCommitLogOffset(), +// BitsArray.create(request.getBitMap()).toString())); + } catch (Throwable e) { + e.printStackTrace(); + } + } + }); + master.getDispatcherList().addFirst(new CommitLogDispatcherCalcBitMap(brokerConfig, filterManager)); + + assertThat(master.load()).isTrue(); + + master.start(); + + return master; + } + + protected List<MessageExtBrokerInner> putMsg(DefaultMessageStore master, int topicCount, int msgCountPerTopic) throws Exception { + List<MessageExtBrokerInner> msgs = new ArrayList<MessageExtBrokerInner>(); + for (int i = 0; i < topicCount; i++) { + String realTopic = topic + i; + for (int j = 0; j < msgCountPerTopic; j++) { + MessageExtBrokerInner msg = buildMessage(); + msg.setTopic(realTopic); + msg.putUserProperty("a", String.valueOf(j * 10 + 5)); + msg.setPropertiesString(MessageDecoder.messageProperties2String(msg.getProperties())); + + PutMessageResult result = master.putMessage(msg); + + msg.setMsgId(result.getAppendMessageResult().getMsgId()); + + msgs.add(msg); + } + } + + return msgs; + } + + protected void deleteDirectory(String rootPath) { + File file = new File(rootPath); + deleteFile(file); + } + + protected void deleteFile(File file) { + File[] subFiles = file.listFiles(); + if (subFiles != null) { + for (File sub : subFiles) { + deleteFile(sub); + } + } + + file.delete(); + } + + protected List<MessageExtBrokerInner> filtered(List<MessageExtBrokerInner> msgs, ConsumerFilterData filterData) { + List<MessageExtBrokerInner> filteredMsgs = new ArrayList<MessageExtBrokerInner>(); + + for (MessageExtBrokerInner messageExtBrokerInner : msgs) { + + if (!messageExtBrokerInner.getTopic().equals(filterData.getTopic())) { + continue; + } + + try { + Object evlRet = filterData.getCompiledExpression().evaluate(new MessageEvaluationContext(messageExtBrokerInner.getProperties())); + + if (evlRet == null || !(evlRet instanceof Boolean) || (Boolean) evlRet) { + filteredMsgs.add(messageExtBrokerInner); + } + } catch (Exception e) { + e.printStackTrace(); + assertThat(true).isFalse(); + } + } + + return filteredMsgs; + } + + @Test + public void testGetMessage_withFilterBitMapAndConsumerChanged() { + int topicCount = 10, msgPerTopic = 10; + ConsumerFilterManager filterManager = ConsumerFilterManagerTest.gen(topicCount, msgPerTopic); + + DefaultMessageStore master = null; + try { + master = gen(filterManager); + } catch (Exception e) { + e.printStackTrace(); + assertThat(true).isFalse(); + } + + try { + List<MessageExtBrokerInner> msgs = null; + try { + msgs = putMsg(master, topicCount, msgPerTopic); + } catch (Exception e) { + e.printStackTrace(); + assertThat(true).isFalse(); + } + + // sleep to wait for consume queue has been constructed. + try { + Thread.sleep(1000); + } catch (InterruptedException e) { + e.printStackTrace(); + assertThat(true).isFalse(); + } + + // reset consumer; + String topic = "topic" + 0; + String resetGroup = "CID_" + 2; + String normalGroup = "CID_" + 3; + + { + // reset CID_2@topic0 to get all messages. + SubscriptionData resetSubData = new SubscriptionData(); + resetSubData.setExpressionType(ExpressionType.SQL92); + resetSubData.setTopic(topic); + resetSubData.setClassFilterMode(false); + resetSubData.setSubString("a is not null OR a is null"); + + ConsumerFilterData resetFilterData = ConsumerFilterManager.build(topic, + resetGroup, resetSubData.getSubString(), resetSubData.getExpressionType(), + System.currentTimeMillis()); + + GetMessageResult resetGetResult = master.getMessage(resetGroup, topic, queueId, 0, 1000, + new ExpressionMessageFilter(resetSubData, resetFilterData, filterManager)); + + try { + assertThat(resetGetResult).isNotNull(); + + List<MessageExtBrokerInner> filteredMsgs = filtered(msgs, resetFilterData); + + assertThat(resetGetResult.getMessageBufferList().size()).isEqualTo(filteredMsgs.size()); + } finally { + resetGetResult.release(); + } + } + + { + ConsumerFilterData normalFilterData = filterManager.get(topic, normalGroup); + assertThat(normalFilterData).isNotNull(); + assertThat(normalFilterData.getBornTime()).isLessThan(System.currentTimeMillis()); + + SubscriptionData normalSubData = new SubscriptionData(); + normalSubData.setExpressionType(normalFilterData.getExpressionType()); + normalSubData.setTopic(topic); + normalSubData.setClassFilterMode(false); + normalSubData.setSubString(normalFilterData.getExpression()); + + List<MessageExtBrokerInner> filteredMsgs = filtered(msgs, normalFilterData); + + GetMessageResult normalGetResult = master.getMessage(normalGroup, topic, queueId, 0, 1000, + new ExpressionMessageFilter(normalSubData, normalFilterData, filterManager)); + + try { + assertThat(normalGetResult).isNotNull(); + assertThat(normalGetResult.getMessageBufferList().size()).isEqualTo(filteredMsgs.size()); + } finally { + normalGetResult.release(); + } + } + } finally { + master.shutdown(); + master.destroy(); + deleteDirectory(storePath); + } + } + + @Test + public void testGetMessage_withFilterBitMap() { + int topicCount = 10, msgPerTopic = 500; + ConsumerFilterManager filterManager = ConsumerFilterManagerTest.gen(topicCount, msgPerTopic); + + DefaultMessageStore master = null; + try { + master = gen(filterManager); + } catch (Exception e) { + e.printStackTrace(); + assertThat(true).isFalse(); + } + + try { + List<MessageExtBrokerInner> msgs = null; + try { + msgs = putMsg(master, topicCount, msgPerTopic); + // sleep to wait for consume queue has been constructed. + Thread.sleep(1000); + } catch (Exception e) { + e.printStackTrace(); + assertThat(true).isFalse(); + } + + for (int i = 0; i < topicCount; i++) { + String realTopic = topic + i; + + for (int j = 0; j < msgPerTopic; j++) { + String group = "CID_" + j; + + ConsumerFilterData filterData = filterManager.get(realTopic, group); + assertThat(filterData).isNotNull(); + + List<MessageExtBrokerInner> filteredMsgs = filtered(msgs, filterData); + + SubscriptionData subscriptionData = new SubscriptionData(); + subscriptionData.setExpressionType(filterData.getExpressionType()); + subscriptionData.setTopic(filterData.getTopic()); + subscriptionData.setClassFilterMode(false); + subscriptionData.setSubString(filterData.getExpression()); + + GetMessageResult getMessageResult = master.getMessage(group, realTopic, queueId, 0, 10000, + new ExpressionMessageFilter(subscriptionData, filterData, filterManager)); + String assertMsg = group + "-" + realTopic; + try { + assertThat(getMessageResult).isNotNull(); + assertThat(GetMessageStatus.FOUND).isEqualTo(getMessageResult.getStatus()); + assertThat(getMessageResult.getMessageBufferList()).isNotNull().isNotEmpty(); + assertThat(getMessageResult.getMessageBufferList().size()).isEqualTo(filteredMsgs.size()); + + for (ByteBuffer buffer : getMessageResult.getMessageBufferList()) { + MessageExt messageExt = MessageDecoder.decode(buffer.slice(), false); + assertThat(messageExt).isNotNull(); + + Object evlRet = null; + try { + evlRet = filterData.getCompiledExpression().evaluate(new MessageEvaluationContext(messageExt.getProperties())); + } catch (Exception e) { + e.printStackTrace(); + assertThat(true).isFalse(); + } + + assertThat(evlRet).isNotNull().isEqualTo(Boolean.TRUE); + + // check + boolean find = false; + for (MessageExtBrokerInner messageExtBrokerInner : filteredMsgs) { + if (messageExtBrokerInner.getMsgId().equals(messageExt.getMsgId())) { + find = true; + } + } + assertThat(find).isTrue(); + } + } finally { + getMessageResult.release(); + } + } + } + } finally { + master.shutdown(); + master.destroy(); + deleteDirectory(storePath); + } + } +} http://git-wip-us.apache.org/repos/asf/incubator-rocketmq/blob/58f1574b/broker/src/test/java/org/apache/rocketmq/broker/processor/PullMessageProcessorTest.java ---------------------------------------------------------------------- diff --git a/broker/src/test/java/org/apache/rocketmq/broker/processor/PullMessageProcessorTest.java b/broker/src/test/java/org/apache/rocketmq/broker/processor/PullMessageProcessorTest.java index d3d9812..941d4a7 100644 --- a/broker/src/test/java/org/apache/rocketmq/broker/processor/PullMessageProcessorTest.java +++ b/broker/src/test/java/org/apache/rocketmq/broker/processor/PullMessageProcessorTest.java @@ -25,6 +25,7 @@ import java.util.List; import java.util.Set; import org.apache.rocketmq.broker.BrokerController; import org.apache.rocketmq.broker.client.ClientChannelInfo; +import org.apache.rocketmq.broker.filter.ExpressionMessageFilter; import org.apache.rocketmq.broker.mqtrace.ConsumeMessageContext; import org.apache.rocketmq.broker.mqtrace.ConsumeMessageHook; import org.apache.rocketmq.common.BrokerConfig; @@ -126,7 +127,7 @@ public class PullMessageProcessorTest { @Test public void testProcessRequest_Found() throws RemotingCommandException { GetMessageResult getMessageResult = createGetMessageResult(); - when(messageStore.getMessage(anyString(), anyString(), anyInt(), anyLong(), anyInt(), any(SubscriptionData.class))).thenReturn(getMessageResult); + when(messageStore.getMessage(anyString(), anyString(), anyInt(), anyLong(), anyInt(), any(ExpressionMessageFilter.class))).thenReturn(getMessageResult); final RemotingCommand request = createPullMsgCommand(RequestCode.PULL_MESSAGE); RemotingCommand response = pullMessageProcessor.processRequest(handlerContext, request); @@ -137,7 +138,7 @@ public class PullMessageProcessorTest { @Test public void testProcessRequest_FoundWithHook() throws RemotingCommandException { GetMessageResult getMessageResult = createGetMessageResult(); - when(messageStore.getMessage(anyString(), anyString(), anyInt(), anyLong(), anyInt(), any(SubscriptionData.class))).thenReturn(getMessageResult); + when(messageStore.getMessage(anyString(), anyString(), anyInt(), anyLong(), anyInt(), any(ExpressionMessageFilter.class))).thenReturn(getMessageResult); List<ConsumeMessageHook> consumeMessageHookList = new ArrayList<>(); final ConsumeMessageContext[] messageContext = new ConsumeMessageContext[1]; ConsumeMessageHook consumeMessageHook = new ConsumeMessageHook() { @@ -168,7 +169,7 @@ public class PullMessageProcessorTest { public void testProcessRequest_MsgWasRemoving() throws RemotingCommandException { GetMessageResult getMessageResult = createGetMessageResult(); getMessageResult.setStatus(GetMessageStatus.MESSAGE_WAS_REMOVING); - when(messageStore.getMessage(anyString(), anyString(), anyInt(), anyLong(), anyInt(), any(SubscriptionData.class))).thenReturn(getMessageResult); + when(messageStore.getMessage(anyString(), anyString(), anyInt(), anyLong(), anyInt(), any(ExpressionMessageFilter.class))).thenReturn(getMessageResult); final RemotingCommand request = createPullMsgCommand(RequestCode.PULL_MESSAGE); RemotingCommand response = pullMessageProcessor.processRequest(handlerContext, request); @@ -180,7 +181,7 @@ public class PullMessageProcessorTest { public void testProcessRequest_NoMsgInQueue() throws RemotingCommandException { GetMessageResult getMessageResult = createGetMessageResult(); getMessageResult.setStatus(GetMessageStatus.NO_MESSAGE_IN_QUEUE); - when(messageStore.getMessage(anyString(), anyString(), anyInt(), anyLong(), anyInt(), any(SubscriptionData.class))).thenReturn(getMessageResult); + when(messageStore.getMessage(anyString(), anyString(), anyInt(), anyLong(), anyInt(), any(ExpressionMessageFilter.class))).thenReturn(getMessageResult); final RemotingCommand request = createPullMsgCommand(RequestCode.PULL_MESSAGE); RemotingCommand response = pullMessageProcessor.processRequest(handlerContext, request); http://git-wip-us.apache.org/repos/asf/incubator-rocketmq/blob/58f1574b/client/src/main/java/org/apache/rocketmq/client/consumer/DefaultMQPushConsumer.java ---------------------------------------------------------------------- diff --git a/client/src/main/java/org/apache/rocketmq/client/consumer/DefaultMQPushConsumer.java b/client/src/main/java/org/apache/rocketmq/client/consumer/DefaultMQPushConsumer.java index 3903fe2..9c9b59e 100644 --- a/client/src/main/java/org/apache/rocketmq/client/consumer/DefaultMQPushConsumer.java +++ b/client/src/main/java/org/apache/rocketmq/client/consumer/DefaultMQPushConsumer.java @@ -519,6 +519,21 @@ public class DefaultMQPushConsumer extends ClientConfig implements MQPushConsume } /** + * Subscribe a topic by message selector. + * + * @see org.apache.rocketmq.client.consumer.MessageSelector#bySql + * @see org.apache.rocketmq.client.consumer.MessageSelector#byTag + * + * @param topic topic to consume. + * @param messageSelector {@link org.apache.rocketmq.client.consumer.MessageSelector} + * @throws MQClientException + */ + @Override + public void subscribe(final String topic, final MessageSelector messageSelector) throws MQClientException { + this.defaultMQPushConsumerImpl.subscribe(topic, messageSelector); + } + + /** * Un-subscribe the specified topic from subscription. * @param topic message topic */ http://git-wip-us.apache.org/repos/asf/incubator-rocketmq/blob/58f1574b/client/src/main/java/org/apache/rocketmq/client/consumer/MQPushConsumer.java ---------------------------------------------------------------------- diff --git a/client/src/main/java/org/apache/rocketmq/client/consumer/MQPushConsumer.java b/client/src/main/java/org/apache/rocketmq/client/consumer/MQPushConsumer.java index 9255281..9c6c1f1 100644 --- a/client/src/main/java/org/apache/rocketmq/client/consumer/MQPushConsumer.java +++ b/client/src/main/java/org/apache/rocketmq/client/consumer/MQPushConsumer.java @@ -70,6 +70,27 @@ public interface MQPushConsumer extends MQConsumer { void subscribe(final String topic, final String fullClassName, final String filterClassSource) throws MQClientException; /** + * Subscribe some topic with selector. + * <p> + * This interface also has the ability of {@link #subscribe(String, String)}, + * and, support other message selection, such as {@link org.apache.rocketmq.common.filter.ExpressionType#SQL92}. + * </p> + * <p/> + * <p> + * Choose Tag: {@link MessageSelector#byTag(java.lang.String)} + * </p> + * <p/> + * <p> + * Choose SQL92: {@link MessageSelector#bySql(java.lang.String)} + * </p> + * + * @param topic + * @param selector message selector({@link MessageSelector}), can be null. + * @throws MQClientException + */ + void subscribe(final String topic, final MessageSelector selector) throws MQClientException; + + /** * Unsubscribe consumption some topic * * @param topic message topic http://git-wip-us.apache.org/repos/asf/incubator-rocketmq/blob/58f1574b/client/src/main/java/org/apache/rocketmq/client/consumer/MessageSelector.java ---------------------------------------------------------------------- diff --git a/client/src/main/java/org/apache/rocketmq/client/consumer/MessageSelector.java b/client/src/main/java/org/apache/rocketmq/client/consumer/MessageSelector.java new file mode 100644 index 0000000..35a5181 --- /dev/null +++ b/client/src/main/java/org/apache/rocketmq/client/consumer/MessageSelector.java @@ -0,0 +1,77 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.rocketmq.client.consumer; + +import org.apache.rocketmq.common.filter.ExpressionType; + +/** + * + * Message selector: select message at server. + * <p> + * Now, support: + * <li>Tag: {@link org.apache.rocketmq.common.filter.ExpressionType#TAG} + * </li> + * <li>SQL92: {@link org.apache.rocketmq.common.filter.ExpressionType#SQL92} + * </li> + * </p> + */ +public class MessageSelector { + + /** + * @see org.apache.rocketmq.common.filter.ExpressionType + */ + private String type; + + /** + * expression content. + */ + private String expression; + + private MessageSelector(String type, String expression) { + this.type = type; + this.expression = expression; + } + + /** + * Use SLQ92 to select message. + * + * @param sql if null or empty, will be treated as select all message. + * @return + */ + public static MessageSelector bySql(String sql) { + return new MessageSelector(ExpressionType.SQL92, sql); + } + + /** + * Use tag to select message. + * + * @param tag if null or empty or "*", will be treated as select all message. + * @return + */ + public static MessageSelector byTag(String tag) { + return new MessageSelector(ExpressionType.TAG, tag); + } + + public String getExpressionType() { + return type; + } + + public String getExpression() { + return expression; + } +} http://git-wip-us.apache.org/repos/asf/incubator-rocketmq/blob/58f1574b/client/src/main/java/org/apache/rocketmq/client/impl/FindBrokerResult.java ---------------------------------------------------------------------- diff --git a/client/src/main/java/org/apache/rocketmq/client/impl/FindBrokerResult.java b/client/src/main/java/org/apache/rocketmq/client/impl/FindBrokerResult.java index 295060e..4367a4c 100644 --- a/client/src/main/java/org/apache/rocketmq/client/impl/FindBrokerResult.java +++ b/client/src/main/java/org/apache/rocketmq/client/impl/FindBrokerResult.java @@ -19,10 +19,18 @@ package org.apache.rocketmq.client.impl; public class FindBrokerResult { private final String brokerAddr; private final boolean slave; + private final int brokerVersion; public FindBrokerResult(String brokerAddr, boolean slave) { this.brokerAddr = brokerAddr; this.slave = slave; + this.brokerVersion = 0; + } + + public FindBrokerResult(String brokerAddr, boolean slave, int brokerVersion) { + this.brokerAddr = brokerAddr; + this.slave = slave; + this.brokerVersion = brokerVersion; } public String getBrokerAddr() { @@ -32,4 +40,8 @@ public class FindBrokerResult { public boolean isSlave() { return slave; } + + public int getBrokerVersion() { + return brokerVersion; + } } http://git-wip-us.apache.org/repos/asf/incubator-rocketmq/blob/58f1574b/client/src/main/java/org/apache/rocketmq/client/impl/MQClientAPIImpl.java ---------------------------------------------------------------------- diff --git a/client/src/main/java/org/apache/rocketmq/client/impl/MQClientAPIImpl.java b/client/src/main/java/org/apache/rocketmq/client/impl/MQClientAPIImpl.java index ff25334..4244bdd 100644 --- a/client/src/main/java/org/apache/rocketmq/client/impl/MQClientAPIImpl.java +++ b/client/src/main/java/org/apache/rocketmq/client/impl/MQClientAPIImpl.java @@ -59,6 +59,7 @@ import org.apache.rocketmq.common.namesrv.TopAddressing; import org.apache.rocketmq.common.protocol.RequestCode; import org.apache.rocketmq.common.protocol.ResponseCode; import org.apache.rocketmq.common.protocol.body.BrokerStatsData; +import org.apache.rocketmq.common.protocol.body.CheckClientRequestBody; import org.apache.rocketmq.common.protocol.body.ClusterInfo; import org.apache.rocketmq.common.protocol.body.ConsumeMessageDirectlyResult; import org.apache.rocketmq.common.protocol.body.ConsumeStatsList; @@ -70,6 +71,7 @@ import org.apache.rocketmq.common.protocol.body.KVTable; import org.apache.rocketmq.common.protocol.body.LockBatchRequestBody; import org.apache.rocketmq.common.protocol.body.LockBatchResponseBody; import org.apache.rocketmq.common.protocol.body.ProducerConnection; +import org.apache.rocketmq.common.protocol.body.QueryConsumeQueueResponseBody; import org.apache.rocketmq.common.protocol.body.QueryConsumeTimeSpanBody; import org.apache.rocketmq.common.protocol.body.QueryCorrectionOffsetBody; import org.apache.rocketmq.common.protocol.body.QueueTimeSpan; @@ -103,6 +105,7 @@ import org.apache.rocketmq.common.protocol.header.GetTopicStatsInfoRequestHeader import org.apache.rocketmq.common.protocol.header.GetTopicsByClusterRequestHeader; import org.apache.rocketmq.common.protocol.header.PullMessageRequestHeader; import org.apache.rocketmq.common.protocol.header.PullMessageResponseHeader; +import org.apache.rocketmq.common.protocol.header.QueryConsumeQueueRequestHeader; import org.apache.rocketmq.common.protocol.header.QueryConsumeTimeSpanRequestHeader; import org.apache.rocketmq.common.protocol.header.QueryConsumerOffsetRequestHeader; import org.apache.rocketmq.common.protocol.header.QueryConsumerOffsetResponseHeader; @@ -129,6 +132,7 @@ import org.apache.rocketmq.common.protocol.header.namesrv.PutKVConfigRequestHead import org.apache.rocketmq.common.protocol.header.namesrv.WipeWritePermOfBrokerRequestHeader; import org.apache.rocketmq.common.protocol.header.namesrv.WipeWritePermOfBrokerResponseHeader; import org.apache.rocketmq.common.protocol.heartbeat.HeartbeatData; +import org.apache.rocketmq.common.protocol.heartbeat.SubscriptionData; import org.apache.rocketmq.common.protocol.route.TopicRouteData; import org.apache.rocketmq.common.subscription.SubscriptionGroupConfig; import org.apache.rocketmq.remoting.InvokeCallback; @@ -168,7 +172,7 @@ public class MQClientAPIImpl { public MQClientAPIImpl(final NettyClientConfig nettyClientConfig, final ClientRemotingProcessor clientRemotingProcessor, RPCHook rpcHook, final ClientConfig clientConfig) { this.clientConfig = clientConfig; - topAddressing = new TopAddressing(MixAll.WS_ADDR, clientConfig.getUnitName()); + topAddressing = new TopAddressing(MixAll.getWSAddr(), clientConfig.getUnitName()); this.remotingClient = new NettyRemotingClient(nettyClientConfig, null); this.clientRemotingProcessor = clientRemotingProcessor; @@ -843,7 +847,7 @@ public class MQClientAPIImpl { this.remotingClient.invokeOneway(MixAll.brokerVIPChannel(this.clientConfig.isVipChannelEnabled(), addr), request, timeoutMillis); } - public void sendHearbeat(// + public int sendHearbeat(// final String addr, // final HeartbeatData heartbeatData, // final long timeoutMillis// @@ -855,7 +859,7 @@ public class MQClientAPIImpl { assert response != null; switch (response.getCode()) { case ResponseCode.SUCCESS: { - return; + return response.getVersion(); } default: break; @@ -2024,4 +2028,51 @@ public class MQClientAPIImpl { return configMap; } + public QueryConsumeQueueResponseBody queryConsumeQueue(final String brokerAddr, final String topic, final int queueId, + final long index, final int count, final String consumerGroup, + final long timeoutMillis) throws InterruptedException, + RemotingTimeoutException, RemotingSendRequestException, RemotingConnectException, MQClientException { + + QueryConsumeQueueRequestHeader requestHeader = new QueryConsumeQueueRequestHeader(); + requestHeader.setTopic(topic); + requestHeader.setQueueId(queueId); + requestHeader.setIndex(index); + requestHeader.setCount(count); + requestHeader.setConsumerGroup(consumerGroup); + + RemotingCommand request = RemotingCommand.createRequestCommand(RequestCode.QUERY_CONSUME_QUEUE, requestHeader); + + RemotingCommand response = this.remotingClient.invokeSync(MixAll.brokerVIPChannel(this.clientConfig.isVipChannelEnabled(), brokerAddr), request, timeoutMillis); + + assert response != null; + + if (ResponseCode.SUCCESS == response.getCode()) { + return QueryConsumeQueueResponseBody.decode(response.getBody(), QueryConsumeQueueResponseBody.class); + } + + throw new MQClientException(response.getCode(), response.getRemark()); + } + + public void checkClientInBroker(final String brokerAddr, final String consumerGroup, + final String clientId, final SubscriptionData subscriptionData, + final long timeoutMillis) + throws InterruptedException, RemotingTimeoutException, RemotingSendRequestException, + RemotingConnectException, MQClientException { + RemotingCommand request = RemotingCommand.createRequestCommand(RequestCode.CHECK_CLIENT_CONFIG, null); + + CheckClientRequestBody requestBody = new CheckClientRequestBody(); + requestBody.setClientId(clientId); + requestBody.setGroup(consumerGroup); + requestBody.setSubscriptionData(subscriptionData); + + request.setBody(requestBody.encode()); + + RemotingCommand response = this.remotingClient.invokeSync(MixAll.brokerVIPChannel(this.clientConfig.isVipChannelEnabled(), brokerAddr), request, timeoutMillis); + + assert response != null; + + if (ResponseCode.SUCCESS != response.getCode()) { + throw new MQClientException(response.getCode(), response.getRemark()); + } + } } \ No newline at end of file http://git-wip-us.apache.org/repos/asf/incubator-rocketmq/blob/58f1574b/client/src/main/java/org/apache/rocketmq/client/impl/consumer/DefaultMQPushConsumerImpl.java ---------------------------------------------------------------------- diff --git a/client/src/main/java/org/apache/rocketmq/client/impl/consumer/DefaultMQPushConsumerImpl.java b/client/src/main/java/org/apache/rocketmq/client/impl/consumer/DefaultMQPushConsumerImpl.java index 67f3ebe..2cafe29 100644 --- a/client/src/main/java/org/apache/rocketmq/client/impl/consumer/DefaultMQPushConsumerImpl.java +++ b/client/src/main/java/org/apache/rocketmq/client/impl/consumer/DefaultMQPushConsumerImpl.java @@ -30,6 +30,7 @@ import java.util.concurrent.ConcurrentHashMap; import org.apache.rocketmq.client.QueryResult; import org.apache.rocketmq.client.Validators; import org.apache.rocketmq.client.consumer.DefaultMQPushConsumer; +import org.apache.rocketmq.client.consumer.MessageSelector; import org.apache.rocketmq.client.consumer.PullCallback; import org.apache.rocketmq.client.consumer.PullResult; import org.apache.rocketmq.client.consumer.listener.MessageListener; @@ -405,15 +406,16 @@ public class DefaultMQPushConsumerImpl implements MQConsumerInner { this.pullAPIWrapper.pullKernelImpl(// pullRequest.getMessageQueue(), // 1 subExpression, // 2 - subscriptionData.getSubVersion(), // 3 - pullRequest.getNextOffset(), // 4 - this.defaultMQPushConsumer.getPullBatchSize(), // 5 - sysFlag, // 6 - commitOffsetValue, // 7 - BROKER_SUSPEND_MAX_TIME_MILLIS, // 8 - CONSUMER_TIMEOUT_MILLIS_WHEN_SUSPEND, // 9 - CommunicationMode.ASYNC, // 10 - pullCallback// 11 + subscriptionData.getExpressionType(), // 3 + subscriptionData.getSubVersion(), // 4 + pullRequest.getNextOffset(), // 5 + this.defaultMQPushConsumer.getPullBatchSize(), // 6 + sysFlag, // 7 + commitOffsetValue, // 8 + BROKER_SUSPEND_MAX_TIME_MILLIS, // 9 + CONSUMER_TIMEOUT_MILLIS_WHEN_SUSPEND, // 10 + CommunicationMode.ASYNC, // 11 + pullCallback // 12 ); } catch (Exception e) { log.error("pullKernelImpl exception", e); @@ -615,6 +617,7 @@ public class DefaultMQPushConsumerImpl implements MQConsumerInner { } this.updateTopicSubscribeInfoWhenSubscriptionChanged(); + this.mQClientFactory.checkClientInBroker(); this.mQClientFactory.sendHeartbeatToAllBrokerWithLock(); this.mQClientFactory.rebalanceImmediately(); } @@ -836,6 +839,25 @@ public class DefaultMQPushConsumerImpl implements MQConsumerInner { } } + public void subscribe(final String topic, final MessageSelector messageSelector) throws MQClientException { + try { + if (messageSelector == null) { + subscribe(topic, SubscriptionData.SUB_ALL); + return; + } + + SubscriptionData subscriptionData = FilterAPI.build(topic, + messageSelector.getExpression(), messageSelector.getExpressionType()); + + this.rebalanceImpl.getSubscriptionInner().put(topic, subscriptionData); + if (this.mQClientFactory != null) { + this.mQClientFactory.sendHeartbeatToAllBrokerWithLock(); + } + } catch (Exception e) { + throw new MQClientException("subscription exception", e); + } + } + public void suspend() { this.pause = true; log.info("suspend this consumer, {}", this.defaultMQPushConsumer.getConsumerGroup()); http://git-wip-us.apache.org/repos/asf/incubator-rocketmq/blob/58f1574b/client/src/main/java/org/apache/rocketmq/client/impl/consumer/PullAPIWrapper.java ---------------------------------------------------------------------- diff --git a/client/src/main/java/org/apache/rocketmq/client/impl/consumer/PullAPIWrapper.java b/client/src/main/java/org/apache/rocketmq/client/impl/consumer/PullAPIWrapper.java index 96e21e1..304a44a 100644 --- a/client/src/main/java/org/apache/rocketmq/client/impl/consumer/PullAPIWrapper.java +++ b/client/src/main/java/org/apache/rocketmq/client/impl/consumer/PullAPIWrapper.java @@ -33,7 +33,9 @@ import org.apache.rocketmq.client.impl.CommunicationMode; import org.apache.rocketmq.client.impl.FindBrokerResult; import org.apache.rocketmq.client.impl.factory.MQClientInstance; import org.apache.rocketmq.client.log.ClientLogger; +import org.apache.rocketmq.common.MQVersion; import org.apache.rocketmq.common.MixAll; +import org.apache.rocketmq.common.filter.ExpressionType; import org.apache.rocketmq.common.message.MessageAccessor; import org.apache.rocketmq.common.message.MessageConst; import org.apache.rocketmq.common.message.MessageDecoder; @@ -135,6 +137,7 @@ public class PullAPIWrapper { public PullResult pullKernelImpl( final MessageQueue mq, final String subExpression, + final String expressionType, final long subVersion, final long offset, final int maxNums, @@ -156,6 +159,14 @@ public class PullAPIWrapper { } if (findBrokerResult != null) { + { + // check version + if (!ExpressionType.isTagType(expressionType) + && findBrokerResult.getBrokerVersion() < MQVersion.Version.V4_1_0_SNAPSHOT.ordinal()) { + throw new MQClientException("The broker[" + mq.getBrokerName() + ", " + + findBrokerResult.getBrokerVersion() + "] does not upgrade to support for filter message by " + expressionType, null); + } + } int sysFlagInner = sysFlag; if (findBrokerResult.isSlave()) { @@ -173,6 +184,7 @@ public class PullAPIWrapper { requestHeader.setSuspendTimeoutMillis(brokerSuspendMaxTimeMillis); requestHeader.setSubscription(subExpression); requestHeader.setSubVersion(subVersion); + requestHeader.setExpressionType(expressionType); String brokerAddr = findBrokerResult.getBrokerAddr(); if (PullSysFlag.hasClassFilterFlag(sysFlagInner)) { @@ -192,6 +204,34 @@ public class PullAPIWrapper { throw new MQClientException("The broker[" + mq.getBrokerName() + "] not exist", null); } + public PullResult pullKernelImpl( + final MessageQueue mq, + final String subExpression, + final long subVersion, + final long offset, + final int maxNums, + final int sysFlag, + final long commitOffset, + final long brokerSuspendMaxTimeMillis, + final long timeoutMillis, + final CommunicationMode communicationMode, + final PullCallback pullCallback + ) throws MQClientException, RemotingException, MQBrokerException, InterruptedException { + return pullKernelImpl( + mq, + subExpression, + ExpressionType.TAG, + subVersion, offset, + maxNums, + sysFlag, + commitOffset, + brokerSuspendMaxTimeMillis, + timeoutMillis, + communicationMode, + pullCallback + ); + } + public long recalculatePullFromWhichNode(final MessageQueue mq) { if (this.isConnectBrokerByUser()) { return this.defaultBrokerId; http://git-wip-us.apache.org/repos/asf/incubator-rocketmq/blob/58f1574b/client/src/main/java/org/apache/rocketmq/client/impl/factory/MQClientInstance.java ---------------------------------------------------------------------- diff --git a/client/src/main/java/org/apache/rocketmq/client/impl/factory/MQClientInstance.java b/client/src/main/java/org/apache/rocketmq/client/impl/factory/MQClientInstance.java index d7e02fe..a8c65b2 100644 --- a/client/src/main/java/org/apache/rocketmq/client/impl/factory/MQClientInstance.java +++ b/client/src/main/java/org/apache/rocketmq/client/impl/factory/MQClientInstance.java @@ -61,6 +61,7 @@ import org.apache.rocketmq.common.MixAll; import org.apache.rocketmq.common.ServiceState; import org.apache.rocketmq.common.UtilAll; import org.apache.rocketmq.common.constant.PermName; +import org.apache.rocketmq.common.filter.ExpressionType; import org.apache.rocketmq.common.message.MessageExt; import org.apache.rocketmq.common.message.MessageQueue; import org.apache.rocketmq.common.protocol.body.ConsumeMessageDirectlyResult; @@ -98,6 +99,8 @@ public class MQClientInstance { private final Lock lockHeartbeat = new ReentrantLock(); private final ConcurrentHashMap<String/* Broker Name */, HashMap<Long/* brokerId */, String/* address */>> brokerAddrTable = new ConcurrentHashMap<String, HashMap<Long, String>>(); + private final ConcurrentHashMap<String/* Broker Name */, HashMap<String/* address */, Integer>> brokerVersionTable = + new ConcurrentHashMap<String, HashMap<String, Integer>>(); private final ScheduledExecutorService scheduledExecutorService = Executors.newSingleThreadScheduledExecutor(new ThreadFactory() { @Override public Thread newThread(Runnable r) { @@ -404,6 +407,44 @@ public class MQClientInstance { } } + public void checkClientInBroker() throws MQClientException { + Iterator<Entry<String, MQConsumerInner>> it = this.consumerTable.entrySet().iterator(); + + while (it.hasNext()) { + Entry<String, MQConsumerInner> entry = it.next(); + Set<SubscriptionData> subscriptionInner = entry.getValue().subscriptions(); + if (subscriptionInner == null || subscriptionInner.isEmpty()) { + return; + } + + for (SubscriptionData subscriptionData : subscriptionInner) { + if (ExpressionType.isTagType(subscriptionData.getExpressionType())) { + continue; + } + // may need to check one broker every cluster... + // assume that the configs of every broker in cluster are the the same. + String addr = findBrokerAddrByTopic(subscriptionData.getTopic()); + + if (addr != null) { + try { + this.getMQClientAPIImpl().checkClientInBroker( + addr, entry.getKey(), this.clientId, subscriptionData, 3 * 1000 + ); + } catch (Exception e) { + if (e instanceof MQClientException) { + throw (MQClientException) e; + } else { + throw new MQClientException("Check client in broker error, maybe because you use " + + subscriptionData.getExpressionType() + " to filter message, but server has not been upgraded to support!" + + "This error would not affect the launch of consumer, but may has impact on message receiving if you " + + "have use the new features which are not supported by server, please check the log!", e); + } + } + } + } + } + } + public void sendHeartbeatToAllBrokerWithLock() { if (this.lockHeartbeat.tryLock()) { try { @@ -493,7 +534,11 @@ public class MQClientInstance { } try { - this.mQClientAPIImpl.sendHearbeat(addr, heartbeatData, 3000); + int version = this.mQClientAPIImpl.sendHearbeat(addr, heartbeatData, 3000); + if (!this.brokerVersionTable.containsKey(brokerName)) { + this.brokerVersionTable.put(brokerName, new HashMap<String, Integer>(4)); + } + this.brokerVersionTable.get(brokerName).put(addr, version); if (times % 20 == 0) { log.info("send heart beat to broker[{} {} {}] success", brokerName, id, addr); log.info(heartbeatData.toString()); @@ -943,7 +988,7 @@ public class MQClientInstance { } if (found) { - return new FindBrokerResult(brokerAddr, slave); + return new FindBrokerResult(brokerAddr, slave, findBrokerVersion(brokerName, brokerAddr)); } return null; @@ -982,12 +1027,21 @@ public class MQClientInstance { } if (found) { - return new FindBrokerResult(brokerAddr, slave); + return new FindBrokerResult(brokerAddr, slave, findBrokerVersion(brokerName, brokerAddr)); } return null; } + public int findBrokerVersion(String brokerName, String brokerAddr) { + if (this.brokerVersionTable.containsKey(brokerName)) { + if (this.brokerVersionTable.get(brokerName).containsKey(brokerAddr)) { + return this.brokerVersionTable.get(brokerName).get(brokerAddr); + } + } + return 0; + } + public List<String> findConsumerIdList(final String topic, final String group) { String brokerAddr = this.findBrokerAddrByTopic(topic); if (null == brokerAddr) { http://git-wip-us.apache.org/repos/asf/incubator-rocketmq/blob/58f1574b/common/src/main/java/org/apache/rocketmq/common/BrokerConfig.java ---------------------------------------------------------------------- diff --git a/common/src/main/java/org/apache/rocketmq/common/BrokerConfig.java b/common/src/main/java/org/apache/rocketmq/common/BrokerConfig.java index f79f726..f0a73bd 100644 --- a/common/src/main/java/org/apache/rocketmq/common/BrokerConfig.java +++ b/common/src/main/java/org/apache/rocketmq/common/BrokerConfig.java @@ -99,6 +99,25 @@ public class BrokerConfig { private boolean traceOn = true; + // Switch of filter bit map calculation. + // If switch on: + // 1. Calculate filter bit map when construct queue. + // 2. Filter bit map will be saved to consume queue extend file if allowed. + private boolean enableCalcFilterBitMap = false; + + // Expect num of consumers will use filter. + private int expectConsumerNumUseFilter = 32; + + // Error rate of bloom filter, 1~100. + private int maxErrorRateOfBloomFilter = 20; + + //how long to clean filter data after dead.Default: 24h + private long filterDataCleanTimeSpan = 24 * 3600 * 1000; + + // whether do filter when retry. + private boolean filterSupportRetry = false; + private boolean enablePropertyFilter = false; + public static String localHostName() { try { return InetAddress.getLocalHost().getHostName(); @@ -484,4 +503,52 @@ public class BrokerConfig { public void setCommercialBaseCount(int commercialBaseCount) { this.commercialBaseCount = commercialBaseCount; } + + public boolean isEnableCalcFilterBitMap() { + return enableCalcFilterBitMap; + } + + public void setEnableCalcFilterBitMap(boolean enableCalcFilterBitMap) { + this.enableCalcFilterBitMap = enableCalcFilterBitMap; + } + + public int getExpectConsumerNumUseFilter() { + return expectConsumerNumUseFilter; + } + + public void setExpectConsumerNumUseFilter(int expectConsumerNumUseFilter) { + this.expectConsumerNumUseFilter = expectConsumerNumUseFilter; + } + + public int getMaxErrorRateOfBloomFilter() { + return maxErrorRateOfBloomFilter; + } + + public void setMaxErrorRateOfBloomFilter(int maxErrorRateOfBloomFilter) { + this.maxErrorRateOfBloomFilter = maxErrorRateOfBloomFilter; + } + + public long getFilterDataCleanTimeSpan() { + return filterDataCleanTimeSpan; + } + + public void setFilterDataCleanTimeSpan(long filterDataCleanTimeSpan) { + this.filterDataCleanTimeSpan = filterDataCleanTimeSpan; + } + + public boolean isFilterSupportRetry() { + return filterSupportRetry; + } + + public void setFilterSupportRetry(boolean filterSupportRetry) { + this.filterSupportRetry = filterSupportRetry; + } + + public boolean isEnablePropertyFilter() { + return enablePropertyFilter; + } + + public void setEnablePropertyFilter(boolean enablePropertyFilter) { + this.enablePropertyFilter = enablePropertyFilter; + } } http://git-wip-us.apache.org/repos/asf/incubator-rocketmq/blob/58f1574b/common/src/main/java/org/apache/rocketmq/common/MixAll.java ---------------------------------------------------------------------- diff --git a/common/src/main/java/org/apache/rocketmq/common/MixAll.java b/common/src/main/java/org/apache/rocketmq/common/MixAll.java index 4a54a60..e75efd9 100644 --- a/common/src/main/java/org/apache/rocketmq/common/MixAll.java +++ b/common/src/main/java/org/apache/rocketmq/common/MixAll.java @@ -55,8 +55,8 @@ public class MixAll { public static final String DEFAULT_NAMESRV_ADDR_LOOKUP = "jmenv.tbsite.net"; public static final String WS_DOMAIN_NAME = System.getProperty("rocketmq.namesrv.domain", DEFAULT_NAMESRV_ADDR_LOOKUP); public static final String WS_DOMAIN_SUBGROUP = System.getProperty("rocketmq.namesrv.domain.subgroup", "nsaddr"); - // http://jmenv.tbsite.net:8080/rocketmq/nsaddr - public static final String WS_ADDR = "http://" + WS_DOMAIN_NAME + ":8080/rocketmq/" + WS_DOMAIN_SUBGROUP; +// // http://jmenv.tbsite.net:8080/rocketmq/nsaddr +// public static final String WS_ADDR = "http://" + WS_DOMAIN_NAME + ":8080/rocketmq/" + WS_DOMAIN_SUBGROUP; public static final String DEFAULT_TOPIC = "TBW102"; public static final String BENCHMARK_TOPIC = "BenchmarkTest"; public static final String DEFAULT_PRODUCER_GROUP = "DEFAULT_PRODUCER"; @@ -89,6 +89,16 @@ public class MixAll { public static final String DEFAULT_TRACE_REGION_ID = "DefaultRegion"; public static final String CONSUME_CONTEXT_TYPE = "ConsumeContextType"; + public static String getWSAddr() { + String wsDomainName = System.getProperty("rocketmq.namesrv.domain", DEFAULT_NAMESRV_ADDR_LOOKUP); + String wsDomainSubgroup = System.getProperty("rocketmq.namesrv.domain.subgroup", "nsaddr"); + String wsAddr = "http://" + wsDomainName + ":8080/rocketmq/" + wsDomainSubgroup; + if (wsDomainName.indexOf(":") > 0) { + wsAddr = "http://" + wsDomainName + "/rocketmq/" + wsDomainSubgroup; + } + return wsAddr; + } + public static String getRetryTopic(final String consumerGroup) { return RETRY_GROUP_TOPIC_PREFIX + consumerGroup; } http://git-wip-us.apache.org/repos/asf/incubator-rocketmq/blob/58f1574b/common/src/main/java/org/apache/rocketmq/common/constant/LoggerName.java ---------------------------------------------------------------------- diff --git a/common/src/main/java/org/apache/rocketmq/common/constant/LoggerName.java b/common/src/main/java/org/apache/rocketmq/common/constant/LoggerName.java index e706e28..385c121 100644 --- a/common/src/main/java/org/apache/rocketmq/common/constant/LoggerName.java +++ b/common/src/main/java/org/apache/rocketmq/common/constant/LoggerName.java @@ -34,4 +34,5 @@ public class LoggerName { public static final String DUPLICATION_LOGGER_NAME = "RocketmqDuplication"; public static final String PROTECTION_LOGGER_NAME = "RocketmqProtection"; public static final String WATER_MARK_LOGGER_NAME = "RocketmqWaterMark"; + public static final String FILTER_LOGGER_NAME = "RocketmqFilter"; } http://git-wip-us.apache.org/repos/asf/incubator-rocketmq/blob/58f1574b/common/src/main/java/org/apache/rocketmq/common/filter/ExpressionType.java ---------------------------------------------------------------------- diff --git a/common/src/main/java/org/apache/rocketmq/common/filter/ExpressionType.java b/common/src/main/java/org/apache/rocketmq/common/filter/ExpressionType.java new file mode 100644 index 0000000..3b7940a --- /dev/null +++ b/common/src/main/java/org/apache/rocketmq/common/filter/ExpressionType.java @@ -0,0 +1,67 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.rocketmq.common.filter; + +public class ExpressionType { + + /** + * <ul> + * Keywords: + * <li>{@code AND, OR, NOT, BETWEEN, IN, TRUE, FALSE, IS, NULL}</li> + * </ul> + * <p/> + * <ul> + * Data type: + * <li>Boolean, like: TRUE, FALSE</li> + * <li>String, like: 'abc'</li> + * <li>Decimal, like: 123</li> + * <li>Float number, like: 3.1415</li> + * </ul> + * <p/> + * <ul> + * Grammar: + * <li>{@code AND, OR}</li> + * <li>{@code >, >=, <, <=, =}</li> + * <li>{@code BETWEEN A AND B}, equals to {@code >=A AND <=B}</li> + * <li>{@code NOT BETWEEN A AND B}, equals to {@code >B OR <A}</li> + * <li>{@code IN ('a', 'b')}, equals to {@code ='a' OR ='b'}, this operation only support String type.</li> + * <li>{@code IS NULL}, {@code IS NOT NULL}, check parameter whether is null, or not.</li> + * <li>{@code =TRUE}, {@code =FALSE}, check parameter whether is true, or false.</li> + * </ul> + * <p/> + * <p> + * Example: + * (a > 10 AND a < 100) OR (b IS NOT NULL AND b=TRUE) + * </p> + */ + public static final String SQL92 = "SQL92"; + + /** + * Only support or operation such as + * "tag1 || tag2 || tag3", <br> + * If null or * expression,meaning subscribe all. + */ + public static final String TAG = "TAG"; + + public static boolean isTagType(String type) { + if (type == null || TAG.equals(type)) { + return true; + } + return false; + } +} http://git-wip-us.apache.org/repos/asf/incubator-rocketmq/blob/58f1574b/common/src/main/java/org/apache/rocketmq/common/filter/FilterAPI.java ---------------------------------------------------------------------- diff --git a/common/src/main/java/org/apache/rocketmq/common/filter/FilterAPI.java b/common/src/main/java/org/apache/rocketmq/common/filter/FilterAPI.java index e9bf3fa..fc8525c 100644 --- a/common/src/main/java/org/apache/rocketmq/common/filter/FilterAPI.java +++ b/common/src/main/java/org/apache/rocketmq/common/filter/FilterAPI.java @@ -63,4 +63,22 @@ public class FilterAPI { return subscriptionData; } + + public static SubscriptionData build(final String topic, final String subString, + final String type) throws Exception { + if (ExpressionType.TAG.equals(type) || type == null) { + return buildSubscriptionData(null, topic, subString); + } + + if (subString == null || subString.length() < 1) { + throw new IllegalArgumentException("Expression can't be null! " + type); + } + + SubscriptionData subscriptionData = new SubscriptionData(); + subscriptionData.setTopic(topic); + subscriptionData.setSubString(subString); + subscriptionData.setExpressionType(type); + + return subscriptionData; + } }