http://git-wip-us.apache.org/repos/asf/incubator-rocketmq/blob/58f1574b/filter/src/test/java/org/apache/rocketmq/filter/BitsArrayTest.java ---------------------------------------------------------------------- diff --git a/filter/src/test/java/org/apache/rocketmq/filter/BitsArrayTest.java b/filter/src/test/java/org/apache/rocketmq/filter/BitsArrayTest.java new file mode 100644 index 0000000..ef81b29 --- /dev/null +++ b/filter/src/test/java/org/apache/rocketmq/filter/BitsArrayTest.java @@ -0,0 +1,123 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.rocketmq.filter; + +import org.apache.rocketmq.filter.util.BitsArray; +import org.junit.Test; + +import java.util.Random; + +import static org.assertj.core.api.Assertions.assertThat; + +public class BitsArrayTest { + + BitsArray gen(int bitCount) { + BitsArray bitsArray = BitsArray.create(bitCount); + + for (int i = 0; i < bitCount / Byte.SIZE; i++) { + bitsArray.setByte(i, (byte) (new Random(System.currentTimeMillis())).nextInt(0xff)); + try { + Thread.sleep(2); + } catch (InterruptedException e) { + } + } + + return bitsArray; + } + + int bitLength = Byte.SIZE; + + @Test + public void testConstructor() { + BitsArray bitsArray = BitsArray.create(8); + + assertThat(bitsArray.byteLength() == 1 && bitsArray.bitLength() == 8).isTrue(); + + bitsArray = BitsArray.create(9); + + assertThat(bitsArray.byteLength() == 2 && bitsArray.bitLength() == 9).isTrue(); + + bitsArray = BitsArray.create(7); + + assertThat(bitsArray.byteLength() == 1 && bitsArray.bitLength() == 7).isTrue(); + } + + @Test + public void testSet() { + BitsArray bitsArray = gen(bitLength); + BitsArray backUp = bitsArray.clone(); + + boolean val = bitsArray.getBit(2); + + bitsArray.setBit(2, !val); + + bitsArray.xor(backUp); + + assertThat(bitsArray.getBit(2)).isTrue(); + } + + @Test + public void testAndOr() { + BitsArray bitsArray = gen(bitLength); + + boolean val = bitsArray.getBit(2); + + if (val) { + bitsArray.and(2, false); + assertThat(!bitsArray.getBit(2)).isTrue(); + } else { + bitsArray.or(2, true); + assertThat(bitsArray.getBit(2)).isTrue(); + } + } + + @Test + public void testXor() { + BitsArray bitsArray = gen(bitLength); + + boolean val = bitsArray.getBit(2); + + bitsArray.xor(2, !val); + + assertThat(bitsArray.getBit(2)).isTrue(); + } + + @Test + public void testNot() { + BitsArray bitsArray = gen(bitLength); + BitsArray backUp = bitsArray.clone(); + + bitsArray.not(2); + + bitsArray.xor(backUp); + + assertThat(bitsArray.getBit(2)).isTrue(); + } + + @Test + public void testOr() { + BitsArray b1 = BitsArray.create(new byte[]{(byte) 0xff, 0x00}); + BitsArray b2 = BitsArray.create(new byte[]{0x00, (byte) 0xff}); + + b1.or(b2); + + for (int i = 0; i < b1.bitLength(); i++) { + assertThat(b1.getBit(i)).isTrue(); + } + } +}
http://git-wip-us.apache.org/repos/asf/incubator-rocketmq/blob/58f1574b/filter/src/test/java/org/apache/rocketmq/filter/BloomFilterTest.java ---------------------------------------------------------------------- diff --git a/filter/src/test/java/org/apache/rocketmq/filter/BloomFilterTest.java b/filter/src/test/java/org/apache/rocketmq/filter/BloomFilterTest.java new file mode 100644 index 0000000..c6097ee --- /dev/null +++ b/filter/src/test/java/org/apache/rocketmq/filter/BloomFilterTest.java @@ -0,0 +1,172 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.rocketmq.filter; + +import org.apache.rocketmq.filter.util.BitsArray; +import org.apache.rocketmq.filter.util.BloomFilter; +import org.apache.rocketmq.filter.util.BloomFilterData; +import org.junit.Test; + +import java.util.Random; + +import static org.assertj.core.api.Assertions.assertThat; + +public class BloomFilterTest { + + @Test + public void testEquals() { + BloomFilter a = BloomFilter.createByFn(10, 20); + + BloomFilter b = BloomFilter.createByFn(10, 20); + + BloomFilter c = BloomFilter.createByFn(12, 20); + + BloomFilter d = BloomFilter.createByFn(10, 30); + + assertThat(a).isEqualTo(b); + assertThat(a).isNotEqualTo(c); + assertThat(a).isNotEqualTo(d); + assertThat(d).isNotEqualTo(c); + + assertThat(a.hashCode()).isEqualTo(b.hashCode()); + assertThat(a.hashCode()).isNotEqualTo(c.hashCode()); + assertThat(a.hashCode()).isNotEqualTo(d.hashCode()); + assertThat(c.hashCode()).isNotEqualTo(d.hashCode()); + } + + @Test + public void testHashTo() { + String cid = "CID_abc_efg"; + + BloomFilter bloomFilter = BloomFilter.createByFn(10, 20); + + BitsArray bits = BitsArray.create(bloomFilter.getM()); + + int[] bitPos = bloomFilter.calcBitPositions(cid); + + bloomFilter.hashTo(cid, bits); + + for (int bit : bitPos) { + assertThat(bits.getBit(bit)).isTrue(); + } + } + + @Test + public void testCalcBitPositions() { + String cid = "CID_abc_efg"; + + BloomFilter bloomFilter = BloomFilter.createByFn(10, 20); + + int[] bitPos = bloomFilter.calcBitPositions(cid); + + assertThat(bitPos).isNotNull(); + assertThat(bitPos.length).isEqualTo(bloomFilter.getK()); + + int[] bitPos2 = bloomFilter.calcBitPositions(cid); + + assertThat(bitPos2).isNotNull(); + assertThat(bitPos2.length).isEqualTo(bloomFilter.getK()); + + assertThat(bitPos).isEqualTo(bitPos2); + } + + @Test + public void testIsHit() { + String cid = "CID_abc_efg"; + String cid2 = "CID_abc_123"; + + BloomFilter bloomFilter = BloomFilter.createByFn(10, 20); + + BitsArray bits = BitsArray.create(bloomFilter.getM()); + + bloomFilter.hashTo(cid, bits); + + assertThat(bloomFilter.isHit(cid, bits)).isTrue(); + assertThat(!bloomFilter.isHit(cid2, bits)).isTrue(); + + bloomFilter.hashTo(cid2, bits); + + assertThat(bloomFilter.isHit(cid, bits)).isTrue(); + assertThat(bloomFilter.isHit(cid2, bits)).isTrue(); + } + + @Test + public void testBloomFilterData() { + BloomFilterData bloomFilterData = new BloomFilterData(new int[]{1, 2, 3}, 128); + BloomFilterData bloomFilterData1 = new BloomFilterData(new int[]{1, 2, 3}, 128); + BloomFilterData bloomFilterData2 = new BloomFilterData(new int[]{1, 2, 3}, 129); + + assertThat(bloomFilterData).isEqualTo(bloomFilterData1); + assertThat(bloomFilterData2).isNotEqualTo(bloomFilterData); + assertThat(bloomFilterData2).isNotEqualTo(bloomFilterData1); + + assertThat(bloomFilterData.hashCode()).isEqualTo(bloomFilterData1.hashCode()); + assertThat(bloomFilterData2.hashCode()).isNotEqualTo(bloomFilterData.hashCode()); + assertThat(bloomFilterData2.hashCode()).isNotEqualTo(bloomFilterData1.hashCode()); + + assertThat(bloomFilterData.getBitPos()).isEqualTo(bloomFilterData2.getBitPos()); + assertThat(bloomFilterData.getBitNum()).isEqualTo(bloomFilterData1.getBitNum()); + assertThat(bloomFilterData.getBitNum()).isNotEqualTo(bloomFilterData2.getBitNum()); + + bloomFilterData2.setBitNum(128); + + assertThat(bloomFilterData).isEqualTo(bloomFilterData2); + + bloomFilterData2.setBitPos(new int[]{1, 2, 3, 4}); + + assertThat(bloomFilterData).isNotEqualTo(bloomFilterData2); + + BloomFilterData nullData = new BloomFilterData(); + + assertThat(nullData.getBitNum()).isEqualTo(0); + assertThat(nullData.getBitPos()).isNull(); + + BloomFilter bloomFilter = BloomFilter.createByFn(1, 300); + + assertThat(bloomFilter).isNotNull(); + assertThat(bloomFilter.isValid(bloomFilterData)).isFalse(); + } + + @Test + public void testCheckFalseHit() { + BloomFilter bloomFilter = BloomFilter.createByFn(1, 300); + BitsArray bits = BitsArray.create(bloomFilter.getM()); + int falseHit = 0; + for (int i = 0; i < bloomFilter.getN(); i++) { + String str = randomString((new Random(System.nanoTime())).nextInt(127) + 10); + int[] bitPos = bloomFilter.calcBitPositions(str); + + if (bloomFilter.checkFalseHit(bitPos, bits)) { + falseHit++; + } + + bloomFilter.hashTo(bitPos, bits); + } + + assertThat(falseHit).isLessThanOrEqualTo(bloomFilter.getF() * bloomFilter.getN() / 100); + } + + private String randomString(int length) { + StringBuilder stringBuilder = new StringBuilder(length); + for (int i = 0; i < length; i++) { + stringBuilder.append((char) ((new Random(System.nanoTime())).nextInt(123 - 97) + 97)); + } + + return stringBuilder.toString(); + } +} http://git-wip-us.apache.org/repos/asf/incubator-rocketmq/blob/58f1574b/filter/src/test/java/org/apache/rocketmq/filter/ExpressionTest.java ---------------------------------------------------------------------- diff --git a/filter/src/test/java/org/apache/rocketmq/filter/ExpressionTest.java b/filter/src/test/java/org/apache/rocketmq/filter/ExpressionTest.java new file mode 100644 index 0000000..0ee81c9 --- /dev/null +++ b/filter/src/test/java/org/apache/rocketmq/filter/ExpressionTest.java @@ -0,0 +1,594 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.rocketmq.filter; + +import org.apache.rocketmq.filter.expression.ComparisonExpression; +import org.apache.rocketmq.filter.expression.ConstantExpression; +import org.apache.rocketmq.filter.expression.EvaluationContext; +import org.apache.rocketmq.filter.expression.Expression; +import org.apache.rocketmq.filter.expression.MQFilterException; +import org.apache.rocketmq.filter.expression.PropertyExpression; +import org.apache.rocketmq.filter.parser.SelectorParser; +import org.junit.Test; + +import java.util.HashMap; +import java.util.Map; + +import static org.assertj.core.api.Assertions.assertThat; + +public class ExpressionTest { + + private static String andExpression = "a=3 and b<>4 And c>5 AND d<=4"; + private static String orExpression = "a=3 or b<>4 Or c>5 OR d<=4"; + private static String inExpression = "a in ('3', '4', '5')"; + private static String notInExpression = "a not in ('3', '4', '5')"; + private static String betweenExpression = "a between 2 and 10"; + private static String notBetweenExpression = "a not between 2 and 10"; + private static String isNullExpression = "a is null"; + private static String isNotNullExpression = "a is not null"; + private static String equalExpression = "a is not null and a='hello'"; + private static String booleanExpression = "a=TRUE OR b=FALSE"; + private static String nullOrExpression = "a is null OR a='hello'"; + private static String stringHasString = "TAGS is not null and TAGS='''''tag'''''"; + + @Test + public void testEvaluate_stringHasString() { + Expression expr = genExp(stringHasString); + + EvaluationContext context = genContext( + KeyValue.c("TAGS", "''tag''") + ); + + eval(expr, context, Boolean.TRUE); + } + + @Test + public void testEvaluate_now() { + EvaluationContext context = genContext( + KeyValue.c("a", System.currentTimeMillis()) + ); + + Expression nowExpression = ConstantExpression.createNow(); + Expression propertyExpression = new PropertyExpression("a"); + + Expression expression = ComparisonExpression.createLessThanEqual(propertyExpression, + nowExpression); + + eval(expression, context, Boolean.TRUE); + } + + @Test + public void testEvaluate_stringCompare() { + Expression expression = genExp("a between up and low"); + + EvaluationContext context = genContext( + KeyValue.c("a", "3.14") + ); + + eval(expression, context, Boolean.FALSE); + + { + context = genContext( + KeyValue.c("a", "3.14"), + KeyValue.c("up", "up"), + KeyValue.c("low", "low") + ); + + eval(expression, context, Boolean.FALSE); + } + + { + expression = genExp("key is not null and key between 0 and 100"); + + context = genContext( + KeyValue.c("key", "con") + ); + + eval(expression, context, Boolean.FALSE); + } + + { + expression = genExp("a between 0 and 100"); + + context = genContext( + KeyValue.c("a", "abc") + ); + + eval(expression, context, Boolean.FALSE); + } + + { + expression = genExp("a=b"); + + context = genContext( + KeyValue.c("a", "3.14"), + KeyValue.c("b", "3.14") + ); + + eval(expression, context, Boolean.TRUE); + } + + { + expression = genExp("a<>b"); + + context = genContext( + KeyValue.c("a", "3.14"), + KeyValue.c("b", "3.14") + ); + + eval(expression, context, Boolean.FALSE); + } + + { + expression = genExp("a<>b"); + + context = genContext( + KeyValue.c("a", "3.14"), + KeyValue.c("b", "3.141") + ); + + eval(expression, context, Boolean.TRUE); + } + } + + @Test + public void testEvaluate_exponent() { + Expression expression = genExp("a > 3.1E10"); + + EvaluationContext context = genContext( + KeyValue.c("a", String.valueOf(3.1415 * Math.pow(10, 10))) + ); + + eval(expression, context, Boolean.TRUE); + } + + @Test + public void testEvaluate_floatNumber() { + Expression expression = genExp("a > 3.14"); + + EvaluationContext context = genContext( + KeyValue.c("a", String.valueOf(3.1415)) + ); + + eval(expression, context, Boolean.TRUE); + } + + @Test + public void testEvaluate_twoVariable() { + Expression expression = genExp("a > b"); + + EvaluationContext context = genContext( + KeyValue.c("a", String.valueOf(10)), + KeyValue.c("b", String.valueOf(20)) + ); + + eval(expression, context, Boolean.FALSE); + + context = genContext( + KeyValue.c("b", String.valueOf(10)), + KeyValue.c("a", String.valueOf(20)) + ); + + eval(expression, context, Boolean.TRUE); + } + + @Test + public void testEvaluate_nullOr() { + Expression expression = genExp(nullOrExpression); + + EvaluationContext context = genContext( + ); + + eval(expression, context, Boolean.TRUE); + + context = genContext( + KeyValue.c("a", "hello") + ); + + eval(expression, context, Boolean.TRUE); + + context = genContext( + KeyValue.c("a", "abc") + ); + + eval(expression, context, Boolean.FALSE); + } + + @Test + public void testEvaluate_boolean() { + Expression expression = genExp(booleanExpression); + + EvaluationContext context = genContext( + KeyValue.c("a", "true"), + KeyValue.c("b", "false") + ); + + eval(expression, context, Boolean.TRUE); + + context = genContext( + KeyValue.c("a", "false"), + KeyValue.c("b", "true") + ); + + eval(expression, context, Boolean.FALSE); + } + + @Test + public void testEvaluate_equal() { + Expression expression = genExp(equalExpression); + + EvaluationContext context = genContext( + KeyValue.c("a", "hello") + ); + + eval(expression, context, Boolean.TRUE); + + context = genContext( + ); + + eval(expression, context, Boolean.FALSE); + } + + @Test + public void testEvaluate_andTrue() { + Expression expression = genExp(andExpression); + + EvaluationContext context = genContext( + KeyValue.c("a", 3), + KeyValue.c("b", 5), + KeyValue.c("c", 6), + KeyValue.c("d", 1) + ); + + for (int i = 0; i < 500; i++) { + eval(expression, context, Boolean.TRUE); + } + + long start = System.currentTimeMillis(); + for (int j = 0; j < 100; j++) { + for (int i = 0; i < 1000; i++) { + eval(expression, context, Boolean.TRUE); + } + } + + // use string + context = genContext( + KeyValue.c("a", "3"), + KeyValue.c("b", "5"), + KeyValue.c("c", "6"), + KeyValue.c("d", "1") + ); + + eval(expression, context, Boolean.TRUE); + } + + @Test + public void testEvaluate_andFalse() { + Expression expression = genExp(andExpression); + + EvaluationContext context = genContext( + KeyValue.c("a", 4), + KeyValue.c("b", 5), + KeyValue.c("c", 6), + KeyValue.c("d", 1) + ); + + eval(expression, context, Boolean.FALSE); + + // use string + context = genContext( + KeyValue.c("a", "4"), + KeyValue.c("b", "5"), + KeyValue.c("c", "6"), + KeyValue.c("d", "1") + ); + + eval(expression, context, Boolean.FALSE); + } + + @Test + public void testEvaluate_orTrue() { + Expression expression = genExp(orExpression); + + // first + EvaluationContext context = genContext( + KeyValue.c("a", 3) + ); + eval(expression, context, Boolean.TRUE); + + // second + context = genContext( + KeyValue.c("a", 4), + KeyValue.c("b", 5) + ); + eval(expression, context, Boolean.TRUE); + + // third + context = genContext( + KeyValue.c("a", 4), + KeyValue.c("b", 4), + KeyValue.c("c", 6) + ); + eval(expression, context, Boolean.TRUE); + + // forth + context = genContext( + KeyValue.c("a", 4), + KeyValue.c("b", 4), + KeyValue.c("c", 3), + KeyValue.c("d", 2) + ); + eval(expression, context, Boolean.TRUE); + } + + @Test + public void testEvaluate_orFalse() { + Expression expression = genExp(orExpression); + // forth + EvaluationContext context = genContext( + KeyValue.c("a", 4), + KeyValue.c("b", 4), + KeyValue.c("c", 3), + KeyValue.c("d", 10) + ); + eval(expression, context, Boolean.FALSE); + } + + @Test + public void testEvaluate_inTrue() { + Expression expression = genExp(inExpression); + + EvaluationContext context = genContext( + KeyValue.c("a", "3") + ); + eval(expression, context, Boolean.TRUE); + + context = genContext( + KeyValue.c("a", "4") + ); + eval(expression, context, Boolean.TRUE); + + context = genContext( + KeyValue.c("a", "5") + ); + eval(expression, context, Boolean.TRUE); + } + + @Test + public void testEvaluate_inFalse() { + Expression expression = genExp(inExpression); + + EvaluationContext context = genContext( + KeyValue.c("a", "8") + ); + eval(expression, context, Boolean.FALSE); + } + + @Test + public void testEvaluate_notInTrue() { + Expression expression = genExp(notInExpression); + + EvaluationContext context = genContext( + KeyValue.c("a", "8") + ); + eval(expression, context, Boolean.TRUE); + } + + @Test + public void testEvaluate_notInFalse() { + Expression expression = genExp(notInExpression); + + EvaluationContext context = genContext( + KeyValue.c("a", "3") + ); + eval(expression, context, Boolean.FALSE); + + context = genContext( + KeyValue.c("a", "4") + ); + eval(expression, context, Boolean.FALSE); + + context = genContext( + KeyValue.c("a", "5") + ); + eval(expression, context, Boolean.FALSE); + } + + @Test + public void testEvaluate_betweenTrue() { + Expression expression = genExp(betweenExpression); + + EvaluationContext context = genContext( + KeyValue.c("a", "2") + ); + eval(expression, context, Boolean.TRUE); + + context = genContext( + KeyValue.c("a", "10") + ); + eval(expression, context, Boolean.TRUE); + + context = genContext( + KeyValue.c("a", "3") + ); + eval(expression, context, Boolean.TRUE); + } + + @Test + public void testEvaluate_betweenFalse() { + Expression expression = genExp(betweenExpression); + + EvaluationContext context = genContext( + KeyValue.c("a", "1") + ); + eval(expression, context, Boolean.FALSE); + + context = genContext( + KeyValue.c("a", "11") + ); + eval(expression, context, Boolean.FALSE); + } + + @Test + public void testEvaluate_notBetweenTrue() { + Expression expression = genExp(notBetweenExpression); + + EvaluationContext context = genContext( + KeyValue.c("a", "1") + ); + eval(expression, context, Boolean.TRUE); + + context = genContext( + KeyValue.c("a", "11") + ); + eval(expression, context, Boolean.TRUE); + } + + @Test + public void testEvaluate_notBetweenFalse() { + Expression expression = genExp(notBetweenExpression); + + EvaluationContext context = genContext( + KeyValue.c("a", "2") + ); + eval(expression, context, Boolean.FALSE); + + context = genContext( + KeyValue.c("a", "10") + ); + eval(expression, context, Boolean.FALSE); + + context = genContext( + KeyValue.c("a", "3") + ); + eval(expression, context, Boolean.FALSE); + } + + @Test + public void testEvaluate_isNullTrue() { + Expression expression = genExp(isNullExpression); + + EvaluationContext context = genContext( + KeyValue.c("abc", "2") + ); + eval(expression, context, Boolean.TRUE); + } + + @Test + public void testEvaluate_isNullFalse() { + Expression expression = genExp(isNullExpression); + + EvaluationContext context = genContext( + KeyValue.c("a", "2") + ); + eval(expression, context, Boolean.FALSE); + } + + @Test + public void testEvaluate_isNotNullTrue() { + Expression expression = genExp(isNotNullExpression); + + EvaluationContext context = genContext( + KeyValue.c("a", "2") + ); + eval(expression, context, Boolean.TRUE); + } + + @Test + public void testEvaluate_isNotNullFalse() { + Expression expression = genExp(isNotNullExpression); + + EvaluationContext context = genContext( + KeyValue.c("abc", "2") + ); + eval(expression, context, Boolean.FALSE); + } + + protected void eval(Expression expression, EvaluationContext context, Boolean result) { + Object ret = null; + try { + ret = expression.evaluate(context); + } catch (Throwable e) { + e.printStackTrace(); + } + + if (ret == null || !(ret instanceof Boolean)) { + assertThat(result).isFalse(); + } else { + assertThat(result).isEqualTo(ret); + } + } + + protected EvaluationContext genContext(KeyValue... keyValues) { + if (keyValues == null || keyValues.length < 1) { + return new PropertyContext(); + } + + PropertyContext context = new PropertyContext(); + for (KeyValue keyValue : keyValues) { + context.properties.put(keyValue.key, keyValue.value); + } + + return context; + } + + protected Expression genExp(String exp) { + Expression expression = null; + + try { + expression = SelectorParser.parse(exp); + + assertThat(expression).isNotNull(); + } catch (MQFilterException e) { + e.printStackTrace(); + assertThat(Boolean.FALSE).isTrue(); + } + + return expression; + } + + static class KeyValue { + public static KeyValue c(String key, Object value) { + return new KeyValue(key, value); + } + + public KeyValue(String key, Object value) { + this.key = key; + this.value = value; + } + + public String key; + public Object value; + } + + class PropertyContext implements EvaluationContext { + + public Map<String, Object> properties = new HashMap<String, Object>(8); + + @Override + public Object get(final String name) { + return properties.get(name); + } + + @Override + public Map<String, Object> keyValues() { + return properties; + } + + } +} http://git-wip-us.apache.org/repos/asf/incubator-rocketmq/blob/58f1574b/filter/src/test/java/org/apache/rocketmq/filter/FilterSpiTest.java ---------------------------------------------------------------------- diff --git a/filter/src/test/java/org/apache/rocketmq/filter/FilterSpiTest.java b/filter/src/test/java/org/apache/rocketmq/filter/FilterSpiTest.java new file mode 100644 index 0000000..22eeb86 --- /dev/null +++ b/filter/src/test/java/org/apache/rocketmq/filter/FilterSpiTest.java @@ -0,0 +1,84 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.rocketmq.filter; + +import org.apache.rocketmq.common.filter.ExpressionType; +import org.apache.rocketmq.filter.expression.EmptyEvaluationContext; +import org.apache.rocketmq.filter.expression.EvaluationContext; +import org.apache.rocketmq.filter.expression.Expression; +import org.apache.rocketmq.filter.expression.MQFilterException; +import org.junit.Test; + +import static org.assertj.core.api.Assertions.assertThat; + +public class FilterSpiTest { + + static class NothingExpression implements Expression { + + @Override + public Object evaluate(final EvaluationContext context) throws Exception { + return Boolean.TRUE; + } + } + + static class NothingFilter implements FilterSpi { + @Override + public Expression compile(final String expr) throws MQFilterException { + return new NothingExpression(); + } + + @Override + public String ofType() { + return "Nothing"; + } + } + + + @Test + public void testRegister() { + FilterFactory.INSTANCE.register(new NothingFilter()); + + Expression expr = null; + try { + expr = FilterFactory.INSTANCE.get("Nothing").compile("abc"); + } catch (MQFilterException e) { + e.printStackTrace(); + assertThat(Boolean.FALSE).isTrue(); + } + + assertThat(expr).isNotNull(); + + try { + assertThat((Boolean) expr.evaluate(new EmptyEvaluationContext())).isTrue(); + } catch (Exception e) { + e.printStackTrace(); + assertThat(Boolean.FALSE).isTrue(); + } + } + + @Test + public void testGet() { + try { + assertThat((Boolean) FilterFactory.INSTANCE.get(ExpressionType.SQL92).compile("a is not null and a > 0") + .evaluate(new EmptyEvaluationContext())).isFalse(); + } catch (Exception e) { + e.printStackTrace(); + assertThat(Boolean.FALSE).isTrue(); + } + } +} http://git-wip-us.apache.org/repos/asf/incubator-rocketmq/blob/58f1574b/filter/src/test/java/org/apache/rocketmq/filter/ParserTest.java ---------------------------------------------------------------------- diff --git a/filter/src/test/java/org/apache/rocketmq/filter/ParserTest.java b/filter/src/test/java/org/apache/rocketmq/filter/ParserTest.java new file mode 100644 index 0000000..36ef271 --- /dev/null +++ b/filter/src/test/java/org/apache/rocketmq/filter/ParserTest.java @@ -0,0 +1,129 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.rocketmq.filter; + +import org.apache.rocketmq.filter.expression.Expression; +import org.apache.rocketmq.filter.expression.MQFilterException; +import org.apache.rocketmq.filter.parser.SelectorParser; +import org.junit.Test; + +import java.util.Arrays; + +import static org.assertj.core.api.Assertions.assertThat; + +public class ParserTest { + + private static String andExpression = "a=3 and b<>4 And c>5 AND d<=4"; + private static String andExpressionHasBlank = "a=3 and b<>4 And c>5 AND d<=4"; + private static String orExpression = "a=3 or b<>4 Or c>5 OR d<=4"; + private static String inExpression = "a in ('3', '4', '5')"; + private static String notInExpression = "(a not in ('6', '4', '5')) or (b in ('3', '4', '5'))"; + private static String betweenExpression = "(a between 2 and 10) AND (b not between 6 and 9)"; + private static String equalNullExpression = "a is null"; + private static String notEqualNullExpression = "a is not null"; + private static String nowExpression = "a <= now"; + + private static String invalidExpression = "a and between 2 and 10"; + private static String illegalBetween = " a between 10 and 0"; + + @Test + public void testParse_valid() { + for (String expr : Arrays.asList( + andExpression, orExpression, inExpression, notInExpression, betweenExpression, + equalNullExpression, notEqualNullExpression, nowExpression + )) { + + try { + Expression expression = SelectorParser.parse(expr); + assertThat(expression).isNotNull(); + } catch (MQFilterException e) { + e.printStackTrace(); + assertThat(Boolean.FALSE).isTrue(); + } + + } + } + + @Test + public void testParse_invalid() { + try { + SelectorParser.parse(invalidExpression); + + assertThat(Boolean.TRUE).isFalse(); + } catch (MQFilterException e) { + } + } + + @Test + public void testParse_decimalOverFlow() { + try { + String str = "100000000000000000000000"; + + SelectorParser.parse("a > " + str); + + assertThat(Boolean.TRUE).isFalse(); + } catch (Exception e) { + } + } + + @Test + public void testParse_floatOverFlow() { + try { + String str = "1"; + for (int i = 0; i < 2048; i++) { + str += "111111111111111111111111111111111111111111111111111"; + } + str += "."; + for (int i = 0; i < 2048; i++) { + str += "111111111111111111111111111111111111111111111111111"; + } + + SelectorParser.parse("a > " + str); + + assertThat(Boolean.TRUE).isFalse(); + } catch (Exception e) { + } + } + + @Test + public void testParse_illegalBetween() { + try { + SelectorParser.parse(illegalBetween); + + assertThat(Boolean.TRUE).isFalse(); + } catch (Exception e) { + } + } + + @Test + public void testEquals() { + try { + Expression expr1 = SelectorParser.parse(andExpression); + + Expression expr2 = SelectorParser.parse(andExpressionHasBlank); + + Expression expr3 = SelectorParser.parse(orExpression); + + assertThat(expr1).isEqualTo(expr2); + assertThat(expr1).isNotEqualTo(expr3); + } catch (MQFilterException e) { + e.printStackTrace(); + assertThat(Boolean.TRUE).isFalse(); + } + } +} http://git-wip-us.apache.org/repos/asf/incubator-rocketmq/blob/58f1574b/pom.xml ---------------------------------------------------------------------- diff --git a/pom.xml b/pom.xml index 47df84d..feb8b14 100644 --- a/pom.xml +++ b/pom.xml @@ -178,6 +178,7 @@ <module>example</module> <module>filtersrv</module> <module>srvutil</module> + <module>filter</module> <module>test</module> <module>distribution</module> </modules> @@ -554,6 +555,11 @@ <version>${project.version}</version> </dependency> <dependency> + <groupId>org.apache.rocketmq</groupId> + <artifactId>rocketmq-filter</artifactId> + <version>${project.version}</version> + </dependency> + <dependency> <groupId>${project.groupId}</groupId> <artifactId>rocketmq-example</artifactId> <version>${project.version}</version> @@ -603,6 +609,11 @@ <artifactId>commons-lang3</artifactId> <version>3.4</version> </dependency> + <dependency> + <groupId>com.google.guava</groupId> + <artifactId>guava</artifactId> + <version>19.0</version> + </dependency> </dependencies> </dependencyManagement> </project> http://git-wip-us.apache.org/repos/asf/incubator-rocketmq/blob/58f1574b/srvutil/pom.xml ---------------------------------------------------------------------- diff --git a/srvutil/pom.xml b/srvutil/pom.xml index 3269903..6dc0377 100644 --- a/srvutil/pom.xml +++ b/srvutil/pom.xml @@ -41,5 +41,9 @@ <groupId>commons-cli</groupId> <artifactId>commons-cli</artifactId> </dependency> + <dependency> + <groupId>com.google.guava</groupId> + <artifactId>guava</artifactId> + </dependency> </dependencies> </project> http://git-wip-us.apache.org/repos/asf/incubator-rocketmq/blob/58f1574b/store/src/main/java/org/apache/rocketmq/store/CommitLog.java ---------------------------------------------------------------------- diff --git a/store/src/main/java/org/apache/rocketmq/store/CommitLog.java b/store/src/main/java/org/apache/rocketmq/store/CommitLog.java index 5be8258..7841feb 100644 --- a/store/src/main/java/org/apache/rocketmq/store/CommitLog.java +++ b/store/src/main/java/org/apache/rocketmq/store/CommitLog.java @@ -314,10 +314,11 @@ public class CommitLog { // 17 properties short propertiesLength = byteBuffer.getShort(); + Map<String, String> propertiesMap = null; if (propertiesLength > 0) { byteBuffer.get(bytesContent, 0, propertiesLength); String properties = new String(bytesContent, 0, propertiesLength, MessageDecoder.CHARSET_UTF8); - Map<String, String> propertiesMap = MessageDecoder.string2messageProperties(properties); + propertiesMap = MessageDecoder.string2messageProperties(properties); keys = propertiesMap.get(MessageConst.PROPERTY_KEYS); @@ -369,8 +370,9 @@ public class CommitLog { queueOffset, // 7 keys, // 8 uniqKey, //9 - sysFlag, // 9 - preparedTransactionOffset// 10 + sysFlag, // 10 + preparedTransactionOffset, // 11 + propertiesMap // 12 ); } catch (Exception e) { } http://git-wip-us.apache.org/repos/asf/incubator-rocketmq/blob/58f1574b/store/src/main/java/org/apache/rocketmq/store/CommitLogDispatcher.java ---------------------------------------------------------------------- diff --git a/store/src/main/java/org/apache/rocketmq/store/CommitLogDispatcher.java b/store/src/main/java/org/apache/rocketmq/store/CommitLogDispatcher.java new file mode 100644 index 0000000..e1564a9 --- /dev/null +++ b/store/src/main/java/org/apache/rocketmq/store/CommitLogDispatcher.java @@ -0,0 +1,26 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.rocketmq.store; + +/** + * Dispatcher of commit log. + */ +public interface CommitLogDispatcher { + + void dispatch(final DispatchRequest request); +} http://git-wip-us.apache.org/repos/asf/incubator-rocketmq/blob/58f1574b/store/src/main/java/org/apache/rocketmq/store/ConsumeQueue.java ---------------------------------------------------------------------- diff --git a/store/src/main/java/org/apache/rocketmq/store/ConsumeQueue.java b/store/src/main/java/org/apache/rocketmq/store/ConsumeQueue.java index 919c637..d03ff0f 100644 --- a/store/src/main/java/org/apache/rocketmq/store/ConsumeQueue.java +++ b/store/src/main/java/org/apache/rocketmq/store/ConsumeQueue.java @@ -20,6 +20,7 @@ import java.io.File; import java.nio.ByteBuffer; import java.util.List; import org.apache.rocketmq.common.constant.LoggerName; +import org.apache.rocketmq.store.config.StorePathConfigHelper; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -40,6 +41,7 @@ public class ConsumeQueue { private final int mappedFileSize; private long maxPhysicOffset = -1; private volatile long minLogicOffset = 0; + private ConsumeQueueExt consumeQueueExt = null; public ConsumeQueue( final String topic, @@ -61,11 +63,24 @@ public class ConsumeQueue { this.mappedFileQueue = new MappedFileQueue(queueDir, mappedFileSize, null); this.byteBufferIndex = ByteBuffer.allocate(CQ_STORE_UNIT_SIZE); + + if (defaultMessageStore.getMessageStoreConfig().isEnableConsumeQueueExt()) { + this.consumeQueueExt = new ConsumeQueueExt( + topic, + queueId, + StorePathConfigHelper.getStorePathConsumeQueueExt(defaultMessageStore.getMessageStoreConfig().getStorePathRootDir()), + defaultMessageStore.getMessageStoreConfig().getMappedFileSizeConsumeQueueExt(), + defaultMessageStore.getMessageStoreConfig().getBitMapLengthConsumeQueueExt() + ); + } } public boolean load() { boolean result = this.mappedFileQueue.load(); log.info("load consume queue " + this.topic + "-" + this.queueId + " " + (result ? "OK" : "Failed")); + if (isExtReadEnable()) { + result &= this.consumeQueueExt.load(); + } return result; } @@ -82,6 +97,7 @@ public class ConsumeQueue { ByteBuffer byteBuffer = mappedFile.sliceByteBuffer(); long processOffset = mappedFile.getFileFromOffset(); long mappedFileOffset = 0; + long maxExtAddr = 1; while (true) { for (int i = 0; i < mappedFileSizeLogics; i += CQ_STORE_UNIT_SIZE) { long offset = byteBuffer.getLong(); @@ -91,6 +107,9 @@ public class ConsumeQueue { if (offset >= 0 && size > 0) { mappedFileOffset = i + CQ_STORE_UNIT_SIZE; this.maxPhysicOffset = offset; + if (isExtAddr(tagsCode)) { + maxExtAddr = tagsCode; + } } else { log.info("recover current consume queue file over, " + mappedFile.getFileName() + " " + offset + " " + size + " " + tagsCode); @@ -123,6 +142,12 @@ public class ConsumeQueue { this.mappedFileQueue.setFlushedWhere(processOffset); this.mappedFileQueue.setCommittedWhere(processOffset); this.mappedFileQueue.truncateDirtyFiles(processOffset); + + if (isExtReadEnable()) { + this.consumeQueueExt.recover(); + log.info("Truncate consume queue extend file by max {}", maxExtAddr); + this.consumeQueueExt.truncateByMaxAddress(maxExtAddr); + } } } @@ -200,7 +225,7 @@ public class ConsumeQueue { int logicFileSize = this.mappedFileSize; this.maxPhysicOffset = phyOffet - 1; - + long maxExtAddr = 1; while (true) { MappedFile mappedFile = this.mappedFileQueue.getLastMappedFile(); if (mappedFile != null) { @@ -213,7 +238,7 @@ public class ConsumeQueue { for (int i = 0; i < logicFileSize; i += CQ_STORE_UNIT_SIZE) { long offset = byteBuffer.getLong(); int size = byteBuffer.getInt(); - byteBuffer.getLong(); + long tagsCode = byteBuffer.getLong(); if (0 == i) { if (offset >= phyOffet) { @@ -225,6 +250,10 @@ public class ConsumeQueue { mappedFile.setCommittedPosition(pos); mappedFile.setFlushedPosition(pos); this.maxPhysicOffset = offset; + // This maybe not take effect, when not every consume queue has extend file. + if (isExtAddr(tagsCode)) { + maxExtAddr = tagsCode; + } } } else { @@ -239,6 +268,9 @@ public class ConsumeQueue { mappedFile.setCommittedPosition(pos); mappedFile.setFlushedPosition(pos); this.maxPhysicOffset = offset; + if (isExtAddr(tagsCode)) { + maxExtAddr = tagsCode; + } if (pos == logicFileSize) { return; @@ -252,6 +284,10 @@ public class ConsumeQueue { break; } } + + if (isExtReadEnable()) { + this.consumeQueueExt.truncateByMaxAddress(maxExtAddr); + } } public long getLastOffset() { @@ -285,7 +321,12 @@ public class ConsumeQueue { } public boolean flush(final int flushLeastPages) { - return this.mappedFileQueue.flush(flushLeastPages); + boolean result = this.mappedFileQueue.flush(flushLeastPages); + if (isExtReadEnable()) { + result = result & this.consumeQueueExt.flush(flushLeastPages); + } + + return result; } public int deleteExpiredFile(long offset) { @@ -296,6 +337,7 @@ public class ConsumeQueue { public void correctMinOffset(long phyMinOffset) { MappedFile mappedFile = this.mappedFileQueue.getFirstMappedFile(); + long minExtAddr = 1; if (mappedFile != null) { SelectMappedBufferResult result = mappedFile.selectMappedBuffer(0); if (result != null) { @@ -303,12 +345,16 @@ public class ConsumeQueue { for (int i = 0; i < result.getSize(); i += ConsumeQueue.CQ_STORE_UNIT_SIZE) { long offsetPy = result.getByteBuffer().getLong(); result.getByteBuffer().getInt(); - result.getByteBuffer().getLong(); + long tagsCode = result.getByteBuffer().getLong(); if (offsetPy >= phyMinOffset) { this.minLogicOffset = result.getMappedFile().getFileFromOffset() + i; log.info("Compute logical min offset: {}, topic: {}, queueId: {}", this.getMinOffsetInQueue(), this.topic, this.queueId); + // This maybe not take effect, when not every consume queue has extend file. + if (isExtAddr(tagsCode)) { + minExtAddr = tagsCode; + } break; } } @@ -319,24 +365,43 @@ public class ConsumeQueue { } } } + + if (isExtReadEnable()) { + this.consumeQueueExt.truncateByMinAddress(minExtAddr); + } } public long getMinOffsetInQueue() { return this.minLogicOffset / CQ_STORE_UNIT_SIZE; } - public void putMessagePositionInfoWrapper(long offset, int size, long tagsCode, long storeTimestamp, - long logicOffset) { + public void putMessagePositionInfoWrapper(DispatchRequest request) { final int maxRetries = 30; boolean canWrite = this.defaultMessageStore.getRunningFlags().isCQWriteable(); for (int i = 0; i < maxRetries && canWrite; i++) { - boolean result = this.putMessagePositionInfo(offset, size, tagsCode, logicOffset); + long tagsCode = request.getTagsCode(); + if (isExtWriteEnable()) { + ConsumeQueueExt.CqExtUnit cqExtUnit = new ConsumeQueueExt.CqExtUnit(); + cqExtUnit.setFilterBitMap(request.getBitMap()); + cqExtUnit.setMsgStoreTime(request.getStoreTimestamp()); + cqExtUnit.setTagsCode(request.getTagsCode()); + + long extAddr = this.consumeQueueExt.put(cqExtUnit); + if (isExtAddr(extAddr)) { + tagsCode = extAddr; + } else { + log.warn("Save consume queue extend fail, So just save tagsCode! {}, topic:{}, queueId:{}, offset:{}", cqExtUnit, + topic, queueId, request.getCommitLogOffset()); + } + } + boolean result = this.putMessagePositionInfo(request.getCommitLogOffset(), + request.getMsgSize(), tagsCode, request.getConsumeQueueOffset()); if (result) { - this.defaultMessageStore.getStoreCheckpoint().setLogicsMsgTimestamp(storeTimestamp); + this.defaultMessageStore.getStoreCheckpoint().setLogicsMsgTimestamp(request.getStoreTimestamp()); return; } else { // XXX: warn and notify me - log.warn("[BUG]put commit log position info to " + topic + ":" + queueId + " " + offset + log.warn("[BUG]put commit log position info to " + topic + ":" + queueId + " " + request.getCommitLogOffset() + " failed, retry " + i + " times"); try { @@ -423,6 +488,20 @@ public class ConsumeQueue { return null; } + public ConsumeQueueExt.CqExtUnit getExt(final long offset) { + if (isExtReadEnable()) { + return this.consumeQueueExt.get(offset); + } + return null; + } + + public boolean getExt(final long offset, ConsumeQueueExt.CqExtUnit cqExtUnit) { + if (isExtReadEnable()) { + return this.consumeQueueExt.get(offset, cqExtUnit); + } + return false; + } + public long getMinLogicOffset() { return minLogicOffset; } @@ -457,6 +536,9 @@ public class ConsumeQueue { this.maxPhysicOffset = -1; this.minLogicOffset = 0; this.mappedFileQueue.destroy(); + if (isExtReadEnable()) { + this.consumeQueueExt.destroy(); + } } public long getMessageTotalInQueue() { @@ -469,5 +551,27 @@ public class ConsumeQueue { public void checkSelf() { mappedFileQueue.checkSelf(); + if (isExtReadEnable()) { + this.consumeQueueExt.checkSelf(); + } + } + + protected boolean isExtReadEnable() { + return this.consumeQueueExt != null; + } + + protected boolean isExtWriteEnable() { + return this.consumeQueueExt != null + && this.defaultMessageStore.getMessageStoreConfig().isEnableConsumeQueueExt(); + } + + /** + * Check {@code tagsCode} is address of extend file or tags code. + * + * @param tagsCode + * @return + */ + public boolean isExtAddr(long tagsCode) { + return isExtReadEnable() && this.consumeQueueExt.isExtAddr(tagsCode); } } http://git-wip-us.apache.org/repos/asf/incubator-rocketmq/blob/58f1574b/store/src/main/java/org/apache/rocketmq/store/ConsumeQueueExt.java ---------------------------------------------------------------------- diff --git a/store/src/main/java/org/apache/rocketmq/store/ConsumeQueueExt.java b/store/src/main/java/org/apache/rocketmq/store/ConsumeQueueExt.java new file mode 100644 index 0000000..1a177e9 --- /dev/null +++ b/store/src/main/java/org/apache/rocketmq/store/ConsumeQueueExt.java @@ -0,0 +1,638 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.rocketmq.store; + +import org.apache.rocketmq.common.constant.LoggerName; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import java.io.File; +import java.nio.ByteBuffer; +import java.util.ArrayList; +import java.util.Arrays; +import java.util.List; + +/** + * Extend of consume queue, to store something not important, + * such as message store time, filter bit map and etc. + * <p/> + * <li>1. This class is used only by {@link ConsumeQueue}</li> + * <li>2. And is week reliable.</li> + * <li>3. Be careful, address returned is always less than 0.</li> + * <li>4. Pls keep this file small.</li> + */ +public class ConsumeQueueExt { + private static final Logger log = LoggerFactory.getLogger(LoggerName.STORE_LOGGER_NAME); + + private final MappedFileQueue mappedFileQueue; + private final String topic; + private final int queueId; + + private final String storePath; + private final int mappedFileSize; + private ByteBuffer tempContainer; + + public static final int END_BLANK_DATA_LENGTH = 4; + + /** + * Addr can not exceed this value.For compatible. + */ + public static final long MAX_ADDR = Integer.MIN_VALUE - 1L; + public static final long MAX_REAL_OFFSET = MAX_ADDR - Long.MIN_VALUE; + + /** + * Constructor. + * + * @param topic topic + * @param queueId id of queue + * @param storePath root dir of files to store. + * @param mappedFileSize file size + * @param bitMapLength bit map length. + */ + public ConsumeQueueExt(final String topic, + final int queueId, + final String storePath, + final int mappedFileSize, + final int bitMapLength) { + + this.storePath = storePath; + this.mappedFileSize = mappedFileSize; + + this.topic = topic; + this.queueId = queueId; + + String queueDir = this.storePath + + File.separator + topic + + File.separator + queueId; + + this.mappedFileQueue = new MappedFileQueue(queueDir, mappedFileSize, null); + + if (bitMapLength > 0) { + this.tempContainer = ByteBuffer.allocate( + bitMapLength / Byte.SIZE + ); + } + } + + /** + * Check whether {@code address} point to extend file. + * <p> + * Just test {@code address} is less than 0. + * </p> + * + * @param address + * @return + */ + public boolean isExtAddr(final long address) { + return address <= MAX_ADDR; + } + + /** + * Transform {@code address}(decorated by {@link #decorate}) to offset in mapped file. + * <p> + * if {@code address} is less than 0, return {@code address} - {@link java.lang.Long#MIN_VALUE}; + * else, just return {@code address} + * </p> + * + * @param address + * @return + */ + public long unDecorate(final long address) { + if (isExtAddr(address)) { + return address - Long.MIN_VALUE; + } + return address; + } + + /** + * Decorate {@code offset} from mapped file, in order to distinguish with tagsCode(saved in cq originally). + * <p> + * if {@code offset} is greater than or equal to 0, then return {@code offset} + {@link java.lang.Long#MIN_VALUE}; + * else, just return {@code offset} + * </p> + * + * @param offset + * @return ext address(value is less than 0) + */ + public long decorate(final long offset) { + if (!isExtAddr(offset)) { + return offset + Long.MIN_VALUE; + } + return offset; + } + + /** + * Get data from buffer. + * + * @param address less than 0 + * @return + */ + public CqExtUnit get(final long address) { + CqExtUnit cqExtUnit = new CqExtUnit(); + if (get(address, cqExtUnit)) { + return cqExtUnit; + } + + return null; + } + + /** + * Get data from buffer, and set to {@code cqExtUnit} + * + * @param address less than 0 + * @param cqExtUnit + * @return + */ + public boolean get(final long address, final CqExtUnit cqExtUnit) { + if (!isExtAddr(address)) { + return false; + } + + final int mappedFileSize = this.mappedFileSize; + final long realOffset = unDecorate(address); + + MappedFile mappedFile = this.mappedFileQueue.findMappedFileByOffset(realOffset, realOffset == 0); + if (mappedFile == null) { + return false; + } + + int pos = (int) (realOffset % mappedFileSize); + + SelectMappedBufferResult bufferResult = mappedFile.selectMappedBuffer(pos); + if (bufferResult == null) { + log.warn("[BUG] Consume queue extend unit({}) is not found!", realOffset); + return false; + } + boolean ret = false; + try { + ret = cqExtUnit.read(bufferResult.getByteBuffer()); + } finally { + bufferResult.release(); + } + + return ret; + } + + /** + * Save to mapped buffer of file and return address. + * <p> + * Be careful, this method is not thread safe. + * </p> + * + * @param cqExtUnit + * @return success: < 0: fail: >=0 + */ + public long put(final CqExtUnit cqExtUnit) { + final int retryTimes = 3; + try { + int size = cqExtUnit.calcUnitSize(); + if (size > CqExtUnit.MAX_EXT_UNIT_SIZE) { + log.error("Size of cq ext unit is greater than {}, {}", CqExtUnit.MAX_EXT_UNIT_SIZE, cqExtUnit); + return 1; + } + if (this.mappedFileQueue.getMaxOffset() + size > MAX_REAL_OFFSET) { + log.warn("Capacity of ext is maximum!{}, {}", this.mappedFileQueue.getMaxOffset(), size); + return 1; + } + // unit size maybe change.but, the same most of the time. + if (this.tempContainer == null || this.tempContainer.capacity() < size) { + this.tempContainer = ByteBuffer.allocate(size); + } + + for (int i = 0; i < retryTimes; i++) { + MappedFile mappedFile = this.mappedFileQueue.getLastMappedFile(); + + if (mappedFile == null || mappedFile.isFull()) { + mappedFile = this.mappedFileQueue.getLastMappedFile(0); + } + + if (mappedFile == null) { + log.error("Create mapped file when save consume queue extend, {}", cqExtUnit); + continue; + } + final int wrotePosition = mappedFile.getWrotePosition(); + final int blankSize = this.mappedFileSize - wrotePosition - END_BLANK_DATA_LENGTH; + + // check whether has enough space. + if (size > blankSize) { + fullFillToEnd(mappedFile, wrotePosition); + log.info("No enough space(need:{}, has:{}) of file {}, so fill to end", + size, blankSize, mappedFile.getFileName()); + continue; + } + + if (mappedFile.appendMessage(cqExtUnit.write(this.tempContainer), 0, size)) { + return decorate(wrotePosition + mappedFile.getFileFromOffset()); + } + } + } catch (Throwable e) { + log.error("Save consume queue extend error, " + cqExtUnit, e); + } + + return 1; + } + + protected void fullFillToEnd(final MappedFile mappedFile, final int wrotePosition) { + ByteBuffer mappedFileBuffer = mappedFile.sliceByteBuffer(); + mappedFileBuffer.position(wrotePosition); + + // ending. + mappedFileBuffer.putShort((short) -1); + + mappedFile.setWrotePosition(this.mappedFileSize); + } + + /** + * Load data from file when startup. + * + * @return + */ + public boolean load() { + boolean result = this.mappedFileQueue.load(); + log.info("load consume queue extend" + this.topic + "-" + this.queueId + " " + (result ? "OK" : "Failed")); + return result; + } + + /** + * Check whether the step size in mapped file queue is correct. + */ + public void checkSelf() { + this.mappedFileQueue.checkSelf(); + } + + /** + * Recover. + */ + public void recover() { + final List<MappedFile> mappedFiles = this.mappedFileQueue.getMappedFiles(); + if (mappedFiles == null || mappedFiles.isEmpty()) { + return; + } + + // load all files, consume queue will truncate extend files. + int index = 0; + + MappedFile mappedFile = mappedFiles.get(index); + ByteBuffer byteBuffer = mappedFile.sliceByteBuffer(); + long processOffset = mappedFile.getFileFromOffset(); + long mappedFileOffset = 0; + CqExtUnit extUnit = new CqExtUnit(); + while (true) { + extUnit.readBySkip(byteBuffer); + + // check whether write sth. + if (extUnit.getSize() > 0) { + mappedFileOffset += extUnit.getSize(); + continue; + } + + index++; + if (index < mappedFiles.size()) { + mappedFile = mappedFiles.get(index); + byteBuffer = mappedFile.sliceByteBuffer(); + processOffset = mappedFile.getFileFromOffset(); + mappedFileOffset = 0; + log.info("Recover next consume queue extend file, " + mappedFile.getFileName()); + continue; + } + + log.info("All files of consume queue extend has been recovered over, last mapped file " + + mappedFile.getFileName()); + break; + } + + processOffset += mappedFileOffset; + this.mappedFileQueue.setFlushedWhere(processOffset); + this.mappedFileQueue.setCommittedWhere(processOffset); + this.mappedFileQueue.truncateDirtyFiles(processOffset); + } + + /** + * Delete files before {@code minAddress}. + * + * @param minAddress less than 0 + */ + public void truncateByMinAddress(final long minAddress) { + if (!isExtAddr(minAddress)) { + return; + } + + log.info("Truncate consume queue ext by min {}.", minAddress); + + List<MappedFile> willRemoveFiles = new ArrayList<MappedFile>(); + + List<MappedFile> mappedFiles = this.mappedFileQueue.getMappedFiles(); + final long realOffset = unDecorate(minAddress); + + for (MappedFile file : mappedFiles) { + long fileTailOffset = file.getFileFromOffset() + this.mappedFileSize; + + if (fileTailOffset < realOffset) { + log.info("Destroy consume queue ext by min: file={}, fileTailOffset={}, minOffset={}", file.getFileName(), + fileTailOffset, realOffset); + if (file.destroy(1000)) { + willRemoveFiles.add(file); + } + } + } + + this.mappedFileQueue.deleteExpiredFile(willRemoveFiles); + } + + /** + * Delete files after {@code maxAddress}, and reset wrote/commit/flush position to last file. + * + * @param maxAddress less than 0 + */ + public void truncateByMaxAddress(final long maxAddress) { + if (!isExtAddr(maxAddress)) { + return; + } + + log.info("Truncate consume queue ext by max {}.", maxAddress); + + CqExtUnit cqExtUnit = get(maxAddress); + if (cqExtUnit == null) { + log.error("[BUG] address {} of consume queue extend not found!", maxAddress); + return; + } + + final long realOffset = unDecorate(maxAddress); + + this.mappedFileQueue.truncateDirtyFiles(realOffset + cqExtUnit.getSize()); + } + + /** + * flush buffer to file. + * + * @param flushLeastPages + * @return + */ + public boolean flush(final int flushLeastPages) { + return this.mappedFileQueue.flush(flushLeastPages); + } + + /** + * delete files and directory. + */ + public void destroy() { + this.mappedFileQueue.destroy(); + } + + /** + * Max address(value is less than 0). + * <p/> + * <p> + * Be careful: it's an address just when invoking this method. + * </p> + * + * @return + */ + public long getMaxAddress() { + MappedFile mappedFile = this.mappedFileQueue.getLastMappedFile(); + if (mappedFile == null) { + return decorate(0); + } + return decorate(mappedFile.getFileFromOffset() + mappedFile.getWrotePosition()); + } + + /** + * Minus address saved in file. + * + * @return + */ + public long getMinAddress() { + MappedFile firstFile = this.mappedFileQueue.getFirstMappedFile(); + if (firstFile == null) { + return decorate(0); + } + return decorate(firstFile.getFileFromOffset()); + } + + /** + * Store unit. + */ + public static class CqExtUnit { + public static final short MIN_EXT_UNIT_SIZE + = 2 * 1 // size, 32k max + + 8 * 2 // msg time + tagCode + + 2; // bitMapSize + + public static final int MAX_EXT_UNIT_SIZE = Short.MAX_VALUE; + + public CqExtUnit() {} + + public CqExtUnit(Long tagsCode, long msgStoreTime, byte[] filterBitMap) { + this.tagsCode = tagsCode == null ? 0 : tagsCode; + this.msgStoreTime = msgStoreTime; + this.filterBitMap = filterBitMap; + this.bitMapSize = (short) (filterBitMap == null ? 0 : filterBitMap.length); + this.size = (short) (MIN_EXT_UNIT_SIZE + this.bitMapSize); + } + + /** + * unit size + */ + private short size; + /** + * has code of tags + */ + private long tagsCode; + /** + * the time to store into commit log of message + */ + private long msgStoreTime; + /** + * size of bit map + */ + private short bitMapSize; + /** + * filter bit map + */ + private byte[] filterBitMap; + + /** + * build unit from buffer from current position. + * + * @param buffer + * @return + */ + private boolean read(final ByteBuffer buffer) { + if (buffer.position() + 2 > buffer.limit()) { + return false; + } + + this.size = buffer.getShort(); + + if (this.size < 1) { + return false; + } + + this.tagsCode = buffer.getLong(); + this.msgStoreTime = buffer.getLong(); + this.bitMapSize = buffer.getShort(); + + if (this.bitMapSize < 1) { + return true; + } + + if (this.filterBitMap == null || this.filterBitMap.length != this.bitMapSize) { + this.filterBitMap = new byte[bitMapSize]; + } + + buffer.get(this.filterBitMap); + return true; + } + + /** + * Only read first 2 byte to get unit size. + * <p> + * if size > 0, then skip buffer position with size. + * </p> + * <p> + * if size <= 0, nothing to do. + * </p> + * + * @param buffer + */ + private void readBySkip(final ByteBuffer buffer) { + ByteBuffer temp = buffer.slice(); + + short tempSize = temp.getShort(); + this.size = tempSize; + + if (tempSize > 0) { + buffer.position(buffer.position() + this.size); + } + } + + /** + * Transform unit data to byte array. + * <p/> + * <li>1. @{code container} can be null, it will be created if null.</li> + * <li>2. if capacity of @{code container} is less than unit size, it will be created also.</li> + * <li>3. Pls be sure that size of unit is not greater than {@link #MAX_EXT_UNIT_SIZE}</li> + * + * @param container + * @return + */ + private byte[] write(final ByteBuffer container) { + this.bitMapSize = (short) (filterBitMap == null ? 0 : filterBitMap.length); + this.size = (short) (MIN_EXT_UNIT_SIZE + this.bitMapSize); + + ByteBuffer temp = container; + + if (temp == null || temp.capacity() < this.size) { + temp = ByteBuffer.allocate(this.size); + } + + temp.flip(); + temp.limit(this.size); + + temp.putShort(this.size); + temp.putLong(this.tagsCode); + temp.putLong(this.msgStoreTime); + temp.putShort(this.bitMapSize); + if (this.bitMapSize > 0) { + temp.put(this.filterBitMap); + } + + return temp.array(); + } + + /** + * Calculate unit size by current data. + * + * @return + */ + private int calcUnitSize() { + int sizeTemp = MIN_EXT_UNIT_SIZE + (filterBitMap == null ? 0 : filterBitMap.length); + return sizeTemp; + } + + public long getTagsCode() { + return tagsCode; + } + + public void setTagsCode(final long tagsCode) { + this.tagsCode = tagsCode; + } + + public long getMsgStoreTime() { + return msgStoreTime; + } + + public void setMsgStoreTime(final long msgStoreTime) { + this.msgStoreTime = msgStoreTime; + } + + public byte[] getFilterBitMap() { + if (this.bitMapSize < 1) { + return null; + } + return filterBitMap; + } + + public void setFilterBitMap(final byte[] filterBitMap) { + this.filterBitMap = filterBitMap; + // not safe transform, but size will be calculate by #calcUnitSize + this.bitMapSize = (short) (filterBitMap == null ? 0 : filterBitMap.length); + } + + public short getSize() { + return size; + } + + @Override + public boolean equals(Object o) { + if (this == o) return true; + if (!(o instanceof CqExtUnit)) return false; + + CqExtUnit cqExtUnit = (CqExtUnit) o; + + if (bitMapSize != cqExtUnit.bitMapSize) return false; + if (msgStoreTime != cqExtUnit.msgStoreTime) return false; + if (size != cqExtUnit.size) return false; + if (tagsCode != cqExtUnit.tagsCode) return false; + if (!Arrays.equals(filterBitMap, cqExtUnit.filterBitMap)) return false; + + return true; + } + + @Override + public int hashCode() { + int result = (int) size; + result = 31 * result + (int) (tagsCode ^ (tagsCode >>> 32)); + result = 31 * result + (int) (msgStoreTime ^ (msgStoreTime >>> 32)); + result = 31 * result + (int) bitMapSize; + result = 31 * result + (filterBitMap != null ? Arrays.hashCode(filterBitMap) : 0); + return result; + } + + @Override + public String toString() { + return "CqExtUnit{" + + "size=" + size + + ", tagsCode=" + tagsCode + + ", msgStoreTime=" + msgStoreTime + + ", bitMapSize=" + bitMapSize + + ", filterBitMap=" + Arrays.toString(filterBitMap) + + '}'; + } + } +} http://git-wip-us.apache.org/repos/asf/incubator-rocketmq/blob/58f1574b/store/src/main/java/org/apache/rocketmq/store/DefaultMessageFilter.java ---------------------------------------------------------------------- diff --git a/store/src/main/java/org/apache/rocketmq/store/DefaultMessageFilter.java b/store/src/main/java/org/apache/rocketmq/store/DefaultMessageFilter.java index 1350026..9db87f3 100644 --- a/store/src/main/java/org/apache/rocketmq/store/DefaultMessageFilter.java +++ b/store/src/main/java/org/apache/rocketmq/store/DefaultMessageFilter.java @@ -18,26 +18,33 @@ package org.apache.rocketmq.store; import org.apache.rocketmq.common.protocol.heartbeat.SubscriptionData; +import java.nio.ByteBuffer; +import java.util.Map; + public class DefaultMessageFilter implements MessageFilter { - @Override - public boolean isMessageMatched(SubscriptionData subscriptionData, Long tagsCode) { - if (tagsCode == null) { - return true; - } + private SubscriptionData subscriptionData; - if (null == subscriptionData) { - return true; - } + public DefaultMessageFilter(final SubscriptionData subscriptionData) { + this.subscriptionData = subscriptionData; + } - if (subscriptionData.isClassFilterMode()) + @Override + public boolean isMatchedByConsumeQueue(Long tagsCode, ConsumeQueueExt.CqExtUnit cqExtUnit) { + if (null == tagsCode || null == subscriptionData) { return true; + } - if (subscriptionData.getSubString().equals(SubscriptionData.SUB_ALL)) { + if (subscriptionData.isClassFilterMode()) { return true; } - return subscriptionData.getCodeSet().contains(tagsCode.intValue()); + return subscriptionData.getSubString().equals(SubscriptionData.SUB_ALL) + || subscriptionData.getCodeSet().contains(tagsCode.intValue()); } + @Override + public boolean isMatchedByCommitLog(ByteBuffer msgBuffer, Map<String, String> properties) { + return true; + } } http://git-wip-us.apache.org/repos/asf/incubator-rocketmq/blob/58f1574b/store/src/main/java/org/apache/rocketmq/store/DefaultMessageStore.java ---------------------------------------------------------------------- diff --git a/store/src/main/java/org/apache/rocketmq/store/DefaultMessageStore.java b/store/src/main/java/org/apache/rocketmq/store/DefaultMessageStore.java index 0edfeec..7bed62c 100644 --- a/store/src/main/java/org/apache/rocketmq/store/DefaultMessageStore.java +++ b/store/src/main/java/org/apache/rocketmq/store/DefaultMessageStore.java @@ -23,6 +23,7 @@ import java.nio.ByteBuffer; import java.util.Collections; import java.util.HashMap; import java.util.Iterator; +import java.util.LinkedList; import java.util.Map; import java.util.Map.Entry; import java.util.Set; @@ -41,7 +42,6 @@ import org.apache.rocketmq.common.constant.LoggerName; import org.apache.rocketmq.common.message.MessageDecoder; import org.apache.rocketmq.common.message.MessageExt; import org.apache.rocketmq.common.message.MessageExtBatch; -import org.apache.rocketmq.common.protocol.heartbeat.SubscriptionData; import org.apache.rocketmq.common.running.RunningStats; import org.apache.rocketmq.common.sysflag.MessageSysFlag; import org.apache.rocketmq.store.config.BrokerRole; @@ -60,8 +60,6 @@ import static org.apache.rocketmq.store.config.BrokerRole.SLAVE; public class DefaultMessageStore implements MessageStore { private static final Logger log = LoggerFactory.getLogger(LoggerName.STORE_LOGGER_NAME); - private final MessageFilter messageFilter = new DefaultMessageFilter(); - private final MessageStoreConfig messageStoreConfig; // CommitLog private final CommitLog commitLog; @@ -103,6 +101,8 @@ public class DefaultMessageStore implements MessageStore { private AtomicLong printTimes = new AtomicLong(0); + private final LinkedList<CommitLogDispatcher> dispatcherList; + public DefaultMessageStore(final MessageStoreConfig messageStoreConfig, final BrokerStatsManager brokerStatsManager, final MessageArrivingListener messageArrivingListener, final BrokerConfig brokerConfig) throws IOException { this.messageArrivingListener = messageArrivingListener; @@ -133,6 +133,10 @@ public class DefaultMessageStore implements MessageStore { this.allocateMappedFileService.start(); this.indexService.start(); + + this.dispatcherList = new LinkedList<>(); + this.dispatcherList.addLast(new CommitLogDispatcherBuildConsumeQueue()); + this.dispatcherList.addLast(new CommitLogDispatcherBuildIndex()); } public void truncateDirtyLogicFiles(long phyOffset) { @@ -409,7 +413,7 @@ public class DefaultMessageStore implements MessageStore { } public GetMessageResult getMessage(final String group, final String topic, final int queueId, final long offset, final int maxMsgNums, - final SubscriptionData subscriptionData) { + final MessageFilter messageFilter) { if (this.shutdown) { log.warn("message store has shutdown, so getMessage is forbidden"); return null; @@ -464,6 +468,7 @@ public class DefaultMessageStore implements MessageStore { int i = 0; final int maxFilterMessageCount = Math.max(16000, maxMsgNums * ConsumeQueue.CQ_STORE_UNIT_SIZE); final boolean diskFallRecorded = this.messageStoreConfig.isDiskFallRecorded(); + ConsumeQueueExt.CqExtUnit cqExtUnit = new ConsumeQueueExt.CqExtUnit(); for (; i < bufferConsumeQueue.getSize() && i < maxFilterMessageCount; i += ConsumeQueue.CQ_STORE_UNIT_SIZE) { long offsetPy = bufferConsumeQueue.getByteBuffer().getLong(); int sizePy = bufferConsumeQueue.getByteBuffer().getInt(); @@ -483,29 +488,51 @@ public class DefaultMessageStore implements MessageStore { break; } - if (this.messageFilter.isMessageMatched(subscriptionData, tagsCode)) { - SelectMappedBufferResult selectResult = this.commitLog.getMessage(offsetPy, sizePy); - if (selectResult != null) { - this.storeStatsService.getGetMessageTransferedMsgCount().incrementAndGet(); - getResult.addMessage(selectResult); - status = GetMessageStatus.FOUND; - nextPhyFileStartOffset = Long.MIN_VALUE; + boolean extRet = false; + if (consumeQueue.isExtAddr(tagsCode)) { + extRet = consumeQueue.getExt(tagsCode, cqExtUnit); + if (extRet) { + tagsCode = cqExtUnit.getTagsCode(); } else { - if (getResult.getBufferTotalSize() == 0) { - status = GetMessageStatus.MESSAGE_WAS_REMOVING; - } - - nextPhyFileStartOffset = this.commitLog.rollNextFile(offsetPy); + // can't find ext content.Client will filter messages by tag also. + log.error("[BUG] can't find consume queue extend file content!addr={}, offsetPy={}, sizePy={}, topic={}, group={}", + tagsCode, offsetPy, sizePy, topic, group); } - } else { + } + + if (messageFilter != null + && !messageFilter.isMatchedByConsumeQueue(tagsCode, extRet ? cqExtUnit : null)) { if (getResult.getBufferTotalSize() == 0) { status = GetMessageStatus.NO_MATCHED_MESSAGE; } - if (log.isDebugEnabled()) { - log.debug("message type not matched, client: " + subscriptionData + " server: " + tagsCode); + continue; + } + + SelectMappedBufferResult selectResult = this.commitLog.getMessage(offsetPy, sizePy); + if (null == selectResult) { + if (getResult.getBufferTotalSize() == 0) { + status = GetMessageStatus.MESSAGE_WAS_REMOVING; + } + + nextPhyFileStartOffset = this.commitLog.rollNextFile(offsetPy); + continue; + } + + if (messageFilter != null + && !messageFilter.isMatchedByCommitLog(selectResult.getByteBuffer().slice(), null)) { + if (getResult.getBufferTotalSize() == 0) { + status = GetMessageStatus.NO_MATCHED_MESSAGE; } + // release... + selectResult.release(); + continue; } + + this.storeStatsService.getGetMessageTransferedMsgCount().incrementAndGet(); + getResult.addMessage(selectResult); + status = GetMessageStatus.FOUND; + nextPhyFileStartOffset = Long.MIN_VALUE; } if (diskFallRecorded) { @@ -1318,27 +1345,14 @@ public class DefaultMessageStore implements MessageStore { } public void doDispatch(DispatchRequest req) { - final int tranType = MessageSysFlag.getTransactionValue(req.getSysFlag()); - switch (tranType) { - case MessageSysFlag.TRANSACTION_NOT_TYPE: - case MessageSysFlag.TRANSACTION_COMMIT_TYPE: - DefaultMessageStore.this.putMessagePositionInfo(req.getTopic(), req.getQueueId(), req.getCommitLogOffset(), req.getMsgSize(), - req.getTagsCode(), req.getStoreTimestamp(), req.getConsumeQueueOffset()); - break; - case MessageSysFlag.TRANSACTION_PREPARED_TYPE: - case MessageSysFlag.TRANSACTION_ROLLBACK_TYPE: - break; - } - - if (DefaultMessageStore.this.getMessageStoreConfig().isMessageIndexEnable()) { - DefaultMessageStore.this.indexService.buildIndex(req); + for (CommitLogDispatcher dispatcher : this.dispatcherList) { + dispatcher.dispatch(req); } } - public void putMessagePositionInfo(String topic, int queueId, long offset, int size, long tagsCode, long storeTimestamp, - long logicOffset) { - ConsumeQueue cq = this.findConsumeQueue(topic, queueId); - cq.putMessagePositionInfoWrapper(offset, size, tagsCode, storeTimestamp, logicOffset); + public void putMessagePositionInfo(DispatchRequest dispatchRequest) { + ConsumeQueue cq = this.findConsumeQueue(dispatchRequest.getTopic(), dispatchRequest.getQueueId()); + cq.putMessagePositionInfoWrapper(dispatchRequest); } public BrokerStatsManager getBrokerStatsManager() { @@ -1354,6 +1368,20 @@ public class DefaultMessageStore implements MessageStore { return remainTransientStoreBufferNumbs() == 0; } + @Override + public LinkedList<CommitLogDispatcher> getDispatcherList() { + return this.dispatcherList; + } + + @Override + public ConsumeQueue getConsumeQueue(String topic, int queueId) { + ConcurrentHashMap<Integer, ConsumeQueue> map = consumeQueueTable.get(topic); + if (map == null) { + return null; + } + return map.get(queueId); + } + public void unlockMappedFile(final MappedFile mappedFile) { this.scheduledExecutorService.schedule(new Runnable() { @Override @@ -1363,6 +1391,33 @@ public class DefaultMessageStore implements MessageStore { }, 6, TimeUnit.SECONDS); } + class CommitLogDispatcherBuildConsumeQueue implements CommitLogDispatcher { + + @Override + public void dispatch(DispatchRequest request) { + final int tranType = MessageSysFlag.getTransactionValue(request.getSysFlag()); + switch (tranType) { + case MessageSysFlag.TRANSACTION_NOT_TYPE: + case MessageSysFlag.TRANSACTION_COMMIT_TYPE: + DefaultMessageStore.this.putMessagePositionInfo(request); + break; + case MessageSysFlag.TRANSACTION_PREPARED_TYPE: + case MessageSysFlag.TRANSACTION_ROLLBACK_TYPE: + break; + } + } + } + + class CommitLogDispatcherBuildIndex implements CommitLogDispatcher { + + @Override + public void dispatch(DispatchRequest request) { + if (DefaultMessageStore.this.messageStoreConfig.isMessageIndexEnable()) { + DefaultMessageStore.this.indexService.buildIndex(request); + } + } + } + class CleanCommitLogService { private final static int MAX_MANUAL_DELETE_FILE_TIMES = 20; @@ -1695,7 +1750,8 @@ public class DefaultMessageStore implements MessageStore { && DefaultMessageStore.this.brokerConfig.isLongPollingEnable()) { DefaultMessageStore.this.messageArrivingListener.arriving(dispatchRequest.getTopic(), dispatchRequest.getQueueId(), dispatchRequest.getConsumeQueueOffset() + 1, - dispatchRequest.getTagsCode()); + dispatchRequest.getTagsCode(), dispatchRequest.getStoreTimestamp(), + dispatchRequest.getBitMap(), dispatchRequest.getPropertiesMap()); } // FIXED BUG By shijia this.reputFromOffset += size;