KYLIN-942 support parallel scan for grid table
Project: http://git-wip-us.apache.org/repos/asf/incubator-kylin/repo Commit: http://git-wip-us.apache.org/repos/asf/incubator-kylin/commit/d8372747 Tree: http://git-wip-us.apache.org/repos/asf/incubator-kylin/tree/d8372747 Diff: http://git-wip-us.apache.org/repos/asf/incubator-kylin/diff/d8372747 Branch: refs/heads/KYLIN-942 Commit: d8372747606d13feadf88cbe57a402a91516d841 Parents: 8ac3f08 Author: honma <ho...@ebay.com> Authored: Thu Oct 22 17:47:42 2015 +0800 Committer: honma <ho...@ebay.com> Committed: Thu Oct 22 17:47:42 2015 +0800 ---------------------------------------------------------------------- core-common/pom.xml | 9 + .../org/apache/kylin/common/util/BitSets.java | 39 + .../org/apache/kylin/common/util/BytesUtil.java | 20 +- .../kylin/common/util/CompressionUtils.java | 6 +- .../apache/kylin/common/util/ShardingHash.java | 67 ++ .../apache/kylin/common/util/BitSetsTest.java | 36 + .../apache/kylin/common/util/BytesUtilTest.java | 11 +- .../org/apache/kylin/cube/CubeInstance.java | 14 +- .../java/org/apache/kylin/cube/CubeSegment.java | 50 + .../cube/common/FuzzyValueCombination.java | 130 +++ .../kylin/cube/common/RowKeySplitter.java | 31 +- .../org/apache/kylin/cube/cuboid/Cuboid.java | 4 +- .../kylin/cube/gridtable/CubeCodeSystem.java | 12 +- .../cube/gridtable/TrimmedCubeCodeSystem.java | 15 +- .../cube/inmemcubing/ConcurrentDiskStore.java | 4 +- .../kylin/cube/inmemcubing/MemDiskStore.java | 4 +- .../kylin/cube/kv/AbstractRowKeyEncoder.java | 5 + .../apache/kylin/cube/kv/FuzzyKeyEncoder.java | 2 +- .../apache/kylin/cube/kv/FuzzyMaskEncoder.java | 11 +- .../org/apache/kylin/cube/kv/RowConstants.java | 11 +- .../org/apache/kylin/cube/kv/RowKeyDecoder.java | 4 +- .../org/apache/kylin/cube/kv/RowKeyEncoder.java | 56 +- .../org/apache/kylin/gridtable/GTBuilder.java | 4 +- .../java/org/apache/kylin/gridtable/GTInfo.java | 30 +- .../org/apache/kylin/gridtable/GTRecord.java | 75 +- .../org/apache/kylin/gridtable/GTScanRange.java | 30 +- .../kylin/gridtable/GTScanRangePlanner.java | 244 +++-- .../apache/kylin/gridtable/GTScanRequest.java | 9 +- .../java/org/apache/kylin/gridtable/GTUtil.java | 4 - .../org/apache/kylin/gridtable/GridTable.java | 3 - .../org/apache/kylin/gridtable/IGTStore.java | 4 +- .../gridtable/memstore/GTSimpleMemStore.java | 7 +- .../kylin/cube/common/RowKeySplitterTest.java | 20 +- .../apache/kylin/cube/kv/RowKeyDecoderTest.java | 10 +- .../apache/kylin/cube/kv/RowKeyEncoderTest.java | 32 +- .../kylin/gridtable/DictGridTableTest.java | 152 ++- .../kylin/metadata/filter/TupleFilter.java | 11 + .../metadata/filter/TupleFilterSerializer.java | 10 +- .../kylin/storage/hybrid/HybridInstance.java | 2 +- .../translate/FuzzyValueCombination.java | 30 +- .../kylin/storage/translate/HBaseKeyRange.java | 6 +- dev-support/test_all.sh | 11 + .../kylin/engine/mr/common/CuboidShardUtil.java | 56 ++ .../kylin/engine/mr/common/CuboidStatsUtil.java | 61 ++ .../mr/steps/FactDistinctColumnsReducer.java | 26 +- .../mr/steps/MapContextGTRecordWriter.java | 22 +- .../mr/steps/MergeCuboidFromStorageMapper.java | 7 +- .../engine/mr/steps/MergeCuboidMapper.java | 9 +- .../engine/mr/steps/MergeStatisticsStep.java | 3 +- .../kylin/engine/mr/steps/NDCuboidJob.java | 5 - .../kylin/engine/mr/steps/NDCuboidMapper.java | 23 +- .../steps/FactDistinctColumnsReducerTest.java | 3 +- .../engine/mr/steps/MergeCuboidJobTest.java | 2 + .../kylin/engine/mr/steps/NDCuboidJobTest.java | 2 + .../engine/mr/steps/NDCuboidMapperTest.java | 7 +- ...test_kylin_cube_with_slr_left_join_desc.json | 1 + .../test_kylin_cube_without_slr_desc.json | 1 + ...t_kylin_cube_without_slr_left_join_desc.json | 1 + .../test_streaming_table_cube_desc.json | 1 + .../kylin/invertedindex/index/ShardingHash.java | 32 - .../kylin/invertedindex/index/TableRecord.java | 3 +- .../kylin/job/streaming/CubeStreamConsumer.java | 8 +- .../kylin/job/hadoop/invertedindex/IITest.java | 3 +- pom.xml | 7 + .../apache/kylin/query/routing/RoutingRule.java | 9 +- .../kylin/query/test/ITKylinQueryTest.java | 4 +- .../src/test/resources/query/debug/query78.sql | 22 + query/src/test/resources/query/sql/query01.sql | 4 +- query/src/test/resources/query/sql/query85.sql | 26 + query/src/test/resources/query/sql/query86.sql | 24 + server/src/main/resources/log4j.properties | 2 +- .../kylin/storage/hbase/HBaseStorage.java | 5 +- .../coprocessor/CoprocessorProjector.java | 2 +- .../common/coprocessor/CoprocessorRowType.java | 2 +- .../hbase/cube/v1/CubeSegmentTupleIterator.java | 2 +- .../storage/hbase/cube/v1/CubeStorageQuery.java | 60 +- .../hbase/cube/v2/CubeHBaseEndpointRPC.java | 133 ++- .../storage/hbase/cube/v2/CubeHBaseRPC.java | 155 ++- .../storage/hbase/cube/v2/CubeHBaseScanRPC.java | 98 +- .../storage/hbase/cube/v2/CubeScanner.java | 265 ----- .../hbase/cube/v2/CubeSegmentScanner.java | 290 ++++++ .../storage/hbase/cube/v2/CubeStorageQuery.java | 7 +- .../hbase/cube/v2/HBaseReadonlyStore.java | 47 +- .../kylin/storage/hbase/cube/v2/HBaseScan.java | 88 ++ .../kylin/storage/hbase/cube/v2/RawScan.java | 22 +- .../cube/v2/SequentialCubeTupleIterator.java | 8 +- .../coprocessor/endpoint/CubeVisitService.java | 29 +- .../endpoint/generated/CubeVisitProtos.java | 981 ++++++++++++++++++- .../endpoint/protobuf/CubeVisit.proto | 4 + .../endpoint/EndpointTupleIterator.java | 21 +- .../ii/coprocessor/endpoint/IIEndpoint.java | 22 +- .../storage/hbase/steps/CreateHTableJob.java | 153 ++- .../storage/hbase/steps/HBaseCuboidWriter.java | 26 +- .../hbase/steps/HBaseStreamingOutput.java | 6 +- .../kylin/storage/hbase/steps/MergeGCStep.java | 10 +- .../hbase/steps/SandboxMetastoreCLI.java | 2 +- 96 files changed, 3061 insertions(+), 986 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/incubator-kylin/blob/d8372747/core-common/pom.xml ---------------------------------------------------------------------- diff --git a/core-common/pom.xml b/core-common/pom.xml index 577db42..ea02b4b 100644 --- a/core-common/pom.xml +++ b/core-common/pom.xml @@ -49,6 +49,10 @@ <artifactId>commons-lang3</artifactId> </dependency> <dependency> + <groupId>org.apache.commons</groupId> + <artifactId>commons-math3</artifactId> + </dependency> + <dependency> <groupId>commons-io</groupId> <artifactId>commons-io</artifactId> </dependency> @@ -65,6 +69,11 @@ <artifactId>commons-email</artifactId> </dependency> <dependency> + <groupId>org.apache.commons</groupId> + <artifactId>commons-collections4</artifactId> + <version>${commons-collections4.version}</version> + </dependency> + <dependency> <groupId>commons-httpclient</groupId> <artifactId>commons-httpclient</artifactId> </dependency> http://git-wip-us.apache.org/repos/asf/incubator-kylin/blob/d8372747/core-common/src/main/java/org/apache/kylin/common/util/BitSets.java ---------------------------------------------------------------------- diff --git a/core-common/src/main/java/org/apache/kylin/common/util/BitSets.java b/core-common/src/main/java/org/apache/kylin/common/util/BitSets.java new file mode 100644 index 0000000..b8a6de7 --- /dev/null +++ b/core-common/src/main/java/org/apache/kylin/common/util/BitSets.java @@ -0,0 +1,39 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.kylin.common.util; + +import java.util.BitSet; + +public class BitSets { + public static BitSet valueOf(int[] indexes) { + if (indexes == null || indexes.length == 0) { + return new BitSet(); + } + + int maxIndex = Integer.MIN_VALUE; + for (int index : indexes) { + maxIndex = Math.max(maxIndex, index); + } + BitSet set = new BitSet(maxIndex); + for (int index : indexes) { + set.set(index); + } + return set; + } +} http://git-wip-us.apache.org/repos/asf/incubator-kylin/blob/d8372747/core-common/src/main/java/org/apache/kylin/common/util/BytesUtil.java ---------------------------------------------------------------------- diff --git a/core-common/src/main/java/org/apache/kylin/common/util/BytesUtil.java b/core-common/src/main/java/org/apache/kylin/common/util/BytesUtil.java index 0880da1..0d4dba9 100644 --- a/core-common/src/main/java/org/apache/kylin/common/util/BytesUtil.java +++ b/core-common/src/main/java/org/apache/kylin/common/util/BytesUtil.java @@ -25,14 +25,23 @@ public class BytesUtil { public static final byte[] EMPTY_BYTE_ARRAY = new byte[0]; - public static void writeLong(long num, byte[] bytes, int offset, int size) { + public static void writeShort(short num, byte[] bytes, int offset, int size) { for (int i = offset + size - 1; i >= offset; i--) { bytes[i] = (byte) num; num >>>= 8; } } - public static void writeUnsigned(int num, byte[] bytes, int offset, int size) { + public static long readShort(byte[] bytes, int offset, int size) { + short num = 0; + for (int i = offset, n = offset + size; i < n; i++) { + num <<= 8; + num |= (short) bytes[i] & 0xFF; + } + return num; + } + + public static void writeLong(long num, byte[] bytes, int offset, int size) { for (int i = offset + size - 1; i >= offset; i--) { bytes[i] = (byte) num; num >>>= 8; @@ -48,6 +57,13 @@ public class BytesUtil { return integer; } + public static void writeUnsigned(int num, byte[] bytes, int offset, int size) { + for (int i = offset + size - 1; i >= offset; i--) { + bytes[i] = (byte) num; + num >>>= 8; + } + } + public static int readUnsigned(byte[] bytes, int offset, int size) { int integer = 0; for (int i = offset, n = offset + size; i < n; i++) { http://git-wip-us.apache.org/repos/asf/incubator-kylin/blob/d8372747/core-common/src/main/java/org/apache/kylin/common/util/CompressionUtils.java ---------------------------------------------------------------------- diff --git a/core-common/src/main/java/org/apache/kylin/common/util/CompressionUtils.java b/core-common/src/main/java/org/apache/kylin/common/util/CompressionUtils.java index 13abab5..c9838e4 100644 --- a/core-common/src/main/java/org/apache/kylin/common/util/CompressionUtils.java +++ b/core-common/src/main/java/org/apache/kylin/common/util/CompressionUtils.java @@ -45,8 +45,7 @@ public class CompressionUtils { outputStream.close(); byte[] output = outputStream.toByteArray(); - logger.info("Original: " + data.length + " bytes"); - logger.info("Compressed: " + output.length + " bytes"); + logger.info("Original: " + data.length + " bytes. " + "Compressed: " + output.length + " bytes "); return output; } @@ -63,8 +62,7 @@ public class CompressionUtils { outputStream.close(); byte[] output = outputStream.toByteArray(); - logger.info("Original: " + data.length + " bytes"); - logger.info("Decompressed: " + output.length + " bytes"); + logger.info("Original: " + data.length + " bytes. " + "Decompressed: " + output.length + " bytes"); return output; } } http://git-wip-us.apache.org/repos/asf/incubator-kylin/blob/d8372747/core-common/src/main/java/org/apache/kylin/common/util/ShardingHash.java ---------------------------------------------------------------------- diff --git a/core-common/src/main/java/org/apache/kylin/common/util/ShardingHash.java b/core-common/src/main/java/org/apache/kylin/common/util/ShardingHash.java new file mode 100644 index 0000000..8d728c8 --- /dev/null +++ b/core-common/src/main/java/org/apache/kylin/common/util/ShardingHash.java @@ -0,0 +1,67 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.kylin.common.util; + +import com.google.common.hash.HashFunction; +import com.google.common.hash.Hashing; + +public class ShardingHash { + + static HashFunction hashFunc = Hashing.murmur3_128(); + + public static short getShard(int integerValue, int totalShards) { + if (totalShards <= 1) { + return 0; + } + long hash = hashFunc.hashInt(integerValue).asLong(); + return _getShard(hash, totalShards); + } + + public static short getShard(long longValue, int totalShards) { + if (totalShards <= 1) { + return 0; + } + long hash = hashFunc.hashLong(longValue).asLong(); + return _getShard(hash, totalShards); + } + + public static short getShard(byte[] byteValues, int offset, int length, int totalShards) { + if (totalShards <= 1) { + return 0; + } + + long hash = hashFunc.hashBytes(byteValues, offset, length).asLong(); + return _getShard(hash, totalShards); + } + + public static short normalize(short cuboidShardBase, short shardOffset, int totalShards) { + if (totalShards <= 1) { + return 0; + } + return (short) ((cuboidShardBase + shardOffset) % totalShards); + } + + private static short _getShard(long hash, int totalShard) { + long x = hash % totalShard; + if (x < 0) { + x += totalShard; + } + return (short) x; + } +} http://git-wip-us.apache.org/repos/asf/incubator-kylin/blob/d8372747/core-common/src/test/java/org/apache/kylin/common/util/BitSetsTest.java ---------------------------------------------------------------------- diff --git a/core-common/src/test/java/org/apache/kylin/common/util/BitSetsTest.java b/core-common/src/test/java/org/apache/kylin/common/util/BitSetsTest.java new file mode 100644 index 0000000..c923969 --- /dev/null +++ b/core-common/src/test/java/org/apache/kylin/common/util/BitSetsTest.java @@ -0,0 +1,36 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.kylin.common.util; + +import java.util.BitSet; + +import org.junit.Assert; +import org.junit.Test; + +public class BitSetsTest { + + @Test + public void basicTest() { + BitSet a = BitSets.valueOf(new int[] { 1, 3, 10 }); + Assert.assertEquals(3, a.cardinality()); + Assert.assertTrue(10 < a.size()); + Assert.assertTrue(a.get(3)); + } + +} \ No newline at end of file http://git-wip-us.apache.org/repos/asf/incubator-kylin/blob/d8372747/core-common/src/test/java/org/apache/kylin/common/util/BytesUtilTest.java ---------------------------------------------------------------------- diff --git a/core-common/src/test/java/org/apache/kylin/common/util/BytesUtilTest.java b/core-common/src/test/java/org/apache/kylin/common/util/BytesUtilTest.java index 7436de9..79bc9f1 100644 --- a/core-common/src/test/java/org/apache/kylin/common/util/BytesUtilTest.java +++ b/core-common/src/test/java/org/apache/kylin/common/util/BytesUtilTest.java @@ -18,17 +18,15 @@ package org.apache.kylin.common.util; +import static org.junit.Assert.assertEquals; +import static org.junit.Assert.assertTrue; + import java.nio.ByteBuffer; import java.util.Arrays; -import junit.framework.TestCase; - import org.junit.Test; -/** - * by honma - */ -public class BytesUtilTest extends TestCase { +public class BytesUtilTest { @Test public void test() { ByteBuffer buffer = ByteBuffer.allocate(10000); @@ -77,6 +75,7 @@ public class BytesUtilTest extends TestCase { assertTrue(Arrays.equals(anOtherNewBytes, ba.array())); } + @Test public void testReadable() { String x = "\\x00\\x00\\x00\\x00\\x00\\x01\\xFC\\xA8"; byte[] bytes = BytesUtil.fromReadableText(x); http://git-wip-us.apache.org/repos/asf/incubator-kylin/blob/d8372747/core-cube/src/main/java/org/apache/kylin/cube/CubeInstance.java ---------------------------------------------------------------------- diff --git a/core-cube/src/main/java/org/apache/kylin/cube/CubeInstance.java b/core-cube/src/main/java/org/apache/kylin/cube/CubeInstance.java index 4bfdb18..7452539 100644 --- a/core-cube/src/main/java/org/apache/kylin/cube/CubeInstance.java +++ b/core-cube/src/main/java/org/apache/kylin/cube/CubeInstance.java @@ -27,8 +27,6 @@ import org.apache.kylin.common.persistence.RootPersistentEntity; import org.apache.kylin.cube.model.CubeDesc; import org.apache.kylin.metadata.model.DataModelDesc; import org.apache.kylin.metadata.model.IBuildable; -import org.apache.kylin.metadata.model.IEngineAware; -import org.apache.kylin.metadata.model.IStorageAware; import org.apache.kylin.metadata.model.LookupDesc; import org.apache.kylin.metadata.model.MeasureDesc; import org.apache.kylin.metadata.model.SegmentStatusEnum; @@ -50,10 +48,9 @@ import com.google.common.collect.Lists; @SuppressWarnings("serial") @JsonAutoDetect(fieldVisibility = Visibility.NONE, getterVisibility = Visibility.NONE, isGetterVisibility = Visibility.NONE, setterVisibility = Visibility.NONE) public class CubeInstance extends RootPersistentEntity implements IRealization, IBuildable { - private static final int COST_WEIGHT_DIMENSION = 1; private static final int COST_WEIGHT_MEASURE = 1; - private static final int COST_WEIGHT_LOOKUP_TABLE = 1; - private static final int COST_WEIGHT_INNER_JOIN = 2; + private static final int COST_WEIGHT_DIMENSION = 10; + private static final int COST_WEIGHT_INNER_JOIN = 100; public static CubeInstance create(String cubeName, String projectName, CubeDesc cubeDesc) { CubeInstance cubeInstance = new CubeInstance(); @@ -69,7 +66,7 @@ public class CubeInstance extends RootPersistentEntity implements IRealization, return cubeInstance; } - + @JsonIgnore private KylinConfig config; @JsonProperty("name") @@ -124,7 +121,7 @@ public class CubeInstance extends RootPersistentEntity implements IRealization, } return mergingSegments; } - + public CubeDesc getDescriptor() { return CubeDescManager.getInstance(config).getCubeDesc(descName); } @@ -357,7 +354,6 @@ public class CubeInstance extends RootPersistentEntity implements IRealization, for (LookupDesc lookupDesc : this.getDescriptor().getModel().getLookups()) { // more tables, more cost - calculatedCost += COST_WEIGHT_LOOKUP_TABLE; if ("inner".equals(lookupDesc.getJoin().getType())) { // inner join cost is bigger than left join, as it will filter some records calculatedCost += COST_WEIGHT_INNER_JOIN; @@ -440,12 +436,10 @@ public class CubeInstance extends RootPersistentEntity implements IRealization, public int getStorageType() { return getDescriptor().getStorageType(); } - @Override public int getEngineType() { return getDescriptor().getEngineType(); } - } http://git-wip-us.apache.org/repos/asf/incubator-kylin/blob/d8372747/core-cube/src/main/java/org/apache/kylin/cube/CubeSegment.java ---------------------------------------------------------------------- diff --git a/core-cube/src/main/java/org/apache/kylin/cube/CubeSegment.java b/core-cube/src/main/java/org/apache/kylin/cube/CubeSegment.java index 7d89470..1a44fcf 100644 --- a/core-cube/src/main/java/org/apache/kylin/cube/CubeSegment.java +++ b/core-cube/src/main/java/org/apache/kylin/cube/CubeSegment.java @@ -25,6 +25,7 @@ import java.util.TimeZone; import java.util.concurrent.ConcurrentHashMap; import org.apache.kylin.common.persistence.ResourceStore; +import org.apache.kylin.common.util.ShardingHash; import org.apache.kylin.cube.model.CubeDesc; import org.apache.kylin.dict.Dictionary; import org.apache.kylin.dict.IDictionaryAware; @@ -37,6 +38,7 @@ import com.fasterxml.jackson.annotation.JsonAutoDetect.Visibility; import com.fasterxml.jackson.annotation.JsonBackReference; import com.fasterxml.jackson.annotation.JsonProperty; import com.google.common.base.Objects; +import com.google.common.collect.Maps; @JsonAutoDetect(fieldVisibility = Visibility.NONE, getterVisibility = Visibility.NONE, isGetterVisibility = Visibility.NONE, setterVisibility = Visibility.NONE) public class CubeSegment implements Comparable<CubeSegment>, IDictionaryAware, IBuildable { @@ -67,6 +69,10 @@ public class CubeSegment implements Comparable<CubeSegment>, IDictionaryAware, I private String lastBuildJobID; @JsonProperty("create_time_utc") private long createTimeUTC; + @JsonProperty("cuboid_shard_nums") + private Map<Long, Short> cuboidShardNums = Maps.newHashMap(); + @JsonProperty("total_shards") + private int totalShards = 0; @JsonProperty("binary_signature") private String binarySignature; // a hash of cube schema and dictionary ID, used for sanity check @@ -76,6 +82,8 @@ public class CubeSegment implements Comparable<CubeSegment>, IDictionaryAware, I @JsonProperty("snapshots") private ConcurrentHashMap<String, String> snapshots; // table name ==> snapshot resource path + private volatile Map<Long, Short> cuboidBaseShards = Maps.newHashMap();//cuboid id ==> base(starting) shard for this cuboid + public CubeDesc getCubeDesc() { return getCubeInstance().getDescriptor(); } @@ -360,4 +368,46 @@ public class CubeSegment implements Comparable<CubeSegment>, IDictionaryAware, I return cubeInstance.getStorageType(); } + /** + * get the number of shards where each cuboid will distribute + * @return + */ + public Short getCuboidShardNum(Long cuboidId) { + Short ret = this.cuboidShardNums.get(cuboidId); + if (ret == null) { + return 1; + } else { + return ret; + } + } + + // /** + // * get the number of shards where each cuboid will distribute + // * @return + // */ + // public Map<Long, Short> getCuboidShards() { + // return this.cuboidShards; + // } + + public void setCuboidShardNums(Map<Long, Short> newCuboidShards) { + this.cuboidShardNums = newCuboidShards; + } + + public int getTotalShards() { + return totalShards; + } + + public void setTotalShards(int totalShards) { + this.totalShards = totalShards; + } + + public short getCuboidBaseShard(Long cuboidId) { + Short ret = cuboidBaseShards.get(cuboidId); + if (ret == null) { + ret = ShardingHash.getShard(cuboidId, totalShards); + cuboidBaseShards.put(cuboidId, ret); + } + return ret; + } + } http://git-wip-us.apache.org/repos/asf/incubator-kylin/blob/d8372747/core-cube/src/main/java/org/apache/kylin/cube/common/FuzzyValueCombination.java ---------------------------------------------------------------------- diff --git a/core-cube/src/main/java/org/apache/kylin/cube/common/FuzzyValueCombination.java b/core-cube/src/main/java/org/apache/kylin/cube/common/FuzzyValueCombination.java new file mode 100644 index 0000000..4ddb06a --- /dev/null +++ b/core-cube/src/main/java/org/apache/kylin/cube/common/FuzzyValueCombination.java @@ -0,0 +1,130 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.kylin.cube.common; + +import java.util.Collections; +import java.util.HashMap; +import java.util.Iterator; +import java.util.List; +import java.util.Map; +import java.util.Map.Entry; +import java.util.Set; + +import com.google.common.collect.Lists; +import com.google.common.collect.Maps; +import com.google.common.collect.Sets; + +public class FuzzyValueCombination { + + private static class Dim<K, V> { + K col; + Set<V> values; + } + + private static final Set SINGLE_NULL_SET = Sets.newHashSet(); + + static { + SINGLE_NULL_SET.add(null); + } + + public static <K, V> List<Map<K, V>> calculate(Map<K, Set<V>> fuzzyValues, long cap) { + Collections.emptyMap(); + Dim<K, V>[] dims = toDims(fuzzyValues); + // If a query has many IN clause and each IN clause has many values, then it will easily generate + // thousands of fuzzy keys. When there are lots of fuzzy keys, the scan performance is bottle necked + // on it. So simply choose to abandon all fuzzy keys in this case. + if (exceedCap(dims, cap)) { + return Lists.newArrayList(); + } else { + return combination(dims); + } + } + + @SuppressWarnings("unchecked") + private static <K, V> List<Map<K, V>> combination(Dim<K, V>[] dims) { + + List<Map<K, V>> result = Lists.newArrayList(); + + int emptyDims = 0; + for (Dim dim : dims) { + if (dim.values.isEmpty()) { + dim.values = SINGLE_NULL_SET; + emptyDims++; + } + } + if (emptyDims == dims.length) { + return result; + } + + Map<K, V> r = Maps.newHashMap(); + Iterator<V>[] iters = new Iterator[dims.length]; + int level = 0; + while (true) { + Dim<K, V> dim = dims[level]; + if (iters[level] == null) { + iters[level] = dim.values.iterator(); + } + + Iterator<V> it = iters[level]; + if (it.hasNext() == false) { + if (level == 0) + break; + r.remove(dim.col); + iters[level] = null; + level--; + continue; + } + + r.put(dim.col, it.next()); + if (level == dims.length - 1) { + result.add(new HashMap<K, V>(r)); + } else { + level++; + } + } + return result; + } + + private static <K, V> Dim<K, V>[] toDims(Map<K, Set<V>> fuzzyValues) { + Dim[] dims = new Dim[fuzzyValues.size()]; + int i = 0; + for (Entry<K, Set<V>> entry : fuzzyValues.entrySet()) { + dims[i] = new Dim<K, V>(); + dims[i].col = entry.getKey(); + dims[i].values = entry.getValue(); + if (dims[i].values == null) + dims[i].values = Collections.emptySet(); + i++; + } + return dims; + } + + private static boolean exceedCap(Dim[] dims, long cap) { + return combCount(dims) > cap; + } + + private static long combCount(Dim[] dims) { + long count = 1; + for (Dim dim : dims) { + count *= Math.max(dim.values.size(), 1); + } + return count; + } + +} http://git-wip-us.apache.org/repos/asf/incubator-kylin/blob/d8372747/core-cube/src/main/java/org/apache/kylin/cube/common/RowKeySplitter.java ---------------------------------------------------------------------- diff --git a/core-cube/src/main/java/org/apache/kylin/cube/common/RowKeySplitter.java b/core-cube/src/main/java/org/apache/kylin/cube/common/RowKeySplitter.java index 7e379dd..0111cee 100644 --- a/core-cube/src/main/java/org/apache/kylin/cube/common/RowKeySplitter.java +++ b/core-cube/src/main/java/org/apache/kylin/cube/common/RowKeySplitter.java @@ -27,10 +27,6 @@ import org.apache.kylin.cube.kv.RowKeyColumnIO; import org.apache.kylin.cube.model.CubeDesc; import org.apache.kylin.metadata.model.TblColRef; -/** - * @author George Song (ysong1) - * - */ public class RowKeySplitter { private CubeDesc cubeDesc; @@ -39,6 +35,9 @@ public class RowKeySplitter { private SplittedBytes[] splitBuffers; private int bufferSize; + private long lastSplittedCuboidId; + private short lastSplittedShard; + public SplittedBytes[] getSplitBuffers() { return splitBuffers; } @@ -47,6 +46,14 @@ public class RowKeySplitter { return bufferSize; } + public long getLastSplittedCuboidId() { + return lastSplittedCuboidId; + } + + public short getLastSplittedShard() { + return lastSplittedShard; + } + public RowKeySplitter(CubeSegment cubeSeg, int splitLen, int bytesLen) { this.cubeDesc = cubeSeg.getCubeDesc(); this.colIO = new RowKeyColumnIO(cubeSeg); @@ -60,21 +67,27 @@ public class RowKeySplitter { /** * @param bytes - * @param byteLen * @return cuboid ID */ - public long split(byte[] bytes, int byteLen) { + public long split(byte[] bytes) { this.bufferSize = 0; int offset = 0; + // extract shard + SplittedBytes shardSplit = this.splitBuffers[this.bufferSize++]; + shardSplit.length = RowConstants.ROWKEY_SHARDID_LEN; + System.arraycopy(bytes, offset, shardSplit.value, 0, RowConstants.ROWKEY_SHARDID_LEN); + offset += RowConstants.ROWKEY_SHARDID_LEN; + // extract cuboid id SplittedBytes cuboidIdSplit = this.splitBuffers[this.bufferSize++]; cuboidIdSplit.length = RowConstants.ROWKEY_CUBOIDID_LEN; System.arraycopy(bytes, offset, cuboidIdSplit.value, 0, RowConstants.ROWKEY_CUBOIDID_LEN); offset += RowConstants.ROWKEY_CUBOIDID_LEN; - long cuboidId = Bytes.toLong(cuboidIdSplit.value, 0, cuboidIdSplit.length); - Cuboid cuboid = Cuboid.findById(cubeDesc, cuboidId); + lastSplittedCuboidId = Bytes.toLong(cuboidIdSplit.value, 0, cuboidIdSplit.length); + lastSplittedShard = Bytes.toShort(shardSplit.value, 0, shardSplit.length); + Cuboid cuboid = Cuboid.findById(cubeDesc, lastSplittedCuboidId); // rowkey columns for (int i = 0; i < cuboid.getColumns().size(); i++) { @@ -86,6 +99,6 @@ public class RowKeySplitter { offset += colLength; } - return cuboidId; + return lastSplittedCuboidId; } } http://git-wip-us.apache.org/repos/asf/incubator-kylin/blob/d8372747/core-cube/src/main/java/org/apache/kylin/cube/cuboid/Cuboid.java ---------------------------------------------------------------------- diff --git a/core-cube/src/main/java/org/apache/kylin/cube/cuboid/Cuboid.java b/core-cube/src/main/java/org/apache/kylin/cube/cuboid/Cuboid.java index a7b2de4..9ee2315 100644 --- a/core-cube/src/main/java/org/apache/kylin/cube/cuboid/Cuboid.java +++ b/core-cube/src/main/java/org/apache/kylin/cube/cuboid/Cuboid.java @@ -28,6 +28,7 @@ import java.util.Queue; import java.util.concurrent.ConcurrentHashMap; import org.apache.kylin.common.util.Bytes; +import org.apache.kylin.common.util.ShardingHash; import org.apache.kylin.cube.gridtable.CuboidToGridTableMapping; import org.apache.kylin.cube.model.CubeDesc; import org.apache.kylin.cube.model.RowKeyColDesc; @@ -36,9 +37,6 @@ import org.apache.kylin.cube.model.RowKeyDesc.AggrGroupMask; import org.apache.kylin.cube.model.RowKeyDesc.HierarchyMask; import org.apache.kylin.metadata.model.TblColRef; -/** - * @author George Song (ysong1) - */ public class Cuboid implements Comparable<Cuboid> { private final static Map<String, Map<Long, Cuboid>> CUBOID_CACHE = new ConcurrentHashMap<String, Map<Long, Cuboid>>(); http://git-wip-us.apache.org/repos/asf/incubator-kylin/blob/d8372747/core-cube/src/main/java/org/apache/kylin/cube/gridtable/CubeCodeSystem.java ---------------------------------------------------------------------- diff --git a/core-cube/src/main/java/org/apache/kylin/cube/gridtable/CubeCodeSystem.java b/core-cube/src/main/java/org/apache/kylin/cube/gridtable/CubeCodeSystem.java index e52a6e1..99258e9 100644 --- a/core-cube/src/main/java/org/apache/kylin/cube/gridtable/CubeCodeSystem.java +++ b/core-cube/src/main/java/org/apache/kylin/cube/gridtable/CubeCodeSystem.java @@ -18,17 +18,15 @@ import org.apache.kylin.gridtable.IGTComparator; import org.apache.kylin.metadata.measure.MeasureAggregator; import org.apache.kylin.metadata.measure.serializer.DataTypeSerializer; import org.apache.kylin.metadata.measure.serializer.StringSerializer; -import org.slf4j.Logger; -import org.slf4j.LoggerFactory; /** - * Created by shaoshi on 3/23/15. - * This implementation uses Dictionary to encode and decode the table; If a column doesn't have dictionary, will check - * its data type to serialize/deserialize it; + * defines how column values will be encoded to/ decoded from GTRecord + * + * Cube meta can provide which columns are dictionary encoded (dict encoded dimensions) or fixed length encoded (fixed length dimensions) + * Metrics columns are more flexible, they will use DataTypeSerializer according to their data type. */ @SuppressWarnings({ "rawtypes", "unchecked" }) public class CubeCodeSystem implements IGTCodeSystem { - private static final Logger logger = LoggerFactory.getLogger(CubeCodeSystem.class); // ============================================================================ @@ -113,7 +111,7 @@ public class CubeCodeSystem implements IGTCodeSystem { if (serializer instanceof DictionarySerializer) { ((DictionarySerializer) serializer).serializeWithRounding(value, roundingFlag, buf); } else { - if ((!(serializer instanceof StringSerializer || serializer instanceof FixLenSerializer)) && (value instanceof String)) { + if ((value instanceof String) && (!(serializer instanceof StringSerializer || serializer instanceof FixLenSerializer))) { value = serializer.valueOf((String) value); } serializer.serialize(value, buf); http://git-wip-us.apache.org/repos/asf/incubator-kylin/blob/d8372747/core-cube/src/main/java/org/apache/kylin/cube/gridtable/TrimmedCubeCodeSystem.java ---------------------------------------------------------------------- diff --git a/core-cube/src/main/java/org/apache/kylin/cube/gridtable/TrimmedCubeCodeSystem.java b/core-cube/src/main/java/org/apache/kylin/cube/gridtable/TrimmedCubeCodeSystem.java index e662a82..e4f32fb 100644 --- a/core-cube/src/main/java/org/apache/kylin/cube/gridtable/TrimmedCubeCodeSystem.java +++ b/core-cube/src/main/java/org/apache/kylin/cube/gridtable/TrimmedCubeCodeSystem.java @@ -74,15 +74,12 @@ public class TrimmedCubeCodeSystem implements IGTCodeSystem { @Override public void encodeColumnValue(int col, Object value, int roundingFlag, ByteBuffer buf) { DataTypeSerializer serializer = serializers[col]; - if (serializer instanceof CubeCodeSystem.TrimmedDictionarySerializer || serializer instanceof CubeCodeSystem.DictionarySerializer) { - //TODO: remove this check - throw new IllegalStateException("Encode dictionary value in coprocessor"); - } else { - if ((!(serializer instanceof StringSerializer || serializer instanceof CubeCodeSystem.FixLenSerializer)) && (value instanceof String)) { - value = serializer.valueOf((String) value); - } - serializer.serialize(value, buf); - } + +// if (((value instanceof String) && !(serializer instanceof StringSerializer || serializer instanceof CubeCodeSystem.FixLenSerializer))) { +// value = serializer.valueOf((String) value); +// } + + serializer.serialize(value, buf); } @Override http://git-wip-us.apache.org/repos/asf/incubator-kylin/blob/d8372747/core-cube/src/main/java/org/apache/kylin/cube/inmemcubing/ConcurrentDiskStore.java ---------------------------------------------------------------------- diff --git a/core-cube/src/main/java/org/apache/kylin/cube/inmemcubing/ConcurrentDiskStore.java b/core-cube/src/main/java/org/apache/kylin/cube/inmemcubing/ConcurrentDiskStore.java index ebff9c8..8b95b4f 100644 --- a/core-cube/src/main/java/org/apache/kylin/cube/inmemcubing/ConcurrentDiskStore.java +++ b/core-cube/src/main/java/org/apache/kylin/cube/inmemcubing/ConcurrentDiskStore.java @@ -92,12 +92,12 @@ public class ConcurrentDiskStore implements IGTStore, Closeable { } @Override - public IGTWriter rebuild(int shard) throws IOException { + public IGTWriter rebuild() throws IOException { return newWriter(0); } @Override - public IGTWriter append(int shard) throws IOException { + public IGTWriter append() throws IOException { return newWriter(diskFile.length()); } http://git-wip-us.apache.org/repos/asf/incubator-kylin/blob/d8372747/core-cube/src/main/java/org/apache/kylin/cube/inmemcubing/MemDiskStore.java ---------------------------------------------------------------------- diff --git a/core-cube/src/main/java/org/apache/kylin/cube/inmemcubing/MemDiskStore.java b/core-cube/src/main/java/org/apache/kylin/cube/inmemcubing/MemDiskStore.java index 2a12d1b..166ae76 100644 --- a/core-cube/src/main/java/org/apache/kylin/cube/inmemcubing/MemDiskStore.java +++ b/core-cube/src/main/java/org/apache/kylin/cube/inmemcubing/MemDiskStore.java @@ -88,12 +88,12 @@ public class MemDiskStore implements IGTStore, Closeable { } @Override - public IGTWriter rebuild(int shard) throws IOException { + public IGTWriter rebuild() throws IOException { return newWriter(0); } @Override - public IGTWriter append(int shard) throws IOException { + public IGTWriter append() throws IOException { return newWriter(length()); } http://git-wip-us.apache.org/repos/asf/incubator-kylin/blob/d8372747/core-cube/src/main/java/org/apache/kylin/cube/kv/AbstractRowKeyEncoder.java ---------------------------------------------------------------------- diff --git a/core-cube/src/main/java/org/apache/kylin/cube/kv/AbstractRowKeyEncoder.java b/core-cube/src/main/java/org/apache/kylin/cube/kv/AbstractRowKeyEncoder.java index f566f5c..1e24432 100644 --- a/core-cube/src/main/java/org/apache/kylin/cube/kv/AbstractRowKeyEncoder.java +++ b/core-cube/src/main/java/org/apache/kylin/cube/kv/AbstractRowKeyEncoder.java @@ -62,6 +62,7 @@ public abstract class AbstractRowKeyEncoder { protected final Cuboid cuboid; protected byte blankByte = DEFAULT_BLANK_BYTE; + protected boolean encodeShard = true; protected AbstractRowKeyEncoder(Cuboid cuboid) { this.cuboid = cuboid; @@ -71,6 +72,10 @@ public abstract class AbstractRowKeyEncoder { this.blankByte = blankByte; } + public void setEncodeShard(boolean encodeShard) { + this.encodeShard = encodeShard; + } + abstract public byte[] encode(Map<TblColRef, String> valueMap); abstract public byte[] encode(byte[][] values); http://git-wip-us.apache.org/repos/asf/incubator-kylin/blob/d8372747/core-cube/src/main/java/org/apache/kylin/cube/kv/FuzzyKeyEncoder.java ---------------------------------------------------------------------- diff --git a/core-cube/src/main/java/org/apache/kylin/cube/kv/FuzzyKeyEncoder.java b/core-cube/src/main/java/org/apache/kylin/cube/kv/FuzzyKeyEncoder.java index a17bb28..2185bc5 100644 --- a/core-cube/src/main/java/org/apache/kylin/cube/kv/FuzzyKeyEncoder.java +++ b/core-cube/src/main/java/org/apache/kylin/cube/kv/FuzzyKeyEncoder.java @@ -37,7 +37,7 @@ public class FuzzyKeyEncoder extends RowKeyEncoder { @Override protected byte[] defaultValue(int length) { byte[] keyBytes = new byte[length]; - Arrays.fill(keyBytes, RowConstants.FUZZY_MASK_ZERO); + Arrays.fill(keyBytes, RowConstants.BYTE_ZERO); return keyBytes; } } http://git-wip-us.apache.org/repos/asf/incubator-kylin/blob/d8372747/core-cube/src/main/java/org/apache/kylin/cube/kv/FuzzyMaskEncoder.java ---------------------------------------------------------------------- diff --git a/core-cube/src/main/java/org/apache/kylin/cube/kv/FuzzyMaskEncoder.java b/core-cube/src/main/java/org/apache/kylin/cube/kv/FuzzyMaskEncoder.java index 5077287..bf67538 100644 --- a/core-cube/src/main/java/org/apache/kylin/cube/kv/FuzzyMaskEncoder.java +++ b/core-cube/src/main/java/org/apache/kylin/cube/kv/FuzzyMaskEncoder.java @@ -36,20 +36,19 @@ public class FuzzyMaskEncoder extends RowKeyEncoder { } @Override - protected int fillHeader(byte[] bytes, byte[][] values) { + protected int fillHeader(byte[] bytes) { + Arrays.fill(bytes, 0, RowConstants.ROWKEY_SHARDID_LEN, RowConstants.BYTE_ONE); // always fuzzy match cuboid ID to lock on the selected cuboid - int cuboidStart = this.headerLength - RowConstants.ROWKEY_CUBOIDID_LEN; - Arrays.fill(bytes, 0, cuboidStart, RowConstants.FUZZY_MASK_ONE); - Arrays.fill(bytes, cuboidStart, this.headerLength, RowConstants.FUZZY_MASK_ZERO); + Arrays.fill(bytes, RowConstants.ROWKEY_SHARDID_LEN, RowConstants.ROWKEY_HEADER_LEN, RowConstants.BYTE_ZERO); return this.headerLength; } @Override protected void fillColumnValue(TblColRef column, int columnLen, byte[] value, int valueLen, byte[] outputValue, int outputValueOffset) { if (value == null) { - Arrays.fill(outputValue, outputValueOffset, outputValueOffset + columnLen, RowConstants.FUZZY_MASK_ONE); + Arrays.fill(outputValue, outputValueOffset, outputValueOffset + columnLen, RowConstants.BYTE_ONE); } else { - Arrays.fill(outputValue, outputValueOffset, outputValueOffset + columnLen, RowConstants.FUZZY_MASK_ZERO); + Arrays.fill(outputValue, outputValueOffset, outputValueOffset + columnLen, RowConstants.BYTE_ZERO); } } } http://git-wip-us.apache.org/repos/asf/incubator-kylin/blob/d8372747/core-cube/src/main/java/org/apache/kylin/cube/kv/RowConstants.java ---------------------------------------------------------------------- diff --git a/core-cube/src/main/java/org/apache/kylin/cube/kv/RowConstants.java b/core-cube/src/main/java/org/apache/kylin/cube/kv/RowConstants.java index 7607edf..6a8eeb5 100644 --- a/core-cube/src/main/java/org/apache/kylin/cube/kv/RowConstants.java +++ b/core-cube/src/main/java/org/apache/kylin/cube/kv/RowConstants.java @@ -26,18 +26,23 @@ public class RowConstants { public static final byte ROWKEY_LOWER_BYTE = 0; // row key upper bound public static final byte ROWKEY_UPPER_BYTE = (byte) 0xff; + // row key cuboid id length public static final int ROWKEY_CUBOIDID_LEN = 8; + // row key shard length + public static final int ROWKEY_SHARDID_LEN = 2; - // fuzzy mask - public static final byte FUZZY_MASK_ZERO = 0; - public static final byte FUZZY_MASK_ONE = 1; + public static final int ROWKEY_HEADER_LEN = ROWKEY_CUBOIDID_LEN + ROWKEY_SHARDID_LEN; + + public static final byte BYTE_ZERO = 0; + public static final byte BYTE_ONE = 1; // row value delimiter public static final byte ROWVALUE_DELIMITER_BYTE = 7; public static final String ROWVALUE_DELIMITER_STRING = String.valueOf((char) 7); public static final byte[] ROWVALUE_DELIMITER_BYTES = { 7 }; + public static final int ROWKEY_BUFFER_SIZE = 1024 * 1024; // 1 MB public static final int ROWVALUE_BUFFER_SIZE = 1024 * 1024; // 1 MB // marker class http://git-wip-us.apache.org/repos/asf/incubator-kylin/blob/d8372747/core-cube/src/main/java/org/apache/kylin/cube/kv/RowKeyDecoder.java ---------------------------------------------------------------------- diff --git a/core-cube/src/main/java/org/apache/kylin/cube/kv/RowKeyDecoder.java b/core-cube/src/main/java/org/apache/kylin/cube/kv/RowKeyDecoder.java index 1b896a0..3506845 100644 --- a/core-cube/src/main/java/org/apache/kylin/cube/kv/RowKeyDecoder.java +++ b/core-cube/src/main/java/org/apache/kylin/cube/kv/RowKeyDecoder.java @@ -53,12 +53,12 @@ public class RowKeyDecoder { public long decode(byte[] bytes) throws IOException { this.values.clear(); - long cuboidId = rowKeySplitter.split(bytes, bytes.length); + long cuboidId = rowKeySplitter.split(bytes); initCuboid(cuboidId); SplittedBytes[] splits = rowKeySplitter.getSplitBuffers(); - int offset = 1; // skip cuboid id part + int offset = 2; // skip shard and cuboid id part for (int i = 0; i < this.cuboid.getColumns().size(); i++) { TblColRef col = this.cuboid.getColumns().get(i); http://git-wip-us.apache.org/repos/asf/incubator-kylin/blob/d8372747/core-cube/src/main/java/org/apache/kylin/cube/kv/RowKeyEncoder.java ---------------------------------------------------------------------- diff --git a/core-cube/src/main/java/org/apache/kylin/cube/kv/RowKeyEncoder.java b/core-cube/src/main/java/org/apache/kylin/cube/kv/RowKeyEncoder.java index 7f8bbd3..0676df6 100644 --- a/core-cube/src/main/java/org/apache/kylin/cube/kv/RowKeyEncoder.java +++ b/core-cube/src/main/java/org/apache/kylin/cube/kv/RowKeyEncoder.java @@ -24,56 +24,33 @@ import java.util.List; import java.util.Map; import org.apache.kylin.common.util.Bytes; +import org.apache.kylin.common.util.BytesUtil; +import org.apache.kylin.common.util.ShardingHash; import org.apache.kylin.cube.CubeSegment; import org.apache.kylin.cube.cuboid.Cuboid; import org.apache.kylin.metadata.model.TblColRef; -/** - * @author George Song (ysong1) - */ public class RowKeyEncoder extends AbstractRowKeyEncoder { private int bytesLength; protected int headerLength; private RowKeyColumnIO colIO; + CubeSegment cubeSeg; protected RowKeyEncoder(CubeSegment cubeSeg, Cuboid cuboid) { super(cuboid); + this.cubeSeg = cubeSeg; colIO = new RowKeyColumnIO(cubeSeg); - bytesLength = headerLength = RowConstants.ROWKEY_CUBOIDID_LEN; // header + bytesLength = headerLength = RowConstants.ROWKEY_HEADER_LEN; // include shard and cuboidid for (TblColRef column : cuboid.getColumns()) { bytesLength += colIO.getColumnLength(column); } } - public RowKeyColumnIO getColumnIO() { - return colIO; - } - - public int getColumnOffset(TblColRef col) { - int offset = RowConstants.ROWKEY_CUBOIDID_LEN; - - for (TblColRef dimCol : cuboid.getColumns()) { - if (col.equals(dimCol)) - return offset; - offset += colIO.getColumnLength(dimCol); - } - - throw new IllegalArgumentException("Column " + col + " not found on cuboid " + cuboid); - } - public int getColumnLength(TblColRef col) { return colIO.getColumnLength(col); } - public int getRowKeyLength() { - return bytesLength; - } - - public int getHeaderLength() { - return headerLength; - } - @Override public byte[] encode(Map<TblColRef, String> valueMap) { List<byte[]> valueList = new ArrayList<byte[]>(); @@ -95,7 +72,8 @@ public class RowKeyEncoder extends AbstractRowKeyEncoder { @Override public byte[] encode(byte[][] values) { byte[] bytes = new byte[this.bytesLength]; - int offset = fillHeader(bytes, values); + int bodyOffset = RowConstants.ROWKEY_HEADER_LEN; + int offset = bodyOffset; for (int i = 0; i < cuboid.getColumns().size(); i++) { TblColRef column = cuboid.getColumns().get(i); @@ -107,18 +85,34 @@ public class RowKeyEncoder extends AbstractRowKeyEncoder { fillColumnValue(column, colLength, value, value.length, bytes, offset); } offset += colLength; - } + + //fill shard and cuboid + fillHeader(bytes); + return bytes; } - protected int fillHeader(byte[] bytes, byte[][] values) { + protected int fillHeader(byte[] bytes) { int offset = 0; + + if (encodeShard) { + short cuboidShardNum = cubeSeg.getCuboidShardNum(cuboid.getId()); + short shardOffset = ShardingHash.getShard(bytes, RowConstants.ROWKEY_HEADER_LEN, bytes.length - RowConstants.ROWKEY_HEADER_LEN, cuboidShardNum); + short finalShard = ShardingHash.normalize(cubeSeg.getCuboidBaseShard(cuboid.getId()), shardOffset, cubeSeg.getTotalShards()); + BytesUtil.writeShort(finalShard, bytes, offset, RowConstants.ROWKEY_SHARDID_LEN); + } else { + BytesUtil.writeShort((short) 0, bytes, offset, RowConstants.ROWKEY_SHARDID_LEN); + } + offset += RowConstants.ROWKEY_SHARDID_LEN; + System.arraycopy(cuboid.getBytes(), 0, bytes, offset, RowConstants.ROWKEY_CUBOIDID_LEN); offset += RowConstants.ROWKEY_CUBOIDID_LEN; + if (this.headerLength != offset) { throw new IllegalStateException("Expected header length is " + headerLength + ". But the offset is " + offset); } + return offset; } http://git-wip-us.apache.org/repos/asf/incubator-kylin/blob/d8372747/core-cube/src/main/java/org/apache/kylin/gridtable/GTBuilder.java ---------------------------------------------------------------------- diff --git a/core-cube/src/main/java/org/apache/kylin/gridtable/GTBuilder.java b/core-cube/src/main/java/org/apache/kylin/gridtable/GTBuilder.java index 31ea9e2..5eefa54 100644 --- a/core-cube/src/main/java/org/apache/kylin/gridtable/GTBuilder.java +++ b/core-cube/src/main/java/org/apache/kylin/gridtable/GTBuilder.java @@ -19,9 +19,9 @@ public class GTBuilder implements Closeable { this.info = info; if (append) { - storeWriter = store.append(shard); + storeWriter = store.append(); } else { - storeWriter = store.rebuild(shard); + storeWriter = store.rebuild(); } } http://git-wip-us.apache.org/repos/asf/incubator-kylin/blob/d8372747/core-cube/src/main/java/org/apache/kylin/gridtable/GTInfo.java ---------------------------------------------------------------------- diff --git a/core-cube/src/main/java/org/apache/kylin/gridtable/GTInfo.java b/core-cube/src/main/java/org/apache/kylin/gridtable/GTInfo.java index d4fe3fb..e3d3640 100644 --- a/core-cube/src/main/java/org/apache/kylin/gridtable/GTInfo.java +++ b/core-cube/src/main/java/org/apache/kylin/gridtable/GTInfo.java @@ -34,13 +34,11 @@ public class GTInfo { ImmutableBitSet colBlocksAll; int rowBlockSize; // 0: disable row block - // sharding - int nShards; // 0: no sharding - // must create from builder private GTInfo() { } + public String getTableName() { return tableName; } @@ -56,15 +54,11 @@ public class GTInfo { public ImmutableBitSet getPrimaryKey() { return primaryKey; } - + public ImmutableBitSet getAllColumns() { return colAll; } - public boolean isShardingEnabled() { - return nShards > 0; - } - public boolean isRowBlockEnabled() { return rowBlockSize > 0; } @@ -119,7 +113,7 @@ public class GTInfo { public void validateColRef(TblColRef ref) { TblColRef expected = colRef(ref.getColumnDesc().getZeroBasedIndex()); - if (expected.equals(ref) == false) + if (!expected.equals(ref)) throw new IllegalArgumentException(); } @@ -162,11 +156,11 @@ public class GTInfo { for (int i = 0; i < colBlocks.length; i++) { merge = merge.or(colBlocks[i]); } - if (merge.equals(colAll) == false) + if (!merge.equals(colAll)) throw new IllegalStateException(); // primary key must be the first column block - if (primaryKey.equals(colBlocks[0]) == false) + if (!primaryKey.equals(colBlocks[0])) throw new IllegalStateException(); // drop empty column block @@ -177,7 +171,7 @@ public class GTInfo { if (cb.isEmpty()) it.remove(); } - colBlocks = (ImmutableBitSet[]) list.toArray(new ImmutableBitSet[list.size()]); + colBlocks = list.toArray(new ImmutableBitSet[list.size()]); } public static class Builder { @@ -228,12 +222,6 @@ public class GTInfo { } /** optional */ - public Builder enableSharding(int nShards) { - info.nShards = nShards; - return this; - } - - /** optional */ public Builder setColumnPreferIndex(ImmutableBitSet colPreferIndex) { info.colPreferIndex = colPreferIndex; return this; @@ -256,8 +244,12 @@ public class GTInfo { return KryoUtils.serialize(info); } } - + public static GTInfo deserialize(byte[] bytes) { return KryoUtils.deserialize(bytes, GTInfo.class); } + + public IGTCodeSystem getCodeSystem() { + return codeSystem; + } } http://git-wip-us.apache.org/repos/asf/incubator-kylin/blob/d8372747/core-cube/src/main/java/org/apache/kylin/gridtable/GTRecord.java ---------------------------------------------------------------------- diff --git a/core-cube/src/main/java/org/apache/kylin/gridtable/GTRecord.java b/core-cube/src/main/java/org/apache/kylin/gridtable/GTRecord.java index dbfdf57..0f4eb3d 100644 --- a/core-cube/src/main/java/org/apache/kylin/gridtable/GTRecord.java +++ b/core-cube/src/main/java/org/apache/kylin/gridtable/GTRecord.java @@ -2,7 +2,7 @@ package org.apache.kylin.gridtable; import java.nio.ByteBuffer; import java.util.Arrays; -import java.util.BitSet; +import java.util.List; import org.apache.kylin.common.util.ByteArray; import org.apache.kylin.common.util.ImmutableBitSet; @@ -159,7 +159,7 @@ public class GTRecord implements Comparable<GTRecord> { return false; for (int i = 0; i < maskForEqualHashComp.trueBitCount(); i++) { int c = maskForEqualHashComp.trueBitAt(i); - if (this.cols[c].equals(o.cols[c]) == false) { + if (!this.cols[c].equals(o.cols[c])) { return false; } } @@ -228,19 +228,6 @@ public class GTRecord implements Comparable<GTRecord> { buf.setLength(pos); } - /** write data to given buffer, like serialize, UNLIKE other export this will put a prefix indicating null or not*/ - public void exportAllColumns(ByteBuffer buf) { - for (int i = 0; i < info.colAll.trueBitCount(); i++) { - int c = info.colAll.trueBitAt(i); - if (cols[c] == null || cols[c].array() == null) { - buf.put((byte) 0); - } else { - buf.put((byte) 1); - buf.put(cols[c].array(), cols[c].offset(), cols[c].length()); - } - } - } - /** write data to given buffer, like serialize */ public void exportColumns(ImmutableBitSet selectedCols, ByteBuffer buf) { for (int i = 0; i < selectedCols.trueBitCount(); i++) { @@ -261,34 +248,11 @@ public class GTRecord implements Comparable<GTRecord> { } /** change pointers to point to data in given buffer, UNLIKE deserialize */ - public void loadPrimaryKey(ByteBuffer buf) { - loadColumns(info.primaryKey, buf); - } - - /** change pointers to point to data in given buffer, UNLIKE deserialize */ public void loadCellBlock(int c, ByteBuffer buf) { loadColumns(info.colBlocks[c], buf); } /** change pointers to point to data in given buffer, UNLIKE deserialize */ - public void loadAllColumns(ByteBuffer buf) { - int pos = buf.position(); - for (int i = 0; i < info.colAll.trueBitCount(); i++) { - int c = info.colAll.trueBitAt(i); - - byte exist = buf.get(); - pos++; - - if (exist == 1) { - int len = info.codeSystem.codeLength(c, buf); - cols[c].set(buf.array(), buf.arrayOffset() + pos, len); - pos += len; - buf.position(pos); - } - } - } - - /** change pointers to point to data in given buffer, UNLIKE deserialize */ public void loadColumns(ImmutableBitSet selectedCols, ByteBuffer buf) { int pos = buf.position(); for (int i = 0; i < selectedCols.trueBitCount(); i++) { @@ -300,30 +264,19 @@ public class GTRecord implements Comparable<GTRecord> { } } - /** similar to export(primaryKey) but will stop at the first null value */ - public static ByteArray exportScanKey(GTRecord rec) { - if (rec == null) - return null; - - GTInfo info = rec.getInfo(); - - BitSet selectedColumns = new BitSet(); - int len = 0; - for (int i = 0; i < info.primaryKey.trueBitCount(); i++) { - int c = info.primaryKey.trueBitAt(i); - if (rec.cols[c].array() == null) { - break; - } - selectedColumns.set(c); - len += rec.cols[c].length(); + /** change pointers to point to data in given buffer, UNLIKE deserialize + * unlike loadColumns(ImmutableBitSet selectedCols, ByteBuffer buf), this + * method allows to defined specific columns(in order) to load + */ + public void loadColumns(List<Integer> selectedCols, ByteBuffer buf) { + int pos = buf.position(); + for (int i = 0; i < selectedCols.size(); i++) { + int c = selectedCols.get(i); + int len = info.codeSystem.codeLength(c, buf); + cols[c].set(buf.array(), buf.arrayOffset() + pos, len); + pos += len; + buf.position(pos); } - - if (selectedColumns.cardinality() == 0) - return null; - - ByteArray buf = ByteArray.allocate(len); - rec.exportColumns(new ImmutableBitSet(selectedColumns), buf); - return buf; } } http://git-wip-us.apache.org/repos/asf/incubator-kylin/blob/d8372747/core-cube/src/main/java/org/apache/kylin/gridtable/GTScanRange.java ---------------------------------------------------------------------- diff --git a/core-cube/src/main/java/org/apache/kylin/gridtable/GTScanRange.java b/core-cube/src/main/java/org/apache/kylin/gridtable/GTScanRange.java index 197fde4..eefe88e 100644 --- a/core-cube/src/main/java/org/apache/kylin/gridtable/GTScanRange.java +++ b/core-cube/src/main/java/org/apache/kylin/gridtable/GTScanRange.java @@ -7,42 +7,26 @@ public class GTScanRange { final public GTRecord pkStart; // inclusive, record must not be null, col[pk].array() can be null to mean unbounded final public GTRecord pkEnd; // inclusive, record must not be null, col[pk].array() can be null to mean unbounded - final public List<GTRecord> hbaseFuzzyKeys; // partial matching primary keys + final public List<GTRecord> fuzzyKeys; // partial matching primary keys public GTScanRange(GTRecord pkStart, GTRecord pkEnd) { this(pkStart, pkEnd, null); } - public GTScanRange(GTRecord pkStart, GTRecord pkEnd, List<GTRecord> hbaseFuzzyKeys) { + public GTScanRange(GTRecord pkStart, GTRecord pkEnd, List<GTRecord> fuzzyKeys) { GTInfo info = pkStart.info; assert info == pkEnd.info; - validateRangeKey(pkStart); - validateRangeKey(pkEnd); - this.pkStart = pkStart; this.pkEnd = pkEnd; - this.hbaseFuzzyKeys = hbaseFuzzyKeys == null ? Collections.<GTRecord> emptyList() : hbaseFuzzyKeys; - } - - private void validateRangeKey(GTRecord pk) { - pk.maskForEqualHashComp(pk.info.primaryKey); - boolean afterNull = false; - for (int i = 0; i < pk.info.primaryKey.trueBitCount(); i++) { - int c = pk.info.primaryKey.trueBitAt(i); - if (afterNull) { - pk.cols[c].set(null, 0, 0); - } else { - afterNull = pk.cols[c].array() == null; - } - } + this.fuzzyKeys = fuzzyKeys == null ? Collections.<GTRecord> emptyList() : fuzzyKeys; } @Override public int hashCode() { final int prime = 31; int result = 1; - result = prime * result + ((hbaseFuzzyKeys == null) ? 0 : hbaseFuzzyKeys.hashCode()); + result = prime * result + ((fuzzyKeys == null) ? 0 : fuzzyKeys.hashCode()); result = prime * result + ((pkEnd == null) ? 0 : pkEnd.hashCode()); result = prime * result + ((pkStart == null) ? 0 : pkStart.hashCode()); return result; @@ -57,10 +41,10 @@ public class GTScanRange { if (getClass() != obj.getClass()) return false; GTScanRange other = (GTScanRange) obj; - if (hbaseFuzzyKeys == null) { - if (other.hbaseFuzzyKeys != null) + if (fuzzyKeys == null) { + if (other.fuzzyKeys != null) return false; - } else if (!hbaseFuzzyKeys.equals(other.hbaseFuzzyKeys)) + } else if (!fuzzyKeys.equals(other.fuzzyKeys)) return false; if (pkEnd == null) { if (other.pkEnd != null) http://git-wip-us.apache.org/repos/asf/incubator-kylin/blob/d8372747/core-cube/src/main/java/org/apache/kylin/gridtable/GTScanRangePlanner.java ---------------------------------------------------------------------- diff --git a/core-cube/src/main/java/org/apache/kylin/gridtable/GTScanRangePlanner.java b/core-cube/src/main/java/org/apache/kylin/gridtable/GTScanRangePlanner.java index c09ecf0..d860090 100644 --- a/core-cube/src/main/java/org/apache/kylin/gridtable/GTScanRangePlanner.java +++ b/core-cube/src/main/java/org/apache/kylin/gridtable/GTScanRangePlanner.java @@ -1,6 +1,7 @@ package org.apache.kylin.gridtable; import java.util.ArrayList; +import java.util.BitSet; import java.util.Collection; import java.util.Collections; import java.util.Comparator; @@ -11,35 +12,50 @@ import java.util.List; import java.util.Map; import java.util.Set; +import org.apache.kylin.common.debug.BackdoorToggles; import org.apache.kylin.common.util.ByteArray; import org.apache.kylin.common.util.ImmutableBitSet; +import org.apache.kylin.common.util.Pair; +import org.apache.kylin.cube.common.FuzzyValueCombination; import org.apache.kylin.metadata.filter.CompareTupleFilter; import org.apache.kylin.metadata.filter.LogicalTupleFilter; import org.apache.kylin.metadata.filter.TupleFilter; import org.apache.kylin.metadata.filter.TupleFilter.FilterOperatorEnum; import org.apache.kylin.metadata.model.TblColRef; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; import com.google.common.collect.Lists; +import com.google.common.collect.Maps; import com.google.common.collect.Sets; public class GTScanRangePlanner { + private static final Logger logger = LoggerFactory.getLogger(GTScanRangePlanner.class); + private static final int MAX_HBASE_FUZZY_KEYS = 100; final private GTInfo info; - final private ComparatorEx<ByteArray> byteUnknownIsSmaller; - final private ComparatorEx<ByteArray> byteUnknownIsBigger; - final private ComparatorEx<GTRecord> recordUnknownIsSmaller; - final private ComparatorEx<GTRecord> recordUnknownIsBigger; + final private Pair<ByteArray, ByteArray> segmentStartAndEnd; + final private TblColRef partitionColRef; + + final private RecordComparator rangeStartComparator; + final private RecordComparator rangeEndComparator; + final private RecordComparator rangeStartEndComparator; - public GTScanRangePlanner(GTInfo info) { + public GTScanRangePlanner(GTInfo info, Pair<ByteArray, ByteArray> segmentStartAndEnd, TblColRef partitionColRef) { this.info = info; + this.segmentStartAndEnd = segmentStartAndEnd; + this.partitionColRef = partitionColRef; IGTComparator comp = info.codeSystem.getComparator(); - this.byteUnknownIsSmaller = byteComparatorTreatsUnknownSmaller(comp); - this.byteUnknownIsBigger = byteComparatorTreatsUnknownBigger(comp); - this.recordUnknownIsSmaller = recordComparatorTreatsUnknownSmaller(comp); - this.recordUnknownIsBigger = recordComparatorTreatsUnknownBigger(comp); + + //start key GTRecord compare to start key GTRecord + this.rangeStartComparator = getRangeStartComparator(comp); + //stop key GTRecord compare to stop key GTRecord + this.rangeEndComparator = getRangeEndComparator(comp); + //start key GTRecord compare to stop key GTRecord + this.rangeStartEndComparator = getRangeStartEndComparator(comp); } // return empty list meaning filter is always false @@ -57,7 +73,8 @@ public class GTScanRangePlanner { List<GTScanRange> scanRanges = Lists.newArrayListWithCapacity(orAndDimRanges.size()); for (Collection<ColumnRange> andDimRanges : orAndDimRanges) { GTScanRange scanRange = newScanRange(andDimRanges); - scanRanges.add(scanRange); + if (scanRange != null) + scanRanges.add(scanRange); } List<GTScanRange> mergedRanges = mergeOverlapRanges(scanRanges); @@ -69,28 +86,64 @@ public class GTScanRangePlanner { private GTScanRange newScanRange(Collection<ColumnRange> andDimRanges) { GTRecord pkStart = new GTRecord(info); GTRecord pkEnd = new GTRecord(info); - List<GTRecord> hbaseFuzzyKeys = Lists.newArrayList(); + Map<Integer, Set<ByteArray>> fuzzyValues = Maps.newHashMap(); + + List<GTRecord> fuzzyKeys; for (ColumnRange range : andDimRanges) { + + if (partitionColRef != null && range.column.equals(partitionColRef)) { + + if (rangeStartEndComparator.comparator.compare(segmentStartAndEnd.getFirst(), range.end) <= 0 // + && rangeStartEndComparator.comparator.compare(range.begin, segmentStartAndEnd.getSecond()) < 0) { + //segment range is [Closed,Open) + } else { + return null; + } + } + int col = range.column.getColumnDesc().getZeroBasedIndex(); - if (info.primaryKey.get(col) == false) + if (!info.primaryKey.get(col)) continue; pkStart.set(col, range.begin); pkEnd.set(col, range.end); - if (range.equals != null) { - ImmutableBitSet fuzzyMask = new ImmutableBitSet(col); - for (ByteArray v : range.equals) { - GTRecord fuzzy = new GTRecord(info); - fuzzy.set(col, v); - fuzzy.maskForEqualHashComp(fuzzyMask); - hbaseFuzzyKeys.add(fuzzy); - } + if (range.valueSet != null && !range.valueSet.isEmpty()) { + fuzzyValues.put(col, range.valueSet); } } - return new GTScanRange(pkStart, pkEnd, hbaseFuzzyKeys); + fuzzyKeys = buildFuzzyKeys(fuzzyValues); + + return new GTScanRange(pkStart, pkEnd, fuzzyKeys); + } + + private List<GTRecord> buildFuzzyKeys(Map<Integer, Set<ByteArray>> fuzzyValueSet) { + ArrayList<GTRecord> result = Lists.newArrayList(); + + if (fuzzyValueSet.isEmpty()) + return result; + + // debug/profiling purpose + if (BackdoorToggles.getDisableFuzzyKey()) { + logger.info("The execution of this query will not use fuzzy key"); + return result; + } + + List<Map<Integer, ByteArray>> fuzzyValueCombinations = FuzzyValueCombination.calculate(fuzzyValueSet, MAX_HBASE_FUZZY_KEYS); + + for (Map<Integer, ByteArray> fuzzyValue : fuzzyValueCombinations) { + GTRecord fuzzy = new GTRecord(info); + BitSet bitSet = new BitSet(info.getColumnCount()); + for (Map.Entry<Integer, ByteArray> entry : fuzzyValue.entrySet()) { + bitSet.set(entry.getKey()); + fuzzy.set(entry.getKey(), entry.getValue()); + } + fuzzy.maskForEqualHashComp(new ImmutableBitSet(bitSet)); + result.add(fuzzy); + } + return result; } private TupleFilter flattenToOrAndFilter(TupleFilter filter) { @@ -194,7 +247,7 @@ public class GTScanRangePlanner { Collections.sort(ranges, new Comparator<GTScanRange>() { @Override public int compare(GTScanRange a, GTScanRange b) { - return recordUnknownIsSmaller.compare(a.pkStart, b.pkStart); + return rangeStartComparator.compare(a.pkStart, b.pkStart); } }); @@ -202,13 +255,12 @@ public class GTScanRangePlanner { List<GTScanRange> mergedRanges = new ArrayList<GTScanRange>(); int mergeBeginIndex = 0; GTRecord mergeEnd = ranges.get(0).pkEnd; - for (int index = 0; index < ranges.size(); index++) { + for (int index = 1; index < ranges.size(); index++) { GTScanRange range = ranges.get(index); // if overlap, swallow it - if (recordUnknownIsSmaller.min(range.pkStart, mergeEnd) == range.pkStart // - || recordUnknownIsBigger.max(mergeEnd, range.pkStart) == mergeEnd) { - mergeEnd = recordUnknownIsBigger.max(mergeEnd, range.pkEnd); + if (rangeStartEndComparator.compare(range.pkStart, mergeEnd) <= 0) { + mergeEnd = rangeEndComparator.max(mergeEnd, range.pkEnd); continue; } @@ -218,7 +270,7 @@ public class GTScanRangePlanner { // start new split mergeBeginIndex = index; - mergeEnd = recordUnknownIsBigger.max(mergeEnd, range.pkEnd); + mergeEnd = range.pkEnd; } // don't miss the last range @@ -239,9 +291,9 @@ public class GTScanRangePlanner { boolean hasNonFuzzyRange = false; for (GTScanRange range : ranges) { - hasNonFuzzyRange = hasNonFuzzyRange || range.hbaseFuzzyKeys.isEmpty(); - newFuzzyKeys.addAll(range.hbaseFuzzyKeys); - end = recordUnknownIsBigger.max(end, range.pkEnd); + hasNonFuzzyRange = hasNonFuzzyRange || range.fuzzyKeys.isEmpty(); + newFuzzyKeys.addAll(range.fuzzyKeys); + end = rangeEndComparator.max(end, range.pkEnd); } // if any range is non-fuzzy, then all fuzzy keys must be cleared @@ -269,7 +321,7 @@ public class GTScanRangePlanner { private TblColRef column; private ByteArray begin = ByteArray.EMPTY; private ByteArray end = ByteArray.EMPTY; - private Set<ByteArray> equals; + private Set<ByteArray> valueSet; public ColumnRange(TblColRef column, Set<ByteArray> values, FilterOperatorEnum op) { this.column = column; @@ -277,16 +329,16 @@ public class GTScanRangePlanner { switch (op) { case EQ: case IN: - equals = new HashSet<ByteArray>(values); + valueSet = new HashSet<ByteArray>(values); refreshBeginEndFromEquals(); break; case LT: case LTE: - end = byteUnknownIsBigger.max(values); + end = rangeEndComparator.comparator.max(values); break; case GT: case GTE: - begin = byteUnknownIsSmaller.min(values); + begin = rangeStartComparator.comparator.min(values); break; case NEQ: case NOTIN: @@ -303,16 +355,16 @@ public class GTScanRangePlanner { this.column = column; this.begin = beginValue; this.end = endValue; - this.equals = equalValues; + this.valueSet = equalValues; } private void refreshBeginEndFromEquals() { - if (equals.isEmpty()) { + if (valueSet.isEmpty()) { begin = ByteArray.EMPTY; end = ByteArray.EMPTY; } else { - begin = byteUnknownIsSmaller.min(equals); - end = byteUnknownIsBigger.max(equals); + begin = rangeStartComparator.comparator.min(valueSet); + end = rangeEndComparator.comparator.max(valueSet); } } @@ -321,8 +373,8 @@ public class GTScanRangePlanner { } public boolean satisfyNone() { - if (equals != null) { - return equals.isEmpty(); + if (valueSet != null) { + return valueSet.isEmpty(); } else if (begin.array() != null && end.array() != null) { return info.codeSystem.getComparator().compare(begin, end) > 0; } else { @@ -338,36 +390,36 @@ public class GTScanRangePlanner { } if (this.satisfyAll()) { - copy(another.column, another.begin, another.end, another.equals); + copy(another.column, another.begin, another.end, another.valueSet); return; } - if (this.equals != null && another.equals != null) { - this.equals.retainAll(another.equals); + if (this.valueSet != null && another.valueSet != null) { + this.valueSet.retainAll(another.valueSet); refreshBeginEndFromEquals(); return; } - if (this.equals != null) { - this.equals = filter(this.equals, another.begin, another.end); + if (this.valueSet != null) { + this.valueSet = filter(this.valueSet, another.begin, another.end); refreshBeginEndFromEquals(); return; } - if (another.equals != null) { - this.equals = filter(another.equals, this.begin, this.end); + if (another.valueSet != null) { + this.valueSet = filter(another.valueSet, this.begin, this.end); refreshBeginEndFromEquals(); return; } - this.begin = byteUnknownIsSmaller.max(this.begin, another.begin); - this.end = byteUnknownIsBigger.min(this.end, another.end); + this.begin = rangeStartComparator.comparator.max(this.begin, another.begin); + this.end = rangeEndComparator.comparator.min(this.end, another.end); } private Set<ByteArray> filter(Set<ByteArray> equalValues, ByteArray beginValue, ByteArray endValue) { Set<ByteArray> result = Sets.newHashSetWithExpectedSize(equalValues.size()); for (ByteArray v : equalValues) { - if (byteUnknownIsSmaller.compare(beginValue, v) <= 0 && byteUnknownIsBigger.compare(v, endValue) <= 0) { + if (rangeStartEndComparator.comparator.compare(beginValue, v) <= 0 && rangeStartEndComparator.comparator.compare(v, endValue) <= 0) { result.add(v); } } @@ -375,10 +427,10 @@ public class GTScanRangePlanner { } public String toString() { - if (equals == null) { + if (valueSet == null) { return column.getName() + " between " + begin + " and " + end; } else { - return column.getName() + " in " + equals; + return column.getName() + " in " + valueSet; } } } @@ -424,40 +476,55 @@ public class GTScanRangePlanner { } } - public static ComparatorEx<ByteArray> byteComparatorTreatsUnknownSmaller(final IGTComparator comp) { - return new ComparatorEx<ByteArray>() { + public static RecordComparator getRangeStartComparator(final IGTComparator comp) { + return new RecordComparator(new ComparatorEx<ByteArray>() { @Override public int compare(ByteArray a, ByteArray b) { - if (a.array() == null) - return -1; - else if (b.array() == null) + if (a.array() == null) { + if (b.array() == null) { + return 0; + } else { + return -1; + } + } else if (b.array() == null) { return 1; - else + } else { return comp.compare(a, b); + } } - }; + }); } - public static ComparatorEx<ByteArray> byteComparatorTreatsUnknownBigger(final IGTComparator comp) { - return new ComparatorEx<ByteArray>() { + public static RecordComparator getRangeEndComparator(final IGTComparator comp) { + return new RecordComparator(new ComparatorEx<ByteArray>() { @Override public int compare(ByteArray a, ByteArray b) { - if (a.array() == null) - return 1; - else if (b.array() == null) + if (a.array() == null) { + if (b.array() == null) { + return 0; + } else { + return 1; + } + } else if (b.array() == null) { return -1; - else + } else { return comp.compare(a, b); + } } - }; - } - - public static ComparatorEx<GTRecord> recordComparatorTreatsUnknownSmaller(IGTComparator comp) { - return new RecordComparator(byteComparatorTreatsUnknownSmaller(comp)); + }); } - public static ComparatorEx<GTRecord> recordComparatorTreatsUnknownBigger(IGTComparator comp) { - return new RecordComparator(byteComparatorTreatsUnknownBigger(comp)); + public static RecordComparator getRangeStartEndComparator(final IGTComparator comp) { + return new AsymmetricRecordComparator(new ComparatorEx<ByteArray>() { + @Override + public int compare(ByteArray a, ByteArray b) { + if (a.array() == null || b.array() == null) { + return -1; + } else { + return comp.compare(a, b); + } + } + }); } private static class RecordComparator extends ComparatorEx<GTRecord> { @@ -473,7 +540,7 @@ public class GTScanRangePlanner { assert a.maskForEqualHashComp() == b.maskForEqualHashComp(); ImmutableBitSet mask = a.maskForEqualHashComp(); - int comp = 0; + int comp; for (int i = 0; i < mask.trueBitCount(); i++) { int c = mask.trueBitAt(i); comp = comparator.compare(a.cols[c], b.cols[c]); @@ -483,4 +550,35 @@ public class GTScanRangePlanner { return 0; // equals } } + + /** + * asymmetric means compare(a,b) > 0 does not cause compare(b,a) < 0 + * so min max functions will not bu supported + */ + private static class AsymmetricRecordComparator extends RecordComparator { + + AsymmetricRecordComparator(ComparatorEx<ByteArray> byteComparator) { + super(byteComparator); + } + + public GTRecord min(Collection<GTRecord> v) { + throw new UnsupportedOperationException(); + } + + public GTRecord max(Collection<GTRecord> v) { + throw new UnsupportedOperationException(); + } + + public GTRecord min(GTRecord a, GTRecord b) { + throw new UnsupportedOperationException(); + } + + public GTRecord max(GTRecord a, GTRecord b) { + throw new UnsupportedOperationException(); + } + + public boolean between(GTRecord v, GTRecord start, GTRecord end) { + throw new UnsupportedOperationException(); + } + } } http://git-wip-us.apache.org/repos/asf/incubator-kylin/blob/d8372747/core-cube/src/main/java/org/apache/kylin/gridtable/GTScanRequest.java ---------------------------------------------------------------------- diff --git a/core-cube/src/main/java/org/apache/kylin/gridtable/GTScanRequest.java b/core-cube/src/main/java/org/apache/kylin/gridtable/GTScanRequest.java index 2b31e70..c81dd63 100644 --- a/core-cube/src/main/java/org/apache/kylin/gridtable/GTScanRequest.java +++ b/core-cube/src/main/java/org/apache/kylin/gridtable/GTScanRequest.java @@ -2,6 +2,7 @@ package org.apache.kylin.gridtable; import java.io.IOException; import java.util.Arrays; +import java.util.List; import java.util.Set; import org.apache.kylin.common.util.ImmutableBitSet; @@ -88,7 +89,7 @@ public class GTScanRequest { } private void validateFilterPushDown(GTInfo info) { - if (hasFilterPushDown() == false) + if (!hasFilterPushDown()) return; Set<TblColRef> filterColumns = Sets.newHashSet(); @@ -102,7 +103,7 @@ public class GTScanRequest { } // un-evaluatable filter must be removed - if (TupleFilter.isEvaluableRecursively(filterPushDown) == false) { + if (!TupleFilter.isEvaluableRecursively(filterPushDown)) { Set<TblColRef> unevaluableColumns = Sets.newHashSet(); filterPushDown = GTUtil.convertFilterUnevaluatable(filterPushDown, info, unevaluableColumns); @@ -147,6 +148,10 @@ public class GTScanRequest { return range.pkEnd; } + public List<GTRecord> getFuzzyKeys() { + return range.fuzzyKeys; + } + public ImmutableBitSet getSelectedColBlocks() { return selectedColBlocks; } http://git-wip-us.apache.org/repos/asf/incubator-kylin/blob/d8372747/core-cube/src/main/java/org/apache/kylin/gridtable/GTUtil.java ---------------------------------------------------------------------- diff --git a/core-cube/src/main/java/org/apache/kylin/gridtable/GTUtil.java b/core-cube/src/main/java/org/apache/kylin/gridtable/GTUtil.java index bbd82c8..de9a5ce 100644 --- a/core-cube/src/main/java/org/apache/kylin/gridtable/GTUtil.java +++ b/core-cube/src/main/java/org/apache/kylin/gridtable/GTUtil.java @@ -33,10 +33,6 @@ public class GTUtil { return convertFilter(rootFilter, info, null, false, unevaluatableColumnCollector); } - public static TupleFilter convertFilterConstants(TupleFilter rootFilter, GTInfo info) { - return convertFilter(rootFilter, info, null, true, null); - } - public static TupleFilter convertFilterColumnsAndConstants(TupleFilter rootFilter, GTInfo info, // List<TblColRef> colMapping, Set<TblColRef> unevaluatableColumnCollector) { return convertFilter(rootFilter, info, colMapping, true, unevaluatableColumnCollector); http://git-wip-us.apache.org/repos/asf/incubator-kylin/blob/d8372747/core-cube/src/main/java/org/apache/kylin/gridtable/GridTable.java ---------------------------------------------------------------------- diff --git a/core-cube/src/main/java/org/apache/kylin/gridtable/GridTable.java b/core-cube/src/main/java/org/apache/kylin/gridtable/GridTable.java index f812b8f..8f81654 100644 --- a/core-cube/src/main/java/org/apache/kylin/gridtable/GridTable.java +++ b/core-cube/src/main/java/org/apache/kylin/gridtable/GridTable.java @@ -14,17 +14,14 @@ public class GridTable implements Closeable { } public GTBuilder rebuild() throws IOException { - assert info.isShardingEnabled() == false; return rebuild(-1); } public GTBuilder rebuild(int shard) throws IOException { - assert shard < info.nShards; return new GTBuilder(info, shard, store); } public GTBuilder append() throws IOException { - assert info.isShardingEnabled() == false; return append(-1); } http://git-wip-us.apache.org/repos/asf/incubator-kylin/blob/d8372747/core-cube/src/main/java/org/apache/kylin/gridtable/IGTStore.java ---------------------------------------------------------------------- diff --git a/core-cube/src/main/java/org/apache/kylin/gridtable/IGTStore.java b/core-cube/src/main/java/org/apache/kylin/gridtable/IGTStore.java index 5282544..f4c44f8 100644 --- a/core-cube/src/main/java/org/apache/kylin/gridtable/IGTStore.java +++ b/core-cube/src/main/java/org/apache/kylin/gridtable/IGTStore.java @@ -6,9 +6,9 @@ public interface IGTStore { GTInfo getInfo(); - IGTWriter rebuild(int shard) throws IOException; + IGTWriter rebuild() throws IOException; - IGTWriter append(int shard) throws IOException; + IGTWriter append() throws IOException; IGTScanner scan(GTScanRequest scanRequest) throws IOException; http://git-wip-us.apache.org/repos/asf/incubator-kylin/blob/d8372747/core-cube/src/main/java/org/apache/kylin/gridtable/memstore/GTSimpleMemStore.java ---------------------------------------------------------------------- diff --git a/core-cube/src/main/java/org/apache/kylin/gridtable/memstore/GTSimpleMemStore.java b/core-cube/src/main/java/org/apache/kylin/gridtable/memstore/GTSimpleMemStore.java index d7074e4..9675aa1 100644 --- a/core-cube/src/main/java/org/apache/kylin/gridtable/memstore/GTSimpleMemStore.java +++ b/core-cube/src/main/java/org/apache/kylin/gridtable/memstore/GTSimpleMemStore.java @@ -22,9 +22,6 @@ public class GTSimpleMemStore implements IGTStore { public GTSimpleMemStore(GTInfo info) { this.info = info; this.rowList = new ArrayList<byte[]>(); - - if (info.isShardingEnabled()) - throw new UnsupportedOperationException(); } @Override @@ -41,13 +38,13 @@ public class GTSimpleMemStore implements IGTStore { } @Override - public IGTWriter rebuild(int shard) { + public IGTWriter rebuild() { rowList.clear(); return new Writer(); } @Override - public IGTWriter append(int shard) { + public IGTWriter append() { return new Writer(); }