minor, refine enableStorageLimitIfPossible logic
Project: http://git-wip-us.apache.org/repos/asf/kylin/repo Commit: http://git-wip-us.apache.org/repos/asf/kylin/commit/26c03fe2 Tree: http://git-wip-us.apache.org/repos/asf/kylin/tree/26c03fe2 Diff: http://git-wip-us.apache.org/repos/asf/kylin/diff/26c03fe2 Branch: refs/heads/ranger Commit: 26c03fe26b665adcf31cefb574f7c389a4daa0a5 Parents: b1b3c22 Author: Hongbin Ma <mahong...@apache.org> Authored: Mon Sep 4 22:39:53 2017 +0800 Committer: Hongbin Ma <m...@kyligence.io> Committed: Tue Sep 5 10:29:20 2017 +0800 ---------------------------------------------------------------------- .../org/apache/kylin/GTForwardingScanner.java | 56 ------ .../kylin/gridtable/GTAggregateScanner.java | 171 ++++++++++++++++--- .../apache/kylin/gridtable/GTFilterScanner.java | 44 +++-- .../kylin/gridtable/GTForwardingScanner.java | 52 ++++++ .../apache/kylin/gridtable/GTScanRequest.java | 61 ++++--- .../kylin/gridtable/GTScanRequestBuilder.java | 18 +- .../gridtable/GTStreamAggregateScanner.java | 1 - .../kylin/gridtable/IGTBypassChecker.java | 23 +++ .../kylin/gridtable/StorageLimitLevel.java | 29 ++++ .../apache/kylin/storage/StorageContext.java | 35 ++-- .../storage/gtrecord/CubeScanRangePlanner.java | 2 +- .../gtrecord/GTCubeStorageQueryBase.java | 70 ++++---- .../SortMergedPartitionResultIterator.java | 20 +-- .../coprocessor/endpoint/CubeVisitService.java | 73 ++++---- 14 files changed, 449 insertions(+), 206 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/kylin/blob/26c03fe2/core-cube/src/main/java/org/apache/kylin/GTForwardingScanner.java ---------------------------------------------------------------------- diff --git a/core-cube/src/main/java/org/apache/kylin/GTForwardingScanner.java b/core-cube/src/main/java/org/apache/kylin/GTForwardingScanner.java deleted file mode 100644 index de8c88d..0000000 --- a/core-cube/src/main/java/org/apache/kylin/GTForwardingScanner.java +++ /dev/null @@ -1,56 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.kylin; - -import org.apache.kylin.gridtable.GTInfo; -import org.apache.kylin.gridtable.GTRecord; -import org.apache.kylin.gridtable.IGTScanner; - -import java.io.IOException; -import java.util.Iterator; - -import static com.google.common.base.Preconditions.checkNotNull; - -/** - * A {@link IGTScanner} which forwards all its method calls to another scanner. - * - * @see <a href="http://en.wikipedia.org/wiki/Decorator_pattern">decorator pattern</a>. - */ -public class GTForwardingScanner implements IGTScanner { - protected IGTScanner delegated; - - protected GTForwardingScanner(IGTScanner delegated) { - this.delegated = checkNotNull(delegated, "delegated"); - } - - @Override - public GTInfo getInfo() { - return delegated.getInfo(); - } - - @Override - public void close() throws IOException { - delegated.close(); - } - - @Override - public Iterator<GTRecord> iterator() { - return delegated.iterator(); - } -} http://git-wip-us.apache.org/repos/asf/kylin/blob/26c03fe2/core-cube/src/main/java/org/apache/kylin/gridtable/GTAggregateScanner.java ---------------------------------------------------------------------- diff --git a/core-cube/src/main/java/org/apache/kylin/gridtable/GTAggregateScanner.java b/core-cube/src/main/java/org/apache/kylin/gridtable/GTAggregateScanner.java index 92d0fac..cbf4232 100644 --- a/core-cube/src/main/java/org/apache/kylin/gridtable/GTAggregateScanner.java +++ b/core-cube/src/main/java/org/apache/kylin/gridtable/GTAggregateScanner.java @@ -33,6 +33,7 @@ import java.util.List; import java.util.Map.Entry; import java.util.PriorityQueue; import java.util.SortedMap; +import java.util.TreeMap; import org.apache.commons.io.IOUtils; import org.apache.kylin.common.exceptions.ResourceLimitExceededException; @@ -55,11 +56,12 @@ import org.apache.kylin.metadata.tuple.ITuple; import org.slf4j.Logger; import org.slf4j.LoggerFactory; +import com.google.common.base.Preconditions; import com.google.common.collect.Lists; import com.google.common.collect.Maps; @SuppressWarnings({ "rawtypes", "unchecked" }) -public class GTAggregateScanner implements IGTScanner { +public class GTAggregateScanner implements IGTScanner, IGTBypassChecker { private static final Logger logger = LoggerFactory.getLogger(GTAggregateScanner.class); @@ -73,6 +75,7 @@ public class GTAggregateScanner implements IGTScanner { final AggregationCache aggrCache; final long spillThreshold; // 0 means no memory control && no spill final int storagePushDownLimit;//default to be Int.MAX + final StorageLimitLevel storageLimitLevel; final boolean spillEnabled; final TupleFilter havingFilter; @@ -84,24 +87,34 @@ public class GTAggregateScanner implements IGTScanner { this(inputScanner, req, true); } - public GTAggregateScanner(IGTScanner inputScanner, GTScanRequest req, boolean spillEnabled) { + public GTAggregateScanner(IGTScanner input, GTScanRequest req, boolean spillEnabled) { if (!req.hasAggregation()) throw new IllegalStateException(); - this.info = inputScanner.getInfo(); + if (input instanceof GTFilterScanner) { + logger.info("setting IGTBypassChecker of child"); + ((GTFilterScanner) input).setChecker(this); + } else { + logger.info("applying a GTFilterScanner with IGTBypassChecker on top child"); + input = new GTFilterScanner(input, null, this); + } + + this.inputScanner = input; + this.info = this.inputScanner.getInfo(); this.dimensions = req.getDimensions(); this.groupBy = req.getAggrGroupBy(); this.metrics = req.getAggrMetrics(); this.metricsAggrFuncs = req.getAggrMetricsFuncs(); - this.inputScanner = inputScanner; this.measureCodec = req.createMeasureCodec(); - this.aggrCache = new AggregationCache(); this.spillThreshold = (long) (req.getAggCacheMemThreshold() * MemoryBudgetController.ONE_GB); this.aggrMask = new boolean[metricsAggrFuncs.length]; this.storagePushDownLimit = req.getStoragePushDownLimit(); + this.storageLimitLevel = req.getStorageLimitLevel(); this.spillEnabled = spillEnabled; this.havingFilter = req.getHavingFilterPushDown(); + this.aggrCache = new AggregationCache(); + Arrays.fill(aggrMask, true); } @@ -147,18 +160,15 @@ public class GTAggregateScanner implements IGTScanner { @Override public Iterator<GTRecord> iterator() { long count = 0; + for (GTRecord r : inputScanner) { - if (getNumOfSpills() == 0) { - //check limit - boolean ret = aggrCache.aggregate(r, storagePushDownLimit); + //check limit + boolean ret = aggrCache.aggregate(r); - if (!ret) { - logger.info("abort reading inputScanner because storage push down limit is hit"); - break;//limit is hit - } - } else {//else if dumps is not empty, it means a lot of row need aggregated, so it's less likely that limit clause is helping - aggrCache.aggregate(r, Integer.MAX_VALUE); + if (!ret) { + logger.info("abort reading inputScanner because storage push down limit is hit"); + break;//limit is hit } count++; @@ -180,11 +190,94 @@ public class GTAggregateScanner implements IGTScanner { return aggrCache.estimatedMemSize(); } + public boolean shouldBypass(GTRecord record) { + return aggrCache.shouldBypass(record); + } + class AggregationCache implements Closeable { + /** + * if a limit construct is provided + * before a gtrecord is sent to filter->aggregate pipeline, + * this check could help to decide if the record should be skipped + * + * e.g. + * limit is three, and current AggregationCache's key set contains (1,2,3), + * when gtrecord with key 4 comes, it should be skipped before sending to filter + */ + class ByPassChecker { + private int aggregateBufferSizeLimit = -1; + private byte[] currentLastKey = null; + private int[] groupOffsetsInLastKey = null; + + private int byPassCounter = 0; + + ByPassChecker(int aggregateBufferSizeLimit) { + this.aggregateBufferSizeLimit = aggregateBufferSizeLimit; + + //init groupOffsetsInLastKey + int p = 0; + int idx = 0; + this.groupOffsetsInLastKey = new int[groupBy.trueBitCount()]; + for (int i = 0; i < dimensions.trueBitCount(); i++) { + int c = dimensions.trueBitAt(i); + int l = info.codeSystem.maxCodeLength(c); + if (groupBy.get(c)) + groupOffsetsInLastKey[idx++] = p; + p += l; + } + } + + /** + * @return true if should bypass this record + */ + boolean shouldByPass(GTRecord record) { + + if (dumps.size() > 0) { + return false; //rare case: limit tends to be small, when limit is applied it's not likely to have dumps + //TODO: what if bypass before dump happens? + } + + Preconditions.checkState(aggBufMap.size() <= aggregateBufferSizeLimit); + + if (aggBufMap.size() == aggregateBufferSizeLimit) { + Preconditions.checkNotNull(currentLastKey); + for (int i = 0; i < groupBy.trueBitCount(); i++) { + int c = groupBy.trueBitAt(i); + ByteArray col = record.get(c); + + int compare = Bytes.compareTo(col.array(), col.offset(), col.length(), currentLastKey, + groupOffsetsInLastKey[i], col.length()); + if (compare > 0) { + byPassCounter++; + return true; + } else if (compare < 0) { + return false; + } + } + } + + return false; + } + + void updateOnBufferChange() { + if (aggBufMap.size() > aggregateBufferSizeLimit) { + aggBufMap.pollLastEntry(); + Preconditions.checkState(aggBufMap.size() == aggregateBufferSizeLimit); + } + + currentLastKey = aggBufMap.lastKey(); + } + + int getByPassCounter() { + return byPassCounter; + } + } + final List<Dump> dumps; final int keyLength; final boolean[] compareMask; boolean compareAll = true; + ByPassChecker byPassChecker = null; final Comparator<byte[]> bytesComparator = new Comparator<byte[]>() { @Override @@ -212,7 +305,7 @@ public class GTAggregateScanner implements IGTScanner { } }; - SortedMap<byte[], MeasureAggregator[]> aggBufMap; + TreeMap<byte[], MeasureAggregator[]> aggBufMap; public AggregationCache() { compareMask = createCompareMask(); @@ -222,6 +315,20 @@ public class GTAggregateScanner implements IGTScanner { keyLength = compareMask.length; dumps = Lists.newArrayList(); aggBufMap = createBuffMap(); + + if (storageLimitLevel == StorageLimitLevel.LIMIT_ON_RETURN_SIZE) { + //ByPassChecker is not free, if LIMIT_ON_SCAN, not worth to as it has better optimization + byPassChecker = new ByPassChecker(storagePushDownLimit); + } + } + + public boolean shouldBypass(GTRecord record) { + if (byPassChecker == null) { + return false; + } + + boolean b = byPassChecker.shouldByPass(record); + return b; } private boolean[] createCompareMask() { @@ -245,7 +352,7 @@ public class GTAggregateScanner implements IGTScanner { return mask; } - private SortedMap<byte[], MeasureAggregator[]> createBuffMap() { + private TreeMap<byte[], MeasureAggregator[]> createBuffMap() { return Maps.newTreeMap(bytesComparator); } @@ -263,7 +370,7 @@ public class GTAggregateScanner implements IGTScanner { return result; } - boolean aggregate(GTRecord r, int stopForLimit) { + boolean aggregate(GTRecord r) { if (++aggregatedRowCount % 100000 == 0) { if (memTracker != null) { memTracker.markHigh(); @@ -272,7 +379,8 @@ public class GTAggregateScanner implements IGTScanner { final long estMemSize = estimatedMemSize(); if (spillThreshold > 0 && estMemSize > spillThreshold) { if (!spillEnabled) { - throw new ResourceLimitExceededException("aggregation's memory consumption " + estMemSize + " exceeds threshold " + spillThreshold); + throw new ResourceLimitExceededException("aggregation's memory consumption " + estMemSize + + " exceeds threshold " + spillThreshold); } spillBuffMap(estMemSize); // spill to disk aggBufMap = createBuffMap(); @@ -284,7 +392,9 @@ public class GTAggregateScanner implements IGTScanner { if (aggrs == null) { //for storage push down limit - if (aggBufMap.size() >= stopForLimit) { + //TODO: what if bypass before dump happens? + if (getNumOfSpills() == 0 && storageLimitLevel == StorageLimitLevel.LIMIT_ON_SCAN + && aggBufMap.size() >= storagePushDownLimit) { return false; } @@ -298,6 +408,11 @@ public class GTAggregateScanner implements IGTScanner { aggrs[i].aggregate(metrics); } } + + if (byPassChecker != null) { + byPassChecker.updateOnBufferChange(); + } + return true; } @@ -314,6 +429,12 @@ public class GTAggregateScanner implements IGTScanner { @Override public void close() throws RuntimeException { try { + logger.info("closing aggrCache"); + if (byPassChecker != null) { + logger.info("AggregationCache byPassChecker helps to skip {} cuboid rows", + byPassChecker.getByPassCounter()); + } + for (Dump dump : dumps) { dump.terminate(); } @@ -356,7 +477,8 @@ public class GTAggregateScanner implements IGTScanner { final ReturningRecord returningRecord = new ReturningRecord(); Entry<byte[], MeasureAggregator[]> returningEntry = null; - final HavingFilterChecker havingFilterChecker = (havingFilter == null) ? null : new HavingFilterChecker(); + final HavingFilterChecker havingFilterChecker = (havingFilter == null) ? null + : new HavingFilterChecker(); @Override public boolean hasNext() { @@ -530,7 +652,8 @@ public class GTAggregateScanner implements IGTScanner { public Iterator<Pair<byte[], byte[]>> iterator() { try { if (dumpedFile == null || !dumpedFile.exists()) { - throw new RuntimeException("Dumped file cannot be found at: " + (dumpedFile == null ? "<null>" : dumpedFile.getAbsolutePath())); + throw new RuntimeException("Dumped file cannot be found at: " + + (dumpedFile == null ? "<null>" : dumpedFile.getAbsolutePath())); } dis = new DataInputStream(new FileInputStream(dumpedFile)); @@ -555,7 +678,8 @@ public class GTAggregateScanner implements IGTScanner { dis.read(value); return new Pair<>(key, value); } catch (Exception e) { - throw new RuntimeException("Cannot read AggregationCache from dumped file: " + e.getMessage()); + throw new RuntimeException( + "Cannot read AggregationCache from dumped file: " + e.getMessage()); } } @@ -570,7 +694,8 @@ public class GTAggregateScanner implements IGTScanner { } public void flush() throws IOException { - logger.info("AggregationCache(size={} est_mem_size={} threshold={}) will spill to {}", buffMap.size(), estMemSize, spillThreshold, dumpedFile.getAbsolutePath()); + logger.info("AggregationCache(size={} est_mem_size={} threshold={}) will spill to {}", buffMap.size(), + estMemSize, spillThreshold, dumpedFile.getAbsolutePath()); if (buffMap != null) { DataOutputStream dos = null; http://git-wip-us.apache.org/repos/asf/kylin/blob/26c03fe2/core-cube/src/main/java/org/apache/kylin/gridtable/GTFilterScanner.java ---------------------------------------------------------------------- diff --git a/core-cube/src/main/java/org/apache/kylin/gridtable/GTFilterScanner.java b/core-cube/src/main/java/org/apache/kylin/gridtable/GTFilterScanner.java index cad0a04..11a23d6 100644 --- a/core-cube/src/main/java/org/apache/kylin/gridtable/GTFilterScanner.java +++ b/core-cube/src/main/java/org/apache/kylin/gridtable/GTFilterScanner.java @@ -18,14 +18,12 @@ package org.apache.kylin.gridtable; -import java.io.IOException; import java.util.BitSet; import java.util.HashSet; import java.util.Iterator; import java.util.NoSuchElementException; import java.util.Set; -import org.apache.kylin.GTForwardingScanner; import org.apache.kylin.common.util.ByteArray; import org.apache.kylin.common.util.BytesUtil; import org.apache.kylin.common.util.ImmutableBitSet; @@ -36,25 +34,35 @@ import org.apache.kylin.metadata.tuple.IEvaluatableTuple; public class GTFilterScanner extends GTForwardingScanner { - final private TupleFilter filter; - final private IFilterCodeSystem<ByteArray> filterCodeSystem; - final private IEvaluatableTuple oneTuple; // avoid instance creation + private TupleFilter filter; + private IFilterCodeSystem<ByteArray> filterCodeSystem; + private IEvaluatableTuple oneTuple; // avoid instance creation private GTRecord next = null; - public GTFilterScanner(IGTScanner delegated, GTScanRequest req) throws IOException { + private IGTBypassChecker checker = null; + + public GTFilterScanner(IGTScanner delegated, GTScanRequest req, IGTBypassChecker checker) { super(delegated); - this.filter = req.getFilterPushDown(); - this.filterCodeSystem = GTUtil.wrap(getInfo().codeSystem.getComparator()); - this.oneTuple = new IEvaluatableTuple() { - @Override - public Object getValue(TblColRef col) { - return next.get(col.getColumnDesc().getZeroBasedIndex()); - } - }; + this.checker = checker; + + if (req != null) { + this.filter = req.getFilterPushDown(); + this.filterCodeSystem = GTUtil.wrap(getInfo().codeSystem.getComparator()); + this.oneTuple = new IEvaluatableTuple() { + @Override + public Object getValue(TblColRef col) { + return next.get(col.getColumnDesc().getZeroBasedIndex()); + } + }; - if (!TupleFilter.isEvaluableRecursively(filter)) - throw new IllegalArgumentException(); + if (!TupleFilter.isEvaluableRecursively(filter)) + throw new IllegalArgumentException(); + } + } + + public void setChecker(IGTBypassChecker checker) { + this.checker = checker; } @Override @@ -81,6 +89,10 @@ public class GTFilterScanner extends GTForwardingScanner { } private boolean evaluate() { + if (checker != null && checker.shouldBypass(next)) { + return false; + } + if (filter == null) return true; http://git-wip-us.apache.org/repos/asf/kylin/blob/26c03fe2/core-cube/src/main/java/org/apache/kylin/gridtable/GTForwardingScanner.java ---------------------------------------------------------------------- diff --git a/core-cube/src/main/java/org/apache/kylin/gridtable/GTForwardingScanner.java b/core-cube/src/main/java/org/apache/kylin/gridtable/GTForwardingScanner.java new file mode 100644 index 0000000..be840ea --- /dev/null +++ b/core-cube/src/main/java/org/apache/kylin/gridtable/GTForwardingScanner.java @@ -0,0 +1,52 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.kylin.gridtable; + +import static com.google.common.base.Preconditions.checkNotNull; + +import java.io.IOException; +import java.util.Iterator; + +/** + * A {@link IGTScanner} which forwards all its method calls to another scanner. + * + * @see <a href="http://en.wikipedia.org/wiki/Decorator_pattern">decorator pattern</a>. + */ +public class GTForwardingScanner implements IGTScanner { + protected IGTScanner delegated; + + protected GTForwardingScanner(IGTScanner delegated) { + this.delegated = checkNotNull(delegated, "delegated"); + } + + @Override + public GTInfo getInfo() { + return delegated.getInfo(); + } + + @Override + public void close() throws IOException { + delegated.close(); + } + + @Override + public Iterator<GTRecord> iterator() { + return delegated.iterator(); + } +} http://git-wip-us.apache.org/repos/asf/kylin/blob/26c03fe2/core-cube/src/main/java/org/apache/kylin/gridtable/GTScanRequest.java ---------------------------------------------------------------------- diff --git a/core-cube/src/main/java/org/apache/kylin/gridtable/GTScanRequest.java b/core-cube/src/main/java/org/apache/kylin/gridtable/GTScanRequest.java index e65d2b8..523814b 100644 --- a/core-cube/src/main/java/org/apache/kylin/gridtable/GTScanRequest.java +++ b/core-cube/src/main/java/org/apache/kylin/gridtable/GTScanRequest.java @@ -73,15 +73,19 @@ public class GTScanRequest { private boolean allowStorageAggregation; private double aggCacheMemThreshold; private int storageScanRowNumThreshold; + //valid value iff GTCubeStorageQueryBase.enableStorageLimitIfPossible is true private int storagePushDownLimit; + private StorageLimitLevel storageLimitLevel; // runtime computed fields private transient boolean doingStorageAggregation = false; GTScanRequest(GTInfo info, List<GTScanRange> ranges, ImmutableBitSet dimensions, ImmutableBitSet aggrGroupBy, // - ImmutableBitSet aggrMetrics, String[] aggrMetricsFuncs, TupleFilter filterPushDown, TupleFilter havingFilterPushDown, // + ImmutableBitSet aggrMetrics, String[] aggrMetricsFuncs, TupleFilter filterPushDown, + TupleFilter havingFilterPushDown, // boolean allowStorageAggregation, double aggCacheMemThreshold, int storageScanRowNumThreshold, // - int storagePushDownLimit, String storageBehavior, long startTime, long timeout) { + int storagePushDownLimit, StorageLimitLevel storageLimitLevel, String storageBehavior, long startTime, + long timeout) { this.info = info; if (ranges == null) { this.ranges = Lists.newArrayList(new GTScanRange(new GTRecord(info), new GTRecord(info))); @@ -103,6 +107,7 @@ public class GTScanRequest { this.aggCacheMemThreshold = aggCacheMemThreshold; this.storageScanRowNumThreshold = storageScanRowNumThreshold; this.storagePushDownLimit = storagePushDownLimit; + this.storageLimitLevel = storageLimitLevel; validate(info); } @@ -173,14 +178,16 @@ public class GTScanRequest { * * Refer to CoprocessorBehavior for explanation */ - public IGTScanner decorateScanner(IGTScanner scanner, boolean filterToggledOn, boolean aggrToggledOn) throws IOException { + public IGTScanner decorateScanner(IGTScanner scanner, boolean filterToggledOn, boolean aggrToggledOn) + throws IOException { return decorateScanner(scanner, filterToggledOn, aggrToggledOn, false, true); } /** * hasPreFiltered indicate the data has been filtered before scanning */ - public IGTScanner decorateScanner(IGTScanner scanner, boolean filterToggledOn, boolean aggrToggledOn, boolean hasPreFiltered, boolean spillEnabled) throws IOException { + public IGTScanner decorateScanner(IGTScanner scanner, boolean filterToggledOn, boolean aggrToggledOn, + boolean hasPreFiltered, boolean spillEnabled) throws IOException { IGTScanner result = scanner; if (!filterToggledOn) { //Skip reading this section if you're not profiling! lookAndForget(result); @@ -188,7 +195,9 @@ public class GTScanRequest { } else { if (this.hasFilterPushDown() && !hasPreFiltered) { - result = new GTFilterScanner(result, this); + result = new GTFilterScanner(result, this, null); + } else { + result = new GTForwardingScanner(result);//need its check function } if (!aggrToggledOn) {//Skip reading this section if you're not profiling! @@ -207,7 +216,6 @@ public class GTScanRequest { } return result; } - } public BufferedMeasureCodec createMeasureCodec() { @@ -281,7 +289,7 @@ public class GTScanRequest { public TupleFilter getHavingFilterPushDown() { return havingFilterPushDown; } - + public ImmutableBitSet getDimensions() { return this.getColumns().andNot(this.getAggrMetrics()); } @@ -321,6 +329,10 @@ public class GTScanRequest { return this.storagePushDownLimit; } + public StorageLimitLevel getStorageLimitLevel() { + return storageLimitLevel; + } + public String getStorageBehavior() { return storageBehavior; } @@ -335,7 +347,9 @@ public class GTScanRequest { @Override public String toString() { - return "GTScanRequest [range=" + ranges + ", columns=" + columns + ", filterPushDown=" + filterPushDown + ", aggrGroupBy=" + aggrGroupBy + ", aggrMetrics=" + aggrMetrics + ", aggrMetricsFuncs=" + Arrays.toString(aggrMetricsFuncs) + "]"; + return "GTScanRequest [range=" + ranges + ", columns=" + columns + ", filterPushDown=" + filterPushDown + + ", aggrGroupBy=" + aggrGroupBy + ", aggrMetrics=" + aggrMetrics + ", aggrMetricsFuncs=" + + Arrays.toString(aggrMetricsFuncs) + "]"; } public byte[] toByteArray() { @@ -350,12 +364,12 @@ public class GTScanRequest { private static final int SERIAL_0_BASE = 0; private static final int SERIAL_1_HAVING_FILTER = 1; - + public static final BytesSerializer<GTScanRequest> serializer = new BytesSerializer<GTScanRequest>() { @Override public void serialize(GTScanRequest value, ByteBuffer out) { final int serialLevel = KylinConfig.getInstanceFromEnv().getGTScanRequestSerializationLevel(); - + GTInfo.serializer.serialize(value.info, out); BytesUtil.writeVInt(value.ranges.size(), out); @@ -370,9 +384,10 @@ public class GTScanRequest { ImmutableBitSet.serializer.serialize(value.columns, out); BytesUtil.writeByteArray(GTUtil.serializeGTFilter(value.filterPushDown, value.info), out); - + if (serialLevel >= SERIAL_1_HAVING_FILTER) { - BytesUtil.writeByteArray(TupleFilterSerializer.serialize(value.havingFilterPushDown, StringCodeSystem.INSTANCE), out); + BytesUtil.writeByteArray( + TupleFilterSerializer.serialize(value.havingFilterPushDown, StringCodeSystem.INSTANCE), out); } ImmutableBitSet.serializer.serialize(value.aggrGroupBy, out); @@ -380,6 +395,7 @@ public class GTScanRequest { BytesUtil.writeAsciiStringArray(value.aggrMetricsFuncs, out); BytesUtil.writeVInt(value.allowStorageAggregation ? 1 : 0, out); out.putDouble(value.aggCacheMemThreshold); + BytesUtil.writeUTFString(value.getStorageLimitLevel().name(), out); BytesUtil.writeVInt(value.storageScanRowNumThreshold, out); BytesUtil.writeVInt(value.storagePushDownLimit, out); BytesUtil.writeVLong(value.startTime, out); @@ -390,7 +406,7 @@ public class GTScanRequest { @Override public GTScanRequest deserialize(ByteBuffer in) { final int serialLevel = KylinConfig.getInstanceFromEnv().getGTScanRequestSerializationLevel(); - + GTInfo sInfo = GTInfo.serializer.deserialize(in); List<GTScanRange> sRanges = Lists.newArrayList(); @@ -409,10 +425,11 @@ public class GTScanRequest { ImmutableBitSet sColumns = ImmutableBitSet.serializer.deserialize(in); TupleFilter sGTFilter = GTUtil.deserializeGTFilter(BytesUtil.readByteArray(in), sInfo); - + TupleFilter sGTHavingFilter = null; if (serialLevel >= SERIAL_1_HAVING_FILTER) { - sGTHavingFilter = TupleFilterSerializer.deserialize(BytesUtil.readByteArray(in), StringCodeSystem.INSTANCE); + sGTHavingFilter = TupleFilterSerializer.deserialize(BytesUtil.readByteArray(in), + StringCodeSystem.INSTANCE); } ImmutableBitSet sAggGroupBy = ImmutableBitSet.serializer.deserialize(in); @@ -420,17 +437,21 @@ public class GTScanRequest { String[] sAggrMetricFuncs = BytesUtil.readAsciiStringArray(in); boolean sAllowPreAggr = (BytesUtil.readVInt(in) == 1); double sAggrCacheGB = in.getDouble(); + StorageLimitLevel storageLimitLevel = StorageLimitLevel.valueOf(BytesUtil.readUTFString(in)); int storageScanRowNumThreshold = BytesUtil.readVInt(in); int storagePushDownLimit = BytesUtil.readVInt(in); long startTime = BytesUtil.readVLong(in); long timeout = BytesUtil.readVLong(in); String storageBehavior = BytesUtil.readUTFString(in); - return new GTScanRequestBuilder().setInfo(sInfo).setRanges(sRanges).setDimensions(sColumns).// - setAggrGroupBy(sAggGroupBy).setAggrMetrics(sAggrMetrics).setAggrMetricsFuncs(sAggrMetricFuncs).// - setFilterPushDown(sGTFilter).setHavingFilterPushDown(sGTHavingFilter).setAllowStorageAggregation(sAllowPreAggr).setAggCacheMemThreshold(sAggrCacheGB).// - setStorageScanRowNumThreshold(storageScanRowNumThreshold).setStoragePushDownLimit(storagePushDownLimit).// - setStartTime(startTime).setTimeout(timeout).setStorageBehavior(storageBehavior).createGTScanRequest(); + return new GTScanRequestBuilder().setInfo(sInfo).setRanges(sRanges).setDimensions(sColumns) + .setAggrGroupBy(sAggGroupBy).setAggrMetrics(sAggrMetrics).setAggrMetricsFuncs(sAggrMetricFuncs) + .setFilterPushDown(sGTFilter).setHavingFilterPushDown(sGTHavingFilter) + .setAllowStorageAggregation(sAllowPreAggr).setAggCacheMemThreshold(sAggrCacheGB) + .setStorageScanRowNumThreshold(storageScanRowNumThreshold) + .setStoragePushDownLimit(storagePushDownLimit).setStorageLimitLevel(storageLimitLevel) + .setStartTime(startTime).setTimeout(timeout).setStorageBehavior(storageBehavior) + .createGTScanRequest(); } private void serializeGTRecord(GTRecord gtRecord, ByteBuffer out) { http://git-wip-us.apache.org/repos/asf/kylin/blob/26c03fe2/core-cube/src/main/java/org/apache/kylin/gridtable/GTScanRequestBuilder.java ---------------------------------------------------------------------- diff --git a/core-cube/src/main/java/org/apache/kylin/gridtable/GTScanRequestBuilder.java b/core-cube/src/main/java/org/apache/kylin/gridtable/GTScanRequestBuilder.java index ba1fdbc..28fe40a 100644 --- a/core-cube/src/main/java/org/apache/kylin/gridtable/GTScanRequestBuilder.java +++ b/core-cube/src/main/java/org/apache/kylin/gridtable/GTScanRequestBuilder.java @@ -38,6 +38,7 @@ public class GTScanRequestBuilder { private double aggCacheMemThreshold = 0; private int storageScanRowNumThreshold = Integer.MAX_VALUE;// storage should terminate itself when $storageScanRowNumThreshold cuboid rows are scanned, and throw exception. private int storagePushDownLimit = Integer.MAX_VALUE;// storage can quit scanning safely when $toragePushDownLimit aggregated rows are produced. + private StorageLimitLevel storageLimitLevel = StorageLimitLevel.NO_LIMIT; private long startTime = -1; private long timeout = -1; private String storageBehavior = null; @@ -61,7 +62,7 @@ public class GTScanRequestBuilder { this.havingFilterPushDown = havingFilterPushDown; return this; } - + public GTScanRequestBuilder setDimensions(ImmutableBitSet dimensions) { this.dimensions = dimensions; return this; @@ -102,6 +103,11 @@ public class GTScanRequestBuilder { return this; } + public GTScanRequestBuilder setStorageLimitLevel(StorageLimitLevel storageLimitLevel) { + this.storageLimitLevel = storageLimitLevel; + return this; + } + public GTScanRequestBuilder setStartTime(long startTime) { this.startTime = startTime; return this; @@ -131,12 +137,16 @@ public class GTScanRequestBuilder { } if (storageBehavior == null) { - storageBehavior = BackdoorToggles.getCoprocessorBehavior() == null ? StorageSideBehavior.SCAN_FILTER_AGGR_CHECKMEM.toString() : BackdoorToggles.getCoprocessorBehavior(); + storageBehavior = BackdoorToggles.getCoprocessorBehavior() == null + ? StorageSideBehavior.SCAN_FILTER_AGGR_CHECKMEM.toString() + : BackdoorToggles.getCoprocessorBehavior(); } this.startTime = startTime == -1 ? System.currentTimeMillis() : startTime; this.timeout = timeout == -1 ? 300000 : timeout; - return new GTScanRequest(info, ranges, dimensions, aggrGroupBy, aggrMetrics, aggrMetricsFuncs, filterPushDown, havingFilterPushDown, allowStorageAggregation, aggCacheMemThreshold, storageScanRowNumThreshold, storagePushDownLimit, storageBehavior, startTime, timeout); + return new GTScanRequest(info, ranges, dimensions, aggrGroupBy, aggrMetrics, aggrMetricsFuncs, filterPushDown, + havingFilterPushDown, allowStorageAggregation, aggCacheMemThreshold, storageScanRowNumThreshold, + storagePushDownLimit, storageLimitLevel, storageBehavior, startTime, timeout); } -} \ No newline at end of file +} http://git-wip-us.apache.org/repos/asf/kylin/blob/26c03fe2/core-cube/src/main/java/org/apache/kylin/gridtable/GTStreamAggregateScanner.java ---------------------------------------------------------------------- diff --git a/core-cube/src/main/java/org/apache/kylin/gridtable/GTStreamAggregateScanner.java b/core-cube/src/main/java/org/apache/kylin/gridtable/GTStreamAggregateScanner.java index 4eb5791..61163d3 100644 --- a/core-cube/src/main/java/org/apache/kylin/gridtable/GTStreamAggregateScanner.java +++ b/core-cube/src/main/java/org/apache/kylin/gridtable/GTStreamAggregateScanner.java @@ -21,7 +21,6 @@ package org.apache.kylin.gridtable; import com.google.common.base.Preconditions; import com.google.common.collect.Iterators; import com.google.common.collect.PeekingIterator; -import org.apache.kylin.GTForwardingScanner; import org.apache.kylin.common.util.ImmutableBitSet; import org.apache.kylin.measure.BufferedMeasureCodec; import org.apache.kylin.measure.MeasureAggregator; http://git-wip-us.apache.org/repos/asf/kylin/blob/26c03fe2/core-cube/src/main/java/org/apache/kylin/gridtable/IGTBypassChecker.java ---------------------------------------------------------------------- diff --git a/core-cube/src/main/java/org/apache/kylin/gridtable/IGTBypassChecker.java b/core-cube/src/main/java/org/apache/kylin/gridtable/IGTBypassChecker.java new file mode 100644 index 0000000..55f36d5 --- /dev/null +++ b/core-cube/src/main/java/org/apache/kylin/gridtable/IGTBypassChecker.java @@ -0,0 +1,23 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.kylin.gridtable; + +public interface IGTBypassChecker { + boolean shouldBypass(GTRecord record); +} http://git-wip-us.apache.org/repos/asf/kylin/blob/26c03fe2/core-cube/src/main/java/org/apache/kylin/gridtable/StorageLimitLevel.java ---------------------------------------------------------------------- diff --git a/core-cube/src/main/java/org/apache/kylin/gridtable/StorageLimitLevel.java b/core-cube/src/main/java/org/apache/kylin/gridtable/StorageLimitLevel.java new file mode 100644 index 0000000..5313a1f --- /dev/null +++ b/core-cube/src/main/java/org/apache/kylin/gridtable/StorageLimitLevel.java @@ -0,0 +1,29 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.kylin.gridtable; + +public enum StorageLimitLevel { + NO_LIMIT, + //even if enableStorageLimitIfPossible() is false, + //(meaning we have to scan through all the cuboid rows) + //we can still take advantage of the fact that we don't need all of the agg keys + LIMIT_ON_RETURN_SIZE, + //iff enableStorageLimitIfPossible() is true + LIMIT_ON_SCAN +} http://git-wip-us.apache.org/repos/asf/kylin/blob/26c03fe2/core-storage/src/main/java/org/apache/kylin/storage/StorageContext.java ---------------------------------------------------------------------- diff --git a/core-storage/src/main/java/org/apache/kylin/storage/StorageContext.java b/core-storage/src/main/java/org/apache/kylin/storage/StorageContext.java index 78cf97c..a2e2869 100644 --- a/core-storage/src/main/java/org/apache/kylin/storage/StorageContext.java +++ b/core-storage/src/main/java/org/apache/kylin/storage/StorageContext.java @@ -23,6 +23,7 @@ import java.util.concurrent.atomic.AtomicLong; import org.apache.kylin.common.StorageURL; import org.apache.kylin.common.debug.BackdoorToggles; import org.apache.kylin.cube.cuboid.Cuboid; +import org.apache.kylin.gridtable.StorageLimitLevel; import org.apache.kylin.metadata.realization.IRealization; import org.apache.kylin.storage.gtrecord.GTCubeStorageQueryBase; import org.slf4j.Logger; @@ -41,6 +42,7 @@ public class StorageContext { private boolean overlookOuterLimit = false; private int offset = 0; private int finalPushDownLimit = Integer.MAX_VALUE; + private StorageLimitLevel storageLimitLevel = StorageLimitLevel.NO_LIMIT; private boolean hasSort = false; private boolean acceptPartialResult = false; private long deadline; @@ -68,7 +70,8 @@ public class StorageContext { //the limit here correspond to the limit concept in SQL //also take into consideration Statement.setMaxRows in JDBC private int getLimit() { - if (overlookOuterLimit || BackdoorToggles.getStatementMaxRows() == null || BackdoorToggles.getStatementMaxRows() == 0) { + if (overlookOuterLimit || BackdoorToggles.getStatementMaxRows() == null + || BackdoorToggles.getStatementMaxRows() == 0) { return limit; } else { return Math.min(limit, BackdoorToggles.getStatementMaxRows()); @@ -77,7 +80,8 @@ public class StorageContext { public void setLimit(int l) { if (limit != Integer.MAX_VALUE) { - logger.warn("Setting limit to {} but in current olap context, the limit is already {}, won't apply", l, limit); + logger.warn("Setting limit to {} but in current olap context, the limit is already {}, won't apply", l, + limit); } else { limit = l; } @@ -107,7 +111,7 @@ public class StorageContext { return isValidPushDownLimit(finalPushDownLimit); } - public static boolean isValidPushDownLimit(int finalPushDownLimit) { + public static boolean isValidPushDownLimit(long finalPushDownLimit) { return finalPushDownLimit < Integer.MAX_VALUE && finalPushDownLimit > 0; } @@ -115,20 +119,31 @@ public class StorageContext { return finalPushDownLimit; } - public void setFinalPushDownLimit(IRealization realization) { + public StorageLimitLevel getStorageLimitLevel() { + return storageLimitLevel; + } + + public void applyLimitPushDown(IRealization realization, StorageLimitLevel storageLimitLevel) { - if (!isValidPushDownLimit(this.getLimit())) { + if (storageLimitLevel == StorageLimitLevel.NO_LIMIT) { return; } - int tempPushDownLimit = this.getOffset() + this.getLimit(); - if (!realization.supportsLimitPushDown()) { logger.warn("Not enabling limit push down because cube storage type not supported"); - } else { - this.finalPushDownLimit = tempPushDownLimit; - logger.info("Enable limit (storage push down limit) :" + tempPushDownLimit); + return; } + + long temp = this.getOffset() + this.getLimit(); + + if (!isValidPushDownLimit(temp)) { + logger.warn("Not enabling limit push down because current limit is invalid: " + this.getLimit()); + return; + } + + this.finalPushDownLimit = (int) temp; + this.storageLimitLevel = storageLimitLevel; + logger.info("Enabling limit push down: {} at level: {}", temp, storageLimitLevel); } public boolean mergeSortPartitionResults() { http://git-wip-us.apache.org/repos/asf/kylin/blob/26c03fe2/core-storage/src/main/java/org/apache/kylin/storage/gtrecord/CubeScanRangePlanner.java ---------------------------------------------------------------------- diff --git a/core-storage/src/main/java/org/apache/kylin/storage/gtrecord/CubeScanRangePlanner.java b/core-storage/src/main/java/org/apache/kylin/storage/gtrecord/CubeScanRangePlanner.java index cecea85..14af5ac 100644 --- a/core-storage/src/main/java/org/apache/kylin/storage/gtrecord/CubeScanRangePlanner.java +++ b/core-storage/src/main/java/org/apache/kylin/storage/gtrecord/CubeScanRangePlanner.java @@ -148,7 +148,7 @@ public class CubeScanRangePlanner extends ScanRangePlannerBase { scanRequest = new GTScanRequestBuilder().setInfo(gtInfo).setRanges(scanRanges).setDimensions(gtDimensions).// setAggrGroupBy(gtAggrGroups).setAggrMetrics(gtAggrMetrics).setAggrMetricsFuncs(gtAggrFuncs).setFilterPushDown(gtFilter).// setAllowStorageAggregation(context.isNeedStorageAggregation()).setAggCacheMemThreshold(cubeSegment.getConfig().getQueryCoprocessorMemGB()).// - setStoragePushDownLimit(context.getFinalPushDownLimit()).setHavingFilterPushDown(havingFilter).createGTScanRequest(); + setStoragePushDownLimit(context.getFinalPushDownLimit()).setStorageLimitLevel(context.getStorageLimitLevel()).setHavingFilterPushDown(havingFilter).createGTScanRequest(); } else { scanRequest = null; } http://git-wip-us.apache.org/repos/asf/kylin/blob/26c03fe2/core-storage/src/main/java/org/apache/kylin/storage/gtrecord/GTCubeStorageQueryBase.java ---------------------------------------------------------------------- diff --git a/core-storage/src/main/java/org/apache/kylin/storage/gtrecord/GTCubeStorageQueryBase.java b/core-storage/src/main/java/org/apache/kylin/storage/gtrecord/GTCubeStorageQueryBase.java index a3af511..a0c6ea2 100644 --- a/core-storage/src/main/java/org/apache/kylin/storage/gtrecord/GTCubeStorageQueryBase.java +++ b/core-storage/src/main/java/org/apache/kylin/storage/gtrecord/GTCubeStorageQueryBase.java @@ -35,6 +35,7 @@ import org.apache.kylin.cube.cuboid.Cuboid; import org.apache.kylin.cube.model.CubeDesc; import org.apache.kylin.cube.model.CubeDesc.DeriveInfo; import org.apache.kylin.dict.lookup.LookupStringTable; +import org.apache.kylin.gridtable.StorageLimitLevel; import org.apache.kylin.measure.MeasureType; import org.apache.kylin.metadata.filter.CaseTupleFilter; import org.apache.kylin.metadata.filter.ColumnTupleFilter; @@ -139,7 +140,8 @@ public abstract class GTCubeStorageQueryBase implements IStorageQuery { context.setNeedStorageAggregation(isNeedStorageAggregation(cuboid, groupsD, singleValuesD)); // exactAggregation mean: needn't aggregation at storage and query engine both. - boolean exactAggregation = isExactAggregation(context, cuboid, groups, otherDimsD, singleValuesD, derivedPostAggregation, sqlDigest.aggregations); + boolean exactAggregation = isExactAggregation(context, cuboid, groups, otherDimsD, singleValuesD, + derivedPostAggregation, sqlDigest.aggregations); context.setExactAggregation(exactAggregation); // replace derived columns in filter with host columns; columns on loosened condition must be added to group by @@ -161,9 +163,10 @@ public abstract class GTCubeStorageQueryBase implements IStorageQuery { TupleFilter havingFilter = checkHavingCanPushDown(sqlDigest.havingFilter, groupsD, sqlDigest.aggregations, metrics); - logger.info("Cuboid identified: cube={}, cuboidId={}, groupsD={}, filterD={}, limitPushdown={}, storageAggr={}", + logger.info( + "Cuboid identified: cube={}, cuboidId={}, groupsD={}, filterD={}, limitPushdown={}, limitLevel={}, storageAggr={}", cubeInstance.getName(), cuboid.getId(), groupsD, filterColumnD, context.getFinalPushDownLimit(), - context.isNeedStorageAggregation()); + context.getStorageLimitLevel(), context.isNeedStorageAggregation()); return new GTCubeStorageQueryRequest(cuboid, dimensionsD, groupsD, filterColumnD, metrics, filterD, havingFilter, context); @@ -365,50 +368,50 @@ public abstract class GTCubeStorageQueryBase implements IStorageQuery { private void enableStorageLimitIfPossible(Cuboid cuboid, Collection<TblColRef> groups, Set<TblColRef> derivedPostAggregation, Collection<TblColRef> groupsD, TupleFilter filter, Set<TblColRef> loosenedColumnD, Collection<FunctionDesc> functionDescs, StorageContext context) { - boolean possible = true; - if (!TupleFilter.isEvaluableRecursively(filter)) { - possible = false; - logger.debug("Storage limit push down is impossible because the filter isn't evaluable"); - } + StorageLimitLevel storageLimitLevel = StorageLimitLevel.LIMIT_ON_SCAN; - if (!loosenedColumnD.isEmpty()) { // KYLIN-2173 - possible = false; - logger.debug("Storage limit push down is impossible because filter is loosened: " + loosenedColumnD); - } - - if (context.hasSort()) { - possible = false; - logger.debug("Storage limit push down is impossible because the query has order by"); + //if groupsD is clustered at "head" of the rowkey, then limit push down is possible + int size = groupsD.size(); + if (!groupsD.containsAll(cuboid.getColumns().subList(0, size))) { + storageLimitLevel = StorageLimitLevel.LIMIT_ON_RETURN_SIZE; + logger.debug( + "storageLimitLevel set to LIMIT_ON_RETURN_SIZE because groupD is not clustered at head, groupsD: " + + groupsD // + + " with cuboid columns: " + cuboid.getColumns()); } // derived aggregation is bad, unless expanded columns are already in group by if (!groups.containsAll(derivedPostAggregation)) { - possible = false; - logger.debug("Storage limit push down is impossible because derived column require post aggregation: " + storageLimitLevel = StorageLimitLevel.NO_LIMIT; + logger.debug("storageLimitLevel set to NO_LIMIT because derived column require post aggregation: " + derivedPostAggregation); } - //if groupsD is clustered at "head" of the rowkey, then limit push down is possible - int size = groupsD.size(); - if (!groupsD.containsAll(cuboid.getColumns().subList(0, size))) { - possible = false; - logger.debug( - "Storage limit push down is impossible because groupD is not clustered at head, groupsD: " + groupsD // - + " with cuboid columns: " + cuboid.getColumns()); + if (!TupleFilter.isEvaluableRecursively(filter)) { + storageLimitLevel = StorageLimitLevel.NO_LIMIT; + logger.debug("storageLimitLevel set to NO_LIMIT because the filter isn't evaluable"); + } + + if (!loosenedColumnD.isEmpty()) { // KYLIN-2173 + storageLimitLevel = StorageLimitLevel.NO_LIMIT; + logger.debug("storageLimitLevel set to NO_LIMIT because filter is loosened: " + loosenedColumnD); + } + + if (context.hasSort()) { + storageLimitLevel = StorageLimitLevel.NO_LIMIT; + logger.debug("storageLimitLevel set to NO_LIMIT because the query has order by"); } //if exists measures like max(cal_dt), then it's not a perfect cuboid match, cannot apply limit for (FunctionDesc functionDesc : functionDescs) { if (functionDesc.isDimensionAsMetric()) { - possible = false; - logger.debug("Storage limit push down is impossible because {} isDimensionAsMetric ", functionDesc); + storageLimitLevel = StorageLimitLevel.NO_LIMIT; + logger.debug("storageLimitLevel set to NO_LIMIT because {} isDimensionAsMetric ", functionDesc); } } - if (possible) { - context.setFinalPushDownLimit(cubeInstance); - } + context.applyLimitPushDown(cubeInstance, storageLimitLevel); } private void enableStreamAggregateIfBeneficial(Cuboid cuboid, Set<TblColRef> groupsD, StorageContext context) { @@ -493,7 +496,9 @@ public abstract class GTCubeStorageQueryBase implements IStorageQuery { return havingFilter; } - private boolean isExactAggregation(StorageContext context, Cuboid cuboid, Collection<TblColRef> groups, Set<TblColRef> othersD, Set<TblColRef> singleValuesD, Set<TblColRef> derivedPostAggregation, Collection<FunctionDesc> functionDescs) { + private boolean isExactAggregation(StorageContext context, Cuboid cuboid, Collection<TblColRef> groups, + Set<TblColRef> othersD, Set<TblColRef> singleValuesD, Set<TblColRef> derivedPostAggregation, + Collection<FunctionDesc> functionDescs) { if (context.isNeedStorageAggregation()) { logger.info("exactAggregation is false because need storage aggregation"); return false; @@ -506,7 +511,8 @@ public abstract class GTCubeStorageQueryBase implements IStorageQuery { // derived aggregation is bad, unless expanded columns are already in group by if (groups.containsAll(derivedPostAggregation) == false) { - logger.info("exactAggregation is false because derived column require post aggregation: " + derivedPostAggregation); + logger.info("exactAggregation is false because derived column require post aggregation: " + + derivedPostAggregation); return false; } http://git-wip-us.apache.org/repos/asf/kylin/blob/26c03fe2/core-storage/src/main/java/org/apache/kylin/storage/gtrecord/SortMergedPartitionResultIterator.java ---------------------------------------------------------------------- diff --git a/core-storage/src/main/java/org/apache/kylin/storage/gtrecord/SortMergedPartitionResultIterator.java b/core-storage/src/main/java/org/apache/kylin/storage/gtrecord/SortMergedPartitionResultIterator.java index 21e61e3..14e0d86 100644 --- a/core-storage/src/main/java/org/apache/kylin/storage/gtrecord/SortMergedPartitionResultIterator.java +++ b/core-storage/src/main/java/org/apache/kylin/storage/gtrecord/SortMergedPartitionResultIterator.java @@ -18,28 +18,28 @@ package org.apache.kylin.storage.gtrecord; -import com.google.common.collect.Iterators; -import com.google.common.collect.PeekingIterator; -import com.google.common.collect.UnmodifiableIterator; -import org.apache.kylin.gridtable.GTInfo; -import org.apache.kylin.gridtable.GTRecord; - import java.util.Comparator; import java.util.List; import java.util.NoSuchElementException; import java.util.PriorityQueue; +import org.apache.kylin.gridtable.GTInfo; +import org.apache.kylin.gridtable.GTRecord; + +import com.google.common.collect.Iterators; +import com.google.common.collect.PeekingIterator; +import com.google.common.collect.UnmodifiableIterator; + /** * Merge-sort {@code GTRecord}s in all partitions, assume each partition contains sorted elements. */ public class SortMergedPartitionResultIterator extends UnmodifiableIterator<GTRecord> { - final GTRecord record ; // reuse to avoid object creation + final GTRecord record; // reuse to avoid object creation PriorityQueue<PeekingIterator<GTRecord>> heap; - SortMergedPartitionResultIterator( - List<PartitionResultIterator> partitionResults, - GTInfo info, final Comparator<GTRecord> comparator) { + SortMergedPartitionResultIterator(List<PartitionResultIterator> partitionResults, GTInfo info, + final Comparator<GTRecord> comparator) { this.record = new GTRecord(info); Comparator<PeekingIterator<GTRecord>> heapComparator = new Comparator<PeekingIterator<GTRecord>>() { http://git-wip-us.apache.org/repos/asf/kylin/blob/26c03fe2/storage-hbase/src/main/java/org/apache/kylin/storage/hbase/cube/v2/coprocessor/endpoint/CubeVisitService.java ---------------------------------------------------------------------- diff --git a/storage-hbase/src/main/java/org/apache/kylin/storage/hbase/cube/v2/coprocessor/endpoint/CubeVisitService.java b/storage-hbase/src/main/java/org/apache/kylin/storage/hbase/cube/v2/coprocessor/endpoint/CubeVisitService.java index 3791e63..d94b547 100644 --- a/storage-hbase/src/main/java/org/apache/kylin/storage/hbase/cube/v2/coprocessor/endpoint/CubeVisitService.java +++ b/storage-hbase/src/main/java/org/apache/kylin/storage/hbase/cube/v2/coprocessor/endpoint/CubeVisitService.java @@ -52,6 +52,7 @@ import org.apache.kylin.gridtable.GTRecord; import org.apache.kylin.gridtable.GTScanRequest; import org.apache.kylin.gridtable.IGTScanner; import org.apache.kylin.gridtable.IGTStore; +import org.apache.kylin.gridtable.StorageLimitLevel; import org.apache.kylin.gridtable.StorageSideBehavior; import org.apache.kylin.measure.BufferedMeasureCodec; import org.apache.kylin.metadata.realization.IRealizationConstants; @@ -147,8 +148,8 @@ public class CubeVisitService extends CubeVisitProtos.CubeVisitService implement private long rowCount; private long rowBytes; - ResourceTrackingCellListIterator(Iterator<List<Cell>> delegate, - long rowCountLimit, long bytesLimit, long timeout) { + ResourceTrackingCellListIterator(Iterator<List<Cell>> delegate, long rowCountLimit, long bytesLimit, + long timeout) { this.delegate = delegate; this.rowCountLimit = rowCountLimit; this.bytesLimit = bytesLimit; @@ -162,7 +163,8 @@ public class CubeVisitService extends CubeVisitProtos.CubeVisitService implement throw new ResourceLimitExceededException("scanned row count exceeds threshold " + rowCountLimit); } if (rowBytes > bytesLimit) { - throw new ResourceLimitExceededException("scanned bytes " + rowBytes + " exceeds threshold " + bytesLimit); + throw new ResourceLimitExceededException( + "scanned bytes " + rowBytes + " exceeds threshold " + bytesLimit); } if ((rowCount % GTScanRequest.terminateCheckInterval == 1) && System.currentTimeMillis() > deadline) { throw new KylinTimeoutException("coprocessor timeout after " + timeout + " ms"); @@ -193,7 +195,8 @@ public class CubeVisitService extends CubeVisitProtos.CubeVisitService implement if (shardLength == 0) { return; } - byte[] regionStartKey = ArrayUtils.isEmpty(region.getRegionInfo().getStartKey()) ? new byte[shardLength] : region.getRegionInfo().getStartKey(); + byte[] regionStartKey = ArrayUtils.isEmpty(region.getRegionInfo().getStartKey()) ? new byte[shardLength] + : region.getRegionInfo().getStartKey(); Bytes.putBytes(rawScan.startKey, 0, regionStartKey, 0, shardLength); Bytes.putBytes(rawScan.endKey, 0, regionStartKey, 0, shardLength); } @@ -218,7 +221,8 @@ public class CubeVisitService extends CubeVisitProtos.CubeVisitService implement @SuppressWarnings("checkstyle:methodlength") @Override - public void visitCube(final RpcController controller, final CubeVisitProtos.CubeVisitRequest request, RpcCallback<CubeVisitProtos.CubeVisitResponse> done) { + public void visitCube(final RpcController controller, final CubeVisitProtos.CubeVisitRequest request, + RpcCallback<CubeVisitProtos.CubeVisitResponse> done) { List<RegionScanner> regionScanners = Lists.newArrayList(); HRegion region = null; @@ -232,7 +236,7 @@ public class CubeVisitService extends CubeVisitProtos.CubeVisitService implement try (SetThreadName ignored = new SetThreadName("Query %s", queryId)) { final long serviceStartTime = System.currentTimeMillis(); - region = (HRegion)env.getRegion(); + region = (HRegion) env.getRegion(); region.startRegionOperation(); // if user change kylin.properties on kylin server, need to manually redeploy coprocessor jar to update KylinConfig of Env. @@ -241,13 +245,15 @@ public class CubeVisitService extends CubeVisitProtos.CubeVisitService implement debugGitTag = region.getTableDesc().getValue(IRealizationConstants.HTableGitTag); - final GTScanRequest scanReq = GTScanRequest.serializer.deserialize(ByteBuffer.wrap(HBaseZeroCopyByteString.zeroCopyGetBytes(request.getGtScanRequest()))); + final GTScanRequest scanReq = GTScanRequest.serializer + .deserialize(ByteBuffer.wrap(HBaseZeroCopyByteString.zeroCopyGetBytes(request.getGtScanRequest()))); List<List<Integer>> hbaseColumnsToGT = Lists.newArrayList(); for (IntList intList : request.getHbaseColumnsToGTList()) { hbaseColumnsToGT.add(intList.getIntsList()); } StorageSideBehavior behavior = StorageSideBehavior.valueOf(scanReq.getStorageBehavior()); - final List<RawScan> hbaseRawScans = deserializeRawScans(ByteBuffer.wrap(HBaseZeroCopyByteString.zeroCopyGetBytes(request.getHbaseRawScan()))); + final List<RawScan> hbaseRawScans = deserializeRawScans( + ByteBuffer.wrap(HBaseZeroCopyByteString.zeroCopyGetBytes(request.getHbaseRawScan()))); appendProfileInfo(sb, "start latency: " + (serviceStartTime - scanReq.getStartTime()), serviceStartTime); @@ -256,7 +262,8 @@ public class CubeVisitService extends CubeVisitProtos.CubeVisitService implement for (RawScan hbaseRawScan : hbaseRawScans) { if (request.getRowkeyPreambleSize() - RowConstants.ROWKEY_CUBOIDID_LEN > 0) { //if has shard, fill region shard to raw scan start/end - updateRawScanByCurrentRegion(hbaseRawScan, region, request.getRowkeyPreambleSize() - RowConstants.ROWKEY_CUBOIDID_LEN); + updateRawScanByCurrentRegion(hbaseRawScan, region, + request.getRowkeyPreambleSize() - RowConstants.ROWKEY_CUBOIDID_LEN); } Scan scan = CubeHBaseRPC.buildScan(hbaseRawScan); @@ -287,16 +294,18 @@ public class CubeVisitService extends CubeVisitProtos.CubeVisitService implement final long storagePushDownLimit = scanReq.getStoragePushDownLimit(); - ResourceTrackingCellListIterator cellListIterator = new ResourceTrackingCellListIterator( - allCellLists, + ResourceTrackingCellListIterator cellListIterator = new ResourceTrackingCellListIterator(allCellLists, scanReq.getStorageScanRowNumThreshold(), // for old client (scan threshold) !request.hasMaxScanBytes() ? Long.MAX_VALUE : request.getMaxScanBytes(), // for new client scanReq.getTimeout()); - IGTStore store = new HBaseReadonlyStore(cellListIterator, scanReq, hbaseRawScans.get(0).hbaseColumns, hbaseColumnsToGT, request.getRowkeyPreambleSize(), behavior.delayToggledOn(), request.getIsExactAggregate()); + IGTStore store = new HBaseReadonlyStore(cellListIterator, scanReq, hbaseRawScans.get(0).hbaseColumns, + hbaseColumnsToGT, request.getRowkeyPreambleSize(), behavior.delayToggledOn(), + request.getIsExactAggregate()); IGTScanner rawScanner = store.scan(scanReq); - IGTScanner finalScanner = scanReq.decorateScanner(rawScanner, behavior.filterToggledOn(), behavior.aggrToggledOn(), false, request.getSpillEnabled()); + IGTScanner finalScanner = scanReq.decorateScanner(rawScanner, behavior.filterToggledOn(), + behavior.aggrToggledOn(), false, request.getSpillEnabled()); ByteBuffer buffer = ByteBuffer.allocate(BufferedMeasureCodec.DEFAULT_BUFFER_SIZE); @@ -318,7 +327,9 @@ public class CubeVisitService extends CubeVisitProtos.CubeVisitService implement finalRowCount++; //if it's doing storage aggr, then should rely on GTAggregateScanner's limit check - if (!scanReq.isDoingStorageAggregation() && finalRowCount >= storagePushDownLimit) { + if (!scanReq.isDoingStorageAggregation() + && (scanReq.getStorageLimitLevel() != StorageLimitLevel.NO_LIMIT + && finalRowCount >= storagePushDownLimit)) { //read one more record than limit logger.info("The finalScanner aborted because storagePushDownLimit is satisfied"); break; @@ -327,22 +338,20 @@ public class CubeVisitService extends CubeVisitProtos.CubeVisitService implement } catch (KylinTimeoutException e) { logger.info("Abort scan: {}", e.getMessage()); errorInfo = CubeVisitProtos.CubeVisitResponse.ErrorInfo.newBuilder() - .setType(CubeVisitProtos.CubeVisitResponse.ErrorType.TIMEOUT) - .setMessage(e.getMessage()) + .setType(CubeVisitProtos.CubeVisitResponse.ErrorType.TIMEOUT).setMessage(e.getMessage()) .build(); } catch (ResourceLimitExceededException e) { logger.info("Abort scan: {}", e.getMessage()); errorInfo = CubeVisitProtos.CubeVisitResponse.ErrorInfo.newBuilder() .setType(CubeVisitProtos.CubeVisitResponse.ErrorType.RESOURCE_LIMIT_EXCEEDED) - .setMessage(e.getMessage()) - .build(); + .setMessage(e.getMessage()).build(); } finally { finalScanner.close(); } appendProfileInfo(sb, "agg done", serviceStartTime); - logger.info("Total scanned {} rows and {} bytes", - cellListIterator.getTotalScannedRowCount(), cellListIterator.getTotalScannedRowBytes()); + logger.info("Total scanned {} rows and {} bytes", cellListIterator.getTotalScannedRowCount(), + cellListIterator.getTotalScannedRowBytes()); //outputStream.close() is not necessary byte[] compressedAllRows; @@ -360,7 +369,8 @@ public class CubeVisitService extends CubeVisitProtos.CubeVisitService implement appendProfileInfo(sb, "compress done", serviceStartTime); logger.info("Size of final result = {} ({} before compressing)", compressedAllRows.length, allRows.length); - OperatingSystemMXBean operatingSystemMXBean = (OperatingSystemMXBean) ManagementFactory.getOperatingSystemMXBean(); + OperatingSystemMXBean operatingSystemMXBean = (OperatingSystemMXBean) ManagementFactory + .getOperatingSystemMXBean(); double systemCpuLoad = operatingSystemMXBean.getSystemCpuLoad(); double freePhysicalMemorySize = operatingSystemMXBean.getFreePhysicalMemorySize(); double freeSwapSpaceSize = operatingSystemMXBean.getFreeSwapSpaceSize(); @@ -374,18 +384,15 @@ public class CubeVisitService extends CubeVisitProtos.CubeVisitService implement } done.run(responseBuilder.// setCompressedRows(HBaseZeroCopyByteString.wrap(compressedAllRows)).//too many array copies - setStats(CubeVisitProtos.CubeVisitResponse.Stats.newBuilder(). - setAggregatedRowCount(cellListIterator.getTotalScannedRowCount() - finalRowCount). - setScannedRowCount(cellListIterator.getTotalScannedRowCount()). - setScannedBytes(cellListIterator.getTotalScannedRowBytes()). - setServiceStartTime(serviceStartTime). - setServiceEndTime(System.currentTimeMillis()). - setSystemCpuLoad(systemCpuLoad). - setFreePhysicalMemorySize(freePhysicalMemorySize). - setFreeSwapSpaceSize(freeSwapSpaceSize). - setHostname(InetAddress.getLocalHost().getHostName()). - setEtcMsg(sb.toString()). - setNormalComplete(errorInfo == null ? 1 : 0).build()) + setStats(CubeVisitProtos.CubeVisitResponse.Stats.newBuilder() + .setAggregatedRowCount(cellListIterator.getTotalScannedRowCount() - finalRowCount) + .setScannedRowCount(cellListIterator.getTotalScannedRowCount()) + .setScannedBytes(cellListIterator.getTotalScannedRowBytes()) + .setServiceStartTime(serviceStartTime).setServiceEndTime(System.currentTimeMillis()) + .setSystemCpuLoad(systemCpuLoad).setFreePhysicalMemorySize(freePhysicalMemorySize) + .setFreeSwapSpaceSize(freeSwapSpaceSize) + .setHostname(InetAddress.getLocalHost().getHostName()).setEtcMsg(sb.toString()) + .setNormalComplete(errorInfo == null ? 1 : 0).build()) .build()); } catch (IOException ioe) {