http://git-wip-us.apache.org/repos/asf/incubator-kylin/blob/fabdd5cd/core-cube/src/main/java/org/apache/kylin/gridtable/GTScanRequest.java ---------------------------------------------------------------------- diff --git a/core-cube/src/main/java/org/apache/kylin/gridtable/GTScanRequest.java b/core-cube/src/main/java/org/apache/kylin/gridtable/GTScanRequest.java index 2b31e70..c81dd63 100644 --- a/core-cube/src/main/java/org/apache/kylin/gridtable/GTScanRequest.java +++ b/core-cube/src/main/java/org/apache/kylin/gridtable/GTScanRequest.java @@ -2,6 +2,7 @@ package org.apache.kylin.gridtable; import java.io.IOException; import java.util.Arrays; +import java.util.List; import java.util.Set; import org.apache.kylin.common.util.ImmutableBitSet; @@ -88,7 +89,7 @@ public class GTScanRequest { } private void validateFilterPushDown(GTInfo info) { - if (hasFilterPushDown() == false) + if (!hasFilterPushDown()) return; Set<TblColRef> filterColumns = Sets.newHashSet(); @@ -102,7 +103,7 @@ public class GTScanRequest { } // un-evaluatable filter must be removed - if (TupleFilter.isEvaluableRecursively(filterPushDown) == false) { + if (!TupleFilter.isEvaluableRecursively(filterPushDown)) { Set<TblColRef> unevaluableColumns = Sets.newHashSet(); filterPushDown = GTUtil.convertFilterUnevaluatable(filterPushDown, info, unevaluableColumns); @@ -147,6 +148,10 @@ public class GTScanRequest { return range.pkEnd; } + public List<GTRecord> getFuzzyKeys() { + return range.fuzzyKeys; + } + public ImmutableBitSet getSelectedColBlocks() { return selectedColBlocks; }
http://git-wip-us.apache.org/repos/asf/incubator-kylin/blob/fabdd5cd/core-cube/src/main/java/org/apache/kylin/gridtable/GTUtil.java ---------------------------------------------------------------------- diff --git a/core-cube/src/main/java/org/apache/kylin/gridtable/GTUtil.java b/core-cube/src/main/java/org/apache/kylin/gridtable/GTUtil.java index bbd82c8..de9a5ce 100644 --- a/core-cube/src/main/java/org/apache/kylin/gridtable/GTUtil.java +++ b/core-cube/src/main/java/org/apache/kylin/gridtable/GTUtil.java @@ -33,10 +33,6 @@ public class GTUtil { return convertFilter(rootFilter, info, null, false, unevaluatableColumnCollector); } - public static TupleFilter convertFilterConstants(TupleFilter rootFilter, GTInfo info) { - return convertFilter(rootFilter, info, null, true, null); - } - public static TupleFilter convertFilterColumnsAndConstants(TupleFilter rootFilter, GTInfo info, // List<TblColRef> colMapping, Set<TblColRef> unevaluatableColumnCollector) { return convertFilter(rootFilter, info, colMapping, true, unevaluatableColumnCollector); http://git-wip-us.apache.org/repos/asf/incubator-kylin/blob/fabdd5cd/core-cube/src/main/java/org/apache/kylin/gridtable/GridTable.java ---------------------------------------------------------------------- diff --git a/core-cube/src/main/java/org/apache/kylin/gridtable/GridTable.java b/core-cube/src/main/java/org/apache/kylin/gridtable/GridTable.java index f812b8f..8f81654 100644 --- a/core-cube/src/main/java/org/apache/kylin/gridtable/GridTable.java +++ b/core-cube/src/main/java/org/apache/kylin/gridtable/GridTable.java @@ -14,17 +14,14 @@ public class GridTable implements Closeable { } public GTBuilder rebuild() throws IOException { - assert info.isShardingEnabled() == false; return rebuild(-1); } public GTBuilder rebuild(int shard) throws IOException { - assert shard < info.nShards; return new GTBuilder(info, shard, store); } public GTBuilder append() throws IOException { - assert info.isShardingEnabled() == false; return append(-1); } http://git-wip-us.apache.org/repos/asf/incubator-kylin/blob/fabdd5cd/core-cube/src/main/java/org/apache/kylin/gridtable/IGTStore.java ---------------------------------------------------------------------- diff --git a/core-cube/src/main/java/org/apache/kylin/gridtable/IGTStore.java b/core-cube/src/main/java/org/apache/kylin/gridtable/IGTStore.java index 5282544..f4c44f8 100644 --- a/core-cube/src/main/java/org/apache/kylin/gridtable/IGTStore.java +++ b/core-cube/src/main/java/org/apache/kylin/gridtable/IGTStore.java @@ -6,9 +6,9 @@ public interface IGTStore { GTInfo getInfo(); - IGTWriter rebuild(int shard) throws IOException; + IGTWriter rebuild() throws IOException; - IGTWriter append(int shard) throws IOException; + IGTWriter append() throws IOException; IGTScanner scan(GTScanRequest scanRequest) throws IOException; http://git-wip-us.apache.org/repos/asf/incubator-kylin/blob/fabdd5cd/core-cube/src/main/java/org/apache/kylin/gridtable/memstore/GTSimpleMemStore.java ---------------------------------------------------------------------- diff --git a/core-cube/src/main/java/org/apache/kylin/gridtable/memstore/GTSimpleMemStore.java b/core-cube/src/main/java/org/apache/kylin/gridtable/memstore/GTSimpleMemStore.java index d7074e4..9675aa1 100644 --- a/core-cube/src/main/java/org/apache/kylin/gridtable/memstore/GTSimpleMemStore.java +++ b/core-cube/src/main/java/org/apache/kylin/gridtable/memstore/GTSimpleMemStore.java @@ -22,9 +22,6 @@ public class GTSimpleMemStore implements IGTStore { public GTSimpleMemStore(GTInfo info) { this.info = info; this.rowList = new ArrayList<byte[]>(); - - if (info.isShardingEnabled()) - throw new UnsupportedOperationException(); } @Override @@ -41,13 +38,13 @@ public class GTSimpleMemStore implements IGTStore { } @Override - public IGTWriter rebuild(int shard) { + public IGTWriter rebuild() { rowList.clear(); return new Writer(); } @Override - public IGTWriter append(int shard) { + public IGTWriter append() { return new Writer(); } http://git-wip-us.apache.org/repos/asf/incubator-kylin/blob/fabdd5cd/core-cube/src/test/java/org/apache/kylin/cube/common/RowKeySplitterTest.java ---------------------------------------------------------------------- diff --git a/core-cube/src/test/java/org/apache/kylin/cube/common/RowKeySplitterTest.java b/core-cube/src/test/java/org/apache/kylin/cube/common/RowKeySplitterTest.java index 9a7970c..98f1eef 100644 --- a/core-cube/src/test/java/org/apache/kylin/cube/common/RowKeySplitterTest.java +++ b/core-cube/src/test/java/org/apache/kylin/cube/common/RowKeySplitterTest.java @@ -28,10 +28,6 @@ import org.junit.After; import org.junit.Before; import org.junit.Test; -/** - * @author George Song (ysong1) - * - */ public class RowKeySplitterTest extends LocalFileMetadataTestCase { @Before @@ -49,23 +45,23 @@ public class RowKeySplitterTest extends LocalFileMetadataTestCase { public void testWithSlr() throws Exception { CubeInstance cube = CubeManager.getInstance(getTestConfig()).getCube("TEST_KYLIN_CUBE_WITH_SLR_READY"); - RowKeySplitter rowKeySplitter = new RowKeySplitter(cube.getFirstSegment(), 10, 20); + RowKeySplitter rowKeySplitter = new RowKeySplitter(cube.getFirstSegment(), 11, 20); // base cuboid rowkey - byte[] input = { 0, 0, 0, 0, 0, 0, 1, -1, 49, 48, 48, 48, 48, 48, 48, 48, 9, 9, 9, 9, 9, 9, 9, 9, 9, 9, 11, 54, -105, 55, 13, 71, 114, 65, 66, 73, 78, 9, 9, 9, 9, 9, 9, 9, 9, 0, 10, 0 }; - rowKeySplitter.split(input, input.length); + byte[] input = { 0, 0, 0, 0, 0, 0, 0, 0, 1, -1, 49, 48, 48, 48, 48, 48, 48, 48, 9, 9, 9, 9, 9, 9, 9, 9, 9, 9, 11, 54, -105, 55, 13, 71, 114, 65, 66, 73, 78, 9, 9, 9, 9, 9, 9, 9, 9, 0, 10, 0 }; + rowKeySplitter.split(input); - assertEquals(10, rowKeySplitter.getBufferSize()); + assertEquals(11, rowKeySplitter.getBufferSize()); } @Test public void testWithoutSlr() throws Exception { CubeInstance cube = CubeManager.getInstance(getTestConfig()).getCube("TEST_KYLIN_CUBE_WITHOUT_SLR_READY"); - RowKeySplitter rowKeySplitter = new RowKeySplitter(cube.getFirstSegment(), 10, 20); + RowKeySplitter rowKeySplitter = new RowKeySplitter(cube.getFirstSegment(), 11, 20); // base cuboid rowkey - byte[] input = { 0, 0, 0, 0, 0, 0, 0, -1, 11, 55, -13, 13, 22, 34, 121, 70, 80, 45, 71, 84, 67, 9, 9, 9, 9, 9, 9, 0, 10, 5 }; - rowKeySplitter.split(input, input.length); + byte[] input = { 0, 0, 0, 0, 0, 0, 0, 0, 0, -1, 11, 55, -13, 13, 22, 34, 121, 70, 80, 45, 71, 84, 67, 9, 9, 9, 9, 9, 9, 0, 10, 5 }; + rowKeySplitter.split(input); - assertEquals(9, rowKeySplitter.getBufferSize()); + assertEquals(10, rowKeySplitter.getBufferSize()); } } http://git-wip-us.apache.org/repos/asf/incubator-kylin/blob/fabdd5cd/core-cube/src/test/java/org/apache/kylin/cube/kv/RowKeyDecoderTest.java ---------------------------------------------------------------------- diff --git a/core-cube/src/test/java/org/apache/kylin/cube/kv/RowKeyDecoderTest.java b/core-cube/src/test/java/org/apache/kylin/cube/kv/RowKeyDecoderTest.java index 3704e03..d6b1718 100644 --- a/core-cube/src/test/java/org/apache/kylin/cube/kv/RowKeyDecoderTest.java +++ b/core-cube/src/test/java/org/apache/kylin/cube/kv/RowKeyDecoderTest.java @@ -34,10 +34,6 @@ import org.junit.After; import org.junit.Before; import org.junit.Test; -/** - * @author George Song (ysong1) - * - */ public class RowKeyDecoderTest extends LocalFileMetadataTestCase { @Before @@ -57,7 +53,7 @@ public class RowKeyDecoderTest extends LocalFileMetadataTestCase { RowKeyDecoder rowKeyDecoder = new RowKeyDecoder(cube.getFirstSegment()); - byte[] key = { 0, 0, 0, 0, 0, 0, 0, -1, 11, 55, -13, 13, 22, 34, 121, 70, 80, 45, 71, 84, 67, 9, 9, 9, 9, 9, 9, 0, 10, 5 }; + byte[] key = { 0, 0, 0, 0, 0, 0, 0, 0, 0, -1, 11, 55, -13, 13, 22, 34, 121, 70, 80, 45, 71, 84, 67, 9, 9, 9, 9, 9, 9, 0, 10, 5 }; rowKeyDecoder.decode(key); List<String> values = rowKeyDecoder.getValues(); @@ -70,7 +66,7 @@ public class RowKeyDecoderTest extends LocalFileMetadataTestCase { RowKeyDecoder rowKeyDecoder = new RowKeyDecoder(cube.getFirstSegment()); - byte[] key = { 0, 0, 0, 0, 0, 0, 1, -1, 49, 48, 48, 48, 48, 48, 48, 48, 9, 9, 9, 9, 9, 9, 9, 9, 9, 9, 11, 54, -105, 55, 13, 71, 114, 65, 66, 73, 78, 9, 9, 9, 9, 9, 9, 9, 9, 0, 10, 0 }; + byte[] key = { 0, 0, 0, 0, 0, 0, 0, 0, 1, -1, 49, 48, 48, 48, 48, 48, 48, 48, 9, 9, 9, 9, 9, 9, 9, 9, 9, 9, 11, 54, -105, 55, 13, 71, 114, 65, 66, 73, 78, 9, 9, 9, 9, 9, 9, 9, 9, 0, 10, 0 }; rowKeyDecoder.decode(key); List<String> values = rowKeyDecoder.getValues(); @@ -97,7 +93,7 @@ public class RowKeyDecoderTest extends LocalFileMetadataTestCase { AbstractRowKeyEncoder rowKeyEncoder = AbstractRowKeyEncoder.createInstance(cube.getFirstSegment(), baseCuboid); byte[] encodedKey = rowKeyEncoder.encode(data); - assertEquals(30, encodedKey.length); + assertEquals(22 + RowConstants.ROWKEY_HEADER_LEN, encodedKey.length); RowKeyDecoder rowKeyDecoder = new RowKeyDecoder(cube.getFirstSegment()); rowKeyDecoder.decode(encodedKey); http://git-wip-us.apache.org/repos/asf/incubator-kylin/blob/fabdd5cd/core-cube/src/test/java/org/apache/kylin/cube/kv/RowKeyEncoderTest.java ---------------------------------------------------------------------- diff --git a/core-cube/src/test/java/org/apache/kylin/cube/kv/RowKeyEncoderTest.java b/core-cube/src/test/java/org/apache/kylin/cube/kv/RowKeyEncoderTest.java index c50b8c9..45c8108 100644 --- a/core-cube/src/test/java/org/apache/kylin/cube/kv/RowKeyEncoderTest.java +++ b/core-cube/src/test/java/org/apache/kylin/cube/kv/RowKeyEncoderTest.java @@ -35,10 +35,6 @@ import org.junit.After; import org.junit.Before; import org.junit.Test; -/** - * @author George Song (ysong1) - * - */ public class RowKeyEncoderTest extends LocalFileMetadataTestCase { @Before @@ -74,9 +70,11 @@ public class RowKeyEncoderTest extends LocalFileMetadataTestCase { AbstractRowKeyEncoder rowKeyEncoder = AbstractRowKeyEncoder.createInstance(cube.getFirstSegment(), baseCuboid); byte[] encodedKey = rowKeyEncoder.encode(data); - assertEquals(30, encodedKey.length); - byte[] cuboidId = Arrays.copyOfRange(encodedKey, 0, 8); - byte[] rest = Arrays.copyOfRange(encodedKey, 8, encodedKey.length); + assertEquals(22 + RowConstants.ROWKEY_HEADER_LEN, encodedKey.length); + byte[] shard = Arrays.copyOfRange(encodedKey, 0, RowConstants.ROWKEY_SHARDID_LEN); + byte[] cuboidId = Arrays.copyOfRange(encodedKey, RowConstants.ROWKEY_SHARDID_LEN, RowConstants.ROWKEY_HEADER_LEN); + byte[] rest = Arrays.copyOfRange(encodedKey, RowConstants.ROWKEY_HEADER_LEN, encodedKey.length); + assertEquals(0, Bytes.toShort(shard)); assertEquals(255, Bytes.toLong(cuboidId)); assertArrayEquals(new byte[] { 11, 55, -13, 13, 22, 34, 121, 70, 80, 45, 71, 84, 67, 9, 9, 9, 9, 9, 9, 0, 10, 5 }, rest); } @@ -104,10 +102,12 @@ public class RowKeyEncoderTest extends LocalFileMetadataTestCase { AbstractRowKeyEncoder rowKeyEncoder = AbstractRowKeyEncoder.createInstance(cube.getFirstSegment(), baseCuboid); byte[] encodedKey = rowKeyEncoder.encode(data); - assertEquals(48, encodedKey.length); - byte[] sellerId = Arrays.copyOfRange(encodedKey, 8, 26); - byte[] cuboidId = Arrays.copyOfRange(encodedKey, 0, 8); - byte[] rest = Arrays.copyOfRange(encodedKey, 26, encodedKey.length); + assertEquals(40 + RowConstants.ROWKEY_HEADER_LEN, encodedKey.length); + byte[] shard = Arrays.copyOfRange(encodedKey, 0, RowConstants.ROWKEY_SHARDID_LEN); + byte[] sellerId = Arrays.copyOfRange(encodedKey, RowConstants.ROWKEY_HEADER_LEN, 18 + RowConstants.ROWKEY_HEADER_LEN); + byte[] cuboidId = Arrays.copyOfRange(encodedKey, RowConstants.ROWKEY_SHARDID_LEN, RowConstants.ROWKEY_HEADER_LEN); + byte[] rest = Arrays.copyOfRange(encodedKey, 18 + RowConstants.ROWKEY_HEADER_LEN, encodedKey.length); + assertEquals(0, Bytes.toShort(shard)); assertTrue(Bytes.toString(sellerId).startsWith("123456789")); assertEquals(511, Bytes.toLong(cuboidId)); assertArrayEquals(new byte[] { 11, 55, -13, 13, 22, 34, 121, 70, 80, 45, 71, 84, 67, 9, 9, 9, 9, 9, 9, 0, 10, 5 }, rest); @@ -136,10 +136,12 @@ public class RowKeyEncoderTest extends LocalFileMetadataTestCase { AbstractRowKeyEncoder rowKeyEncoder = AbstractRowKeyEncoder.createInstance(cube.getFirstSegment(), baseCuboid); byte[] encodedKey = rowKeyEncoder.encode(data); - assertEquals(48, encodedKey.length); - byte[] sellerId = Arrays.copyOfRange(encodedKey, 8, 26); - byte[] cuboidId = Arrays.copyOfRange(encodedKey, 0, 8); - byte[] rest = Arrays.copyOfRange(encodedKey, 26, encodedKey.length); + assertEquals(40 + RowConstants.ROWKEY_HEADER_LEN, encodedKey.length); + byte[] shard = Arrays.copyOfRange(encodedKey, 0, RowConstants.ROWKEY_SHARDID_LEN); + byte[] cuboidId = Arrays.copyOfRange(encodedKey, RowConstants.ROWKEY_SHARDID_LEN, RowConstants.ROWKEY_HEADER_LEN); + byte[] sellerId = Arrays.copyOfRange(encodedKey, RowConstants.ROWKEY_HEADER_LEN, 18 + RowConstants.ROWKEY_HEADER_LEN); + byte[] rest = Arrays.copyOfRange(encodedKey, 18 + RowConstants.ROWKEY_HEADER_LEN, encodedKey.length); + assertEquals(0, Bytes.toShort(shard)); assertTrue(Bytes.toString(sellerId).startsWith("123456789")); assertEquals(511, Bytes.toLong(cuboidId)); assertArrayEquals(new byte[] { -1, -1, -1, -1, -1, -1, -1, -1, -1, -1, -1, -1, -1, -1, -1, -1, -1, -1, -1, -1, -1, -1 }, rest); http://git-wip-us.apache.org/repos/asf/incubator-kylin/blob/fabdd5cd/core-cube/src/test/java/org/apache/kylin/gridtable/DictGridTableTest.java ---------------------------------------------------------------------- diff --git a/core-cube/src/test/java/org/apache/kylin/gridtable/DictGridTableTest.java b/core-cube/src/test/java/org/apache/kylin/gridtable/DictGridTableTest.java index 684f0ef..91e7e18 100644 --- a/core-cube/src/test/java/org/apache/kylin/gridtable/DictGridTableTest.java +++ b/core-cube/src/test/java/org/apache/kylin/gridtable/DictGridTableTest.java @@ -29,6 +29,7 @@ import java.util.Map; import org.apache.kylin.common.util.ByteArray; import org.apache.kylin.common.util.ImmutableBitSet; +import org.apache.kylin.common.util.Pair; import org.apache.kylin.cube.gridtable.CubeCodeSystem; import org.apache.kylin.dict.Dictionary; import org.apache.kylin.dict.NumberDictionaryBuilder; @@ -48,6 +49,7 @@ import org.apache.kylin.metadata.model.ColumnDesc; import org.apache.kylin.metadata.model.DataType; import org.apache.kylin.metadata.model.TableDesc; import org.apache.kylin.metadata.model.TblColRef; +import org.junit.Before; import org.junit.Test; import com.google.common.collect.Lists; @@ -55,39 +57,120 @@ import com.google.common.collect.Maps; public class DictGridTableTest { + private GridTable table; + private GTInfo info; + private CompareTupleFilter timeComp0; + private CompareTupleFilter timeComp1; + private CompareTupleFilter timeComp2; + private CompareTupleFilter timeComp3; + private CompareTupleFilter timeComp4; + private CompareTupleFilter timeComp5; + private CompareTupleFilter timeComp6; + private CompareTupleFilter ageComp1; + private CompareTupleFilter ageComp2; + private CompareTupleFilter ageComp3; + private CompareTupleFilter ageComp4; + + @Before + public void setup() throws IOException { + table = newTestTable(); + info = table.getInfo(); + + timeComp0 = compare(info.colRef(0), FilterOperatorEnum.LT, enc(info, 0, "2015-01-14")); + timeComp1 = compare(info.colRef(0), FilterOperatorEnum.GT, enc(info, 0, "2015-01-14")); + timeComp2 = compare(info.colRef(0), FilterOperatorEnum.LT, enc(info, 0, "2015-01-13")); + timeComp3 = compare(info.colRef(0), FilterOperatorEnum.LT, enc(info, 0, "2015-01-15")); + timeComp4 = compare(info.colRef(0), FilterOperatorEnum.EQ, enc(info, 0, "2015-01-15")); + timeComp5 = compare(info.colRef(0), FilterOperatorEnum.GT, enc(info, 0, "2015-01-15")); + timeComp6 = compare(info.colRef(0), FilterOperatorEnum.EQ, enc(info, 0, "2015-01-14")); + ageComp1 = compare(info.colRef(1), FilterOperatorEnum.EQ, enc(info, 1, "10")); + ageComp2 = compare(info.colRef(1), FilterOperatorEnum.EQ, enc(info, 1, "20")); + ageComp3 = compare(info.colRef(1), FilterOperatorEnum.EQ, enc(info, 1, "30")); + ageComp4 = compare(info.colRef(1), FilterOperatorEnum.NEQ, enc(info, 1, "30")); + } + + @Test + public void verifySegmentSkipping() { + + ByteArray segmentStart = enc(info, 0, "2015-01-14"); + ByteArray segmentStartX = enc(info, 0, "2015-01-14 00:00:00");//when partition col is dict encoded, time format will be free + ByteArray segmentEnd = enc(info, 0, "2015-01-15"); + assertEquals(segmentStart, segmentStartX); + + GTScanRangePlanner planner = new GTScanRangePlanner(info, Pair.newPair(segmentStart, segmentEnd), info.colRef(0)); + + { + LogicalTupleFilter filter = and(timeComp0, ageComp1); + List<GTScanRange> r = planner.planScanRanges(filter); + assertEquals(1, r.size());//scan range are [close,close] + assertEquals("[null, 10]-[1421193600000, 10]", r.get(0).toString()); + assertEquals(1, r.get(0).fuzzyKeys.size()); + assertEquals("[[10]]", r.get(0).fuzzyKeys.toString()); + } + { + LogicalTupleFilter filter = and(timeComp2, ageComp1); + List<GTScanRange> r = planner.planScanRanges(filter); + assertEquals(0, r.size()); + } + { + LogicalTupleFilter filter = and(timeComp4, ageComp1); + List<GTScanRange> r = planner.planScanRanges(filter); + assertEquals(0, r.size()); + } + { + LogicalTupleFilter filter = and(timeComp5, ageComp1); + List<GTScanRange> r = planner.planScanRanges(filter); + assertEquals(0, r.size()); + } + { + LogicalTupleFilter filter = or(and(timeComp2, ageComp1), and(timeComp1, ageComp1), and(timeComp6, ageComp1)); + List<GTScanRange> r = planner.planScanRanges(filter); + assertEquals(1, r.size()); + assertEquals("[1421193600000, 10]-[null, 10]", r.get(0).toString()); + assertEquals("[[10], [1421193600000, 10]]", r.get(0).fuzzyKeys.toString()); + } + { + LogicalTupleFilter filter = or(timeComp2, timeComp1, timeComp6); + List<GTScanRange> r = planner.planScanRanges(filter); + assertEquals(1, r.size()); + assertEquals("[1421193600000, null]-[null, null]", r.get(0).toString()); + assertEquals(0, r.get(0).fuzzyKeys.size()); + } + } + @Test - public void test() throws IOException { - GridTable table = newTestTable(); - verifyScanRangePlanner(table); - verifyFirstRow(table); - verifyScanWithUnevaluatableFilter(table); - verifyScanWithEvaluatableFilter(table); - verifyConvertFilterConstants1(table); - verifyConvertFilterConstants2(table); - verifyConvertFilterConstants3(table); - verifyConvertFilterConstants4(table); + public void verifySegmentSkipping2() { + ByteArray segmentEnd = enc(info, 0, "2015-01-15"); + GTScanRangePlanner planner = new GTScanRangePlanner(info, Pair.newPair(new ByteArray(), segmentEnd), info.colRef(0)); + + { + LogicalTupleFilter filter = and(timeComp0, ageComp1); + List<GTScanRange> r = planner.planScanRanges(filter); + assertEquals(1, r.size());//scan range are [close,close] + assertEquals("[null, 10]-[1421193600000, 10]", r.get(0).toString()); + assertEquals(1, r.get(0).fuzzyKeys.size()); + assertEquals("[[10]]", r.get(0).fuzzyKeys.toString()); + } + + { + LogicalTupleFilter filter = and(timeComp5, ageComp1); + List<GTScanRange> r = planner.planScanRanges(filter); + assertEquals(0, r.size());//scan range are [close,close] + } } - private void verifyScanRangePlanner(GridTable table) { - GTInfo info = table.getInfo(); - GTScanRangePlanner planner = new GTScanRangePlanner(info); + @Test + public void verifyScanRangePlanner() { - CompareTupleFilter timeComp1 = compare(info.colRef(0), FilterOperatorEnum.GT, enc(info, 0, "2015-01-14")); - CompareTupleFilter timeComp2 = compare(info.colRef(0), FilterOperatorEnum.LT, enc(info, 0, "2015-01-13")); - CompareTupleFilter timeComp3 = compare(info.colRef(0), FilterOperatorEnum.LT, enc(info, 0, "2015-01-15")); - CompareTupleFilter timeComp4 = compare(info.colRef(0), FilterOperatorEnum.EQ, enc(info, 0, "2015-01-15")); - CompareTupleFilter ageComp1 = compare(info.colRef(1), FilterOperatorEnum.EQ, enc(info, 1, "10")); - CompareTupleFilter ageComp2 = compare(info.colRef(1), FilterOperatorEnum.EQ, enc(info, 1, "20")); - CompareTupleFilter ageComp3 = compare(info.colRef(1), FilterOperatorEnum.EQ, enc(info, 1, "30")); - CompareTupleFilter ageComp4 = compare(info.colRef(1), FilterOperatorEnum.NEQ, enc(info, 1, "30")); + GTScanRangePlanner planner = new GTScanRangePlanner(info, null, null); // flatten or-and & hbase fuzzy value { LogicalTupleFilter filter = and(timeComp1, or(ageComp1, ageComp2)); List<GTScanRange> r = planner.planScanRanges(filter); assertEquals(1, r.size()); - assertEquals("[1421193600000, 10]-[null, null]", r.get(0).toString()); - assertEquals("[[10], [20]]", r.get(0).hbaseFuzzyKeys.toString()); + assertEquals("[1421193600000, 10]-[null, 20]", r.get(0).toString()); + assertEquals("[[10], [20]]", r.get(0).fuzzyKeys.toString()); } // pre-evaluate ever false @@ -124,11 +207,13 @@ public class DictGridTableTest { } } - private void verifyFirstRow(GridTable table) throws IOException { + @Test + public void verifyFirstRow() throws IOException { doScanAndVerify(table, new GTScanRequest(table.getInfo()), "[1421193600000, 30, Yang, 10, 10.5]"); } - private void verifyScanWithUnevaluatableFilter(GridTable table) throws IOException { + @Test + public void verifyScanWithUnevaluatableFilter() throws IOException { GTInfo info = table.getInfo(); CompareTupleFilter fComp = compare(info.colRef(0), FilterOperatorEnum.GT, enc(info, 0, "2015-01-14")); @@ -144,7 +229,8 @@ public class DictGridTableTest { doScanAndVerify(table, req, "[1421280000000, 20, null, 20, null]"); } - private void verifyScanWithEvaluatableFilter(GridTable table) throws IOException { + @Test + public void verifyScanWithEvaluatableFilter() throws IOException { GTInfo info = table.getInfo(); CompareTupleFilter fComp1 = compare(info.colRef(0), FilterOperatorEnum.GT, enc(info, 0, "2015-01-14")); @@ -159,7 +245,8 @@ public class DictGridTableTest { doScanAndVerify(table, req, "[1421280000000, 20, null, 30, null]", "[1421366400000, 20, null, 40, null]"); } - private void verifyConvertFilterConstants1(GridTable table) { + @Test + public void verifyConvertFilterConstants1() { GTInfo info = table.getInfo(); TableDesc extTable = TableDesc.mockup("ext"); @@ -178,7 +265,8 @@ public class DictGridTableTest { assertEquals("AND [NULL.GT_MOCKUP_TABLE.0 GT [\\x00\\x00\\x01J\\xE5\\xBD\\x5C\\x00], NULL.GT_MOCKUP_TABLE.1 EQ [\\x00]]", newFilter.toString()); } - private void verifyConvertFilterConstants2(GridTable table) { + @Test + public void verifyConvertFilterConstants2() { GTInfo info = table.getInfo(); TableDesc extTable = TableDesc.mockup("ext"); @@ -198,7 +286,8 @@ public class DictGridTableTest { assertEquals("AND [NULL.GT_MOCKUP_TABLE.0 GT [\\x00\\x00\\x01J\\xE5\\xBD\\x5C\\x00], NULL.GT_MOCKUP_TABLE.1 LT [\\x00]]", newFilter.toString()); } - private void verifyConvertFilterConstants3(GridTable table) { + @Test + public void verifyConvertFilterConstants3() { GTInfo info = table.getInfo(); TableDesc extTable = TableDesc.mockup("ext"); @@ -218,7 +307,8 @@ public class DictGridTableTest { assertEquals("AND [NULL.GT_MOCKUP_TABLE.0 GT [\\x00\\x00\\x01J\\xE5\\xBD\\x5C\\x00], []]", newFilter.toString()); } - private void verifyConvertFilterConstants4(GridTable table) { + @Test + public void verifyConvertFilterConstants4() { GTInfo info = table.getInfo(); TableDesc extTable = TableDesc.mockup("ext"); @@ -252,7 +342,7 @@ public class DictGridTableTest { scanner.close(); } - private Object enc(GTInfo info, int col, String value) { + private ByteArray enc(GTInfo info, int col, String value) { ByteBuffer buf = ByteBuffer.allocate(info.getMaxColumnLength()); info.codeSystem.encodeColumnValue(col, value, buf); return ByteArray.copyOf(buf.array(), buf.arrayOffset(), buf.position()); http://git-wip-us.apache.org/repos/asf/incubator-kylin/blob/fabdd5cd/core-job/src/main/java/org/apache/kylin/engine/EngineFactory.java ---------------------------------------------------------------------- diff --git a/core-job/src/main/java/org/apache/kylin/engine/EngineFactory.java b/core-job/src/main/java/org/apache/kylin/engine/EngineFactory.java index 8b8fb87..919ede6 100644 --- a/core-job/src/main/java/org/apache/kylin/engine/EngineFactory.java +++ b/core-job/src/main/java/org/apache/kylin/engine/EngineFactory.java @@ -18,6 +18,9 @@ package org.apache.kylin.engine; +import static org.apache.kylin.metadata.model.IEngineAware.ID_MR_V1; +import static org.apache.kylin.metadata.model.IEngineAware.ID_MR_V2; + import java.util.HashMap; import java.util.Map; @@ -25,30 +28,29 @@ import org.apache.kylin.common.util.ImplementationSwitch; import org.apache.kylin.cube.CubeSegment; import org.apache.kylin.job.execution.DefaultChainedExecutable; import org.apache.kylin.metadata.model.IEngineAware; -import static org.apache.kylin.metadata.model.IEngineAware.*; public class EngineFactory { - - private static ImplementationSwitch batchEngines; - private static ImplementationSwitch streamingEngines; + + private static ImplementationSwitch<IBatchCubingEngine> batchEngines; + private static ImplementationSwitch<IStreamingCubingEngine> streamingEngines; static { Map<Integer, String> impls = new HashMap<>(); impls.put(ID_MR_V1, "org.apache.kylin.engine.mr.MRBatchCubingEngine"); impls.put(ID_MR_V2, "org.apache.kylin.engine.mr.MRBatchCubingEngine2"); - batchEngines = new ImplementationSwitch(impls); - + batchEngines = new ImplementationSwitch<IBatchCubingEngine>(impls, IBatchCubingEngine.class); + impls.clear(); - streamingEngines = new ImplementationSwitch(impls); // TODO + streamingEngines = new ImplementationSwitch<IStreamingCubingEngine>(impls, IStreamingCubingEngine.class); // TODO } - + public static IBatchCubingEngine batchEngine(IEngineAware aware) { - return batchEngines.get(aware.getEngineType(), IBatchCubingEngine.class); + return batchEngines.get(aware.getEngineType()); } - + public static IStreamingCubingEngine streamingEngine(IEngineAware aware) { - return streamingEngines.get(aware.getEngineType(), IStreamingCubingEngine.class); + return streamingEngines.get(aware.getEngineType()); } - + /** Build a new cube segment, typically its time range appends to the end of current cube. */ public static DefaultChainedExecutable createBatchCubingJob(CubeSegment newSegment, String submitter) { return batchEngine(newSegment).createBatchCubingJob(newSegment, submitter); @@ -58,7 +60,7 @@ public class EngineFactory { public static DefaultChainedExecutable createBatchMergeJob(CubeSegment mergeSegment, String submitter) { return batchEngine(mergeSegment).createBatchMergeJob(mergeSegment, submitter); } - + public static Runnable createStreamingCubingBuilder(CubeSegment seg) { return streamingEngine(seg).createStreamingCubingBuilder(seg); } http://git-wip-us.apache.org/repos/asf/incubator-kylin/blob/fabdd5cd/core-metadata/src/main/java/org/apache/kylin/metadata/filter/TupleFilter.java ---------------------------------------------------------------------- diff --git a/core-metadata/src/main/java/org/apache/kylin/metadata/filter/TupleFilter.java b/core-metadata/src/main/java/org/apache/kylin/metadata/filter/TupleFilter.java index c5bd3e0..e456ac1 100644 --- a/core-metadata/src/main/java/org/apache/kylin/metadata/filter/TupleFilter.java +++ b/core-metadata/src/main/java/org/apache/kylin/metadata/filter/TupleFilter.java @@ -115,6 +115,17 @@ public abstract class TupleFilter { throw new UnsupportedOperationException(); } + /** + * flatten to OR-AND filter, (A AND B AND ..) OR (C AND D AND ..) OR .. + * flatten filter will ONLY contain AND and OR , no NOT will exist. + * This will help to decide scan ranges. + * + * Notice that the flatten filter will ONLY be used for determining scan ranges, + * The filter that is later pushed down into storage level is still the ORIGINAL + * filter, since the flattened filter will be too "fat" to evaluate + * + * @return + */ public TupleFilter flatFilter() { return flattenInternal(this); } http://git-wip-us.apache.org/repos/asf/incubator-kylin/blob/fabdd5cd/core-metadata/src/main/java/org/apache/kylin/metadata/filter/TupleFilterSerializer.java ---------------------------------------------------------------------- diff --git a/core-metadata/src/main/java/org/apache/kylin/metadata/filter/TupleFilterSerializer.java b/core-metadata/src/main/java/org/apache/kylin/metadata/filter/TupleFilterSerializer.java index a9d785b..7404136 100644 --- a/core-metadata/src/main/java/org/apache/kylin/metadata/filter/TupleFilterSerializer.java +++ b/core-metadata/src/main/java/org/apache/kylin/metadata/filter/TupleFilterSerializer.java @@ -33,7 +33,7 @@ import org.apache.kylin.common.util.BytesUtil; */ public class TupleFilterSerializer { - public static interface Decorator { + public interface Decorator { TupleFilter onSerialize(TupleFilter filter); } @@ -69,20 +69,20 @@ public class TupleFilterSerializer { if (filter.hasChildren()) { // serialize filter+true - serializeFilter(1, filter, decorator, buffer, cs); + serializeFilter(1, filter, buffer, cs); // serialize children for (TupleFilter child : filter.getChildren()) { internalSerialize(child, decorator, buffer, cs); } // serialize none - serializeFilter(-1, filter, decorator, buffer, cs); + serializeFilter(-1, filter, buffer, cs); } else { // serialize filter+false - serializeFilter(0, filter, decorator, buffer, cs); + serializeFilter(0, filter, buffer, cs); } } - private static void serializeFilter(int flag, TupleFilter filter, Decorator decorator, ByteBuffer buffer, IFilterCodeSystem<?> cs) { + private static void serializeFilter(int flag, TupleFilter filter, ByteBuffer buffer, IFilterCodeSystem<?> cs) { if (flag < 0) { BytesUtil.writeVInt(-1, buffer); } else { http://git-wip-us.apache.org/repos/asf/incubator-kylin/blob/fabdd5cd/core-metadata/src/main/java/org/apache/kylin/source/SourceFactory.java ---------------------------------------------------------------------- diff --git a/core-metadata/src/main/java/org/apache/kylin/source/SourceFactory.java b/core-metadata/src/main/java/org/apache/kylin/source/SourceFactory.java index a771f5e..54068f6 100644 --- a/core-metadata/src/main/java/org/apache/kylin/source/SourceFactory.java +++ b/core-metadata/src/main/java/org/apache/kylin/source/SourceFactory.java @@ -18,25 +18,26 @@ package org.apache.kylin.source; +import static org.apache.kylin.metadata.model.ISourceAware.ID_HIVE; + import java.util.HashMap; import java.util.Map; import org.apache.kylin.common.util.ImplementationSwitch; import org.apache.kylin.metadata.model.ISourceAware; -import static org.apache.kylin.metadata.model.ISourceAware.*; import org.apache.kylin.metadata.model.TableDesc; public class SourceFactory { - private static ImplementationSwitch sources; + private static ImplementationSwitch<ISource> sources; static { Map<Integer, String> impls = new HashMap<>(); impls.put(ID_HIVE, "org.apache.kylin.source.hive.HiveSource"); - sources = new ImplementationSwitch(impls); + sources = new ImplementationSwitch<ISource>(impls, ISource.class); } public static ISource tableSource(ISourceAware aware) { - return sources.get(aware.getSourceType(), ISource.class); + return sources.get(aware.getSourceType()); } public static ReadableTable createReadableTable(TableDesc table) { http://git-wip-us.apache.org/repos/asf/incubator-kylin/blob/fabdd5cd/core-storage/src/main/java/org/apache/kylin/storage/StorageFactory.java ---------------------------------------------------------------------- diff --git a/core-storage/src/main/java/org/apache/kylin/storage/StorageFactory.java b/core-storage/src/main/java/org/apache/kylin/storage/StorageFactory.java index b26dfdb..271583c 100644 --- a/core-storage/src/main/java/org/apache/kylin/storage/StorageFactory.java +++ b/core-storage/src/main/java/org/apache/kylin/storage/StorageFactory.java @@ -18,7 +18,8 @@ package org.apache.kylin.storage; -import static org.apache.kylin.metadata.model.IStorageAware.*; +import static org.apache.kylin.metadata.model.IStorageAware.ID_HBASE; +import static org.apache.kylin.metadata.model.IStorageAware.ID_HYBRID; import java.util.HashMap; import java.util.Map; @@ -31,22 +32,22 @@ import org.apache.kylin.metadata.realization.IRealization; */ public class StorageFactory { - private static ImplementationSwitch storages; + private static ImplementationSwitch<IStorage> storages; static { Map<Integer, String> impls = new HashMap<>(); impls.put(ID_HBASE, "org.apache.kylin.storage.hbase.HBaseStorage"); impls.put(ID_HYBRID, "org.apache.kylin.storage.hybrid.HybridStorage"); - storages = new ImplementationSwitch(impls); + storages = new ImplementationSwitch<IStorage>(impls, IStorage.class); } - + public static IStorage storage(IStorageAware aware) { - return storages.get(aware.getStorageType(), IStorage.class); + return storages.get(aware.getStorageType()); } - + public static IStorageQuery createQuery(IRealization realization) { return storage(realization).createQuery(realization); } - + public static <T> T createEngineAdapter(IStorageAware aware, Class<T> engineInterface) { return storage(aware).adaptToBuildEngine(engineInterface); } http://git-wip-us.apache.org/repos/asf/incubator-kylin/blob/fabdd5cd/core-storage/src/main/java/org/apache/kylin/storage/hybrid/HybridInstance.java ---------------------------------------------------------------------- diff --git a/core-storage/src/main/java/org/apache/kylin/storage/hybrid/HybridInstance.java b/core-storage/src/main/java/org/apache/kylin/storage/hybrid/HybridInstance.java index 0c30a3c..4049fcd 100644 --- a/core-storage/src/main/java/org/apache/kylin/storage/hybrid/HybridInstance.java +++ b/core-storage/src/main/java/org/apache/kylin/storage/hybrid/HybridInstance.java @@ -163,7 +163,7 @@ public class HybridInstance extends RootPersistentEntity implements IRealization @Override public int getCost(SQLDigest digest) { - cost = 100; + cost = Integer.MAX_VALUE; for (IRealization realization : this.getRealizations()) { if (realization.isCapable(digest)) cost = Math.min(cost, realization.getCost(digest)); http://git-wip-us.apache.org/repos/asf/incubator-kylin/blob/fabdd5cd/core-storage/src/main/java/org/apache/kylin/storage/translate/FuzzyValueCombination.java ---------------------------------------------------------------------- diff --git a/core-storage/src/main/java/org/apache/kylin/storage/translate/FuzzyValueCombination.java b/core-storage/src/main/java/org/apache/kylin/storage/translate/FuzzyValueCombination.java index fbc6d19..1e05eb8 100644 --- a/core-storage/src/main/java/org/apache/kylin/storage/translate/FuzzyValueCombination.java +++ b/core-storage/src/main/java/org/apache/kylin/storage/translate/FuzzyValueCombination.java @@ -32,23 +32,21 @@ import com.google.common.collect.Lists; import com.google.common.collect.Maps; import com.google.common.collect.Sets; -/** - * @author yangli9 - * - */ public class FuzzyValueCombination { - private static class Dim { + private static class Dim<E> { TblColRef col; - Set<String> values; + Set<E> values; } - private static final Set<String> SINGLE_NULL_SET = Sets.newHashSet(); + private static final Set SINGLE_NULL_SET = Sets.newHashSet(); + static { SINGLE_NULL_SET.add(null); } - public static List<Map<TblColRef, String>> calculate(Map<TblColRef, Set<String>> fuzzyValues, long cap) { + public static <E> List<Map<TblColRef, E>> calculate(Map<TblColRef, Set<E>> fuzzyValues, long cap) { + Collections.emptyMap(); Dim[] dims = toDims(fuzzyValues); // If a query has many IN clause and each IN clause has many values, then it will easily generate // thousands of fuzzy keys. When there are lots of fuzzy keys, the scan performance is bottle necked @@ -61,9 +59,9 @@ public class FuzzyValueCombination { } @SuppressWarnings("unchecked") - private static List<Map<TblColRef, String>> combination(Dim[] dims) { + private static <E> List<Map<TblColRef, E>> combination(Dim[] dims) { - List<Map<TblColRef, String>> result = Lists.newArrayList(); + List<Map<TblColRef, E>> result = Lists.newArrayList(); int emptyDims = 0; for (Dim dim : dims) { @@ -76,8 +74,8 @@ public class FuzzyValueCombination { return result; } - Map<TblColRef, String> r = Maps.newHashMap(); - Iterator<String>[] iters = new Iterator[dims.length]; + Map<TblColRef, E> r = Maps.newHashMap(); + Iterator<E>[] iters = new Iterator[dims.length]; int level = 0; while (true) { Dim dim = dims[level]; @@ -85,7 +83,7 @@ public class FuzzyValueCombination { iters[level] = dim.values.iterator(); } - Iterator<String> it = iters[level]; + Iterator<E> it = iters[level]; if (it.hasNext() == false) { if (level == 0) break; @@ -97,7 +95,7 @@ public class FuzzyValueCombination { r.put(dim.col, it.next()); if (level == dims.length - 1) { - result.add(new HashMap<TblColRef, String>(r)); + result.add(new HashMap<TblColRef, E>(r)); } else { level++; } @@ -105,10 +103,10 @@ public class FuzzyValueCombination { return result; } - private static Dim[] toDims(Map<TblColRef, Set<String>> fuzzyValues) { + private static <E> Dim[] toDims(Map<TblColRef, Set<E>> fuzzyValues) { Dim[] dims = new Dim[fuzzyValues.size()]; int i = 0; - for (Entry<TblColRef, Set<String>> entry : fuzzyValues.entrySet()) { + for (Entry<TblColRef, Set<E>> entry : fuzzyValues.entrySet()) { dims[i] = new Dim(); dims[i].col = entry.getKey(); dims[i].values = entry.getValue(); http://git-wip-us.apache.org/repos/asf/incubator-kylin/blob/fabdd5cd/core-storage/src/main/java/org/apache/kylin/storage/translate/HBaseKeyRange.java ---------------------------------------------------------------------- diff --git a/core-storage/src/main/java/org/apache/kylin/storage/translate/HBaseKeyRange.java b/core-storage/src/main/java/org/apache/kylin/storage/translate/HBaseKeyRange.java index 47553ad..bdcd257 100644 --- a/core-storage/src/main/java/org/apache/kylin/storage/translate/HBaseKeyRange.java +++ b/core-storage/src/main/java/org/apache/kylin/storage/translate/HBaseKeyRange.java @@ -119,7 +119,8 @@ public class HBaseKeyRange implements Comparable<HBaseKeyRange> { } AbstractRowKeyEncoder encoder = AbstractRowKeyEncoder.createInstance(cubeSeg, cuboid); - + encoder.setEncodeShard(false);//will enumerate all possible shards when scanning + encoder.setBlankByte(RowConstants.ROWKEY_LOWER_BYTE); this.startKey = encoder.encode(startValues); @@ -133,7 +134,8 @@ public class HBaseKeyRange implements Comparable<HBaseKeyRange> { // restore encoder defaults for later reuse (note // AbstractRowKeyEncoder.createInstance() caches instances) encoder.setBlankByte(AbstractRowKeyEncoder.DEFAULT_BLANK_BYTE); - + + encoder.setEncodeShard(true); // always fuzzy match cuboid ID to lock on the selected cuboid this.fuzzyKeys = buildFuzzyKeys(fuzzyValues); } http://git-wip-us.apache.org/repos/asf/incubator-kylin/blob/fabdd5cd/dev-support/test_all.sh ---------------------------------------------------------------------- diff --git a/dev-support/test_all.sh b/dev-support/test_all.sh new file mode 100644 index 0000000..6a7b887 --- /dev/null +++ b/dev-support/test_all.sh @@ -0,0 +1,11 @@ +#!/bin/bash + +dir=$(dirname ${0}) +cd ${dir} +cd .. + +mvn clean install -DskipTests | tee mci.log +mvn test -Dtest=org.apache.kylin.job.BuildCubeWithEngineTest -DfailIfNoTests=false -P sandbox | tee BuildCubeWithEngineTest.log +mvn test -Dtest=org.apache.kylin.job.BuildIIWithStreamTest -DfailIfNoTests=false -P sandbox | tee BuildIIWithStreamTest.log +mvn test -Dtest=org.apache.kylin.job.BuildCubeWithStreamTest -DfailIfNoTests=false -P sandbox | tee BuildCubeWithStreamTest.log +mvn verify -fae -P sandbox | tee mvnverify.log \ No newline at end of file http://git-wip-us.apache.org/repos/asf/incubator-kylin/blob/fabdd5cd/engine-mr/src/main/java/org/apache/kylin/engine/mr/common/CuboidShardUtil.java ---------------------------------------------------------------------- diff --git a/engine-mr/src/main/java/org/apache/kylin/engine/mr/common/CuboidShardUtil.java b/engine-mr/src/main/java/org/apache/kylin/engine/mr/common/CuboidShardUtil.java new file mode 100644 index 0000000..d09e4ec --- /dev/null +++ b/engine-mr/src/main/java/org/apache/kylin/engine/mr/common/CuboidShardUtil.java @@ -0,0 +1,56 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.kylin.engine.mr.common; + +import java.io.IOException; +import java.util.Map; + +import org.apache.kylin.common.KylinConfig; +import org.apache.kylin.cube.CubeManager; +import org.apache.kylin.cube.CubeSegment; +import org.apache.kylin.cube.CubeUpdate; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import com.google.common.collect.Maps; + +public class CuboidShardUtil { + protected static final Logger logger = LoggerFactory.getLogger(CuboidShardUtil.class); + + public static void saveCuboidShards(CubeSegment segment, Map<Long, Short> cuboidShards, int totalShards) throws IOException { + CubeManager cubeManager = CubeManager.getInstance(KylinConfig.getInstanceFromEnv()); + + Map<Long, Short> filered = Maps.newHashMap(); + for (Map.Entry<Long, Short> entry : cuboidShards.entrySet()) { + if (entry.getValue() <= 1) { + logger.info("Cuboid {} has {} shards, skip saving it as an optimization", entry.getKey(), entry.getValue()); + } else { + logger.info("Cuboid {} has {} shards, saving it", entry.getKey(), entry.getValue()); + filered.put(entry.getKey(), entry.getValue()); + } + } + + segment.setCuboidShardNums(filered); + segment.setTotalShards(totalShards); + + CubeUpdate cubeBuilder = new CubeUpdate(segment.getCubeInstance()); + cubeBuilder.setToUpdateSegs(segment); + cubeManager.updateCube(cubeBuilder); + } +} http://git-wip-us.apache.org/repos/asf/incubator-kylin/blob/fabdd5cd/engine-mr/src/main/java/org/apache/kylin/engine/mr/common/CuboidStatsUtil.java ---------------------------------------------------------------------- diff --git a/engine-mr/src/main/java/org/apache/kylin/engine/mr/common/CuboidStatsUtil.java b/engine-mr/src/main/java/org/apache/kylin/engine/mr/common/CuboidStatsUtil.java new file mode 100644 index 0000000..10c82c3 --- /dev/null +++ b/engine-mr/src/main/java/org/apache/kylin/engine/mr/common/CuboidStatsUtil.java @@ -0,0 +1,61 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.kylin.engine.mr.common; + +import java.io.IOException; +import java.nio.ByteBuffer; +import java.util.ArrayList; +import java.util.Collections; +import java.util.List; +import java.util.Map; + +import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.fs.Path; +import org.apache.hadoop.io.BytesWritable; +import org.apache.hadoop.io.LongWritable; +import org.apache.hadoop.io.SequenceFile; +import org.apache.kylin.common.hll.HyperLogLogPlusCounter; +import org.apache.kylin.common.util.Bytes; +import org.apache.kylin.cube.kv.RowConstants; + +public class CuboidStatsUtil { + + public static void writeCuboidStatistics(Configuration conf, Path outputPath, Map<Long, HyperLogLogPlusCounter> cuboidHLLMap, int samplingPercentage) throws IOException { + Path seqFilePath = new Path(outputPath, BatchConstants.CFG_STATISTICS_CUBOID_ESTIMATION); + SequenceFile.Writer writer = SequenceFile.createWriter(conf, SequenceFile.Writer.file(seqFilePath), SequenceFile.Writer.keyClass(LongWritable.class), SequenceFile.Writer.valueClass(BytesWritable.class)); + + List<Long> allCuboids = new ArrayList<Long>(); + allCuboids.addAll(cuboidHLLMap.keySet()); + Collections.sort(allCuboids); + + // persist the sample percentage with key 0 + writer.append(new LongWritable(0l), new BytesWritable(Bytes.toBytes(samplingPercentage))); + ByteBuffer valueBuf = ByteBuffer.allocate(RowConstants.ROWVALUE_BUFFER_SIZE); + try { + for (long i : allCuboids) { + valueBuf.clear(); + cuboidHLLMap.get(i).writeRegisters(valueBuf); + valueBuf.flip(); + writer.append(new LongWritable(i), new BytesWritable(valueBuf.array(), valueBuf.limit())); + } + } finally { + writer.close(); + } + } +} http://git-wip-us.apache.org/repos/asf/incubator-kylin/blob/fabdd5cd/engine-mr/src/main/java/org/apache/kylin/engine/mr/steps/FactDistinctColumnsReducer.java ---------------------------------------------------------------------- diff --git a/engine-mr/src/main/java/org/apache/kylin/engine/mr/steps/FactDistinctColumnsReducer.java b/engine-mr/src/main/java/org/apache/kylin/engine/mr/steps/FactDistinctColumnsReducer.java index 85312ff..568dd77 100644 --- a/engine-mr/src/main/java/org/apache/kylin/engine/mr/steps/FactDistinctColumnsReducer.java +++ b/engine-mr/src/main/java/org/apache/kylin/engine/mr/steps/FactDistinctColumnsReducer.java @@ -47,6 +47,7 @@ import org.apache.kylin.cube.model.CubeDesc; import org.apache.kylin.engine.mr.KylinReducer; import org.apache.kylin.engine.mr.common.AbstractHadoopJob; import org.apache.kylin.engine.mr.common.BatchConstants; +import org.apache.kylin.engine.mr.common.CuboidStatsUtil; import org.apache.kylin.metadata.model.TblColRef; import com.google.common.collect.Lists; @@ -144,7 +145,7 @@ public class FactDistinctColumnsReducer extends KylinReducer<LongWritable, Text, //output the hll info; if (collectStatistics) { writeMapperAndCuboidStatistics(context); // for human check - writeCuboidStatistics(context.getConfiguration(), new Path(statisticsOutput), cuboidHLLMap, SAMPING_PERCENTAGE); // for CreateHTableJob + CuboidStatsUtil.writeCuboidStatistics(context.getConfiguration(), new Path(statisticsOutput), cuboidHLLMap, SAMPING_PERCENTAGE); // for CreateHTableJob } } @@ -202,27 +203,4 @@ public class FactDistinctColumnsReducer extends KylinReducer<LongWritable, Text, } - public static void writeCuboidStatistics(Configuration conf, Path outputPath, Map<Long, HyperLogLogPlusCounter> cuboidHLLMap, int samplingPercentage) throws IOException { - Path seqFilePath = new Path(outputPath, BatchConstants.CFG_STATISTICS_CUBOID_ESTIMATION); - SequenceFile.Writer writer = SequenceFile.createWriter(conf, SequenceFile.Writer.file(seqFilePath), SequenceFile.Writer.keyClass(LongWritable.class), SequenceFile.Writer.valueClass(BytesWritable.class)); - - List<Long> allCuboids = new ArrayList<Long>(); - allCuboids.addAll(cuboidHLLMap.keySet()); - Collections.sort(allCuboids); - - // persist the sample percentage with key 0 - writer.append(new LongWritable(0l), new BytesWritable(Bytes.toBytes(samplingPercentage))); - ByteBuffer valueBuf = ByteBuffer.allocate(RowConstants.ROWVALUE_BUFFER_SIZE); - try { - for (long i : allCuboids) { - valueBuf.clear(); - cuboidHLLMap.get(i).writeRegisters(valueBuf); - valueBuf.flip(); - writer.append(new LongWritable(i), new BytesWritable(valueBuf.array(), valueBuf.limit())); - } - } finally { - writer.close(); - } - - } } http://git-wip-us.apache.org/repos/asf/incubator-kylin/blob/fabdd5cd/engine-mr/src/main/java/org/apache/kylin/engine/mr/steps/MapContextGTRecordWriter.java ---------------------------------------------------------------------- diff --git a/engine-mr/src/main/java/org/apache/kylin/engine/mr/steps/MapContextGTRecordWriter.java b/engine-mr/src/main/java/org/apache/kylin/engine/mr/steps/MapContextGTRecordWriter.java index 7bb2e16..4c743fb 100644 --- a/engine-mr/src/main/java/org/apache/kylin/engine/mr/steps/MapContextGTRecordWriter.java +++ b/engine-mr/src/main/java/org/apache/kylin/engine/mr/steps/MapContextGTRecordWriter.java @@ -7,7 +7,8 @@ import java.util.BitSet; import org.apache.commons.logging.Log; import org.apache.commons.logging.LogFactory; import org.apache.hadoop.mapreduce.MapContext; -import org.apache.kylin.common.util.Bytes; +import org.apache.kylin.common.util.BytesUtil; +import org.apache.kylin.common.util.ShardingHash; import org.apache.kylin.cube.CubeSegment; import org.apache.kylin.cube.cuboid.Cuboid; import org.apache.kylin.cube.inmemcubing.ICuboidWriter; @@ -37,12 +38,13 @@ public class MapContextGTRecordWriter implements ICuboidWriter { private ByteArrayWritable outputValue = new ByteArrayWritable(); private long cuboidRowCount = 0; + //for shard + public MapContextGTRecordWriter(MapContext<?, ?, ByteArrayWritable, ByteArrayWritable> mapContext, CubeDesc cubeDesc, CubeSegment cubeSegment) { this.mapContext = mapContext; this.cubeDesc = cubeDesc; this.cubeSegment = cubeSegment; this.measureCount = cubeDesc.getMeasures().size(); - } @Override @@ -59,12 +61,20 @@ public class MapContextGTRecordWriter implements ICuboidWriter { } cuboidRowCount++; - int offSet = RowConstants.ROWKEY_CUBOIDID_LEN; + int header = RowConstants.ROWKEY_HEADER_LEN; + int offSet = header; for (int x = 0; x < dimensions; x++) { System.arraycopy(record.get(x).array(), record.get(x).offset(), keyBuf, offSet, record.get(x).length()); offSet += record.get(x).length(); } + //fill shard + short cuboidShardNum = cubeSegment.getCuboidShardNum(cuboidId); + short shardOffset = ShardingHash.getShard(keyBuf, header, offSet - header, cuboidShardNum); + short cuboidShardBase = cubeSegment.getCuboidBaseShard(cuboidId); + short finalShard = ShardingHash.normalize(cuboidShardBase, shardOffset, cubeSegment.getTotalShards()); + BytesUtil.writeShort(finalShard, keyBuf, 0, RowConstants.ROWKEY_SHARDID_LEN); + //output measures valueBuf.clear(); record.exportColumns(measureColumnsIndex, valueBuf); @@ -89,7 +99,7 @@ public class MapContextGTRecordWriter implements ICuboidWriter { } private void initVariables(Long cuboidId) { - bytesLength = RowConstants.ROWKEY_CUBOIDID_LEN; + bytesLength = RowConstants.ROWKEY_HEADER_LEN; Cuboid cuboid = Cuboid.findById(cubeDesc, cuboidId); for (TblColRef column : cuboid.getColumns()) { bytesLength += cubeSegment.getColumnLength(column); @@ -102,6 +112,7 @@ public class MapContextGTRecordWriter implements ICuboidWriter { measureColumnsIndex[i] = dimensions + i; } - System.arraycopy(Bytes.toBytes(cuboidId), 0, keyBuf, 0, RowConstants.ROWKEY_CUBOIDID_LEN); + //write cuboid id first + BytesUtil.writeLong(cuboidId, keyBuf, RowConstants.ROWKEY_SHARDID_LEN, RowConstants.ROWKEY_CUBOIDID_LEN); } } http://git-wip-us.apache.org/repos/asf/incubator-kylin/blob/fabdd5cd/engine-mr/src/main/java/org/apache/kylin/engine/mr/steps/MergeCuboidFromStorageMapper.java ---------------------------------------------------------------------- diff --git a/engine-mr/src/main/java/org/apache/kylin/engine/mr/steps/MergeCuboidFromStorageMapper.java b/engine-mr/src/main/java/org/apache/kylin/engine/mr/steps/MergeCuboidFromStorageMapper.java index 4598673..9b25c97 100644 --- a/engine-mr/src/main/java/org/apache/kylin/engine/mr/steps/MergeCuboidFromStorageMapper.java +++ b/engine-mr/src/main/java/org/apache/kylin/engine/mr/steps/MergeCuboidFromStorageMapper.java @@ -124,11 +124,16 @@ public class MergeCuboidFromStorageMapper extends KylinMapper<Object, Object, By Preconditions.checkState(key.offset() == 0); - long cuboidID = rowKeySplitter.split(key.array(), key.length()); + long cuboidID = rowKeySplitter.split(key.array()); + short shard = rowKeySplitter.getLastSplittedShard(); Cuboid cuboid = Cuboid.findById(cubeDesc, cuboidID); SplittedBytes[] splittedByteses = rowKeySplitter.getSplitBuffers(); int bufOffset = 0; + + BytesUtil.writeShort(shard, newKeyBuf, bufOffset, RowConstants.ROWKEY_SHARDID_LEN); + bufOffset += RowConstants.ROWKEY_SHARDID_LEN; + BytesUtil.writeLong(cuboidID, newKeyBuf, bufOffset, RowConstants.ROWKEY_CUBOIDID_LEN); bufOffset += RowConstants.ROWKEY_CUBOIDID_LEN; http://git-wip-us.apache.org/repos/asf/incubator-kylin/blob/fabdd5cd/engine-mr/src/main/java/org/apache/kylin/engine/mr/steps/MergeCuboidMapper.java ---------------------------------------------------------------------- diff --git a/engine-mr/src/main/java/org/apache/kylin/engine/mr/steps/MergeCuboidMapper.java b/engine-mr/src/main/java/org/apache/kylin/engine/mr/steps/MergeCuboidMapper.java index 45f0d32..6301f3d 100644 --- a/engine-mr/src/main/java/org/apache/kylin/engine/mr/steps/MergeCuboidMapper.java +++ b/engine-mr/src/main/java/org/apache/kylin/engine/mr/steps/MergeCuboidMapper.java @@ -103,7 +103,7 @@ public class MergeCuboidMapper extends KylinMapper<Text, Text, Text, Text> { rowKeySplitter = new RowKeySplitter(sourceCubeSegment, 65, 255); } - + private static final Pattern JOB_NAME_PATTERN = Pattern.compile("kylin-([0-9a-f]{8}-[0-9a-f]{4}-[0-9a-f]{4}-[0-9a-f]{4}-[0-9a-f]{12})"); public static CubeSegment findSourceSegment(FileSplit fileSplit, CubeInstance cube) { @@ -111,7 +111,7 @@ public class MergeCuboidMapper extends KylinMapper<Text, Text, Text, Text> { String jobID = extractJobIDFromPath(filePath); return findSegmentWithUuid(jobID, cube); } - + private static String extractJobIDFromPath(String path) { Matcher matcher = JOB_NAME_PATTERN.matcher(path); // check the first occurrence @@ -134,11 +134,14 @@ public class MergeCuboidMapper extends KylinMapper<Text, Text, Text, Text> { @Override public void map(Text key, Text value, Context context) throws IOException, InterruptedException { - long cuboidID = rowKeySplitter.split(key.getBytes(), key.getBytes().length); + long cuboidID = rowKeySplitter.split(key.getBytes()); + short shard = rowKeySplitter.getLastSplittedShard(); Cuboid cuboid = Cuboid.findById(cubeDesc, cuboidID); SplittedBytes[] splittedByteses = rowKeySplitter.getSplitBuffers(); int bufOffset = 0; + BytesUtil.writeShort(shard, newKeyBuf, bufOffset, RowConstants.ROWKEY_SHARDID_LEN); + bufOffset += RowConstants.ROWKEY_SHARDID_LEN; BytesUtil.writeLong(cuboidID, newKeyBuf, bufOffset, RowConstants.ROWKEY_CUBOIDID_LEN); bufOffset += RowConstants.ROWKEY_CUBOIDID_LEN; http://git-wip-us.apache.org/repos/asf/incubator-kylin/blob/fabdd5cd/engine-mr/src/main/java/org/apache/kylin/engine/mr/steps/MergeStatisticsStep.java ---------------------------------------------------------------------- diff --git a/engine-mr/src/main/java/org/apache/kylin/engine/mr/steps/MergeStatisticsStep.java b/engine-mr/src/main/java/org/apache/kylin/engine/mr/steps/MergeStatisticsStep.java index 5e935eb..67c4416 100644 --- a/engine-mr/src/main/java/org/apache/kylin/engine/mr/steps/MergeStatisticsStep.java +++ b/engine-mr/src/main/java/org/apache/kylin/engine/mr/steps/MergeStatisticsStep.java @@ -47,6 +47,7 @@ import org.apache.kylin.cube.CubeManager; import org.apache.kylin.cube.CubeSegment; import org.apache.kylin.engine.mr.HadoopUtil; import org.apache.kylin.engine.mr.common.BatchConstants; +import org.apache.kylin.engine.mr.common.CuboidStatsUtil; import org.apache.kylin.job.exception.ExecuteException; import org.apache.kylin.job.execution.AbstractExecutable; import org.apache.kylin.job.execution.ExecutableContext; @@ -126,7 +127,7 @@ public class MergeStatisticsStep extends AbstractExecutable { } } averageSamplingPercentage = averageSamplingPercentage / this.getMergingSegmentIds().size(); - FactDistinctColumnsReducer.writeCuboidStatistics(conf, new Path(getMergedStatisticsPath()), cuboidHLLMap, averageSamplingPercentage); + CuboidStatsUtil.writeCuboidStatistics(conf, new Path(getMergedStatisticsPath()), cuboidHLLMap, averageSamplingPercentage); Path statisticsFilePath = new Path(getMergedStatisticsPath(), BatchConstants.CFG_STATISTICS_CUBOID_ESTIMATION); FileSystem fs = statisticsFilePath.getFileSystem(conf); FSDataInputStream is = fs.open(statisticsFilePath); http://git-wip-us.apache.org/repos/asf/incubator-kylin/blob/fabdd5cd/engine-mr/src/main/java/org/apache/kylin/engine/mr/steps/NDCuboidJob.java ---------------------------------------------------------------------- diff --git a/engine-mr/src/main/java/org/apache/kylin/engine/mr/steps/NDCuboidJob.java b/engine-mr/src/main/java/org/apache/kylin/engine/mr/steps/NDCuboidJob.java index 40c4dd7..dc8fb3f 100644 --- a/engine-mr/src/main/java/org/apache/kylin/engine/mr/steps/NDCuboidJob.java +++ b/engine-mr/src/main/java/org/apache/kylin/engine/mr/steps/NDCuboidJob.java @@ -20,11 +20,6 @@ package org.apache.kylin.engine.mr.steps; import org.apache.hadoop.util.ToolRunner; -/** - * @author George Song (ysong1) - * - */ - public class NDCuboidJob extends CuboidJob { public NDCuboidJob() { http://git-wip-us.apache.org/repos/asf/incubator-kylin/blob/fabdd5cd/engine-mr/src/main/java/org/apache/kylin/engine/mr/steps/NDCuboidMapper.java ---------------------------------------------------------------------- diff --git a/engine-mr/src/main/java/org/apache/kylin/engine/mr/steps/NDCuboidMapper.java b/engine-mr/src/main/java/org/apache/kylin/engine/mr/steps/NDCuboidMapper.java index c47d090..2180dd6 100644 --- a/engine-mr/src/main/java/org/apache/kylin/engine/mr/steps/NDCuboidMapper.java +++ b/engine-mr/src/main/java/org/apache/kylin/engine/mr/steps/NDCuboidMapper.java @@ -23,6 +23,8 @@ import java.util.Collection; import org.apache.hadoop.io.Text; import org.apache.kylin.common.KylinConfig; +import org.apache.kylin.common.util.BytesUtil; +import org.apache.kylin.common.util.ShardingHash; import org.apache.kylin.common.util.SplittedBytes; import org.apache.kylin.cube.CubeInstance; import org.apache.kylin.cube.CubeManager; @@ -30,6 +32,7 @@ import org.apache.kylin.cube.CubeSegment; import org.apache.kylin.cube.common.RowKeySplitter; import org.apache.kylin.cube.cuboid.Cuboid; import org.apache.kylin.cube.cuboid.CuboidScheduler; +import org.apache.kylin.cube.kv.RowConstants; import org.apache.kylin.cube.model.CubeDesc; import org.apache.kylin.engine.mr.KylinMapper; import org.apache.kylin.engine.mr.common.AbstractHadoopJob; @@ -49,6 +52,7 @@ public class NDCuboidMapper extends KylinMapper<Text, Text, Text, Text> { private Text outputKey = new Text(); private String cubeName; private String segmentName; + private CubeSegment cubeSegment; private CubeDesc cubeDesc; private CuboidScheduler cuboidScheduler; @@ -68,7 +72,7 @@ public class NDCuboidMapper extends KylinMapper<Text, Text, Text, Text> { KylinConfig config = AbstractHadoopJob.loadKylinPropsAndMetadata(); CubeInstance cube = CubeManager.getInstance(config).getCube(cubeName); - CubeSegment cubeSegment = cube.getSegment(segmentName, SegmentStatusEnum.NEW); + cubeSegment = cube.getSegment(segmentName, SegmentStatusEnum.NEW); cubeDesc = cube.getDescriptor(); // initialize CubiodScheduler @@ -80,16 +84,21 @@ public class NDCuboidMapper extends KylinMapper<Text, Text, Text, Text> { private int buildKey(Cuboid parentCuboid, Cuboid childCuboid, SplittedBytes[] splitBuffers) { int offset = 0; + //shard id will be filled after other contents + offset += RowConstants.ROWKEY_SHARDID_LEN; + // cuboid id System.arraycopy(childCuboid.getBytes(), 0, keyBuf, offset, childCuboid.getBytes().length); - offset += childCuboid.getBytes().length; + offset += RowConstants.ROWKEY_CUBOIDID_LEN; + + int bodyOffset = offset; // rowkey columns long mask = Long.highestOneBit(parentCuboid.getId()); long parentCuboidId = parentCuboid.getId(); long childCuboidId = childCuboid.getId(); long parentCuboidIdActualLength = Long.SIZE - Long.numberOfLeadingZeros(parentCuboid.getId()); - int index = 1; // skip cuboidId + int index = 2; // skip shard and cuboidId for (int i = 0; i < parentCuboidIdActualLength; i++) { if ((mask & parentCuboidId) > 0) {// if the this bit position equals // 1 @@ -103,12 +112,18 @@ public class NDCuboidMapper extends KylinMapper<Text, Text, Text, Text> { mask = mask >> 1; } + //fill shard + short cuboidShardNum = cubeSegment.getCuboidShardNum(childCuboidId); + short shardOffset = ShardingHash.getShard(keyBuf, bodyOffset, offset - bodyOffset, cuboidShardNum); + short finalShard = ShardingHash.normalize(cubeSegment.getCuboidBaseShard(childCuboidId), shardOffset, cubeSegment.getTotalShards()); + BytesUtil.writeShort(finalShard, keyBuf, 0, RowConstants.ROWKEY_SHARDID_LEN); + return offset; } @Override public void map(Text key, Text value, Context context) throws IOException, InterruptedException { - long cuboidId = rowKeySplitter.split(key.getBytes(), key.getLength()); + long cuboidId = rowKeySplitter.split(key.getBytes()); Cuboid parentCuboid = Cuboid.findById(cubeDesc, cuboidId); Collection<Long> myChildren = cuboidScheduler.getSpanningCuboid(cuboidId); http://git-wip-us.apache.org/repos/asf/incubator-kylin/blob/fabdd5cd/engine-mr/src/test/java/org/apache/kylin/engine/mr/steps/FactDistinctColumnsReducerTest.java ---------------------------------------------------------------------- diff --git a/engine-mr/src/test/java/org/apache/kylin/engine/mr/steps/FactDistinctColumnsReducerTest.java b/engine-mr/src/test/java/org/apache/kylin/engine/mr/steps/FactDistinctColumnsReducerTest.java index 165bc13..5f2f100 100644 --- a/engine-mr/src/test/java/org/apache/kylin/engine/mr/steps/FactDistinctColumnsReducerTest.java +++ b/engine-mr/src/test/java/org/apache/kylin/engine/mr/steps/FactDistinctColumnsReducerTest.java @@ -9,6 +9,7 @@ import org.apache.hadoop.fs.FileSystem; import org.apache.hadoop.fs.Path; import org.apache.kylin.common.hll.HyperLogLogPlusCounter; import org.apache.kylin.engine.mr.HadoopUtil; +import org.apache.kylin.engine.mr.common.CuboidStatsUtil; import org.junit.Test; import com.google.common.collect.Maps; @@ -28,7 +29,7 @@ public class FactDistinctColumnsReducerTest { System.out.println(outputPath); Map<Long, HyperLogLogPlusCounter> cuboidHLLMap = Maps.newHashMap(); - FactDistinctColumnsReducer.writeCuboidStatistics(conf, outputPath, cuboidHLLMap, 100); + CuboidStatsUtil.writeCuboidStatistics(conf, outputPath, cuboidHLLMap, 100); FileSystem.getLocal(conf).delete(outputPath, true); } http://git-wip-us.apache.org/repos/asf/incubator-kylin/blob/fabdd5cd/engine-mr/src/test/java/org/apache/kylin/engine/mr/steps/NDCuboidMapperTest.java ---------------------------------------------------------------------- diff --git a/engine-mr/src/test/java/org/apache/kylin/engine/mr/steps/NDCuboidMapperTest.java b/engine-mr/src/test/java/org/apache/kylin/engine/mr/steps/NDCuboidMapperTest.java index efcb2ba..9e1fc2d 100644 --- a/engine-mr/src/test/java/org/apache/kylin/engine/mr/steps/NDCuboidMapperTest.java +++ b/engine-mr/src/test/java/org/apache/kylin/engine/mr/steps/NDCuboidMapperTest.java @@ -35,6 +35,7 @@ import org.apache.hadoop.mrunit.mapreduce.MapReduceDriver; import org.apache.hadoop.mrunit.types.Pair; import org.apache.kylin.common.util.Bytes; import org.apache.kylin.common.util.LocalFileMetadataTestCase; +import org.apache.kylin.cube.kv.RowConstants; import org.apache.kylin.engine.mr.common.BatchConstants; import org.junit.After; import org.junit.Before; @@ -73,7 +74,7 @@ public class NDCuboidMapperTest extends LocalFileMetadataTestCase { mapReduceDriver.getConfiguration().set(BatchConstants.CFG_CUBE_NAME, cubeName); mapReduceDriver.getConfiguration().set(BatchConstants.CFG_CUBE_SEGMENT_NAME, segmentName); - byte[] key = { 0, 0, 0, 0, 0, 0, 1, -1, 49, 48, 48, 48, 48, 48, 48, 48, 9, 9, 9, 9, 9, 9, 9, 9, 9, 9, 11, 54, -105, 55, 13, 71, 114, 65, 66, 73, 78, 9, 9, 9, 9, 9, 9, 9, 9, 0, 10, 0 }; + byte[] key = { 0,0,0, 0, 0, 0, 0, 0, 1, -1, 49, 48, 48, 48, 48, 48, 48, 48, 9, 9, 9, 9, 9, 9, 9, 9, 9, 9, 11, 54, -105, 55, 13, 71, 114, 65, 66, 73, 78, 9, 9, 9, 9, 9, 9, 9, 9, 0, 10, 0 }; byte[] value = { 14, 7, 23, -16, 56, 92, 114, -80, 118, 14, 7, 23, -16, 56, 92, 114, -80, 118, 14, 7, 23, -16, 56, 92, 114, -80, 118, 1, 1 }; Pair<Text, Text> input1 = new Pair<Text, Text>(new Text(key), new Text(value)); @@ -83,7 +84,7 @@ public class NDCuboidMapperTest extends LocalFileMetadataTestCase { assertEquals(4, result.size()); - byte[] resultKey = { 0, 0, 0, 0, 0, 0, 1, 127, 49, 48, 48, 48, 48, 48, 48, 48, 9, 9, 9, 9, 9, 9, 9, 9, 9, 9, 55, 13, 71, 114, 65, 66, 73, 78, 9, 9, 9, 9, 9, 9, 9, 9, 0, 10, 0 }; + byte[] resultKey = { 0,0,0, 0, 0, 0, 0, 0, 1, 127, 49, 48, 48, 48, 48, 48, 48, 48, 9, 9, 9, 9, 9, 9, 9, 9, 9, 9, 55, 13, 71, 114, 65, 66, 73, 78, 9, 9, 9, 9, 9, 9, 9, 9, 0, 10, 0 }; byte[] resultValue = { 14, 7, 23, -16, 56, 92, 114, -80, 118, 14, 7, 23, -16, 56, 92, 114, -80, 118, 14, 7, 23, -16, 56, 92, 114, -80, 118, 1, 1 }; Pair<Text, Text> output1 = new Pair<Text, Text>(new Text(resultKey), new Text(resultValue)); @@ -103,7 +104,7 @@ public class NDCuboidMapperTest extends LocalFileMetadataTestCase { System.out.println(Bytes.toLong(new byte[] { 0, 0, 0, 0, 0, 0, 1, -1 })); for (int i = 0; i < result.size(); i++) { byte[] bytes = new byte[result.get(i).getFirst().getLength()]; - System.arraycopy(result.get(i).getFirst().getBytes(), 0, bytes, 0, result.get(i).getFirst().getLength()); + System.arraycopy(result.get(i).getFirst().getBytes(), RowConstants.ROWKEY_SHARDID_LEN, bytes, 0, result.get(i).getFirst().getLength()-RowConstants.ROWKEY_SHARDID_LEN); System.out.println(Bytes.toLong(bytes)); keySet[i] = Bytes.toLong(bytes); } http://git-wip-us.apache.org/repos/asf/incubator-kylin/blob/fabdd5cd/engine-mr/src/test/resources/data/8d_cuboid/part-r-00000 ---------------------------------------------------------------------- diff --git a/engine-mr/src/test/resources/data/8d_cuboid/part-r-00000 b/engine-mr/src/test/resources/data/8d_cuboid/part-r-00000 old mode 100644 new mode 100755 index d277125..c412e3a Binary files a/engine-mr/src/test/resources/data/8d_cuboid/part-r-00000 and b/engine-mr/src/test/resources/data/8d_cuboid/part-r-00000 differ http://git-wip-us.apache.org/repos/asf/incubator-kylin/blob/fabdd5cd/engine-mr/src/test/resources/data/base_cuboid/part-r-00000 ---------------------------------------------------------------------- diff --git a/engine-mr/src/test/resources/data/base_cuboid/part-r-00000 b/engine-mr/src/test/resources/data/base_cuboid/part-r-00000 old mode 100644 new mode 100755 index ed53ffb..9ade717 Binary files a/engine-mr/src/test/resources/data/base_cuboid/part-r-00000 and b/engine-mr/src/test/resources/data/base_cuboid/part-r-00000 differ http://git-wip-us.apache.org/repos/asf/incubator-kylin/blob/fabdd5cd/engine-spark/src/main/java/org/apache/kylin/engine/spark/SparkCubing.java ---------------------------------------------------------------------- diff --git a/engine-spark/src/main/java/org/apache/kylin/engine/spark/SparkCubing.java b/engine-spark/src/main/java/org/apache/kylin/engine/spark/SparkCubing.java index 7296fec..70b62c0 100644 --- a/engine-spark/src/main/java/org/apache/kylin/engine/spark/SparkCubing.java +++ b/engine-spark/src/main/java/org/apache/kylin/engine/spark/SparkCubing.java @@ -298,7 +298,7 @@ public class SparkCubing extends AbstractSparkApplication { private String build(JavaRDD<List<String>> javaRDD, final String cubeName, final String segmentId, final byte[][] splitKeys) throws Exception { CubeInstance cubeInstance = CubeManager.getInstance(KylinConfig.getInstanceFromEnv()).getCube(cubeName); CubeDesc cubeDesc = cubeInstance.getDescriptor(); - CubeSegment cubeSegment = cubeInstance.getSegmentById(segmentId); + final CubeSegment cubeSegment = cubeInstance.getSegmentById(segmentId); List<TblColRef> baseCuboidColumn = Cuboid.findById(cubeDesc, Cuboid.getBaseCuboidId(cubeDesc)).getColumns(); final Map<TblColRef, Integer> columnLengthMap = Maps.newHashMap(); for (TblColRef tblColRef : baseCuboidColumn) { @@ -341,7 +341,7 @@ public class SparkCubing extends AbstractSparkApplication { LinkedBlockingQueue<List<String>> blockingQueue = new LinkedBlockingQueue(); System.out.println("load properties finished"); AbstractInMemCubeBuilder inMemCubeBuilder = new DoggedCubeBuilder(cubeInstance.getDescriptor(), dictionaryMap); - final SparkCuboidWriter sparkCuboidWriter = new BufferedCuboidWriter(new DefaultTupleConverter(cubeDesc, columnLengthMap)); + final SparkCuboidWriter sparkCuboidWriter = new BufferedCuboidWriter(new DefaultTupleConverter(cubeSegment, columnLengthMap)); Executors.newCachedThreadPool().submit(inMemCubeBuilder.buildAsRunnable(blockingQueue, sparkCuboidWriter)); try { while (listIterator.hasNext()) { http://git-wip-us.apache.org/repos/asf/incubator-kylin/blob/fabdd5cd/engine-spark/src/main/java/org/apache/kylin/engine/spark/cube/DefaultTupleConverter.java ---------------------------------------------------------------------- diff --git a/engine-spark/src/main/java/org/apache/kylin/engine/spark/cube/DefaultTupleConverter.java b/engine-spark/src/main/java/org/apache/kylin/engine/spark/cube/DefaultTupleConverter.java index c687b78..986e45e 100644 --- a/engine-spark/src/main/java/org/apache/kylin/engine/spark/cube/DefaultTupleConverter.java +++ b/engine-spark/src/main/java/org/apache/kylin/engine/spark/cube/DefaultTupleConverter.java @@ -17,18 +17,21 @@ */ package org.apache.kylin.engine.spark.cube; +import java.nio.ByteBuffer; +import java.util.BitSet; +import java.util.Map; + import org.apache.kylin.common.util.ByteArray; import org.apache.kylin.common.util.Bytes; +import org.apache.kylin.common.util.BytesUtil; +import org.apache.kylin.common.util.ShardingHash; +import org.apache.kylin.cube.CubeSegment; import org.apache.kylin.cube.cuboid.Cuboid; import org.apache.kylin.cube.kv.RowConstants; -import org.apache.kylin.cube.model.CubeDesc; import org.apache.kylin.gridtable.GTRecord; import org.apache.kylin.metadata.model.TblColRef; -import scala.Tuple2; -import java.nio.ByteBuffer; -import java.util.BitSet; -import java.util.Map; +import scala.Tuple2; /** */ @@ -36,13 +39,13 @@ public final class DefaultTupleConverter implements TupleConverter { private final static ThreadLocal<ByteBuffer> valueBuf = new ThreadLocal<>(); private final static ThreadLocal<int[]> measureColumnsIndex = new ThreadLocal<>(); - private final CubeDesc cubeDesc; + private final CubeSegment segment; private final int measureCount; private final Map<TblColRef, Integer> columnLengthMap; - public DefaultTupleConverter(CubeDesc cubeDesc, Map<TblColRef, Integer> columnLengthMap) { - this.cubeDesc = cubeDesc; - this.measureCount = cubeDesc.getMeasures().size(); + public DefaultTupleConverter(CubeSegment segment, Map<TblColRef, Integer> columnLengthMap) { + this.segment = segment; + this.measureCount = segment.getCubeDesc().getMeasures().size(); this.columnLengthMap = columnLengthMap; } @@ -59,16 +62,16 @@ public final class DefaultTupleConverter implements TupleConverter { } return measureColumnsIndex.get(); } - + @Override public final Tuple2<byte[], byte[]> convert(long cuboidId, GTRecord record) { - int bytesLength = RowConstants.ROWKEY_CUBOIDID_LEN; - Cuboid cuboid = Cuboid.findById(cubeDesc, cuboidId); + int bytesLength = RowConstants.ROWKEY_HEADER_LEN; + Cuboid cuboid = Cuboid.findById(segment.getCubeDesc(), cuboidId); for (TblColRef column : cuboid.getColumns()) { bytesLength += columnLengthMap.get(column); } - final int dimensions = BitSet.valueOf(new long[]{cuboidId}).cardinality(); + final int dimensions = BitSet.valueOf(new long[] { cuboidId }).cardinality(); int[] measureColumnsIndex = getMeasureColumnsIndex(); for (int i = 0; i < measureCount; i++) { measureColumnsIndex[i] = dimensions + i; @@ -76,13 +79,21 @@ public final class DefaultTupleConverter implements TupleConverter { byte[] key = new byte[bytesLength]; System.arraycopy(Bytes.toBytes(cuboidId), 0, key, 0, RowConstants.ROWKEY_CUBOIDID_LEN); - int offSet = RowConstants.ROWKEY_CUBOIDID_LEN; + int header = RowConstants.ROWKEY_HEADER_LEN; + int offSet = header; for (int x = 0; x < dimensions; x++) { final ByteArray byteArray = record.get(x); System.arraycopy(byteArray.array(), byteArray.offset(), key, offSet, byteArray.length()); offSet += byteArray.length(); } + //fill shard + short cuboidShardNum = segment.getCuboidShardNum(cuboidId); + short shardOffset = ShardingHash.getShard(key, header, offSet - header, cuboidShardNum); + short cuboidShardBase = segment.getCuboidBaseShard(cuboidId); + short finalShard = ShardingHash.normalize(cuboidShardBase, shardOffset, segment.getTotalShards()); + BytesUtil.writeShort(finalShard, key, 0, RowConstants.ROWKEY_SHARDID_LEN); + ByteBuffer valueBuf = getValueBuf(); valueBuf.clear(); record.exportColumns(measureColumnsIndex, valueBuf); http://git-wip-us.apache.org/repos/asf/incubator-kylin/blob/fabdd5cd/invertedindex/src/main/java/org/apache/kylin/invertedindex/index/ShardingHash.java ---------------------------------------------------------------------- diff --git a/invertedindex/src/main/java/org/apache/kylin/invertedindex/index/ShardingHash.java b/invertedindex/src/main/java/org/apache/kylin/invertedindex/index/ShardingHash.java deleted file mode 100644 index f16e9fe..0000000 --- a/invertedindex/src/main/java/org/apache/kylin/invertedindex/index/ShardingHash.java +++ /dev/null @@ -1,32 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. -*/ - -package org.apache.kylin.invertedindex.index; - -import com.google.common.hash.HashFunction; -import com.google.common.hash.Hashing; - -public class ShardingHash { - - static HashFunction hashFunc = Hashing.murmur3_128(); - - public static long hashInt(int integer) { - return hashFunc.newHasher().putInt(integer).hash().asLong(); - } - -} http://git-wip-us.apache.org/repos/asf/incubator-kylin/blob/fabdd5cd/invertedindex/src/main/java/org/apache/kylin/invertedindex/index/TableRecord.java ---------------------------------------------------------------------- diff --git a/invertedindex/src/main/java/org/apache/kylin/invertedindex/index/TableRecord.java b/invertedindex/src/main/java/org/apache/kylin/invertedindex/index/TableRecord.java index 817bf01..2521fbf 100644 --- a/invertedindex/src/main/java/org/apache/kylin/invertedindex/index/TableRecord.java +++ b/invertedindex/src/main/java/org/apache/kylin/invertedindex/index/TableRecord.java @@ -23,6 +23,7 @@ import java.util.Arrays; import org.apache.commons.lang.ObjectUtils; import org.apache.hadoop.hbase.io.ImmutableBytesWritable; import org.apache.kylin.common.util.DateFormat; +import org.apache.kylin.common.util.ShardingHash; import org.apache.kylin.dict.Dictionary; import org.apache.kylin.metadata.measure.LongMutable; @@ -153,7 +154,7 @@ public class TableRecord implements Cloneable { public short getShard() { int timestampID = rawRecord.getValueID(info.getTimestampColumn()); - return (short) (Math.abs(ShardingHash.hashInt(timestampID)) % info.getDescriptor().getSharding()); + return ShardingHash.getShard(timestampID, info.getDescriptor().getSharding()); } @Override http://git-wip-us.apache.org/repos/asf/incubator-kylin/blob/fabdd5cd/pom.xml ---------------------------------------------------------------------- diff --git a/pom.xml b/pom.xml index 661672e..7a2313c 100644 --- a/pom.xml +++ b/pom.xml @@ -64,6 +64,7 @@ <commons-cli.version>1.2</commons-cli.version> <commons-lang.version>2.6</commons-lang.version> <commons-lang3.version>3.1</commons-lang3.version> + <commons-math3.version>3.3</commons-math3.version> <commons-io.version>2.4</commons-io.version> <commons-configuration.version>1.9</commons-configuration.version> <commons-daemon.version>1.0.15</commons-daemon.version> @@ -331,6 +332,12 @@ <version>${commons-lang3.version}</version> </dependency> <dependency> + <groupId>org.apache.commons</groupId> + <artifactId>commons-math3</artifactId> + <version>${commons-math3.version}</version> + </dependency> + + <dependency> <groupId>commons-io</groupId> <artifactId>commons-io</artifactId> <version>${commons-io.version}</version> http://git-wip-us.apache.org/repos/asf/incubator-kylin/blob/fabdd5cd/query/src/main/java/org/apache/kylin/query/routing/RoutingRule.java ---------------------------------------------------------------------- diff --git a/query/src/main/java/org/apache/kylin/query/routing/RoutingRule.java b/query/src/main/java/org/apache/kylin/query/routing/RoutingRule.java index d80763c..1cd55d4 100644 --- a/query/src/main/java/org/apache/kylin/query/routing/RoutingRule.java +++ b/query/src/main/java/org/apache/kylin/query/routing/RoutingRule.java @@ -49,13 +49,10 @@ public abstract class RoutingRule { public static void applyRules(List<IRealization> realizations, OLAPContext olapContext) { for (RoutingRule rule : rules) { - logger.info("Initial realizations order:"); - logger.info(getPrintableText(realizations)); - logger.info("Applying rule " + rule); - + logger.info("Realizations order before: " + getPrintableText(realizations)); + logger.info("Applying rule : " + rule); rule.apply(realizations, olapContext); - - logger.info(getPrintableText(realizations)); + logger.info("Realizations order after: " + getPrintableText(realizations)); logger.info("==================================================="); } } http://git-wip-us.apache.org/repos/asf/incubator-kylin/blob/fabdd5cd/query/src/test/resources/query/debug/query78.sql ---------------------------------------------------------------------- diff --git a/query/src/test/resources/query/debug/query78.sql b/query/src/test/resources/query/debug/query78.sql new file mode 100644 index 0000000..299f1a4 --- /dev/null +++ b/query/src/test/resources/query/debug/query78.sql @@ -0,0 +1,22 @@ +-- +-- Licensed to the Apache Software Foundation (ASF) under one +-- or more contributor license agreements. See the NOTICE file +-- distributed with this work for additional information +-- regarding copyright ownership. The ASF licenses this file +-- to you under the Apache License, Version 2.0 (the +-- "License"); you may not use this file except in compliance +-- with the License. You may obtain a copy of the License at +-- +-- http://www.apache.org/licenses/LICENSE-2.0 +-- +-- Unless required by applicable law or agreed to in writing, software +-- distributed under the License is distributed on an "AS IS" BASIS, +-- WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +-- See the License for the specific language governing permissions and +-- limitations under the License. +-- + +select count(*) as c,sum(PRICE) as GMV, LSTG_FORMAT_NAME as FORMAT_NAME +from test_kylin_fact +where (LSTG_FORMAT_NAME in ('ABIN')) or (LSTG_FORMAT_NAME>='FP-GTC' and LSTG_FORMAT_NAME<='Others') +group by LSTG_FORMAT_NAME http://git-wip-us.apache.org/repos/asf/incubator-kylin/blob/fabdd5cd/query/src/test/resources/query/sql/query01.sql ---------------------------------------------------------------------- diff --git a/query/src/test/resources/query/sql/query01.sql b/query/src/test/resources/query/sql/query01.sql index 5a53058..9ed1db3 100644 --- a/query/src/test/resources/query/sql/query01.sql +++ b/query/src/test/resources/query/sql/query01.sql @@ -16,5 +16,5 @@ -- limitations under the License. -- -select LSTG_FORMAT_NAME, sum(price) as GMV, count(1) as TRANS_CNT from test_kylin_fact - group by LSTG_FORMAT_NAME +select LSTG_FORMAT_NAME,slr_segment_cd ,sum(price) as GMV, count(1) as TRANS_CNT from test_kylin_fact + group by LSTG_FORMAT_NAME ,slr_segment_cd