http://git-wip-us.apache.org/repos/asf/incubator-rocketmq/blob/58f1574b/common/src/main/java/org/apache/rocketmq/common/message/Message.java ---------------------------------------------------------------------- diff --git a/common/src/main/java/org/apache/rocketmq/common/message/Message.java b/common/src/main/java/org/apache/rocketmq/common/message/Message.java index f3bff83..2c81f5c 100644 --- a/common/src/main/java/org/apache/rocketmq/common/message/Message.java +++ b/common/src/main/java/org/apache/rocketmq/common/message/Message.java @@ -81,6 +81,12 @@ public class Message implements Serializable { throw new RuntimeException(String.format( "The Property<%s> is used by system, input another please", name)); } + if (value == null || value == "" || value.trim() == "" + || name == null || name == "" || name.trim() == "") { + throw new IllegalArgumentException( + "The name or value of property can not be null or blank string!" + ); + } this.putProperty(name, value); }
http://git-wip-us.apache.org/repos/asf/incubator-rocketmq/blob/58f1574b/common/src/main/java/org/apache/rocketmq/common/message/MessageDecoder.java ---------------------------------------------------------------------- diff --git a/common/src/main/java/org/apache/rocketmq/common/message/MessageDecoder.java b/common/src/main/java/org/apache/rocketmq/common/message/MessageDecoder.java index 90b837a..e41ec9d 100644 --- a/common/src/main/java/org/apache/rocketmq/common/message/MessageDecoder.java +++ b/common/src/main/java/org/apache/rocketmq/common/message/MessageDecoder.java @@ -41,6 +41,20 @@ public class MessageDecoder { public final static int MESSAGE_MAGIC_CODE = 0xAABBCCDD ^ 1880681586 + 8; public static final char NAME_VALUE_SEPARATOR = 1; public static final char PROPERTY_SEPARATOR = 2; + public static final int BODY_SIZE_POSITION = 4 // 1 TOTALSIZE + + 4 // 2 MAGICCODE + + 4 // 3 BODYCRC + + 4 // 4 QUEUEID + + 4 // 5 FLAG + + 8 // 6 QUEUEOFFSET + + 8 // 7 PHYSICALOFFSET + + 4 // 8 SYSFLAG + + 8 // 9 BORNTIMESTAMP + + 8 // 10 BORNHOST + + 8 // 11 STORETIMESTAMP + + 8 // 12 STOREHOSTADDRESS + + 4 // 13 RECONSUMETIMES + + 8; // 14 Prepared Transaction Offset public static String createMessageId(final ByteBuffer input, final ByteBuffer addr, final long offset) { input.flip(); @@ -80,6 +94,31 @@ public class MessageDecoder { return new MessageId(address, offset); } + /** + * Just decode properties from msg buffer. + * + * @param byteBuffer msg commit log buffer. + * @return + */ + public static Map<String, String> decodeProperties(java.nio.ByteBuffer byteBuffer) { + int topicLengthPosition = BODY_SIZE_POSITION + 4 + byteBuffer.getInt(BODY_SIZE_POSITION); + + byte topicLength = byteBuffer.get(topicLengthPosition); + + short propertiesLength = byteBuffer.getShort(topicLengthPosition + 1 + topicLength); + + byteBuffer.position(topicLengthPosition + 1 + topicLength + 2); + + if (propertiesLength > 0) { + byte[] properties = new byte[propertiesLength]; + byteBuffer.get(properties); + String propertiesString = new String(properties, CHARSET_UTF8); + Map<String, String> map = string2messageProperties(propertiesString); + return map; + } + return null; + } + public static MessageExt decode(java.nio.ByteBuffer byteBuffer) { return decode(byteBuffer, true, true, false); } http://git-wip-us.apache.org/repos/asf/incubator-rocketmq/blob/58f1574b/common/src/main/java/org/apache/rocketmq/common/namesrv/TopAddressing.java ---------------------------------------------------------------------- diff --git a/common/src/main/java/org/apache/rocketmq/common/namesrv/TopAddressing.java b/common/src/main/java/org/apache/rocketmq/common/namesrv/TopAddressing.java index 74fd965..990e748 100644 --- a/common/src/main/java/org/apache/rocketmq/common/namesrv/TopAddressing.java +++ b/common/src/main/java/org/apache/rocketmq/common/namesrv/TopAddressing.java @@ -88,7 +88,7 @@ public class TopAddressing { if (verbose) { String errorMsg = - "connect to " + url + " failed, maybe the domain name " + MixAll.WS_DOMAIN_NAME + " not bind in /etc/hosts"; + "connect to " + url + " failed, maybe the domain name " + MixAll.getWSAddr() + " not bind in /etc/hosts"; errorMsg += FAQUrl.suggestTodo(FAQUrl.NAME_SERVER_ADDR_NOT_EXIST_URL); log.warn(errorMsg); http://git-wip-us.apache.org/repos/asf/incubator-rocketmq/blob/58f1574b/common/src/main/java/org/apache/rocketmq/common/protocol/RequestCode.java ---------------------------------------------------------------------- diff --git a/common/src/main/java/org/apache/rocketmq/common/protocol/RequestCode.java b/common/src/main/java/org/apache/rocketmq/common/protocol/RequestCode.java index c6b0925..6f132f7 100644 --- a/common/src/main/java/org/apache/rocketmq/common/protocol/RequestCode.java +++ b/common/src/main/java/org/apache/rocketmq/common/protocol/RequestCode.java @@ -68,6 +68,8 @@ public class RequestCode { public static final int GET_ALL_DELAY_OFFSET = 45; + public static final int CHECK_CLIENT_CONFIG = 46; + public static final int PUT_KV_CONFIG = 100; public static final int GET_KV_CONFIG = 101; @@ -162,4 +164,6 @@ public class RequestCode { public static final int SEND_BATCH_MESSAGE = 320; + + public static final int QUERY_CONSUME_QUEUE = 321; } http://git-wip-us.apache.org/repos/asf/incubator-rocketmq/blob/58f1574b/common/src/main/java/org/apache/rocketmq/common/protocol/ResponseCode.java ---------------------------------------------------------------------- diff --git a/common/src/main/java/org/apache/rocketmq/common/protocol/ResponseCode.java b/common/src/main/java/org/apache/rocketmq/common/protocol/ResponseCode.java index 90b182b..f62c4ea 100644 --- a/common/src/main/java/org/apache/rocketmq/common/protocol/ResponseCode.java +++ b/common/src/main/java/org/apache/rocketmq/common/protocol/ResponseCode.java @@ -53,6 +53,10 @@ public class ResponseCode extends RemotingSysResponseCode { public static final int SUBSCRIPTION_GROUP_NOT_EXIST = 26; + public static final int FILTER_DATA_NOT_EXIST = 27; + + public static final int FILTER_DATA_NOT_LATEST = 28; + public static final int TRANSACTION_SHOULD_COMMIT = 200; public static final int TRANSACTION_SHOULD_ROLLBACK = 201; http://git-wip-us.apache.org/repos/asf/incubator-rocketmq/blob/58f1574b/common/src/main/java/org/apache/rocketmq/common/protocol/body/CheckClientRequestBody.java ---------------------------------------------------------------------- diff --git a/common/src/main/java/org/apache/rocketmq/common/protocol/body/CheckClientRequestBody.java b/common/src/main/java/org/apache/rocketmq/common/protocol/body/CheckClientRequestBody.java new file mode 100644 index 0000000..a78ce55 --- /dev/null +++ b/common/src/main/java/org/apache/rocketmq/common/protocol/body/CheckClientRequestBody.java @@ -0,0 +1,52 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.rocketmq.common.protocol.body; + +import org.apache.rocketmq.common.protocol.heartbeat.SubscriptionData; +import org.apache.rocketmq.remoting.protocol.RemotingSerializable; + +public class CheckClientRequestBody extends RemotingSerializable { + + private String clientId; + private String group; + private SubscriptionData subscriptionData; + + public String getClientId() { + return clientId; + } + + public void setClientId(String clientId) { + this.clientId = clientId; + } + + public String getGroup() { + return group; + } + + public void setGroup(String group) { + this.group = group; + } + + public SubscriptionData getSubscriptionData() { + return subscriptionData; + } + + public void setSubscriptionData(SubscriptionData subscriptionData) { + this.subscriptionData = subscriptionData; + } +} http://git-wip-us.apache.org/repos/asf/incubator-rocketmq/blob/58f1574b/common/src/main/java/org/apache/rocketmq/common/protocol/body/ConsumeQueueData.java ---------------------------------------------------------------------- diff --git a/common/src/main/java/org/apache/rocketmq/common/protocol/body/ConsumeQueueData.java b/common/src/main/java/org/apache/rocketmq/common/protocol/body/ConsumeQueueData.java new file mode 100644 index 0000000..7268dcd --- /dev/null +++ b/common/src/main/java/org/apache/rocketmq/common/protocol/body/ConsumeQueueData.java @@ -0,0 +1,98 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.rocketmq.common.protocol.body; + +public class ConsumeQueueData { + + private long physicOffset; + private int physicSize; + private long tagsCode; + private String extendDataJson; + private String bitMap; + private boolean eval; + private String msg; + + public long getPhysicOffset() { + return physicOffset; + } + + public void setPhysicOffset(long physicOffset) { + this.physicOffset = physicOffset; + } + + public int getPhysicSize() { + return physicSize; + } + + public void setPhysicSize(int physicSize) { + this.physicSize = physicSize; + } + + public long getTagsCode() { + return tagsCode; + } + + public void setTagsCode(long tagsCode) { + this.tagsCode = tagsCode; + } + + public String getExtendDataJson() { + return extendDataJson; + } + + public void setExtendDataJson(String extendDataJson) { + this.extendDataJson = extendDataJson; + } + + public String getBitMap() { + return bitMap; + } + + public void setBitMap(String bitMap) { + this.bitMap = bitMap; + } + + public boolean isEval() { + return eval; + } + + public void setEval(boolean eval) { + this.eval = eval; + } + + public String getMsg() { + return msg; + } + + public void setMsg(String msg) { + this.msg = msg; + } + + @Override + public String toString() { + return "ConsumeQueueData{" + + "physicOffset=" + physicOffset + + ", physicSize=" + physicSize + + ", tagsCode=" + tagsCode + + ", extendDataJson='" + extendDataJson + '\'' + + ", bitMap='" + bitMap + '\'' + + ", eval=" + eval + + ", msg='" + msg + '\'' + + '}'; + } +} http://git-wip-us.apache.org/repos/asf/incubator-rocketmq/blob/58f1574b/common/src/main/java/org/apache/rocketmq/common/protocol/body/QueryConsumeQueueResponseBody.java ---------------------------------------------------------------------- diff --git a/common/src/main/java/org/apache/rocketmq/common/protocol/body/QueryConsumeQueueResponseBody.java b/common/src/main/java/org/apache/rocketmq/common/protocol/body/QueryConsumeQueueResponseBody.java new file mode 100644 index 0000000..be93da9 --- /dev/null +++ b/common/src/main/java/org/apache/rocketmq/common/protocol/body/QueryConsumeQueueResponseBody.java @@ -0,0 +1,72 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.rocketmq.common.protocol.body; + +import org.apache.rocketmq.common.protocol.heartbeat.SubscriptionData; +import org.apache.rocketmq.remoting.protocol.RemotingSerializable; + +import java.util.List; + +public class QueryConsumeQueueResponseBody extends RemotingSerializable { + + private SubscriptionData subscriptionData; + private String filterData; + private List<ConsumeQueueData> queueData; + private long maxQueueIndex; + private long minQueueIndex; + + public SubscriptionData getSubscriptionData() { + return subscriptionData; + } + + public void setSubscriptionData(SubscriptionData subscriptionData) { + this.subscriptionData = subscriptionData; + } + + public String getFilterData() { + return filterData; + } + + public void setFilterData(String filterData) { + this.filterData = filterData; + } + + public List<ConsumeQueueData> getQueueData() { + return queueData; + } + + public void setQueueData(List<ConsumeQueueData> queueData) { + this.queueData = queueData; + } + + public long getMaxQueueIndex() { + return maxQueueIndex; + } + + public void setMaxQueueIndex(long maxQueueIndex) { + this.maxQueueIndex = maxQueueIndex; + } + + public long getMinQueueIndex() { + return minQueueIndex; + } + + public void setMinQueueIndex(long minQueueIndex) { + this.minQueueIndex = minQueueIndex; + } +} http://git-wip-us.apache.org/repos/asf/incubator-rocketmq/blob/58f1574b/common/src/main/java/org/apache/rocketmq/common/protocol/header/PullMessageRequestHeader.java ---------------------------------------------------------------------- diff --git a/common/src/main/java/org/apache/rocketmq/common/protocol/header/PullMessageRequestHeader.java b/common/src/main/java/org/apache/rocketmq/common/protocol/header/PullMessageRequestHeader.java index 8a59213..106e89e 100644 --- a/common/src/main/java/org/apache/rocketmq/common/protocol/header/PullMessageRequestHeader.java +++ b/common/src/main/java/org/apache/rocketmq/common/protocol/header/PullMessageRequestHeader.java @@ -46,6 +46,7 @@ public class PullMessageRequestHeader implements CommandCustomHeader { private String subscription; @CFNotNull private Long subVersion; + private String expressionType; @Override public void checkFields() throws RemotingCommandException { @@ -130,4 +131,12 @@ public class PullMessageRequestHeader implements CommandCustomHeader { public void setSubVersion(Long subVersion) { this.subVersion = subVersion; } + + public String getExpressionType() { + return expressionType; + } + + public void setExpressionType(String expressionType) { + this.expressionType = expressionType; + } } http://git-wip-us.apache.org/repos/asf/incubator-rocketmq/blob/58f1574b/common/src/main/java/org/apache/rocketmq/common/protocol/header/QueryConsumeQueueRequestHeader.java ---------------------------------------------------------------------- diff --git a/common/src/main/java/org/apache/rocketmq/common/protocol/header/QueryConsumeQueueRequestHeader.java b/common/src/main/java/org/apache/rocketmq/common/protocol/header/QueryConsumeQueueRequestHeader.java new file mode 100644 index 0000000..642fe17 --- /dev/null +++ b/common/src/main/java/org/apache/rocketmq/common/protocol/header/QueryConsumeQueueRequestHeader.java @@ -0,0 +1,75 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.rocketmq.common.protocol.header; + +import org.apache.rocketmq.remoting.CommandCustomHeader; +import org.apache.rocketmq.remoting.exception.RemotingCommandException; + +public class QueryConsumeQueueRequestHeader implements CommandCustomHeader { + + private String topic; + private int queueId; + private long index; + private int count; + private String consumerGroup; + + public String getTopic() { + return topic; + } + + public void setTopic(String topic) { + this.topic = topic; + } + + public int getQueueId() { + return queueId; + } + + public void setQueueId(int queueId) { + this.queueId = queueId; + } + + public long getIndex() { + return index; + } + + public void setIndex(long index) { + this.index = index; + } + + public int getCount() { + return count; + } + + public void setCount(int count) { + this.count = count; + } + + public String getConsumerGroup() { + return consumerGroup; + } + + public void setConsumerGroup(String consumerGroup) { + this.consumerGroup = consumerGroup; + } + + @Override + public void checkFields() throws RemotingCommandException { + + } +} http://git-wip-us.apache.org/repos/asf/incubator-rocketmq/blob/58f1574b/common/src/main/java/org/apache/rocketmq/common/protocol/heartbeat/SubscriptionData.java ---------------------------------------------------------------------- diff --git a/common/src/main/java/org/apache/rocketmq/common/protocol/heartbeat/SubscriptionData.java b/common/src/main/java/org/apache/rocketmq/common/protocol/heartbeat/SubscriptionData.java index 81f5954..e456b7e 100644 --- a/common/src/main/java/org/apache/rocketmq/common/protocol/heartbeat/SubscriptionData.java +++ b/common/src/main/java/org/apache/rocketmq/common/protocol/heartbeat/SubscriptionData.java @@ -32,6 +32,7 @@ public class SubscriptionData implements Comparable<SubscriptionData> { private Set<String> tagsSet = new HashSet<String>(); private Set<Integer> codeSet = new HashSet<Integer>(); private long subVersion = System.currentTimeMillis(); + private String expressionType; @JSONField(serialize = false) private String filterClassSource; @@ -102,6 +103,14 @@ public class SubscriptionData implements Comparable<SubscriptionData> { this.classFilterMode = classFilterMode; } + public String getExpressionType() { + return expressionType; + } + + public void setExpressionType(String expressionType) { + this.expressionType = expressionType; + } + @Override public int hashCode() { final int prime = 31; @@ -111,6 +120,7 @@ public class SubscriptionData implements Comparable<SubscriptionData> { result = prime * result + ((subString == null) ? 0 : subString.hashCode()); result = prime * result + ((tagsSet == null) ? 0 : tagsSet.hashCode()); result = prime * result + ((topic == null) ? 0 : topic.hashCode()); + result = prime * result + ((expressionType == null) ? 0 : expressionType.hashCode()); return result; } @@ -147,6 +157,11 @@ public class SubscriptionData implements Comparable<SubscriptionData> { return false; } else if (!topic.equals(other.topic)) return false; + if (expressionType == null) { + if (other.expressionType != null) + return false; + } else if (!expressionType.equals(other.expressionType)) + return false; return true; } @@ -154,7 +169,7 @@ public class SubscriptionData implements Comparable<SubscriptionData> { public String toString() { return "SubscriptionData [classFilterMode=" + classFilterMode + ", topic=" + topic + ", subString=" + subString + ", tagsSet=" + tagsSet + ", codeSet=" + codeSet + ", subVersion=" + subVersion - + "]"; + + ", expressionType=" + expressionType + "]"; } @Override http://git-wip-us.apache.org/repos/asf/incubator-rocketmq/blob/58f1574b/common/src/test/java/org/apache/rocketmq/common/filter/FilterAPITest.java ---------------------------------------------------------------------- diff --git a/common/src/test/java/org/apache/rocketmq/common/filter/FilterAPITest.java b/common/src/test/java/org/apache/rocketmq/common/filter/FilterAPITest.java index 5137f32..c5f8460 100644 --- a/common/src/test/java/org/apache/rocketmq/common/filter/FilterAPITest.java +++ b/common/src/test/java/org/apache/rocketmq/common/filter/FilterAPITest.java @@ -42,4 +42,53 @@ public class FilterAPITest { } assertThat(subscriptionData.getTagsSet()).isEqualTo(tagSet); } + + @Test + public void testBuildTagSome() { + try { + SubscriptionData subscriptionData = FilterAPI.build( + "TOPIC", "A || B", ExpressionType.TAG + ); + + assertThat(subscriptionData).isNotNull(); + assertThat(subscriptionData.getTopic()).isEqualTo("TOPIC"); + assertThat(subscriptionData.getSubString()).isEqualTo("A || B"); + assertThat(ExpressionType.isTagType(subscriptionData.getExpressionType())).isTrue(); + + assertThat(subscriptionData.getTagsSet()).isNotNull(); + assertThat(subscriptionData.getTagsSet()).containsExactly("A", "B"); + } catch (Exception e) { + e.printStackTrace(); + assertThat(Boolean.FALSE).isTrue(); + } + } + + @Test + public void testBuildSQL() { + try { + SubscriptionData subscriptionData = FilterAPI.build( + "TOPIC", "a is not null", ExpressionType.SQL92 + ); + + assertThat(subscriptionData).isNotNull(); + assertThat(subscriptionData.getTopic()).isEqualTo("TOPIC"); + assertThat(subscriptionData.getExpressionType()).isEqualTo(ExpressionType.SQL92); + } catch (Exception e) { + e.printStackTrace(); + assertThat(Boolean.FALSE).isTrue(); + } + } + + @Test + public void testBuildSQLWithNullSubString() { + try { + FilterAPI.build( + "TOPIC", null, ExpressionType.SQL92 + ); + + assertThat(Boolean.FALSE).isTrue(); + } catch (Exception e) { + e.printStackTrace(); + } + } } http://git-wip-us.apache.org/repos/asf/incubator-rocketmq/blob/58f1574b/common/src/test/java/org/apache/rocketmq/common/message/MessageDecoderTest.java ---------------------------------------------------------------------- diff --git a/common/src/test/java/org/apache/rocketmq/common/message/MessageDecoderTest.java b/common/src/test/java/org/apache/rocketmq/common/message/MessageDecoderTest.java new file mode 100644 index 0000000..d14d6b0 --- /dev/null +++ b/common/src/test/java/org/apache/rocketmq/common/message/MessageDecoderTest.java @@ -0,0 +1,80 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.rocketmq.common.message; + +import org.junit.Test; + +import java.net.InetAddress; +import java.net.InetSocketAddress; +import java.net.UnknownHostException; +import java.nio.ByteBuffer; +import java.util.Map; + +import static org.assertj.core.api.Assertions.assertThat; + +public class MessageDecoderTest { + + @Test + public void testDecodeProperties() { + MessageExt messageExt = new MessageExt(); + + messageExt.setMsgId("645100FA00002A9F000000489A3AA09E"); + messageExt.setTopic("abc"); + messageExt.setBody("hello!q!".getBytes()); + try { + messageExt.setBornHost(new InetSocketAddress(InetAddress.getByName("127.0.0.1"), 0)); + } catch (UnknownHostException e) { + e.printStackTrace(); + assertThat(Boolean.FALSE).isTrue(); + } + messageExt.setBornTimestamp(System.currentTimeMillis()); + messageExt.setCommitLogOffset(123456); + messageExt.setPreparedTransactionOffset(0); + messageExt.setQueueId(0); + messageExt.setQueueOffset(123); + messageExt.setReconsumeTimes(0); + try { + messageExt.setStoreHost(new InetSocketAddress(InetAddress.getLocalHost(), 0)); + } catch (UnknownHostException e) { + e.printStackTrace(); + assertThat(Boolean.FALSE).isTrue(); + } + + messageExt.putUserProperty("a", "123"); + messageExt.putUserProperty("b", "hello"); + messageExt.putUserProperty("c", "3.14"); + + byte[] msgBytes = new byte[0]; + try { + msgBytes = MessageDecoder.encode(messageExt, false); + } catch (Exception e) { + e.printStackTrace(); + assertThat(Boolean.FALSE).isTrue(); + } + + ByteBuffer byteBuffer = ByteBuffer.allocate(msgBytes.length); + byteBuffer.put(msgBytes); + + Map<String, String> properties = MessageDecoder.decodeProperties(byteBuffer); + + assertThat(properties).isNotNull(); + assertThat("123").isEqualTo(properties.get("a")); + assertThat("hello").isEqualTo(properties.get("b")); + assertThat("3.14").isEqualTo(properties.get("c")); + } +} http://git-wip-us.apache.org/repos/asf/incubator-rocketmq/blob/58f1574b/distribution/conf/logback_broker.xml ---------------------------------------------------------------------- diff --git a/distribution/conf/logback_broker.xml b/distribution/conf/logback_broker.xml index 05c0ee4..dd5c63f 100644 --- a/distribution/conf/logback_broker.xml +++ b/distribution/conf/logback_broker.xml @@ -222,6 +222,29 @@ <appender-ref ref="RocketmqRebalanceLockAppender_inner"/> </appender> + <appender name="RocketmqFilterAppender_inner" + class="ch.qos.logback.core.rolling.RollingFileAppender"> + <file>${user.home}/logs/rocketmqlogs/filter.log</file> + <append>true</append> + <rollingPolicy class="ch.qos.logback.core.rolling.FixedWindowRollingPolicy"> + <fileNamePattern>${user.home}/logs/rocketmqlogs/otherdays/filter.%i.log + </fileNamePattern> + <minIndex>1</minIndex> + <maxIndex>10</maxIndex> + </rollingPolicy> + <triggeringPolicy + class="ch.qos.logback.core.rolling.SizeBasedTriggeringPolicy"> + <maxFileSize>100MB</maxFileSize> + </triggeringPolicy> + <encoder> + <pattern>%d{yyy-MM-dd HH:mm:ss,GMT+8} %p %t - %m%n</pattern> + <charset class="java.nio.charset.Charset">UTF-8</charset> + </encoder> + </appender> + <appender name="RocketmqFilterAppender" class="ch.qos.logback.classic.AsyncAppender"> + <appender-ref ref="RocketmqFilterAppender_inner"/> + </appender> + <appender name="RocketmqStatsAppender" class="ch.qos.logback.core.rolling.RollingFileAppender"> <file>${user.home}/logs/rocketmqlogs/stats.log</file> @@ -321,6 +344,11 @@ <appender-ref ref="RocketmqCommercialAppender"/> </logger> + <logger name="RocketmqFilter" additivity="false"> + <level value="INFO"/> + <appender-ref ref="RocketmqFilterAppender"/> + </logger> + <root> <level value="INFO"/> <appender-ref ref="DefaultAppender"/> http://git-wip-us.apache.org/repos/asf/incubator-rocketmq/blob/58f1574b/distribution/release.xml ---------------------------------------------------------------------- diff --git a/distribution/release.xml b/distribution/release.xml index 2d3ec1e..9e4ef2a 100644 --- a/distribution/release.xml +++ b/distribution/release.xml @@ -67,6 +67,7 @@ <include>org.apache.rocketmq:rocketmq-namesrv</include> <include>org.apache.rocketmq:rocketmq-filtersrv</include> <include>org.apache.rocketmq:rocketmq-example</include> + <include>org.apache.rocketmq:rocketmq-filter</include> </includes> <binaries> <outputDirectory>lib/</outputDirectory> http://git-wip-us.apache.org/repos/asf/incubator-rocketmq/blob/58f1574b/example/src/main/java/org/apache/rocketmq/example/benchmark/Consumer.java ---------------------------------------------------------------------- diff --git a/example/src/main/java/org/apache/rocketmq/example/benchmark/Consumer.java b/example/src/main/java/org/apache/rocketmq/example/benchmark/Consumer.java index 473e4c7..3e1b79b 100644 --- a/example/src/main/java/org/apache/rocketmq/example/benchmark/Consumer.java +++ b/example/src/main/java/org/apache/rocketmq/example/benchmark/Consumer.java @@ -27,10 +27,13 @@ import org.apache.commons.cli.Option; import org.apache.commons.cli.Options; import org.apache.commons.cli.PosixParser; import org.apache.rocketmq.client.consumer.DefaultMQPushConsumer; +import org.apache.rocketmq.client.consumer.MessageSelector; import org.apache.rocketmq.client.consumer.listener.ConsumeConcurrentlyContext; import org.apache.rocketmq.client.consumer.listener.ConsumeConcurrentlyStatus; import org.apache.rocketmq.client.consumer.listener.MessageListenerConcurrently; import org.apache.rocketmq.client.exception.MQClientException; +import org.apache.rocketmq.common.MixAll; +import org.apache.rocketmq.common.filter.ExpressionType; import org.apache.rocketmq.common.message.MessageExt; import org.apache.rocketmq.srvutil.ServerUtil; @@ -46,12 +49,14 @@ public class Consumer { final String topic = commandLine.hasOption('t') ? commandLine.getOptionValue('t').trim() : "BenchmarkTest"; final String groupPrefix = commandLine.hasOption('g') ? commandLine.getOptionValue('g').trim() : "benchmark_consumer"; final String isPrefixEnable = commandLine.hasOption('p') ? commandLine.getOptionValue('p').trim() : "true"; + final String filterType = commandLine.hasOption('f') ? commandLine.getOptionValue('f').trim() : null; + final String expression = commandLine.hasOption('e') ? commandLine.getOptionValue('e').trim() : null; String group = groupPrefix; if (Boolean.parseBoolean(isPrefixEnable)) { group = groupPrefix + "_" + Long.toString(System.currentTimeMillis() % 100); } - System.out.printf("topic %s group %s prefix %s%n", topic, group, isPrefixEnable); + System.out.printf("topic: %s, group: %s, prefix: %s, filterType: %s, expression: %s%n", topic, group, isPrefixEnable, filterType, expression); final StatsBenchmarkConsumer statsBenchmarkConsumer = new StatsBenchmarkConsumer(); @@ -99,7 +104,21 @@ public class Consumer { DefaultMQPushConsumer consumer = new DefaultMQPushConsumer(group); consumer.setInstanceName(Long.toString(System.currentTimeMillis())); - consumer.subscribe(topic, "*"); + if (filterType == null || expression == null) { + consumer.subscribe(topic, "*"); + } else { + if (ExpressionType.TAG.equals(filterType)) { + String expr = MixAll.file2String(expression); + System.out.printf("Expression: %s%n", expr); + consumer.subscribe(topic, MessageSelector.byTag(expr)); + } else if (ExpressionType.SQL92.equals(filterType)) { + String expr = MixAll.file2String(expression); + System.out.printf("Expression: %s%n", expr); + consumer.subscribe(topic, MessageSelector.bySql(expr)); + } else { + throw new IllegalArgumentException("Not support filter type! " + filterType); + } + } consumer.registerMessageListener(new MessageListenerConcurrently() { @Override @@ -142,6 +161,14 @@ public class Consumer { opt.setRequired(false); options.addOption(opt); + opt = new Option("f", "filterType", true, "TAG, SQL92"); + opt.setRequired(false); + options.addOption(opt); + + opt = new Option("e", "expression", true, "filter expression content file path.ie: ./test/expr"); + opt.setRequired(false); + options.addOption(opt); + return options; } http://git-wip-us.apache.org/repos/asf/incubator-rocketmq/blob/58f1574b/example/src/main/java/org/apache/rocketmq/example/benchmark/Producer.java ---------------------------------------------------------------------- diff --git a/example/src/main/java/org/apache/rocketmq/example/benchmark/Producer.java b/example/src/main/java/org/apache/rocketmq/example/benchmark/Producer.java index 50d750d..2d8d0f6 100644 --- a/example/src/main/java/org/apache/rocketmq/example/benchmark/Producer.java +++ b/example/src/main/java/org/apache/rocketmq/example/benchmark/Producer.java @@ -18,11 +18,13 @@ package org.apache.rocketmq.example.benchmark; import java.io.UnsupportedEncodingException; import java.util.LinkedList; +import java.util.Random; import java.util.Timer; import java.util.TimerTask; import java.util.concurrent.ExecutorService; import java.util.concurrent.Executors; import java.util.concurrent.atomic.AtomicLong; + import org.apache.commons.cli.CommandLine; import org.apache.commons.cli.Option; import org.apache.commons.cli.Options; @@ -50,13 +52,12 @@ public class Producer { final int threadCount = commandLine.hasOption('w') ? Integer.parseInt(commandLine.getOptionValue('w')) : 64; final int messageSize = commandLine.hasOption('s') ? Integer.parseInt(commandLine.getOptionValue('s')) : 128; final boolean keyEnable = commandLine.hasOption('k') && Boolean.parseBoolean(commandLine.getOptionValue('k')); + final int propertySize = commandLine.hasOption('p') ? Integer.parseInt(commandLine.getOptionValue('p')) : 0; System.out.printf("topic %s threadCount %d messageSize %d keyEnable %s%n", topic, threadCount, messageSize, keyEnable); final Logger log = ClientLogger.getLog(); - final Message msg = buildMessage(messageSize, topic); - final ExecutorService sendThreadPool = Executors.newFixedThreadPool(threadCount); final StatsBenchmarkProducer statsBenchmark = new StatsBenchmarkProducer(); @@ -117,10 +118,37 @@ public class Producer { public void run() { while (true) { try { + final Message msg; + try { + msg = buildMessage(messageSize, topic); + } catch (UnsupportedEncodingException e) { + e.printStackTrace(); + return; + } final long beginTimestamp = System.currentTimeMillis(); if (keyEnable) { msg.setKeys(String.valueOf(beginTimestamp / 1000)); } + if (propertySize > 0) { + if (msg.getProperties() != null) { + msg.getProperties().clear(); + } + int i = 0; + int startValue = (new Random(System.currentTimeMillis())).nextInt(100); + int size = 0; + while (true) { + String prop1 = "prop" + i, prop1V = "hello" + startValue; + String prop2 = "prop" + (i + 1), prop2V = String.valueOf(startValue); + msg.putUserProperty(prop1, prop1V); + msg.putUserProperty(prop2, prop2V); + size += prop1.length() + prop2.length() + prop1V.length() + prop2V.length(); + if (size > propertySize) { + break; + } + i += 2; + startValue += 2; + } + } producer.send(msg); statsBenchmark.getSendRequestSuccessCount().incrementAndGet(); statsBenchmark.getReceiveResponseSuccessCount().incrementAndGet(); @@ -214,7 +242,7 @@ class StatsBenchmarkProducer { private final AtomicLong sendMessageMaxRT = new AtomicLong(0L); public Long[] createSnapshot() { - Long[] snap = new Long[] { + Long[] snap = new Long[]{ System.currentTimeMillis(), this.sendRequestSuccessCount.get(), this.sendRequestFailedCount.get(), http://git-wip-us.apache.org/repos/asf/incubator-rocketmq/blob/58f1574b/example/src/main/java/org/apache/rocketmq/example/filter/SqlConsumer.java ---------------------------------------------------------------------- diff --git a/example/src/main/java/org/apache/rocketmq/example/filter/SqlConsumer.java b/example/src/main/java/org/apache/rocketmq/example/filter/SqlConsumer.java new file mode 100644 index 0000000..9a3b813 --- /dev/null +++ b/example/src/main/java/org/apache/rocketmq/example/filter/SqlConsumer.java @@ -0,0 +1,62 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.rocketmq.example.filter; + +import org.apache.rocketmq.client.consumer.DefaultMQPushConsumer; +import org.apache.rocketmq.client.consumer.MessageSelector; +import org.apache.rocketmq.client.consumer.listener.ConsumeConcurrentlyContext; +import org.apache.rocketmq.client.consumer.listener.ConsumeConcurrentlyStatus; +import org.apache.rocketmq.client.consumer.listener.MessageListenerConcurrently; +import org.apache.rocketmq.client.exception.MQClientException; +import org.apache.rocketmq.common.message.MessageExt; + +import java.util.List; + +public class SqlConsumer { + + public static void main(String[] args) { + DefaultMQPushConsumer consumer = new DefaultMQPushConsumer("please_rename_unique_group_name_4"); + + try { + consumer.subscribe("TopicTest", + MessageSelector.bySql("(TAGS is not null and TAGS in ('TagA', 'TagB'))" + + "and (a is not null and a between 0 3)")); + } catch (MQClientException e) { + e.printStackTrace(); + return; + } + + consumer.registerMessageListener(new MessageListenerConcurrently() { + + @Override + public ConsumeConcurrentlyStatus consumeMessage(List<MessageExt> msgs, + ConsumeConcurrentlyContext context) { + System.out.printf(Thread.currentThread().getName() + " Receive New Messages: " + msgs + "%n"); + return ConsumeConcurrentlyStatus.CONSUME_SUCCESS; + } + }); + + try { + consumer.start(); + } catch (MQClientException e) { + e.printStackTrace(); + return; + } + System.out.printf("Consumer Started.%n"); + } +} http://git-wip-us.apache.org/repos/asf/incubator-rocketmq/blob/58f1574b/example/src/main/java/org/apache/rocketmq/example/filter/SqlProducer.java ---------------------------------------------------------------------- diff --git a/example/src/main/java/org/apache/rocketmq/example/filter/SqlProducer.java b/example/src/main/java/org/apache/rocketmq/example/filter/SqlProducer.java new file mode 100644 index 0000000..3f3a0e6 --- /dev/null +++ b/example/src/main/java/org/apache/rocketmq/example/filter/SqlProducer.java @@ -0,0 +1,67 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.rocketmq.example.filter; + +import org.apache.rocketmq.client.exception.MQClientException; +import org.apache.rocketmq.client.producer.DefaultMQProducer; +import org.apache.rocketmq.client.producer.SendResult; +import org.apache.rocketmq.common.message.Message; +import org.apache.rocketmq.remoting.common.RemotingHelper; + +public class SqlProducer { + + public static void main(String[] args) { + DefaultMQProducer producer = new DefaultMQProducer("please_rename_unique_group_name"); + try { + producer.start(); + } catch (MQClientException e) { + e.printStackTrace(); + return; + } + + for (int i = 0; i < 10; i++) { + try { + String tag; + int div = i % 3; + if (div == 0) { + tag = "TagA"; + } else if (div == 1) { + tag = "TagB"; + } else { + tag = "TagC"; + } + Message msg = new Message("TopicTest", + tag, + ("Hello RocketMQ " + i).getBytes(RemotingHelper.DEFAULT_CHARSET) + ); + msg.putUserProperty("a", String.valueOf(i)); + + SendResult sendResult = producer.send(msg); + System.out.printf("%s%n", sendResult); + } catch (Exception e) { + e.printStackTrace(); + try { + Thread.sleep(1000); + } catch (InterruptedException e1) { + e1.printStackTrace(); + } + } + } + producer.shutdown(); + } +} http://git-wip-us.apache.org/repos/asf/incubator-rocketmq/blob/58f1574b/filter/pom.xml ---------------------------------------------------------------------- diff --git a/filter/pom.xml b/filter/pom.xml new file mode 100644 index 0000000..7978f05 --- /dev/null +++ b/filter/pom.xml @@ -0,0 +1,43 @@ +<?xml version="1.0" encoding="UTF-8"?> +<!-- + ~ Licensed to the Apache Software Foundation (ASF) under one or more + ~ contributor license agreements. See the NOTICE file distributed with + ~ this work for additional information regarding copyright ownership. + ~ The ASF licenses this file to You under the Apache License, Version 2.0 + ~ (the "License"); you may not use this file except in compliance with + ~ the License. You may obtain a copy of the License at + ~ + ~ http://www.apache.org/licenses/LICENSE-2.0 + ~ + ~ Unless required by applicable law or agreed to in writing, software + ~ distributed under the License is distributed on an "AS IS" BASIS, + ~ WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + ~ See the License for the specific language governing permissions and + ~ limitations under the License. + --> + +<project xmlns="http://maven.apache.org/POM/4.0.0" + xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" + xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd"> + <parent> + <artifactId>rocketmq-all</artifactId> + <groupId>org.apache.rocketmq</groupId> + <version>4.1.0-incubating-SNAPSHOT</version> + </parent> + + <modelVersion>4.0.0</modelVersion> + <packaging>jar</packaging> + <artifactId>rocketmq-filter</artifactId> + <name>rocketmq-filter ${project.version}</name> + + <dependencies> + <dependency> + <groupId>${project.groupId}</groupId> + <artifactId>rocketmq-common</artifactId> + </dependency> + <dependency> + <groupId>${project.groupId}</groupId> + <artifactId>rocketmq-srvutil</artifactId> + </dependency> + </dependencies> +</project> \ No newline at end of file http://git-wip-us.apache.org/repos/asf/incubator-rocketmq/blob/58f1574b/filter/src/main/java/org/apache/rocketmq/filter/FilterFactory.java ---------------------------------------------------------------------- diff --git a/filter/src/main/java/org/apache/rocketmq/filter/FilterFactory.java b/filter/src/main/java/org/apache/rocketmq/filter/FilterFactory.java new file mode 100644 index 0000000..a318548 --- /dev/null +++ b/filter/src/main/java/org/apache/rocketmq/filter/FilterFactory.java @@ -0,0 +1,72 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.rocketmq.filter; + +import java.util.HashMap; +import java.util.Map; + +/** + * Filter factory: support other filter to register. + */ +public class FilterFactory { + + public static final FilterFactory INSTANCE = new FilterFactory(); + + protected static final Map<String, FilterSpi> FILTER_SPI_HOLDER = new HashMap<String, FilterSpi>(4); + + static { + FilterFactory.INSTANCE.register(new SqlFilter()); + } + + /** + * Register a filter. + * <br> + * Note: + * <li>1. Filter registered will be used in broker server, so take care of it's reliability and performance.</li> + * + * @param filterSpi + */ + public void register(FilterSpi filterSpi) { + if (FILTER_SPI_HOLDER.containsKey(filterSpi.ofType())) { + throw new IllegalArgumentException(String.format("Filter spi type(%s) already exist!", filterSpi.ofType())); + } + + FILTER_SPI_HOLDER.put(filterSpi.ofType(), filterSpi); + } + + /** + * Un register a filter. + * + * @param type + * @return + */ + public FilterSpi unRegister(String type) { + return FILTER_SPI_HOLDER.remove(type); + } + + /** + * Get a filter registered, null if none exist. + * + * @param type + * @return + */ + public FilterSpi get(String type) { + return FILTER_SPI_HOLDER.get(type); + } + +} http://git-wip-us.apache.org/repos/asf/incubator-rocketmq/blob/58f1574b/filter/src/main/java/org/apache/rocketmq/filter/FilterSpi.java ---------------------------------------------------------------------- diff --git a/filter/src/main/java/org/apache/rocketmq/filter/FilterSpi.java b/filter/src/main/java/org/apache/rocketmq/filter/FilterSpi.java new file mode 100644 index 0000000..fcc39fa --- /dev/null +++ b/filter/src/main/java/org/apache/rocketmq/filter/FilterSpi.java @@ -0,0 +1,43 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.rocketmq.filter; + +import org.apache.rocketmq.filter.expression.Expression; +import org.apache.rocketmq.filter.expression.MQFilterException; + +/** + * Filter spi interface. + */ +public interface FilterSpi { + + /** + * Compile. + * + * @param expr + * @return + * @throws org.apache.rocketmq.filter.expression.MQFilterException + */ + Expression compile(final String expr) throws MQFilterException; + + /** + * Which type. + * + * @return + */ + String ofType(); +} http://git-wip-us.apache.org/repos/asf/incubator-rocketmq/blob/58f1574b/filter/src/main/java/org/apache/rocketmq/filter/SqlFilter.java ---------------------------------------------------------------------- diff --git a/filter/src/main/java/org/apache/rocketmq/filter/SqlFilter.java b/filter/src/main/java/org/apache/rocketmq/filter/SqlFilter.java new file mode 100644 index 0000000..0c1ffb8 --- /dev/null +++ b/filter/src/main/java/org/apache/rocketmq/filter/SqlFilter.java @@ -0,0 +1,43 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.rocketmq.filter; + +import org.apache.rocketmq.common.filter.ExpressionType; +import org.apache.rocketmq.filter.expression.Expression; +import org.apache.rocketmq.filter.expression.MQFilterException; +import org.apache.rocketmq.filter.parser.SelectorParser; + +/** + * SQL92 Filter, just a wrapper of {@link org.apache.rocketmq.filter.parser.SelectorParser}. + * <p/> + * <p> + * Do not use this filter directly.Use {@link FilterFactory#get} to select a filter. + * </p> + */ +public class SqlFilter implements FilterSpi { + + @Override + public Expression compile(final String expr) throws MQFilterException { + return SelectorParser.parse(expr); + } + + @Override + public String ofType() { + return ExpressionType.SQL92; + } +} http://git-wip-us.apache.org/repos/asf/incubator-rocketmq/blob/58f1574b/filter/src/main/java/org/apache/rocketmq/filter/constant/UnaryType.java ---------------------------------------------------------------------- diff --git a/filter/src/main/java/org/apache/rocketmq/filter/constant/UnaryType.java b/filter/src/main/java/org/apache/rocketmq/filter/constant/UnaryType.java new file mode 100644 index 0000000..d2d04cd --- /dev/null +++ b/filter/src/main/java/org/apache/rocketmq/filter/constant/UnaryType.java @@ -0,0 +1,26 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.rocketmq.filter.constant; + +public enum UnaryType { + NEGATE, + IN, + NOT, + BOOLEANCAST, + LIKE +} http://git-wip-us.apache.org/repos/asf/incubator-rocketmq/blob/58f1574b/filter/src/main/java/org/apache/rocketmq/filter/expression/BinaryExpression.java ---------------------------------------------------------------------- diff --git a/filter/src/main/java/org/apache/rocketmq/filter/expression/BinaryExpression.java b/filter/src/main/java/org/apache/rocketmq/filter/expression/BinaryExpression.java new file mode 100644 index 0000000..0f172e3 --- /dev/null +++ b/filter/src/main/java/org/apache/rocketmq/filter/expression/BinaryExpression.java @@ -0,0 +1,91 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.rocketmq.filter.expression; + +/** + * An expression which performs an operation on two expression values. + * <p> + * This class was taken from ActiveMQ org.apache.activemq.filter.BinaryExpression, + * </p> + */ +public abstract class BinaryExpression implements Expression { + protected Expression left; + protected Expression right; + + public BinaryExpression(Expression left, Expression right) { + this.left = left; + this.right = right; + } + + public Expression getLeft() { + return left; + } + + public Expression getRight() { + return right; + } + + /** + * @see Object#toString() + */ + public String toString() { + return "(" + left.toString() + " " + getExpressionSymbol() + " " + right.toString() + ")"; + } + + /** + * @see Object#hashCode() + */ + public int hashCode() { + return toString().hashCode(); + } + + /** + * @see Object#equals(Object) + */ + public boolean equals(Object o) { + + if (o == null || !this.getClass().equals(o.getClass())) { + return false; + } + return toString().equals(o.toString()); + + } + + /** + * Returns the symbol that represents this binary expression. For example, addition is + * represented by "+" + * + * @return + */ + public abstract String getExpressionSymbol(); + + /** + * @param expression + */ + public void setRight(Expression expression) { + right = expression; + } + + /** + * @param expression + */ + public void setLeft(Expression expression) { + left = expression; + } + +} http://git-wip-us.apache.org/repos/asf/incubator-rocketmq/blob/58f1574b/filter/src/main/java/org/apache/rocketmq/filter/expression/BooleanExpression.java ---------------------------------------------------------------------- diff --git a/filter/src/main/java/org/apache/rocketmq/filter/expression/BooleanExpression.java b/filter/src/main/java/org/apache/rocketmq/filter/expression/BooleanExpression.java new file mode 100644 index 0000000..bb54632 --- /dev/null +++ b/filter/src/main/java/org/apache/rocketmq/filter/expression/BooleanExpression.java @@ -0,0 +1,39 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.rocketmq.filter.expression; + +/** + * A BooleanExpression is an expression that always + * produces a Boolean result. + * <p> + * This class was taken from ActiveMQ org.apache.activemq.filter.BooleanExpression, + * but the parameter is changed to an interface. + * </p> + * + * @see org.apache.rocketmq.filter.expression.EvaluationContext + */ +public interface BooleanExpression extends Expression { + + /** + * @param context + * @return true if the expression evaluates to Boolean.TRUE. + * @throws Exception + */ + boolean matches(EvaluationContext context) throws Exception; + +} http://git-wip-us.apache.org/repos/asf/incubator-rocketmq/blob/58f1574b/filter/src/main/java/org/apache/rocketmq/filter/expression/ComparisonExpression.java ---------------------------------------------------------------------- diff --git a/filter/src/main/java/org/apache/rocketmq/filter/expression/ComparisonExpression.java b/filter/src/main/java/org/apache/rocketmq/filter/expression/ComparisonExpression.java new file mode 100644 index 0000000..8b82e57 --- /dev/null +++ b/filter/src/main/java/org/apache/rocketmq/filter/expression/ComparisonExpression.java @@ -0,0 +1,413 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.rocketmq.filter.expression; + +import java.util.List; + +/** + * A filter performing a comparison of two objects + * <p> + * This class was taken from ActiveMQ org.apache.activemq.filter.ComparisonExpression, + * but: + * 1. Remove LIKE expression, and related methods; + * 2. Extract a new method __compare which has int return value; + * 3. When create between expression, check whether left value is less or equal than right value; + * 4. For string type value(can not convert to number), only equal or unequal comparison are supported. + * </p> + */ +public abstract class ComparisonExpression extends BinaryExpression implements BooleanExpression { + + public static final ThreadLocal<Boolean> CONVERT_STRING_EXPRESSIONS = new ThreadLocal<Boolean>(); + + boolean convertStringExpressions = false; + + /** + * @param left + * @param right + */ + public ComparisonExpression(Expression left, Expression right) { + super(left, right); + convertStringExpressions = CONVERT_STRING_EXPRESSIONS.get() != null; + } + + public static BooleanExpression createBetween(Expression value, Expression left, Expression right) { + // check + if (left instanceof ConstantExpression && right instanceof ConstantExpression) { + Object lv = ((ConstantExpression) left).getValue(); + Object rv = ((ConstantExpression) right).getValue(); + if (lv == null || rv == null) { + throw new RuntimeException("Illegal values of between, values can not be null!"); + } + if (lv instanceof Comparable && rv instanceof Comparable) { + int ret = __compare((Comparable) rv, (Comparable) lv, true); + if (ret < 0) + throw new RuntimeException( + String.format("Illegal values of between, left value(%s) must less than or equal to right value(%s)", lv, rv) + ); + } + } + + return LogicExpression.createAND(createGreaterThanEqual(value, left), createLessThanEqual(value, right)); + } + + public static BooleanExpression createNotBetween(Expression value, Expression left, Expression right) { + return LogicExpression.createOR(createLessThan(value, left), createGreaterThan(value, right)); + } + + @SuppressWarnings({"rawtypes", "unchecked"}) + public static BooleanExpression createInFilter(Expression left, List elements) { + + if (!(left instanceof PropertyExpression)) { + throw new RuntimeException("Expected a property for In expression, got: " + left); + } + return UnaryExpression.createInExpression((PropertyExpression) left, elements, false); + + } + + @SuppressWarnings({"rawtypes", "unchecked"}) + public static BooleanExpression createNotInFilter(Expression left, List elements) { + + if (!(left instanceof PropertyExpression)) { + throw new RuntimeException("Expected a property for In expression, got: " + left); + } + return UnaryExpression.createInExpression((PropertyExpression) left, elements, true); + + } + + public static BooleanExpression createIsNull(Expression left) { + return doCreateEqual(left, ConstantExpression.NULL); + } + + public static BooleanExpression createIsNotNull(Expression left) { + return UnaryExpression.createNOT(doCreateEqual(left, ConstantExpression.NULL)); + } + + public static BooleanExpression createNotEqual(Expression left, Expression right) { + return UnaryExpression.createNOT(createEqual(left, right)); + } + + public static BooleanExpression createEqual(Expression left, Expression right) { + checkEqualOperand(left); + checkEqualOperand(right); + checkEqualOperandCompatability(left, right); + return doCreateEqual(left, right); + } + + @SuppressWarnings({"rawtypes"}) + private static BooleanExpression doCreateEqual(Expression left, Expression right) { + return new ComparisonExpression(left, right) { + + public Object evaluate(EvaluationContext context) throws Exception { + Object lv = left.evaluate(context); + Object rv = right.evaluate(context); + + // If one of the values is null + if (lv == null ^ rv == null) { + if (lv == null) { + return null; + } + return Boolean.FALSE; + } + if (lv == rv || lv.equals(rv)) { + return Boolean.TRUE; + } + if (lv instanceof Comparable && rv instanceof Comparable) { + return compare((Comparable) lv, (Comparable) rv); + } + return Boolean.FALSE; + } + + protected boolean asBoolean(int answer) { + return answer == 0; + } + + public String getExpressionSymbol() { + return "=="; + } + }; + } + + public static BooleanExpression createGreaterThan(final Expression left, final Expression right) { + checkLessThanOperand(left); + checkLessThanOperand(right); + return new ComparisonExpression(left, right) { + protected boolean asBoolean(int answer) { + return answer > 0; + } + + public String getExpressionSymbol() { + return ">"; + } + }; + } + + public static BooleanExpression createGreaterThanEqual(final Expression left, final Expression right) { + checkLessThanOperand(left); + checkLessThanOperand(right); + return new ComparisonExpression(left, right) { + protected boolean asBoolean(int answer) { + return answer >= 0; + } + + public String getExpressionSymbol() { + return ">="; + } + }; + } + + public static BooleanExpression createLessThan(final Expression left, final Expression right) { + checkLessThanOperand(left); + checkLessThanOperand(right); + return new ComparisonExpression(left, right) { + + protected boolean asBoolean(int answer) { + return answer < 0; + } + + public String getExpressionSymbol() { + return "<"; + } + + }; + } + + public static BooleanExpression createLessThanEqual(final Expression left, final Expression right) { + checkLessThanOperand(left); + checkLessThanOperand(right); + return new ComparisonExpression(left, right) { + + protected boolean asBoolean(int answer) { + return answer <= 0; + } + + public String getExpressionSymbol() { + return "<="; + } + }; + } + + /** + * Only Numeric expressions can be used in >, >=, < or <= expressions.s + * + * @param expr + */ + public static void checkLessThanOperand(Expression expr) { + if (expr instanceof ConstantExpression) { + Object value = ((ConstantExpression) expr).getValue(); + if (value instanceof Number) { + return; + } + + // Else it's boolean or a String.. + throw new RuntimeException("Value '" + expr + "' cannot be compared."); + } + if (expr instanceof BooleanExpression) { + throw new RuntimeException("Value '" + expr + "' cannot be compared."); + } + } + + /** + * Validates that the expression can be used in == or <> expression. Cannot + * not be NULL TRUE or FALSE litterals. + * + * @param expr + */ + public static void checkEqualOperand(Expression expr) { + if (expr instanceof ConstantExpression) { + Object value = ((ConstantExpression) expr).getValue(); + if (value == null) { + throw new RuntimeException("'" + expr + "' cannot be compared."); + } + } + } + + /** + * @param left + * @param right + */ + private static void checkEqualOperandCompatability(Expression left, Expression right) { + if (left instanceof ConstantExpression && right instanceof ConstantExpression) { + if (left instanceof BooleanExpression && !(right instanceof BooleanExpression)) { + throw new RuntimeException("'" + left + "' cannot be compared with '" + right + "'"); + } + } + } + + @SuppressWarnings({"rawtypes", "unchecked"}) + public Object evaluate(EvaluationContext context) throws Exception { + Comparable<Comparable> lv = (Comparable) left.evaluate(context); + if (lv == null) { + return null; + } + Comparable rv = (Comparable) right.evaluate(context); + if (rv == null) { + return null; + } + if (getExpressionSymbol().equals(">=") || getExpressionSymbol().equals(">") + || getExpressionSymbol().equals("<") || getExpressionSymbol().equals("<=")) { + Class<? extends Comparable> lc = lv.getClass(); + Class<? extends Comparable> rc = rv.getClass(); + if (lc == rc && lc == String.class) { + // Compare String is illegal + // first try to convert to double + try { + Comparable lvC = Double.valueOf((String) (Comparable) lv); + Comparable rvC = Double.valueOf((String) rv); + + return compare(lvC, rvC); + } catch (Exception e) { + throw new RuntimeException("It's illegal to compare string by '>=', '>', '<', '<='. lv=" + lv + ", rv=" + rv, e); + } + } + } + return compare(lv, rv); + } + + @SuppressWarnings({"rawtypes", "unchecked"}) + protected static int __compare(Comparable lv, Comparable rv, boolean convertStringExpressions) { + Class<? extends Comparable> lc = lv.getClass(); + Class<? extends Comparable> rc = rv.getClass(); + // If the the objects are not of the same type, + // try to convert up to allow the comparison. + if (lc != rc) { + try { + if (lc == Boolean.class) { + if (convertStringExpressions && rc == String.class) { + lv = Boolean.valueOf((String) lv).booleanValue(); + } else { + return -1; + } + } else if (lc == Byte.class) { + if (rc == Short.class) { + lv = Short.valueOf(((Number) lv).shortValue()); + } else if (rc == Integer.class) { + lv = Integer.valueOf(((Number) lv).intValue()); + } else if (rc == Long.class) { + lv = Long.valueOf(((Number) lv).longValue()); + } else if (rc == Float.class) { + lv = new Float(((Number) lv).floatValue()); + } else if (rc == Double.class) { + lv = new Double(((Number) lv).doubleValue()); + } else if (convertStringExpressions && rc == String.class) { + rv = Byte.valueOf((String) rv); + } else { + return -1; + } + } else if (lc == Short.class) { + if (rc == Integer.class) { + lv = Integer.valueOf(((Number) lv).intValue()); + } else if (rc == Long.class) { + lv = Long.valueOf(((Number) lv).longValue()); + } else if (rc == Float.class) { + lv = new Float(((Number) lv).floatValue()); + } else if (rc == Double.class) { + lv = new Double(((Number) lv).doubleValue()); + } else if (convertStringExpressions && rc == String.class) { + rv = Short.valueOf((String) rv); + } else { + return -1; + } + } else if (lc == Integer.class) { + if (rc == Long.class) { + lv = Long.valueOf(((Number) lv).longValue()); + } else if (rc == Float.class) { + lv = new Float(((Number) lv).floatValue()); + } else if (rc == Double.class) { + lv = new Double(((Number) lv).doubleValue()); + } else if (convertStringExpressions && rc == String.class) { + rv = Integer.valueOf((String) rv); + } else { + return -1; + } + } else if (lc == Long.class) { + if (rc == Integer.class) { + rv = Long.valueOf(((Number) rv).longValue()); + } else if (rc == Float.class) { + lv = new Float(((Number) lv).floatValue()); + } else if (rc == Double.class) { + lv = new Double(((Number) lv).doubleValue()); + } else if (convertStringExpressions && rc == String.class) { + rv = Long.valueOf((String) rv); + } else { + return -1; + } + } else if (lc == Float.class) { + if (rc == Integer.class) { + rv = new Float(((Number) rv).floatValue()); + } else if (rc == Long.class) { + rv = new Float(((Number) rv).floatValue()); + } else if (rc == Double.class) { + lv = new Double(((Number) lv).doubleValue()); + } else if (convertStringExpressions && rc == String.class) { + rv = Float.valueOf((String) rv); + } else { + return -1; + } + } else if (lc == Double.class) { + if (rc == Integer.class) { + rv = new Double(((Number) rv).doubleValue()); + } else if (rc == Long.class) { + rv = new Double(((Number) rv).doubleValue()); + } else if (rc == Float.class) { + rv = new Float(((Number) rv).doubleValue()); + } else if (convertStringExpressions && rc == String.class) { + rv = Double.valueOf((String) rv); + } else { + return -1; + } + } else if (convertStringExpressions && lc == String.class) { + if (rc == Boolean.class) { + lv = Boolean.valueOf((String) lv); + } else if (rc == Byte.class) { + lv = Byte.valueOf((String) lv); + } else if (rc == Short.class) { + lv = Short.valueOf((String) lv); + } else if (rc == Integer.class) { + lv = Integer.valueOf((String) lv); + } else if (rc == Long.class) { + lv = Long.valueOf((String) lv); + } else if (rc == Float.class) { + lv = Float.valueOf((String) lv); + } else if (rc == Double.class) { + lv = Double.valueOf((String) lv); + } else { + return -1; + } + } else { + return -1; + } + } catch (NumberFormatException e) { + throw new RuntimeException(e); + } + } + return lv.compareTo(rv); + } + + @SuppressWarnings({"rawtypes", "unchecked"}) + protected Boolean compare(Comparable lv, Comparable rv) { + return asBoolean(__compare(lv, rv, convertStringExpressions)) ? Boolean.TRUE : Boolean.FALSE; + } + + protected abstract boolean asBoolean(int answer); + + public boolean matches(EvaluationContext context) throws Exception { + Object object = evaluate(context); + return object != null && object == Boolean.TRUE; + } + +} http://git-wip-us.apache.org/repos/asf/incubator-rocketmq/blob/58f1574b/filter/src/main/java/org/apache/rocketmq/filter/expression/ConstantExpression.java ---------------------------------------------------------------------- diff --git a/filter/src/main/java/org/apache/rocketmq/filter/expression/ConstantExpression.java b/filter/src/main/java/org/apache/rocketmq/filter/expression/ConstantExpression.java new file mode 100644 index 0000000..ca70f51 --- /dev/null +++ b/filter/src/main/java/org/apache/rocketmq/filter/expression/ConstantExpression.java @@ -0,0 +1,156 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.rocketmq.filter.expression; + +/** + * Represents a constant expression + * <p> + * This class was taken from ActiveMQ org.apache.activemq.filter.ConstantExpression, + * but: + * 1. For long type constant, the range bound by java Long type; + * 2. For float type constant, the range bound by java Double type; + * 3. Remove Hex and Octal expression; + * 4. Add now expression to support to get current time. + * </p> + */ +public class ConstantExpression implements Expression { + + static class BooleanConstantExpression extends ConstantExpression implements BooleanExpression { + public BooleanConstantExpression(Object value) { + super(value); + } + + public boolean matches(EvaluationContext context) throws Exception { + Object object = evaluate(context); + return object != null && object == Boolean.TRUE; + } + } + + public static final BooleanConstantExpression NULL = new BooleanConstantExpression(null); + public static final BooleanConstantExpression TRUE = new BooleanConstantExpression(Boolean.TRUE); + public static final BooleanConstantExpression FALSE = new BooleanConstantExpression(Boolean.FALSE); + + private Object value; + + public ConstantExpression(Object value) { + this.value = value; + } + + public static ConstantExpression createFromDecimal(String text) { + + // Strip off the 'l' or 'L' if needed. + if (text.endsWith("l") || text.endsWith("L")) { + text = text.substring(0, text.length() - 1); + } + + // only support Long.MIN_VALUE ~ Long.MAX_VALUE + Number value = new Long(text); +// try { +// value = new Long(text); +// } catch (NumberFormatException e) { +// // The number may be too big to fit in a long. +// value = new BigDecimal(text); +// } + + long l = value.longValue(); + if (Integer.MIN_VALUE <= l && l <= Integer.MAX_VALUE) { + value = Integer.valueOf(value.intValue()); + } + return new ConstantExpression(value); + } + + public static ConstantExpression createFloat(String text) { + Double value = new Double(text); + if (value > Double.MAX_VALUE) { + throw new RuntimeException(text + " is greater than " + Double.MAX_VALUE); + } + if (value < Double.MIN_VALUE) { + throw new RuntimeException(text + " is less than " + Double.MIN_VALUE); + } + return new ConstantExpression(value); + } + + public static ConstantExpression createNow() { + return new NowExpression(); + } + + public Object evaluate(EvaluationContext context) throws Exception { + return value; + } + + public Object getValue() { + return value; + } + + /** + * @see Object#toString() + */ + public String toString() { + Object value = getValue(); + if (value == null) { + return "NULL"; + } + if (value instanceof Boolean) { + return ((Boolean) value).booleanValue() ? "TRUE" : "FALSE"; + } + if (value instanceof String) { + return encodeString((String) value); + } + return value.toString(); + } + + /** + * @see Object#hashCode() + */ + public int hashCode() { + return toString().hashCode(); + } + + /** + * @see Object#equals(Object) + */ + public boolean equals(Object o) { + + if (o == null || !this.getClass().equals(o.getClass())) { + return false; + } + return toString().equals(o.toString()); + + } + + /** + * Encodes the value of string so that it looks like it would look like when + * it was provided in a selector. + * + * @return + */ + public static String encodeString(String s) { + StringBuffer b = new StringBuffer(); + b.append('\''); + for (int i = 0; i < s.length(); i++) { + char c = s.charAt(i); + if (c == '\'') { + b.append(c); + } + b.append(c); + } + b.append('\''); + return b.toString(); + } + +} http://git-wip-us.apache.org/repos/asf/incubator-rocketmq/blob/58f1574b/filter/src/main/java/org/apache/rocketmq/filter/expression/EmptyEvaluationContext.java ---------------------------------------------------------------------- diff --git a/filter/src/main/java/org/apache/rocketmq/filter/expression/EmptyEvaluationContext.java b/filter/src/main/java/org/apache/rocketmq/filter/expression/EmptyEvaluationContext.java new file mode 100644 index 0000000..52af2d0 --- /dev/null +++ b/filter/src/main/java/org/apache/rocketmq/filter/expression/EmptyEvaluationContext.java @@ -0,0 +1,35 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.rocketmq.filter.expression; + +import java.util.Map; + +/** + * Empty context. + */ +public class EmptyEvaluationContext implements EvaluationContext { + @Override + public Object get(String name) { + return null; + } + + @Override + public Map<String, Object> keyValues() { + return null; + } +} http://git-wip-us.apache.org/repos/asf/incubator-rocketmq/blob/58f1574b/filter/src/main/java/org/apache/rocketmq/filter/expression/EvaluationContext.java ---------------------------------------------------------------------- diff --git a/filter/src/main/java/org/apache/rocketmq/filter/expression/EvaluationContext.java b/filter/src/main/java/org/apache/rocketmq/filter/expression/EvaluationContext.java new file mode 100644 index 0000000..094ef53 --- /dev/null +++ b/filter/src/main/java/org/apache/rocketmq/filter/expression/EvaluationContext.java @@ -0,0 +1,43 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.rocketmq.filter.expression; + +import java.util.Map; + +/** + * Context of evaluate expression. + * + * Compare to org.apache.activemq.filter.MessageEvaluationContext, this is just an interface. + */ +public interface EvaluationContext { + + /** + * Get value by name from context + * + * @param name + * @return + */ + Object get(String name); + + /** + * Context variables. + * + * @return + */ + Map<String, Object> keyValues(); +}