http://git-wip-us.apache.org/repos/asf/kylin/blob/5ee76e8b/core-storage/src/main/java/org/apache/kylin/storage/gtrecord/CubeTupleConverter.java ---------------------------------------------------------------------- diff --git a/core-storage/src/main/java/org/apache/kylin/storage/gtrecord/CubeTupleConverter.java b/core-storage/src/main/java/org/apache/kylin/storage/gtrecord/CubeTupleConverter.java index 0f96e3c..7ec24b2 100644 --- a/core-storage/src/main/java/org/apache/kylin/storage/gtrecord/CubeTupleConverter.java +++ b/core-storage/src/main/java/org/apache/kylin/storage/gtrecord/CubeTupleConverter.java @@ -53,19 +53,19 @@ public class CubeTupleConverter { final CubeSegment cubeSeg; final Cuboid cuboid; final TupleInfo tupleInfo; - final List<IDerivedColumnFiller> derivedColFillers; + private final List<IDerivedColumnFiller> derivedColFillers; - final int[] gtColIdx; - final int[] tupleIdx; - final Object[] gtValues; - final MeasureType<?>[] measureTypes; + private final int[] gtColIdx; + private final int[] tupleIdx; + private final Object[] gtValues; + private final MeasureType<?>[] measureTypes; - final List<IAdvMeasureFiller> advMeasureFillers; - final List<Integer> advMeasureIndexInGTValues; + private final List<IAdvMeasureFiller> advMeasureFillers; + private final List<Integer> advMeasureIndexInGTValues; - final int nSelectedDims; + private final int nSelectedDims; - final int[] dimensionIndexOnTuple; + private final int[] dimensionIndexOnTuple; public CubeTupleConverter(CubeSegment cubeSeg, Cuboid cuboid, // Set<TblColRef> selectedDimensions, Set<FunctionDesc> selectedMetrics, TupleInfo returnTupleInfo) { @@ -102,44 +102,44 @@ public class CubeTupleConverter { //////////// - int iii = 0; + int i = 0; // pre-calculate dimension index mapping to tuple for (TblColRef dim : selectedDimensions) { - int i = mapping.getIndexOf(dim); - gtColIdx[iii] = i; - tupleIdx[iii] = tupleInfo.hasColumn(dim) ? tupleInfo.getColumnIndex(dim) : -1; + int dimIndex = mapping.getIndexOf(dim); + gtColIdx[i] = dimIndex; + tupleIdx[i] = tupleInfo.hasColumn(dim) ? tupleInfo.getColumnIndex(dim) : -1; // if (tupleIdx[iii] == -1) { // throw new IllegalStateException("dim not used in tuple:" + dim); // } - iii++; + i++; } for (FunctionDesc metric : selectedMetrics) { - int i = mapping.getIndexOf(metric); - gtColIdx[iii] = i; + int metricIndex = mapping.getIndexOf(metric); + gtColIdx[i] = metricIndex; if (metric.needRewrite()) { String rewriteFieldName = metric.getRewriteFieldName(); - tupleIdx[iii] = tupleInfo.hasField(rewriteFieldName) ? tupleInfo.getFieldIndex(rewriteFieldName) : -1; + tupleIdx[i] = tupleInfo.hasField(rewriteFieldName) ? tupleInfo.getFieldIndex(rewriteFieldName) : -1; } else { // a non-rewrite metrics (like sum, or dimension playing as metrics) is like a dimension column TblColRef col = metric.getParameter().getColRefs().get(0); - tupleIdx[iii] = tupleInfo.hasColumn(col) ? tupleInfo.getColumnIndex(col) : -1; + tupleIdx[i] = tupleInfo.hasColumn(col) ? tupleInfo.getColumnIndex(col) : -1; } MeasureType<?> measureType = metric.getMeasureType(); if (measureType.needAdvancedTupleFilling()) { Map<TblColRef, Dictionary<String>> dictionaryMap = buildDictionaryMap(measureType.getColumnsNeedDictionary(metric)); advMeasureFillers.add(measureType.getAdvancedTupleFiller(metric, returnTupleInfo, dictionaryMap)); - advMeasureIndexInGTValues.add(iii); + advMeasureIndexInGTValues.add(i); } else { - measureTypes[iii] = measureType; + measureTypes[i] = measureType; } - iii++; + i++; } // prepare derived columns and filler
http://git-wip-us.apache.org/repos/asf/kylin/blob/5ee76e8b/core-storage/src/main/java/org/apache/kylin/storage/gtrecord/GTCubeStorageQueryBase.java ---------------------------------------------------------------------- diff --git a/core-storage/src/main/java/org/apache/kylin/storage/gtrecord/GTCubeStorageQueryBase.java b/core-storage/src/main/java/org/apache/kylin/storage/gtrecord/GTCubeStorageQueryBase.java index ae5240b..bacd293 100644 --- a/core-storage/src/main/java/org/apache/kylin/storage/gtrecord/GTCubeStorageQueryBase.java +++ b/core-storage/src/main/java/org/apache/kylin/storage/gtrecord/GTCubeStorageQueryBase.java @@ -127,7 +127,7 @@ public abstract class GTCubeStorageQueryBase implements IStorageQuery { if (scanners.isEmpty()) return ITupleIterator.EMPTY_TUPLE_ITERATOR; - return new SequentialCubeTupleIterator(scanners, cuboid, dimensionsD, metrics, returnTupleInfo, context); + return new SequentialCubeTupleIterator(scanners, cuboid, dimensionsD, metrics, returnTupleInfo, context, cubeDesc.supportsLimitPushDown()); } protected boolean skipZeroInputSegment(CubeSegment cubeSegment) { http://git-wip-us.apache.org/repos/asf/kylin/blob/5ee76e8b/core-storage/src/main/java/org/apache/kylin/storage/gtrecord/SequentialCubeTupleIterator.java ---------------------------------------------------------------------- diff --git a/core-storage/src/main/java/org/apache/kylin/storage/gtrecord/SequentialCubeTupleIterator.java b/core-storage/src/main/java/org/apache/kylin/storage/gtrecord/SequentialCubeTupleIterator.java index 2cff76c..7059473 100644 --- a/core-storage/src/main/java/org/apache/kylin/storage/gtrecord/SequentialCubeTupleIterator.java +++ b/core-storage/src/main/java/org/apache/kylin/storage/gtrecord/SequentialCubeTupleIterator.java @@ -54,7 +54,7 @@ public class SequentialCubeTupleIterator implements ITupleIterator { private int scanCountDelta; public SequentialCubeTupleIterator(List<CubeSegmentScanner> scanners, Cuboid cuboid, Set<TblColRef> selectedDimensions, // - Set<FunctionDesc> selectedMetrics, TupleInfo returnTupleInfo, StorageContext context) { + Set<FunctionDesc> selectedMetrics, TupleInfo returnTupleInfo, StorageContext context, boolean supportLimitPushDown) { this.context = context; this.scanners = scanners; @@ -63,8 +63,6 @@ public class SequentialCubeTupleIterator implements ITupleIterator { segmentCubeTupleIterators.add(new SegmentCubeTupleIterator(scanner, cuboid, selectedDimensions, selectedMetrics, returnTupleInfo, context)); } - boolean supportLimitPushDown = scanners.get(0).getSegment().getCubeDesc().supportsLimitPushDown(); - this.storagePushDownLimit = context.getStoragePushDownLimit(); if (!supportLimitPushDown || storagePushDownLimit > KylinConfig.getInstanceFromEnv().getStoragePushDownLimitMax()) { //normal case http://git-wip-us.apache.org/repos/asf/kylin/blob/5ee76e8b/kylin-it/src/test/java/org/apache/kylin/query/ITKylinQueryTest.java ---------------------------------------------------------------------- diff --git a/kylin-it/src/test/java/org/apache/kylin/query/ITKylinQueryTest.java b/kylin-it/src/test/java/org/apache/kylin/query/ITKylinQueryTest.java index 5db4342..dcacb06 100644 --- a/kylin-it/src/test/java/org/apache/kylin/query/ITKylinQueryTest.java +++ b/kylin-it/src/test/java/org/apache/kylin/query/ITKylinQueryTest.java @@ -51,12 +51,12 @@ public class ITKylinQueryTest extends KylinTestBase { @BeforeClass public static void setUp() throws Exception { + printInfo("setUp in ITKylinQueryTest"); Map<RealizationType, Integer> priorities = Maps.newHashMap(); priorities.put(RealizationType.HYBRID, 0); priorities.put(RealizationType.CUBE, 0); Candidate.setPriorities(priorities); - printInfo("setUp in ITKylinQueryTest"); joinType = "left"; setupAll(); @@ -64,8 +64,8 @@ public class ITKylinQueryTest extends KylinTestBase { @AfterClass public static void tearDown() throws Exception { - Candidate.restorePriorities(); printInfo("tearDown in ITKylinQueryTest"); + Candidate.restorePriorities(); clean(); } http://git-wip-us.apache.org/repos/asf/kylin/blob/5ee76e8b/pom.xml ---------------------------------------------------------------------- diff --git a/pom.xml b/pom.xml index 0a82bf8..ea29958 100644 --- a/pom.xml +++ b/pom.xml @@ -366,6 +366,12 @@ </dependency> <dependency> <groupId>org.apache.hadoop</groupId> + <artifactId>hadoop-mapreduce-client-jobclient</artifactId> + <version>${hadoop2.version}</version> + <type>test-jar</type> + </dependency> + <dependency> + <groupId>org.apache.hadoop</groupId> <artifactId>hadoop-annotations</artifactId> <version>${hadoop2.version}</version> </dependency> http://git-wip-us.apache.org/repos/asf/kylin/blob/5ee76e8b/query/src/main/java/org/apache/kylin/query/enumerator/OLAPEnumerator.java ---------------------------------------------------------------------- diff --git a/query/src/main/java/org/apache/kylin/query/enumerator/OLAPEnumerator.java b/query/src/main/java/org/apache/kylin/query/enumerator/OLAPEnumerator.java index bf93300..96e75f3 100644 --- a/query/src/main/java/org/apache/kylin/query/enumerator/OLAPEnumerator.java +++ b/query/src/main/java/org/apache/kylin/query/enumerator/OLAPEnumerator.java @@ -25,6 +25,7 @@ import org.apache.calcite.DataContext; import org.apache.calcite.jdbc.CalciteConnection; import org.apache.calcite.linq4j.Enumerator; import org.apache.kylin.common.util.DateFormat; +import org.apache.kylin.cube.CubeInstance; import org.apache.kylin.metadata.filter.CompareTupleFilter; import org.apache.kylin.metadata.filter.TupleFilter; import org.apache.kylin.metadata.model.FunctionDesc; @@ -162,8 +163,14 @@ public class OLAPEnumerator implements Enumerator<Object[]> { // Hack no-group-by query for better results private void hackNoGroupByAggregation(SQLDigest sqlDigest) { - if (!sqlDigest.groupbyColumns.isEmpty() || !sqlDigest.metricColumns.isEmpty()) + if (!(olapContext.realization instanceof CubeInstance)) { + //the hack only makes sense for cubes return; + } + + if (!sqlDigest.isRawQuery()) { + return; + } // If no group by and metric found, then it's simple query like select ... from ... where ..., // But we have no raw data stored, in order to return better results, we hack to output sum of metric column http://git-wip-us.apache.org/repos/asf/kylin/blob/5ee76e8b/query/src/main/java/org/apache/kylin/query/routing/Candidate.java ---------------------------------------------------------------------- diff --git a/query/src/main/java/org/apache/kylin/query/routing/Candidate.java b/query/src/main/java/org/apache/kylin/query/routing/Candidate.java index ab7884a..9ea8961 100644 --- a/query/src/main/java/org/apache/kylin/query/routing/Candidate.java +++ b/query/src/main/java/org/apache/kylin/query/routing/Candidate.java @@ -35,6 +35,7 @@ public class Candidate implements Comparable<Candidate> { static { DEFAULT_PRIORITIES.put(RealizationType.HYBRID, 0); DEFAULT_PRIORITIES.put(RealizationType.CUBE, 1); + DEFAULT_PRIORITIES.put(RealizationType.INVERTED_INDEX, 1); } /** for test only */ http://git-wip-us.apache.org/repos/asf/kylin/blob/5ee76e8b/storage-hbase/src/main/java/org/apache/kylin/storage/hbase/cube/v2/CubeHBaseEndpointRPC.java ---------------------------------------------------------------------- diff --git a/storage-hbase/src/main/java/org/apache/kylin/storage/hbase/cube/v2/CubeHBaseEndpointRPC.java b/storage-hbase/src/main/java/org/apache/kylin/storage/hbase/cube/v2/CubeHBaseEndpointRPC.java index 200c040..07a3cc3 100644 --- a/storage-hbase/src/main/java/org/apache/kylin/storage/hbase/cube/v2/CubeHBaseEndpointRPC.java +++ b/storage-hbase/src/main/java/org/apache/kylin/storage/hbase/cube/v2/CubeHBaseEndpointRPC.java @@ -40,7 +40,7 @@ import org.apache.kylin.common.util.CompressionUtils; import org.apache.kylin.common.util.ImmutableBitSet; import org.apache.kylin.common.util.LoggableCachedThreadPool; import org.apache.kylin.common.util.Pair; -import org.apache.kylin.cube.CubeSegment; +import org.apache.kylin.cube.ISegment; import org.apache.kylin.cube.cuboid.Cuboid; import org.apache.kylin.gridtable.GTInfo; import org.apache.kylin.gridtable.GTScanRange; @@ -67,8 +67,8 @@ public class CubeHBaseEndpointRPC extends CubeHBaseRPC { private static ExecutorService executorService = new LoggableCachedThreadPool(); - public CubeHBaseEndpointRPC(CubeSegment cubeSeg, Cuboid cuboid, GTInfo fullGTInfo) { - super(cubeSeg, cuboid, fullGTInfo); + public CubeHBaseEndpointRPC(ISegment segment, Cuboid cuboid, GTInfo fullGTInfo) { + super(segment, cuboid, fullGTInfo); } private byte[] getByteArrayForShort(short v) { http://git-wip-us.apache.org/repos/asf/kylin/blob/5ee76e8b/storage-hbase/src/main/java/org/apache/kylin/storage/hbase/cube/v2/CubeHBaseRPC.java ---------------------------------------------------------------------- diff --git a/storage-hbase/src/main/java/org/apache/kylin/storage/hbase/cube/v2/CubeHBaseRPC.java b/storage-hbase/src/main/java/org/apache/kylin/storage/hbase/cube/v2/CubeHBaseRPC.java index 687ee83..c318cba 100644 --- a/storage-hbase/src/main/java/org/apache/kylin/storage/hbase/cube/v2/CubeHBaseRPC.java +++ b/storage-hbase/src/main/java/org/apache/kylin/storage/hbase/cube/v2/CubeHBaseRPC.java @@ -31,6 +31,7 @@ import org.apache.kylin.common.util.Bytes; import org.apache.kylin.common.util.ImmutableBitSet; import org.apache.kylin.common.util.Pair; import org.apache.kylin.cube.CubeSegment; +import org.apache.kylin.cube.ISegment; import org.apache.kylin.cube.cuboid.Cuboid; import org.apache.kylin.cube.kv.FuzzyKeyEncoder; import org.apache.kylin.cube.kv.FuzzyMaskEncoder; @@ -60,8 +61,10 @@ public abstract class CubeHBaseRPC implements IGTStorage { final private RowKeyEncoder fuzzyKeyEncoder; final private RowKeyEncoder fuzzyMaskEncoder; - public CubeHBaseRPC(CubeSegment cubeSeg, Cuboid cuboid, GTInfo fullGTInfo) { - this.cubeSeg = cubeSeg; + public CubeHBaseRPC(ISegment segment, Cuboid cuboid, GTInfo fullGTInfo) { + Preconditions.checkArgument(segment instanceof CubeSegment, "segment must be CubeSegment"); + + this.cubeSeg = (CubeSegment) segment; this.cuboid = cuboid; this.fullGTInfo = fullGTInfo; http://git-wip-us.apache.org/repos/asf/kylin/blob/5ee76e8b/storage-hbase/src/main/java/org/apache/kylin/storage/hbase/cube/v2/CubeHBaseScanRPC.java ---------------------------------------------------------------------- diff --git a/storage-hbase/src/main/java/org/apache/kylin/storage/hbase/cube/v2/CubeHBaseScanRPC.java b/storage-hbase/src/main/java/org/apache/kylin/storage/hbase/cube/v2/CubeHBaseScanRPC.java index 238939c..a359d19 100644 --- a/storage-hbase/src/main/java/org/apache/kylin/storage/hbase/cube/v2/CubeHBaseScanRPC.java +++ b/storage-hbase/src/main/java/org/apache/kylin/storage/hbase/cube/v2/CubeHBaseScanRPC.java @@ -32,7 +32,7 @@ import org.apache.hadoop.hbase.client.Scan; import org.apache.kylin.common.util.BytesUtil; import org.apache.kylin.common.util.ImmutableBitSet; import org.apache.kylin.common.util.ShardingHash; -import org.apache.kylin.cube.CubeSegment; +import org.apache.kylin.cube.ISegment; import org.apache.kylin.cube.cuboid.Cuboid; import org.apache.kylin.cube.kv.RowConstants; import org.apache.kylin.dimension.DimensionEncoding; @@ -89,8 +89,8 @@ public class CubeHBaseScanRPC extends CubeHBaseRPC { } } - public CubeHBaseScanRPC(CubeSegment cubeSeg, Cuboid cuboid, final GTInfo fullGTInfo) { - super(cubeSeg, cuboid, fullGTInfo); + public CubeHBaseScanRPC(ISegment segment, Cuboid cuboid, final GTInfo fullGTInfo) { + super(segment, cuboid, fullGTInfo); MassInTupleFilter.VALUE_PROVIDER_FACTORY = new MassInValueProviderFactoryImpl(new MassInValueProviderFactoryImpl.DimEncAware() { @Override public DimensionEncoding getDimEnc(TblColRef col) {