minor, refine enableStorageLimitIfPossible logic

Project: http://git-wip-us.apache.org/repos/asf/kylin/repo
Commit: http://git-wip-us.apache.org/repos/asf/kylin/commit/26c03fe2
Tree: http://git-wip-us.apache.org/repos/asf/kylin/tree/26c03fe2
Diff: http://git-wip-us.apache.org/repos/asf/kylin/diff/26c03fe2

Branch: refs/heads/ranger
Commit: 26c03fe26b665adcf31cefb574f7c389a4daa0a5
Parents: b1b3c22
Author: Hongbin Ma <mahong...@apache.org>
Authored: Mon Sep 4 22:39:53 2017 +0800
Committer: Hongbin Ma <m...@kyligence.io>
Committed: Tue Sep 5 10:29:20 2017 +0800

----------------------------------------------------------------------
 .../org/apache/kylin/GTForwardingScanner.java   |  56 ------
 .../kylin/gridtable/GTAggregateScanner.java     | 171 ++++++++++++++++---
 .../apache/kylin/gridtable/GTFilterScanner.java |  44 +++--
 .../kylin/gridtable/GTForwardingScanner.java    |  52 ++++++
 .../apache/kylin/gridtable/GTScanRequest.java   |  61 ++++---
 .../kylin/gridtable/GTScanRequestBuilder.java   |  18 +-
 .../gridtable/GTStreamAggregateScanner.java     |   1 -
 .../kylin/gridtable/IGTBypassChecker.java       |  23 +++
 .../kylin/gridtable/StorageLimitLevel.java      |  29 ++++
 .../apache/kylin/storage/StorageContext.java    |  35 ++--
 .../storage/gtrecord/CubeScanRangePlanner.java  |   2 +-
 .../gtrecord/GTCubeStorageQueryBase.java        |  70 ++++----
 .../SortMergedPartitionResultIterator.java      |  20 +--
 .../coprocessor/endpoint/CubeVisitService.java  |  73 ++++----
 14 files changed, 449 insertions(+), 206 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/kylin/blob/26c03fe2/core-cube/src/main/java/org/apache/kylin/GTForwardingScanner.java
----------------------------------------------------------------------
diff --git a/core-cube/src/main/java/org/apache/kylin/GTForwardingScanner.java 
b/core-cube/src/main/java/org/apache/kylin/GTForwardingScanner.java
deleted file mode 100644
index de8c88d..0000000
--- a/core-cube/src/main/java/org/apache/kylin/GTForwardingScanner.java
+++ /dev/null
@@ -1,56 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.kylin;
-
-import org.apache.kylin.gridtable.GTInfo;
-import org.apache.kylin.gridtable.GTRecord;
-import org.apache.kylin.gridtable.IGTScanner;
-
-import java.io.IOException;
-import java.util.Iterator;
-
-import static com.google.common.base.Preconditions.checkNotNull;
-
-/**
- * A {@link IGTScanner} which forwards all its method calls to another scanner.
- *
- * @see <a href="http://en.wikipedia.org/wiki/Decorator_pattern";>decorator 
pattern</a>.
- */
-public class GTForwardingScanner implements IGTScanner {
-    protected IGTScanner delegated;
-
-    protected GTForwardingScanner(IGTScanner delegated) {
-        this.delegated = checkNotNull(delegated, "delegated");
-    }
-
-    @Override
-    public GTInfo getInfo() {
-        return delegated.getInfo();
-    }
-
-    @Override
-    public void close() throws IOException {
-        delegated.close();
-    }
-
-    @Override
-    public Iterator<GTRecord> iterator() {
-        return delegated.iterator();
-    }
-}

http://git-wip-us.apache.org/repos/asf/kylin/blob/26c03fe2/core-cube/src/main/java/org/apache/kylin/gridtable/GTAggregateScanner.java
----------------------------------------------------------------------
diff --git 
a/core-cube/src/main/java/org/apache/kylin/gridtable/GTAggregateScanner.java 
b/core-cube/src/main/java/org/apache/kylin/gridtable/GTAggregateScanner.java
index 92d0fac..cbf4232 100644
--- a/core-cube/src/main/java/org/apache/kylin/gridtable/GTAggregateScanner.java
+++ b/core-cube/src/main/java/org/apache/kylin/gridtable/GTAggregateScanner.java
@@ -33,6 +33,7 @@ import java.util.List;
 import java.util.Map.Entry;
 import java.util.PriorityQueue;
 import java.util.SortedMap;
+import java.util.TreeMap;
 
 import org.apache.commons.io.IOUtils;
 import org.apache.kylin.common.exceptions.ResourceLimitExceededException;
@@ -55,11 +56,12 @@ import org.apache.kylin.metadata.tuple.ITuple;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
+import com.google.common.base.Preconditions;
 import com.google.common.collect.Lists;
 import com.google.common.collect.Maps;
 
 @SuppressWarnings({ "rawtypes", "unchecked" })
-public class GTAggregateScanner implements IGTScanner {
+public class GTAggregateScanner implements IGTScanner, IGTBypassChecker {
 
     private static final Logger logger = 
LoggerFactory.getLogger(GTAggregateScanner.class);
 
@@ -73,6 +75,7 @@ public class GTAggregateScanner implements IGTScanner {
     final AggregationCache aggrCache;
     final long spillThreshold; // 0 means no memory control && no spill
     final int storagePushDownLimit;//default to be Int.MAX
+    final StorageLimitLevel storageLimitLevel;
     final boolean spillEnabled;
     final TupleFilter havingFilter;
 
@@ -84,24 +87,34 @@ public class GTAggregateScanner implements IGTScanner {
         this(inputScanner, req, true);
     }
 
-    public GTAggregateScanner(IGTScanner inputScanner, GTScanRequest req, 
boolean spillEnabled) {
+    public GTAggregateScanner(IGTScanner input, GTScanRequest req, boolean 
spillEnabled) {
         if (!req.hasAggregation())
             throw new IllegalStateException();
 
-        this.info = inputScanner.getInfo();
+        if (input instanceof GTFilterScanner) {
+            logger.info("setting IGTBypassChecker of child");
+            ((GTFilterScanner) input).setChecker(this);
+        } else {
+            logger.info("applying a GTFilterScanner with IGTBypassChecker on 
top child");
+            input = new GTFilterScanner(input, null, this);
+        }
+
+        this.inputScanner = input;
+        this.info = this.inputScanner.getInfo();
         this.dimensions = req.getDimensions();
         this.groupBy = req.getAggrGroupBy();
         this.metrics = req.getAggrMetrics();
         this.metricsAggrFuncs = req.getAggrMetricsFuncs();
-        this.inputScanner = inputScanner;
         this.measureCodec = req.createMeasureCodec();
-        this.aggrCache = new AggregationCache();
         this.spillThreshold = (long) (req.getAggCacheMemThreshold() * 
MemoryBudgetController.ONE_GB);
         this.aggrMask = new boolean[metricsAggrFuncs.length];
         this.storagePushDownLimit = req.getStoragePushDownLimit();
+        this.storageLimitLevel = req.getStorageLimitLevel();
         this.spillEnabled = spillEnabled;
         this.havingFilter = req.getHavingFilterPushDown();
 
+        this.aggrCache = new AggregationCache();
+
         Arrays.fill(aggrMask, true);
     }
 
@@ -147,18 +160,15 @@ public class GTAggregateScanner implements IGTScanner {
     @Override
     public Iterator<GTRecord> iterator() {
         long count = 0;
+
         for (GTRecord r : inputScanner) {
 
-            if (getNumOfSpills() == 0) {
-                //check limit
-                boolean ret = aggrCache.aggregate(r, storagePushDownLimit);
+            //check limit
+            boolean ret = aggrCache.aggregate(r);
 
-                if (!ret) {
-                    logger.info("abort reading inputScanner because storage 
push down limit is hit");
-                    break;//limit is hit
-                }
-            } else {//else if dumps is not empty, it means a lot of row need 
aggregated, so it's less likely that limit clause is helping 
-                aggrCache.aggregate(r, Integer.MAX_VALUE);
+            if (!ret) {
+                logger.info("abort reading inputScanner because storage push 
down limit is hit");
+                break;//limit is hit
             }
 
             count++;
@@ -180,11 +190,94 @@ public class GTAggregateScanner implements IGTScanner {
         return aggrCache.estimatedMemSize();
     }
 
+    public boolean shouldBypass(GTRecord record) {
+        return aggrCache.shouldBypass(record);
+    }
+
     class AggregationCache implements Closeable {
+        /**
+         * if a limit construct is provided
+         * before a gtrecord is sent to filter->aggregate pipeline, 
+         * this check could help to decide if the record should be skipped
+         *
+         * e.g. 
+         * limit is three, and current AggregationCache's key set contains 
(1,2,3),
+         * when gtrecord with key 4 comes, it should be skipped before sending 
to filter 
+         */
+        class ByPassChecker {
+            private int aggregateBufferSizeLimit = -1;
+            private byte[] currentLastKey = null;
+            private int[] groupOffsetsInLastKey = null;
+
+            private int byPassCounter = 0;
+
+            ByPassChecker(int aggregateBufferSizeLimit) {
+                this.aggregateBufferSizeLimit = aggregateBufferSizeLimit;
+
+                //init groupOffsetsInLastKey
+                int p = 0;
+                int idx = 0;
+                this.groupOffsetsInLastKey = new int[groupBy.trueBitCount()];
+                for (int i = 0; i < dimensions.trueBitCount(); i++) {
+                    int c = dimensions.trueBitAt(i);
+                    int l = info.codeSystem.maxCodeLength(c);
+                    if (groupBy.get(c))
+                        groupOffsetsInLastKey[idx++] = p;
+                    p += l;
+                }
+            }
+
+            /**
+             * @return true if should bypass this record
+             */
+            boolean shouldByPass(GTRecord record) {
+
+                if (dumps.size() > 0) {
+                    return false; //rare case: limit tends to be small, when 
limit is applied it's not likely to have dumps
+                    //TODO: what if bypass before dump happens?
+                }
+
+                Preconditions.checkState(aggBufMap.size() <= 
aggregateBufferSizeLimit);
+
+                if (aggBufMap.size() == aggregateBufferSizeLimit) {
+                    Preconditions.checkNotNull(currentLastKey);
+                    for (int i = 0; i < groupBy.trueBitCount(); i++) {
+                        int c = groupBy.trueBitAt(i);
+                        ByteArray col = record.get(c);
+
+                        int compare = Bytes.compareTo(col.array(), 
col.offset(), col.length(), currentLastKey,
+                                groupOffsetsInLastKey[i], col.length());
+                        if (compare > 0) {
+                            byPassCounter++;
+                            return true;
+                        } else if (compare < 0) {
+                            return false;
+                        }
+                    }
+                }
+
+                return false;
+            }
+
+            void updateOnBufferChange() {
+                if (aggBufMap.size() > aggregateBufferSizeLimit) {
+                    aggBufMap.pollLastEntry();
+                    Preconditions.checkState(aggBufMap.size() == 
aggregateBufferSizeLimit);
+                }
+
+                currentLastKey = aggBufMap.lastKey();
+            }
+
+            int getByPassCounter() {
+                return byPassCounter;
+            }
+        }
+
         final List<Dump> dumps;
         final int keyLength;
         final boolean[] compareMask;
         boolean compareAll = true;
+        ByPassChecker byPassChecker = null;
 
         final Comparator<byte[]> bytesComparator = new Comparator<byte[]>() {
             @Override
@@ -212,7 +305,7 @@ public class GTAggregateScanner implements IGTScanner {
             }
         };
 
-        SortedMap<byte[], MeasureAggregator[]> aggBufMap;
+        TreeMap<byte[], MeasureAggregator[]> aggBufMap;
 
         public AggregationCache() {
             compareMask = createCompareMask();
@@ -222,6 +315,20 @@ public class GTAggregateScanner implements IGTScanner {
             keyLength = compareMask.length;
             dumps = Lists.newArrayList();
             aggBufMap = createBuffMap();
+
+            if (storageLimitLevel == StorageLimitLevel.LIMIT_ON_RETURN_SIZE) {
+                //ByPassChecker is not free, if LIMIT_ON_SCAN, not worth to as 
it has better optimization
+                byPassChecker = new ByPassChecker(storagePushDownLimit);
+            }
+        }
+
+        public boolean shouldBypass(GTRecord record) {
+            if (byPassChecker == null) {
+                return false;
+            }
+
+            boolean b = byPassChecker.shouldByPass(record);
+            return b;
         }
 
         private boolean[] createCompareMask() {
@@ -245,7 +352,7 @@ public class GTAggregateScanner implements IGTScanner {
             return mask;
         }
 
-        private SortedMap<byte[], MeasureAggregator[]> createBuffMap() {
+        private TreeMap<byte[], MeasureAggregator[]> createBuffMap() {
             return Maps.newTreeMap(bytesComparator);
         }
 
@@ -263,7 +370,7 @@ public class GTAggregateScanner implements IGTScanner {
             return result;
         }
 
-        boolean aggregate(GTRecord r, int stopForLimit) {
+        boolean aggregate(GTRecord r) {
             if (++aggregatedRowCount % 100000 == 0) {
                 if (memTracker != null) {
                     memTracker.markHigh();
@@ -272,7 +379,8 @@ public class GTAggregateScanner implements IGTScanner {
                 final long estMemSize = estimatedMemSize();
                 if (spillThreshold > 0 && estMemSize > spillThreshold) {
                     if (!spillEnabled) {
-                        throw new 
ResourceLimitExceededException("aggregation's memory consumption " + estMemSize 
+ " exceeds threshold " + spillThreshold);
+                        throw new 
ResourceLimitExceededException("aggregation's memory consumption " + estMemSize
+                                + " exceeds threshold " + spillThreshold);
                     }
                     spillBuffMap(estMemSize); // spill to disk
                     aggBufMap = createBuffMap();
@@ -284,7 +392,9 @@ public class GTAggregateScanner implements IGTScanner {
             if (aggrs == null) {
 
                 //for storage push down limit
-                if (aggBufMap.size() >= stopForLimit) {
+                //TODO: what if bypass before dump happens?
+                if (getNumOfSpills() == 0 && storageLimitLevel == 
StorageLimitLevel.LIMIT_ON_SCAN
+                        && aggBufMap.size() >= storagePushDownLimit) {
                     return false;
                 }
 
@@ -298,6 +408,11 @@ public class GTAggregateScanner implements IGTScanner {
                     aggrs[i].aggregate(metrics);
                 }
             }
+
+            if (byPassChecker != null) {
+                byPassChecker.updateOnBufferChange();
+            }
+
             return true;
         }
 
@@ -314,6 +429,12 @@ public class GTAggregateScanner implements IGTScanner {
         @Override
         public void close() throws RuntimeException {
             try {
+                logger.info("closing aggrCache");
+                if (byPassChecker != null) {
+                    logger.info("AggregationCache byPassChecker helps to skip 
{} cuboid rows",
+                            byPassChecker.getByPassCounter());
+                }
+
                 for (Dump dump : dumps) {
                     dump.terminate();
                 }
@@ -356,7 +477,8 @@ public class GTAggregateScanner implements IGTScanner {
 
                 final ReturningRecord returningRecord = new ReturningRecord();
                 Entry<byte[], MeasureAggregator[]> returningEntry = null;
-                final HavingFilterChecker havingFilterChecker = (havingFilter 
== null) ? null : new HavingFilterChecker();
+                final HavingFilterChecker havingFilterChecker = (havingFilter 
== null) ? null
+                        : new HavingFilterChecker();
 
                 @Override
                 public boolean hasNext() {
@@ -530,7 +652,8 @@ public class GTAggregateScanner implements IGTScanner {
             public Iterator<Pair<byte[], byte[]>> iterator() {
                 try {
                     if (dumpedFile == null || !dumpedFile.exists()) {
-                        throw new RuntimeException("Dumped file cannot be 
found at: " + (dumpedFile == null ? "<null>" : dumpedFile.getAbsolutePath()));
+                        throw new RuntimeException("Dumped file cannot be 
found at: "
+                                + (dumpedFile == null ? "<null>" : 
dumpedFile.getAbsolutePath()));
                     }
 
                     dis = new DataInputStream(new FileInputStream(dumpedFile));
@@ -555,7 +678,8 @@ public class GTAggregateScanner implements IGTScanner {
                                 dis.read(value);
                                 return new Pair<>(key, value);
                             } catch (Exception e) {
-                                throw new RuntimeException("Cannot read 
AggregationCache from dumped file: " + e.getMessage());
+                                throw new RuntimeException(
+                                        "Cannot read AggregationCache from 
dumped file: " + e.getMessage());
                             }
                         }
 
@@ -570,7 +694,8 @@ public class GTAggregateScanner implements IGTScanner {
             }
 
             public void flush() throws IOException {
-                logger.info("AggregationCache(size={} est_mem_size={} 
threshold={}) will spill to {}", buffMap.size(), estMemSize, spillThreshold, 
dumpedFile.getAbsolutePath());
+                logger.info("AggregationCache(size={} est_mem_size={} 
threshold={}) will spill to {}", buffMap.size(),
+                        estMemSize, spillThreshold, 
dumpedFile.getAbsolutePath());
 
                 if (buffMap != null) {
                     DataOutputStream dos = null;

http://git-wip-us.apache.org/repos/asf/kylin/blob/26c03fe2/core-cube/src/main/java/org/apache/kylin/gridtable/GTFilterScanner.java
----------------------------------------------------------------------
diff --git 
a/core-cube/src/main/java/org/apache/kylin/gridtable/GTFilterScanner.java 
b/core-cube/src/main/java/org/apache/kylin/gridtable/GTFilterScanner.java
index cad0a04..11a23d6 100644
--- a/core-cube/src/main/java/org/apache/kylin/gridtable/GTFilterScanner.java
+++ b/core-cube/src/main/java/org/apache/kylin/gridtable/GTFilterScanner.java
@@ -18,14 +18,12 @@
 
 package org.apache.kylin.gridtable;
 
-import java.io.IOException;
 import java.util.BitSet;
 import java.util.HashSet;
 import java.util.Iterator;
 import java.util.NoSuchElementException;
 import java.util.Set;
 
-import org.apache.kylin.GTForwardingScanner;
 import org.apache.kylin.common.util.ByteArray;
 import org.apache.kylin.common.util.BytesUtil;
 import org.apache.kylin.common.util.ImmutableBitSet;
@@ -36,25 +34,35 @@ import org.apache.kylin.metadata.tuple.IEvaluatableTuple;
 
 public class GTFilterScanner extends GTForwardingScanner {
 
-    final private TupleFilter filter;
-    final private IFilterCodeSystem<ByteArray> filterCodeSystem;
-    final private IEvaluatableTuple oneTuple; // avoid instance creation
+    private TupleFilter filter;
+    private IFilterCodeSystem<ByteArray> filterCodeSystem;
+    private IEvaluatableTuple oneTuple; // avoid instance creation
 
     private GTRecord next = null;
 
-    public GTFilterScanner(IGTScanner delegated, GTScanRequest req) throws 
IOException {
+    private IGTBypassChecker checker = null;
+
+    public GTFilterScanner(IGTScanner delegated, GTScanRequest req, 
IGTBypassChecker checker) {
         super(delegated);
-        this.filter = req.getFilterPushDown();
-        this.filterCodeSystem = 
GTUtil.wrap(getInfo().codeSystem.getComparator());
-        this.oneTuple = new IEvaluatableTuple() {
-            @Override
-            public Object getValue(TblColRef col) {
-                return next.get(col.getColumnDesc().getZeroBasedIndex());
-            }
-        };
+        this.checker = checker;
+
+        if (req != null) {
+            this.filter = req.getFilterPushDown();
+            this.filterCodeSystem = 
GTUtil.wrap(getInfo().codeSystem.getComparator());
+            this.oneTuple = new IEvaluatableTuple() {
+                @Override
+                public Object getValue(TblColRef col) {
+                    return next.get(col.getColumnDesc().getZeroBasedIndex());
+                }
+            };
 
-        if (!TupleFilter.isEvaluableRecursively(filter))
-            throw new IllegalArgumentException();
+            if (!TupleFilter.isEvaluableRecursively(filter))
+                throw new IllegalArgumentException();
+        }
+    }
+
+    public void setChecker(IGTBypassChecker checker) {
+        this.checker = checker;
     }
 
     @Override
@@ -81,6 +89,10 @@ public class GTFilterScanner extends GTForwardingScanner {
             }
 
             private boolean evaluate() {
+                if (checker != null && checker.shouldBypass(next)) {
+                    return false;
+                }
+
                 if (filter == null)
                     return true;
 

http://git-wip-us.apache.org/repos/asf/kylin/blob/26c03fe2/core-cube/src/main/java/org/apache/kylin/gridtable/GTForwardingScanner.java
----------------------------------------------------------------------
diff --git 
a/core-cube/src/main/java/org/apache/kylin/gridtable/GTForwardingScanner.java 
b/core-cube/src/main/java/org/apache/kylin/gridtable/GTForwardingScanner.java
new file mode 100644
index 0000000..be840ea
--- /dev/null
+++ 
b/core-cube/src/main/java/org/apache/kylin/gridtable/GTForwardingScanner.java
@@ -0,0 +1,52 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.kylin.gridtable;
+
+import static com.google.common.base.Preconditions.checkNotNull;
+
+import java.io.IOException;
+import java.util.Iterator;
+
+/**
+ * A {@link IGTScanner} which forwards all its method calls to another scanner.
+ *
+ * @see <a href="http://en.wikipedia.org/wiki/Decorator_pattern";>decorator 
pattern</a>.
+ */
+public class GTForwardingScanner implements IGTScanner {
+    protected IGTScanner delegated;
+
+    protected GTForwardingScanner(IGTScanner delegated) {
+        this.delegated = checkNotNull(delegated, "delegated");
+    }
+
+    @Override
+    public GTInfo getInfo() {
+        return delegated.getInfo();
+    }
+
+    @Override
+    public void close() throws IOException {
+        delegated.close();
+    }
+
+    @Override
+    public Iterator<GTRecord> iterator() {
+        return delegated.iterator();
+    }
+}

http://git-wip-us.apache.org/repos/asf/kylin/blob/26c03fe2/core-cube/src/main/java/org/apache/kylin/gridtable/GTScanRequest.java
----------------------------------------------------------------------
diff --git 
a/core-cube/src/main/java/org/apache/kylin/gridtable/GTScanRequest.java 
b/core-cube/src/main/java/org/apache/kylin/gridtable/GTScanRequest.java
index e65d2b8..523814b 100644
--- a/core-cube/src/main/java/org/apache/kylin/gridtable/GTScanRequest.java
+++ b/core-cube/src/main/java/org/apache/kylin/gridtable/GTScanRequest.java
@@ -73,15 +73,19 @@ public class GTScanRequest {
     private boolean allowStorageAggregation;
     private double aggCacheMemThreshold;
     private int storageScanRowNumThreshold;
+    //valid value iff GTCubeStorageQueryBase.enableStorageLimitIfPossible is 
true
     private int storagePushDownLimit;
+    private StorageLimitLevel storageLimitLevel;
 
     // runtime computed fields
     private transient boolean doingStorageAggregation = false;
 
     GTScanRequest(GTInfo info, List<GTScanRange> ranges, ImmutableBitSet 
dimensions, ImmutableBitSet aggrGroupBy, //
-            ImmutableBitSet aggrMetrics, String[] aggrMetricsFuncs, 
TupleFilter filterPushDown, TupleFilter havingFilterPushDown, // 
+            ImmutableBitSet aggrMetrics, String[] aggrMetricsFuncs, 
TupleFilter filterPushDown,
+            TupleFilter havingFilterPushDown, // 
             boolean allowStorageAggregation, double aggCacheMemThreshold, int 
storageScanRowNumThreshold, //
-            int storagePushDownLimit, String storageBehavior, long startTime, 
long timeout) {
+            int storagePushDownLimit, StorageLimitLevel storageLimitLevel, 
String storageBehavior, long startTime,
+            long timeout) {
         this.info = info;
         if (ranges == null) {
             this.ranges = Lists.newArrayList(new GTScanRange(new 
GTRecord(info), new GTRecord(info)));
@@ -103,6 +107,7 @@ public class GTScanRequest {
         this.aggCacheMemThreshold = aggCacheMemThreshold;
         this.storageScanRowNumThreshold = storageScanRowNumThreshold;
         this.storagePushDownLimit = storagePushDownLimit;
+        this.storageLimitLevel = storageLimitLevel;
 
         validate(info);
     }
@@ -173,14 +178,16 @@ public class GTScanRequest {
      * 
      * Refer to CoprocessorBehavior for explanation
      */
-    public IGTScanner decorateScanner(IGTScanner scanner, boolean 
filterToggledOn, boolean aggrToggledOn) throws IOException {
+    public IGTScanner decorateScanner(IGTScanner scanner, boolean 
filterToggledOn, boolean aggrToggledOn)
+            throws IOException {
         return decorateScanner(scanner, filterToggledOn, aggrToggledOn, false, 
true);
     }
 
     /**
      * hasPreFiltered indicate the data has been filtered before scanning
      */
-    public IGTScanner decorateScanner(IGTScanner scanner, boolean 
filterToggledOn, boolean aggrToggledOn, boolean hasPreFiltered, boolean 
spillEnabled) throws IOException {
+    public IGTScanner decorateScanner(IGTScanner scanner, boolean 
filterToggledOn, boolean aggrToggledOn,
+            boolean hasPreFiltered, boolean spillEnabled) throws IOException {
         IGTScanner result = scanner;
         if (!filterToggledOn) { //Skip reading this section if you're not 
profiling! 
             lookAndForget(result);
@@ -188,7 +195,9 @@ public class GTScanRequest {
         } else {
 
             if (this.hasFilterPushDown() && !hasPreFiltered) {
-                result = new GTFilterScanner(result, this);
+                result = new GTFilterScanner(result, this, null);
+            } else {
+                result = new GTForwardingScanner(result);//need its check 
function
             }
 
             if (!aggrToggledOn) {//Skip reading this section if you're not 
profiling! 
@@ -207,7 +216,6 @@ public class GTScanRequest {
             }
             return result;
         }
-
     }
 
     public BufferedMeasureCodec createMeasureCodec() {
@@ -281,7 +289,7 @@ public class GTScanRequest {
     public TupleFilter getHavingFilterPushDown() {
         return havingFilterPushDown;
     }
-    
+
     public ImmutableBitSet getDimensions() {
         return this.getColumns().andNot(this.getAggrMetrics());
     }
@@ -321,6 +329,10 @@ public class GTScanRequest {
         return this.storagePushDownLimit;
     }
 
+    public StorageLimitLevel getStorageLimitLevel() {
+        return storageLimitLevel;
+    }
+
     public String getStorageBehavior() {
         return storageBehavior;
     }
@@ -335,7 +347,9 @@ public class GTScanRequest {
 
     @Override
     public String toString() {
-        return "GTScanRequest [range=" + ranges + ", columns=" + columns + ", 
filterPushDown=" + filterPushDown + ", aggrGroupBy=" + aggrGroupBy + ", 
aggrMetrics=" + aggrMetrics + ", aggrMetricsFuncs=" + 
Arrays.toString(aggrMetricsFuncs) + "]";
+        return "GTScanRequest [range=" + ranges + ", columns=" + columns + ", 
filterPushDown=" + filterPushDown
+                + ", aggrGroupBy=" + aggrGroupBy + ", aggrMetrics=" + 
aggrMetrics + ", aggrMetricsFuncs="
+                + Arrays.toString(aggrMetricsFuncs) + "]";
     }
 
     public byte[] toByteArray() {
@@ -350,12 +364,12 @@ public class GTScanRequest {
 
     private static final int SERIAL_0_BASE = 0;
     private static final int SERIAL_1_HAVING_FILTER = 1;
-    
+
     public static final BytesSerializer<GTScanRequest> serializer = new 
BytesSerializer<GTScanRequest>() {
         @Override
         public void serialize(GTScanRequest value, ByteBuffer out) {
             final int serialLevel = 
KylinConfig.getInstanceFromEnv().getGTScanRequestSerializationLevel();
-            
+
             GTInfo.serializer.serialize(value.info, out);
 
             BytesUtil.writeVInt(value.ranges.size(), out);
@@ -370,9 +384,10 @@ public class GTScanRequest {
 
             ImmutableBitSet.serializer.serialize(value.columns, out);
             
BytesUtil.writeByteArray(GTUtil.serializeGTFilter(value.filterPushDown, 
value.info), out);
-            
+
             if (serialLevel >= SERIAL_1_HAVING_FILTER) {
-                
BytesUtil.writeByteArray(TupleFilterSerializer.serialize(value.havingFilterPushDown,
 StringCodeSystem.INSTANCE), out);
+                BytesUtil.writeByteArray(
+                        
TupleFilterSerializer.serialize(value.havingFilterPushDown, 
StringCodeSystem.INSTANCE), out);
             }
 
             ImmutableBitSet.serializer.serialize(value.aggrGroupBy, out);
@@ -380,6 +395,7 @@ public class GTScanRequest {
             BytesUtil.writeAsciiStringArray(value.aggrMetricsFuncs, out);
             BytesUtil.writeVInt(value.allowStorageAggregation ? 1 : 0, out);
             out.putDouble(value.aggCacheMemThreshold);
+            BytesUtil.writeUTFString(value.getStorageLimitLevel().name(), out);
             BytesUtil.writeVInt(value.storageScanRowNumThreshold, out);
             BytesUtil.writeVInt(value.storagePushDownLimit, out);
             BytesUtil.writeVLong(value.startTime, out);
@@ -390,7 +406,7 @@ public class GTScanRequest {
         @Override
         public GTScanRequest deserialize(ByteBuffer in) {
             final int serialLevel = 
KylinConfig.getInstanceFromEnv().getGTScanRequestSerializationLevel();
-            
+
             GTInfo sInfo = GTInfo.serializer.deserialize(in);
 
             List<GTScanRange> sRanges = Lists.newArrayList();
@@ -409,10 +425,11 @@ public class GTScanRequest {
 
             ImmutableBitSet sColumns = 
ImmutableBitSet.serializer.deserialize(in);
             TupleFilter sGTFilter = 
GTUtil.deserializeGTFilter(BytesUtil.readByteArray(in), sInfo);
-            
+
             TupleFilter sGTHavingFilter = null;
             if (serialLevel >= SERIAL_1_HAVING_FILTER) {
-                sGTHavingFilter = 
TupleFilterSerializer.deserialize(BytesUtil.readByteArray(in), 
StringCodeSystem.INSTANCE);
+                sGTHavingFilter = 
TupleFilterSerializer.deserialize(BytesUtil.readByteArray(in),
+                        StringCodeSystem.INSTANCE);
             }
 
             ImmutableBitSet sAggGroupBy = 
ImmutableBitSet.serializer.deserialize(in);
@@ -420,17 +437,21 @@ public class GTScanRequest {
             String[] sAggrMetricFuncs = BytesUtil.readAsciiStringArray(in);
             boolean sAllowPreAggr = (BytesUtil.readVInt(in) == 1);
             double sAggrCacheGB = in.getDouble();
+            StorageLimitLevel storageLimitLevel = 
StorageLimitLevel.valueOf(BytesUtil.readUTFString(in));
             int storageScanRowNumThreshold = BytesUtil.readVInt(in);
             int storagePushDownLimit = BytesUtil.readVInt(in);
             long startTime = BytesUtil.readVLong(in);
             long timeout = BytesUtil.readVLong(in);
             String storageBehavior = BytesUtil.readUTFString(in);
 
-            return new 
GTScanRequestBuilder().setInfo(sInfo).setRanges(sRanges).setDimensions(sColumns).//
-            
setAggrGroupBy(sAggGroupBy).setAggrMetrics(sAggrMetrics).setAggrMetricsFuncs(sAggrMetricFuncs).//
-            
setFilterPushDown(sGTFilter).setHavingFilterPushDown(sGTHavingFilter).setAllowStorageAggregation(sAllowPreAggr).setAggCacheMemThreshold(sAggrCacheGB).//
-            
setStorageScanRowNumThreshold(storageScanRowNumThreshold).setStoragePushDownLimit(storagePushDownLimit).//
-            
setStartTime(startTime).setTimeout(timeout).setStorageBehavior(storageBehavior).createGTScanRequest();
+            return new 
GTScanRequestBuilder().setInfo(sInfo).setRanges(sRanges).setDimensions(sColumns)
+                    
.setAggrGroupBy(sAggGroupBy).setAggrMetrics(sAggrMetrics).setAggrMetricsFuncs(sAggrMetricFuncs)
+                    
.setFilterPushDown(sGTFilter).setHavingFilterPushDown(sGTHavingFilter)
+                    
.setAllowStorageAggregation(sAllowPreAggr).setAggCacheMemThreshold(sAggrCacheGB)
+                    .setStorageScanRowNumThreshold(storageScanRowNumThreshold)
+                    
.setStoragePushDownLimit(storagePushDownLimit).setStorageLimitLevel(storageLimitLevel)
+                    
.setStartTime(startTime).setTimeout(timeout).setStorageBehavior(storageBehavior)
+                    .createGTScanRequest();
         }
 
         private void serializeGTRecord(GTRecord gtRecord, ByteBuffer out) {

http://git-wip-us.apache.org/repos/asf/kylin/blob/26c03fe2/core-cube/src/main/java/org/apache/kylin/gridtable/GTScanRequestBuilder.java
----------------------------------------------------------------------
diff --git 
a/core-cube/src/main/java/org/apache/kylin/gridtable/GTScanRequestBuilder.java 
b/core-cube/src/main/java/org/apache/kylin/gridtable/GTScanRequestBuilder.java
index ba1fdbc..28fe40a 100644
--- 
a/core-cube/src/main/java/org/apache/kylin/gridtable/GTScanRequestBuilder.java
+++ 
b/core-cube/src/main/java/org/apache/kylin/gridtable/GTScanRequestBuilder.java
@@ -38,6 +38,7 @@ public class GTScanRequestBuilder {
     private double aggCacheMemThreshold = 0;
     private int storageScanRowNumThreshold = Integer.MAX_VALUE;// storage 
should terminate itself when $storageScanRowNumThreshold cuboid rows are 
scanned, and throw exception.   
     private int storagePushDownLimit = Integer.MAX_VALUE;// storage can quit 
scanning safely when $toragePushDownLimit aggregated rows are produced. 
+    private StorageLimitLevel storageLimitLevel = StorageLimitLevel.NO_LIMIT;
     private long startTime = -1;
     private long timeout = -1;
     private String storageBehavior = null;
@@ -61,7 +62,7 @@ public class GTScanRequestBuilder {
         this.havingFilterPushDown = havingFilterPushDown;
         return this;
     }
-    
+
     public GTScanRequestBuilder setDimensions(ImmutableBitSet dimensions) {
         this.dimensions = dimensions;
         return this;
@@ -102,6 +103,11 @@ public class GTScanRequestBuilder {
         return this;
     }
 
+    public GTScanRequestBuilder setStorageLimitLevel(StorageLimitLevel 
storageLimitLevel) {
+        this.storageLimitLevel = storageLimitLevel;
+        return this;
+    }
+
     public GTScanRequestBuilder setStartTime(long startTime) {
         this.startTime = startTime;
         return this;
@@ -131,12 +137,16 @@ public class GTScanRequestBuilder {
         }
 
         if (storageBehavior == null) {
-            storageBehavior = BackdoorToggles.getCoprocessorBehavior() == null 
? StorageSideBehavior.SCAN_FILTER_AGGR_CHECKMEM.toString() : 
BackdoorToggles.getCoprocessorBehavior();
+            storageBehavior = BackdoorToggles.getCoprocessorBehavior() == null
+                    ? StorageSideBehavior.SCAN_FILTER_AGGR_CHECKMEM.toString()
+                    : BackdoorToggles.getCoprocessorBehavior();
         }
 
         this.startTime = startTime == -1 ? System.currentTimeMillis() : 
startTime;
         this.timeout = timeout == -1 ? 300000 : timeout;
 
-        return new GTScanRequest(info, ranges, dimensions, aggrGroupBy, 
aggrMetrics, aggrMetricsFuncs, filterPushDown, havingFilterPushDown, 
allowStorageAggregation, aggCacheMemThreshold, storageScanRowNumThreshold, 
storagePushDownLimit, storageBehavior, startTime, timeout);
+        return new GTScanRequest(info, ranges, dimensions, aggrGroupBy, 
aggrMetrics, aggrMetricsFuncs, filterPushDown,
+                havingFilterPushDown, allowStorageAggregation, 
aggCacheMemThreshold, storageScanRowNumThreshold,
+                storagePushDownLimit, storageLimitLevel, storageBehavior, 
startTime, timeout);
     }
-}
\ No newline at end of file
+}

http://git-wip-us.apache.org/repos/asf/kylin/blob/26c03fe2/core-cube/src/main/java/org/apache/kylin/gridtable/GTStreamAggregateScanner.java
----------------------------------------------------------------------
diff --git 
a/core-cube/src/main/java/org/apache/kylin/gridtable/GTStreamAggregateScanner.java
 
b/core-cube/src/main/java/org/apache/kylin/gridtable/GTStreamAggregateScanner.java
index 4eb5791..61163d3 100644
--- 
a/core-cube/src/main/java/org/apache/kylin/gridtable/GTStreamAggregateScanner.java
+++ 
b/core-cube/src/main/java/org/apache/kylin/gridtable/GTStreamAggregateScanner.java
@@ -21,7 +21,6 @@ package org.apache.kylin.gridtable;
 import com.google.common.base.Preconditions;
 import com.google.common.collect.Iterators;
 import com.google.common.collect.PeekingIterator;
-import org.apache.kylin.GTForwardingScanner;
 import org.apache.kylin.common.util.ImmutableBitSet;
 import org.apache.kylin.measure.BufferedMeasureCodec;
 import org.apache.kylin.measure.MeasureAggregator;

http://git-wip-us.apache.org/repos/asf/kylin/blob/26c03fe2/core-cube/src/main/java/org/apache/kylin/gridtable/IGTBypassChecker.java
----------------------------------------------------------------------
diff --git 
a/core-cube/src/main/java/org/apache/kylin/gridtable/IGTBypassChecker.java 
b/core-cube/src/main/java/org/apache/kylin/gridtable/IGTBypassChecker.java
new file mode 100644
index 0000000..55f36d5
--- /dev/null
+++ b/core-cube/src/main/java/org/apache/kylin/gridtable/IGTBypassChecker.java
@@ -0,0 +1,23 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.kylin.gridtable;
+
+public interface IGTBypassChecker {
+    boolean shouldBypass(GTRecord record);
+}

http://git-wip-us.apache.org/repos/asf/kylin/blob/26c03fe2/core-cube/src/main/java/org/apache/kylin/gridtable/StorageLimitLevel.java
----------------------------------------------------------------------
diff --git 
a/core-cube/src/main/java/org/apache/kylin/gridtable/StorageLimitLevel.java 
b/core-cube/src/main/java/org/apache/kylin/gridtable/StorageLimitLevel.java
new file mode 100644
index 0000000..5313a1f
--- /dev/null
+++ b/core-cube/src/main/java/org/apache/kylin/gridtable/StorageLimitLevel.java
@@ -0,0 +1,29 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.kylin.gridtable;
+
+public enum StorageLimitLevel {
+    NO_LIMIT,
+    //even if enableStorageLimitIfPossible() is false,
+    //(meaning we have to scan through all the cuboid rows)
+    //we can still take advantage of the fact that we don't need all of the 
agg keys
+    LIMIT_ON_RETURN_SIZE,
+    //iff enableStorageLimitIfPossible() is true
+    LIMIT_ON_SCAN
+}

http://git-wip-us.apache.org/repos/asf/kylin/blob/26c03fe2/core-storage/src/main/java/org/apache/kylin/storage/StorageContext.java
----------------------------------------------------------------------
diff --git 
a/core-storage/src/main/java/org/apache/kylin/storage/StorageContext.java 
b/core-storage/src/main/java/org/apache/kylin/storage/StorageContext.java
index 78cf97c..a2e2869 100644
--- a/core-storage/src/main/java/org/apache/kylin/storage/StorageContext.java
+++ b/core-storage/src/main/java/org/apache/kylin/storage/StorageContext.java
@@ -23,6 +23,7 @@ import java.util.concurrent.atomic.AtomicLong;
 import org.apache.kylin.common.StorageURL;
 import org.apache.kylin.common.debug.BackdoorToggles;
 import org.apache.kylin.cube.cuboid.Cuboid;
+import org.apache.kylin.gridtable.StorageLimitLevel;
 import org.apache.kylin.metadata.realization.IRealization;
 import org.apache.kylin.storage.gtrecord.GTCubeStorageQueryBase;
 import org.slf4j.Logger;
@@ -41,6 +42,7 @@ public class StorageContext {
     private boolean overlookOuterLimit = false;
     private int offset = 0;
     private int finalPushDownLimit = Integer.MAX_VALUE;
+    private StorageLimitLevel storageLimitLevel = StorageLimitLevel.NO_LIMIT;
     private boolean hasSort = false;
     private boolean acceptPartialResult = false;
     private long deadline;
@@ -68,7 +70,8 @@ public class StorageContext {
     //the limit here correspond to the limit concept in SQL
     //also take into consideration Statement.setMaxRows in JDBC
     private int getLimit() {
-        if (overlookOuterLimit || BackdoorToggles.getStatementMaxRows() == 
null || BackdoorToggles.getStatementMaxRows() == 0) {
+        if (overlookOuterLimit || BackdoorToggles.getStatementMaxRows() == null
+                || BackdoorToggles.getStatementMaxRows() == 0) {
             return limit;
         } else {
             return Math.min(limit, BackdoorToggles.getStatementMaxRows());
@@ -77,7 +80,8 @@ public class StorageContext {
 
     public void setLimit(int l) {
         if (limit != Integer.MAX_VALUE) {
-            logger.warn("Setting limit to {} but in current olap context, the 
limit is already {}, won't apply", l, limit);
+            logger.warn("Setting limit to {} but in current olap context, the 
limit is already {}, won't apply", l,
+                    limit);
         } else {
             limit = l;
         }
@@ -107,7 +111,7 @@ public class StorageContext {
         return isValidPushDownLimit(finalPushDownLimit);
     }
 
-    public static boolean isValidPushDownLimit(int finalPushDownLimit) {
+    public static boolean isValidPushDownLimit(long finalPushDownLimit) {
         return finalPushDownLimit < Integer.MAX_VALUE && finalPushDownLimit > 
0;
     }
 
@@ -115,20 +119,31 @@ public class StorageContext {
         return finalPushDownLimit;
     }
 
-    public void setFinalPushDownLimit(IRealization realization) {
+    public StorageLimitLevel getStorageLimitLevel() {
+        return storageLimitLevel;
+    }
+
+    public void applyLimitPushDown(IRealization realization, StorageLimitLevel 
storageLimitLevel) {
 
-        if (!isValidPushDownLimit(this.getLimit())) {
+        if (storageLimitLevel == StorageLimitLevel.NO_LIMIT) {
             return;
         }
 
-        int tempPushDownLimit = this.getOffset() + this.getLimit();
-
         if (!realization.supportsLimitPushDown()) {
             logger.warn("Not enabling limit push down because cube storage 
type not supported");
-        } else {
-            this.finalPushDownLimit = tempPushDownLimit;
-            logger.info("Enable limit (storage push down limit) :" + 
tempPushDownLimit);
+            return;
         }
+
+        long temp = this.getOffset() + this.getLimit();
+
+        if (!isValidPushDownLimit(temp)) {
+            logger.warn("Not enabling limit push down because current limit is 
invalid: " + this.getLimit());
+            return;
+        }
+
+        this.finalPushDownLimit = (int) temp;
+        this.storageLimitLevel = storageLimitLevel;
+        logger.info("Enabling limit push down: {} at level: {}", temp, 
storageLimitLevel);
     }
 
     public boolean mergeSortPartitionResults() {

http://git-wip-us.apache.org/repos/asf/kylin/blob/26c03fe2/core-storage/src/main/java/org/apache/kylin/storage/gtrecord/CubeScanRangePlanner.java
----------------------------------------------------------------------
diff --git 
a/core-storage/src/main/java/org/apache/kylin/storage/gtrecord/CubeScanRangePlanner.java
 
b/core-storage/src/main/java/org/apache/kylin/storage/gtrecord/CubeScanRangePlanner.java
index cecea85..14af5ac 100644
--- 
a/core-storage/src/main/java/org/apache/kylin/storage/gtrecord/CubeScanRangePlanner.java
+++ 
b/core-storage/src/main/java/org/apache/kylin/storage/gtrecord/CubeScanRangePlanner.java
@@ -148,7 +148,7 @@ public class CubeScanRangePlanner extends 
ScanRangePlannerBase {
             scanRequest = new 
GTScanRequestBuilder().setInfo(gtInfo).setRanges(scanRanges).setDimensions(gtDimensions).//
                     
setAggrGroupBy(gtAggrGroups).setAggrMetrics(gtAggrMetrics).setAggrMetricsFuncs(gtAggrFuncs).setFilterPushDown(gtFilter).//
                     
setAllowStorageAggregation(context.isNeedStorageAggregation()).setAggCacheMemThreshold(cubeSegment.getConfig().getQueryCoprocessorMemGB()).//
-                    
setStoragePushDownLimit(context.getFinalPushDownLimit()).setHavingFilterPushDown(havingFilter).createGTScanRequest();
+                    
setStoragePushDownLimit(context.getFinalPushDownLimit()).setStorageLimitLevel(context.getStorageLimitLevel()).setHavingFilterPushDown(havingFilter).createGTScanRequest();
         } else {
             scanRequest = null;
         }

http://git-wip-us.apache.org/repos/asf/kylin/blob/26c03fe2/core-storage/src/main/java/org/apache/kylin/storage/gtrecord/GTCubeStorageQueryBase.java
----------------------------------------------------------------------
diff --git 
a/core-storage/src/main/java/org/apache/kylin/storage/gtrecord/GTCubeStorageQueryBase.java
 
b/core-storage/src/main/java/org/apache/kylin/storage/gtrecord/GTCubeStorageQueryBase.java
index a3af511..a0c6ea2 100644
--- 
a/core-storage/src/main/java/org/apache/kylin/storage/gtrecord/GTCubeStorageQueryBase.java
+++ 
b/core-storage/src/main/java/org/apache/kylin/storage/gtrecord/GTCubeStorageQueryBase.java
@@ -35,6 +35,7 @@ import org.apache.kylin.cube.cuboid.Cuboid;
 import org.apache.kylin.cube.model.CubeDesc;
 import org.apache.kylin.cube.model.CubeDesc.DeriveInfo;
 import org.apache.kylin.dict.lookup.LookupStringTable;
+import org.apache.kylin.gridtable.StorageLimitLevel;
 import org.apache.kylin.measure.MeasureType;
 import org.apache.kylin.metadata.filter.CaseTupleFilter;
 import org.apache.kylin.metadata.filter.ColumnTupleFilter;
@@ -139,7 +140,8 @@ public abstract class GTCubeStorageQueryBase implements 
IStorageQuery {
         context.setNeedStorageAggregation(isNeedStorageAggregation(cuboid, 
groupsD, singleValuesD));
 
         // exactAggregation mean: needn't aggregation at storage and query 
engine both.
-        boolean exactAggregation = isExactAggregation(context, cuboid, groups, 
otherDimsD, singleValuesD, derivedPostAggregation, sqlDigest.aggregations);
+        boolean exactAggregation = isExactAggregation(context, cuboid, groups, 
otherDimsD, singleValuesD,
+                derivedPostAggregation, sqlDigest.aggregations);
         context.setExactAggregation(exactAggregation);
 
         // replace derived columns in filter with host columns; columns on 
loosened condition must be added to group by
@@ -161,9 +163,10 @@ public abstract class GTCubeStorageQueryBase implements 
IStorageQuery {
         TupleFilter havingFilter = 
checkHavingCanPushDown(sqlDigest.havingFilter, groupsD, sqlDigest.aggregations,
                 metrics);
 
-        logger.info("Cuboid identified: cube={}, cuboidId={}, groupsD={}, 
filterD={}, limitPushdown={}, storageAggr={}",
+        logger.info(
+                "Cuboid identified: cube={}, cuboidId={}, groupsD={}, 
filterD={}, limitPushdown={}, limitLevel={}, storageAggr={}",
                 cubeInstance.getName(), cuboid.getId(), groupsD, 
filterColumnD, context.getFinalPushDownLimit(),
-                context.isNeedStorageAggregation());
+                context.getStorageLimitLevel(), 
context.isNeedStorageAggregation());
 
         return new GTCubeStorageQueryRequest(cuboid, dimensionsD, groupsD, 
filterColumnD, metrics, filterD,
                 havingFilter, context);
@@ -365,50 +368,50 @@ public abstract class GTCubeStorageQueryBase implements 
IStorageQuery {
     private void enableStorageLimitIfPossible(Cuboid cuboid, 
Collection<TblColRef> groups,
             Set<TblColRef> derivedPostAggregation, Collection<TblColRef> 
groupsD, TupleFilter filter,
             Set<TblColRef> loosenedColumnD, Collection<FunctionDesc> 
functionDescs, StorageContext context) {
-        boolean possible = true;
 
-        if (!TupleFilter.isEvaluableRecursively(filter)) {
-            possible = false;
-            logger.debug("Storage limit push down is impossible because the 
filter isn't evaluable");
-        }
+        StorageLimitLevel storageLimitLevel = StorageLimitLevel.LIMIT_ON_SCAN;
 
-        if (!loosenedColumnD.isEmpty()) { // KYLIN-2173
-            possible = false;
-            logger.debug("Storage limit push down is impossible because filter 
is loosened: " + loosenedColumnD);
-        }
-
-        if (context.hasSort()) {
-            possible = false;
-            logger.debug("Storage limit push down is impossible because the 
query has order by");
+        //if groupsD is clustered at "head" of the rowkey, then limit push 
down is possible
+        int size = groupsD.size();
+        if (!groupsD.containsAll(cuboid.getColumns().subList(0, size))) {
+            storageLimitLevel = StorageLimitLevel.LIMIT_ON_RETURN_SIZE;
+            logger.debug(
+                    "storageLimitLevel set to LIMIT_ON_RETURN_SIZE because 
groupD is not clustered at head, groupsD: "
+                            + groupsD //
+                            + " with cuboid columns: " + cuboid.getColumns());
         }
 
         // derived aggregation is bad, unless expanded columns are already in 
group by
         if (!groups.containsAll(derivedPostAggregation)) {
-            possible = false;
-            logger.debug("Storage limit push down is impossible because 
derived column require post aggregation: "
+            storageLimitLevel = StorageLimitLevel.NO_LIMIT;
+            logger.debug("storageLimitLevel set to NO_LIMIT because derived 
column require post aggregation: "
                     + derivedPostAggregation);
         }
 
-        //if groupsD is clustered at "head" of the rowkey, then limit push 
down is possible
-        int size = groupsD.size();
-        if (!groupsD.containsAll(cuboid.getColumns().subList(0, size))) {
-            possible = false;
-            logger.debug(
-                    "Storage limit push down is impossible because groupD is 
not clustered at head, groupsD: " + groupsD //
-                            + " with cuboid columns: " + cuboid.getColumns());
+        if (!TupleFilter.isEvaluableRecursively(filter)) {
+            storageLimitLevel = StorageLimitLevel.NO_LIMIT;
+            logger.debug("storageLimitLevel set to NO_LIMIT because the filter 
isn't evaluable");
+        }
+
+        if (!loosenedColumnD.isEmpty()) { // KYLIN-2173
+            storageLimitLevel = StorageLimitLevel.NO_LIMIT;
+            logger.debug("storageLimitLevel set to NO_LIMIT because filter is 
loosened: " + loosenedColumnD);
+        }
+
+        if (context.hasSort()) {
+            storageLimitLevel = StorageLimitLevel.NO_LIMIT;
+            logger.debug("storageLimitLevel set to NO_LIMIT because the query 
has order by");
         }
 
         //if exists measures like max(cal_dt), then it's not a perfect cuboid 
match, cannot apply limit
         for (FunctionDesc functionDesc : functionDescs) {
             if (functionDesc.isDimensionAsMetric()) {
-                possible = false;
-                logger.debug("Storage limit push down is impossible because {} 
isDimensionAsMetric ", functionDesc);
+                storageLimitLevel = StorageLimitLevel.NO_LIMIT;
+                logger.debug("storageLimitLevel set to NO_LIMIT because {} 
isDimensionAsMetric ", functionDesc);
             }
         }
 
-        if (possible) {
-            context.setFinalPushDownLimit(cubeInstance);
-        }
+        context.applyLimitPushDown(cubeInstance, storageLimitLevel);
     }
 
     private void enableStreamAggregateIfBeneficial(Cuboid cuboid, 
Set<TblColRef> groupsD, StorageContext context) {
@@ -493,7 +496,9 @@ public abstract class GTCubeStorageQueryBase implements 
IStorageQuery {
         return havingFilter;
     }
 
-    private boolean isExactAggregation(StorageContext context, Cuboid cuboid, 
Collection<TblColRef> groups, Set<TblColRef> othersD, Set<TblColRef> 
singleValuesD, Set<TblColRef> derivedPostAggregation, Collection<FunctionDesc> 
functionDescs) {
+    private boolean isExactAggregation(StorageContext context, Cuboid cuboid, 
Collection<TblColRef> groups,
+            Set<TblColRef> othersD, Set<TblColRef> singleValuesD, 
Set<TblColRef> derivedPostAggregation,
+            Collection<FunctionDesc> functionDescs) {
         if (context.isNeedStorageAggregation()) {
             logger.info("exactAggregation is false because need storage 
aggregation");
             return false;
@@ -506,7 +511,8 @@ public abstract class GTCubeStorageQueryBase implements 
IStorageQuery {
 
         // derived aggregation is bad, unless expanded columns are already in 
group by
         if (groups.containsAll(derivedPostAggregation) == false) {
-            logger.info("exactAggregation is false because derived column 
require post aggregation: " + derivedPostAggregation);
+            logger.info("exactAggregation is false because derived column 
require post aggregation: "
+                    + derivedPostAggregation);
             return false;
         }
 

http://git-wip-us.apache.org/repos/asf/kylin/blob/26c03fe2/core-storage/src/main/java/org/apache/kylin/storage/gtrecord/SortMergedPartitionResultIterator.java
----------------------------------------------------------------------
diff --git 
a/core-storage/src/main/java/org/apache/kylin/storage/gtrecord/SortMergedPartitionResultIterator.java
 
b/core-storage/src/main/java/org/apache/kylin/storage/gtrecord/SortMergedPartitionResultIterator.java
index 21e61e3..14e0d86 100644
--- 
a/core-storage/src/main/java/org/apache/kylin/storage/gtrecord/SortMergedPartitionResultIterator.java
+++ 
b/core-storage/src/main/java/org/apache/kylin/storage/gtrecord/SortMergedPartitionResultIterator.java
@@ -18,28 +18,28 @@
 
 package org.apache.kylin.storage.gtrecord;
 
-import com.google.common.collect.Iterators;
-import com.google.common.collect.PeekingIterator;
-import com.google.common.collect.UnmodifiableIterator;
-import org.apache.kylin.gridtable.GTInfo;
-import org.apache.kylin.gridtable.GTRecord;
-
 import java.util.Comparator;
 import java.util.List;
 import java.util.NoSuchElementException;
 import java.util.PriorityQueue;
 
+import org.apache.kylin.gridtable.GTInfo;
+import org.apache.kylin.gridtable.GTRecord;
+
+import com.google.common.collect.Iterators;
+import com.google.common.collect.PeekingIterator;
+import com.google.common.collect.UnmodifiableIterator;
+
 /**
  * Merge-sort {@code GTRecord}s in all partitions, assume each partition 
contains sorted elements.
  */
 public class SortMergedPartitionResultIterator extends 
UnmodifiableIterator<GTRecord> {
 
-    final GTRecord record ; // reuse to avoid object creation
+    final GTRecord record; // reuse to avoid object creation
     PriorityQueue<PeekingIterator<GTRecord>> heap;
 
-    SortMergedPartitionResultIterator(
-            List<PartitionResultIterator> partitionResults,
-            GTInfo info, final Comparator<GTRecord> comparator) {
+    SortMergedPartitionResultIterator(List<PartitionResultIterator> 
partitionResults, GTInfo info,
+            final Comparator<GTRecord> comparator) {
 
         this.record = new GTRecord(info);
         Comparator<PeekingIterator<GTRecord>> heapComparator = new 
Comparator<PeekingIterator<GTRecord>>() {

http://git-wip-us.apache.org/repos/asf/kylin/blob/26c03fe2/storage-hbase/src/main/java/org/apache/kylin/storage/hbase/cube/v2/coprocessor/endpoint/CubeVisitService.java
----------------------------------------------------------------------
diff --git 
a/storage-hbase/src/main/java/org/apache/kylin/storage/hbase/cube/v2/coprocessor/endpoint/CubeVisitService.java
 
b/storage-hbase/src/main/java/org/apache/kylin/storage/hbase/cube/v2/coprocessor/endpoint/CubeVisitService.java
index 3791e63..d94b547 100644
--- 
a/storage-hbase/src/main/java/org/apache/kylin/storage/hbase/cube/v2/coprocessor/endpoint/CubeVisitService.java
+++ 
b/storage-hbase/src/main/java/org/apache/kylin/storage/hbase/cube/v2/coprocessor/endpoint/CubeVisitService.java
@@ -52,6 +52,7 @@ import org.apache.kylin.gridtable.GTRecord;
 import org.apache.kylin.gridtable.GTScanRequest;
 import org.apache.kylin.gridtable.IGTScanner;
 import org.apache.kylin.gridtable.IGTStore;
+import org.apache.kylin.gridtable.StorageLimitLevel;
 import org.apache.kylin.gridtable.StorageSideBehavior;
 import org.apache.kylin.measure.BufferedMeasureCodec;
 import org.apache.kylin.metadata.realization.IRealizationConstants;
@@ -147,8 +148,8 @@ public class CubeVisitService extends 
CubeVisitProtos.CubeVisitService implement
         private long rowCount;
         private long rowBytes;
 
-        ResourceTrackingCellListIterator(Iterator<List<Cell>> delegate,
-                                         long rowCountLimit, long bytesLimit, 
long timeout) {
+        ResourceTrackingCellListIterator(Iterator<List<Cell>> delegate, long 
rowCountLimit, long bytesLimit,
+                long timeout) {
             this.delegate = delegate;
             this.rowCountLimit = rowCountLimit;
             this.bytesLimit = bytesLimit;
@@ -162,7 +163,8 @@ public class CubeVisitService extends 
CubeVisitProtos.CubeVisitService implement
                 throw new ResourceLimitExceededException("scanned row count 
exceeds threshold " + rowCountLimit);
             }
             if (rowBytes > bytesLimit) {
-                throw new ResourceLimitExceededException("scanned bytes " + 
rowBytes + " exceeds threshold " + bytesLimit);
+                throw new ResourceLimitExceededException(
+                        "scanned bytes " + rowBytes + " exceeds threshold " + 
bytesLimit);
             }
             if ((rowCount % GTScanRequest.terminateCheckInterval == 1) && 
System.currentTimeMillis() > deadline) {
                 throw new KylinTimeoutException("coprocessor timeout after " + 
timeout + " ms");
@@ -193,7 +195,8 @@ public class CubeVisitService extends 
CubeVisitProtos.CubeVisitService implement
         if (shardLength == 0) {
             return;
         }
-        byte[] regionStartKey = 
ArrayUtils.isEmpty(region.getRegionInfo().getStartKey()) ? new 
byte[shardLength] : region.getRegionInfo().getStartKey();
+        byte[] regionStartKey = 
ArrayUtils.isEmpty(region.getRegionInfo().getStartKey()) ? new byte[shardLength]
+                : region.getRegionInfo().getStartKey();
         Bytes.putBytes(rawScan.startKey, 0, regionStartKey, 0, shardLength);
         Bytes.putBytes(rawScan.endKey, 0, regionStartKey, 0, shardLength);
     }
@@ -218,7 +221,8 @@ public class CubeVisitService extends 
CubeVisitProtos.CubeVisitService implement
 
     @SuppressWarnings("checkstyle:methodlength")
     @Override
-    public void visitCube(final RpcController controller, final 
CubeVisitProtos.CubeVisitRequest request, 
RpcCallback<CubeVisitProtos.CubeVisitResponse> done) {
+    public void visitCube(final RpcController controller, final 
CubeVisitProtos.CubeVisitRequest request,
+            RpcCallback<CubeVisitProtos.CubeVisitResponse> done) {
         List<RegionScanner> regionScanners = Lists.newArrayList();
         HRegion region = null;
 
@@ -232,7 +236,7 @@ public class CubeVisitService extends 
CubeVisitProtos.CubeVisitService implement
         try (SetThreadName ignored = new SetThreadName("Query %s", queryId)) {
             final long serviceStartTime = System.currentTimeMillis();
 
-            region = (HRegion)env.getRegion();
+            region = (HRegion) env.getRegion();
             region.startRegionOperation();
 
             // if user change kylin.properties on kylin server, need to 
manually redeploy coprocessor jar to update KylinConfig of Env.
@@ -241,13 +245,15 @@ public class CubeVisitService extends 
CubeVisitProtos.CubeVisitService implement
 
             debugGitTag = 
region.getTableDesc().getValue(IRealizationConstants.HTableGitTag);
 
-            final GTScanRequest scanReq = 
GTScanRequest.serializer.deserialize(ByteBuffer.wrap(HBaseZeroCopyByteString.zeroCopyGetBytes(request.getGtScanRequest())));
+            final GTScanRequest scanReq = GTScanRequest.serializer
+                    
.deserialize(ByteBuffer.wrap(HBaseZeroCopyByteString.zeroCopyGetBytes(request.getGtScanRequest())));
             List<List<Integer>> hbaseColumnsToGT = Lists.newArrayList();
             for (IntList intList : request.getHbaseColumnsToGTList()) {
                 hbaseColumnsToGT.add(intList.getIntsList());
             }
             StorageSideBehavior behavior = 
StorageSideBehavior.valueOf(scanReq.getStorageBehavior());
-            final List<RawScan> hbaseRawScans = 
deserializeRawScans(ByteBuffer.wrap(HBaseZeroCopyByteString.zeroCopyGetBytes(request.getHbaseRawScan())));
+            final List<RawScan> hbaseRawScans = deserializeRawScans(
+                    
ByteBuffer.wrap(HBaseZeroCopyByteString.zeroCopyGetBytes(request.getHbaseRawScan())));
 
             appendProfileInfo(sb, "start latency: " + (serviceStartTime - 
scanReq.getStartTime()), serviceStartTime);
 
@@ -256,7 +262,8 @@ public class CubeVisitService extends 
CubeVisitProtos.CubeVisitService implement
             for (RawScan hbaseRawScan : hbaseRawScans) {
                 if (request.getRowkeyPreambleSize() - 
RowConstants.ROWKEY_CUBOIDID_LEN > 0) {
                     //if has shard, fill region shard to raw scan start/end
-                    updateRawScanByCurrentRegion(hbaseRawScan, region, 
request.getRowkeyPreambleSize() - RowConstants.ROWKEY_CUBOIDID_LEN);
+                    updateRawScanByCurrentRegion(hbaseRawScan, region,
+                            request.getRowkeyPreambleSize() - 
RowConstants.ROWKEY_CUBOIDID_LEN);
                 }
 
                 Scan scan = CubeHBaseRPC.buildScan(hbaseRawScan);
@@ -287,16 +294,18 @@ public class CubeVisitService extends 
CubeVisitProtos.CubeVisitService implement
 
             final long storagePushDownLimit = 
scanReq.getStoragePushDownLimit();
 
-            ResourceTrackingCellListIterator cellListIterator = new 
ResourceTrackingCellListIterator(
-                    allCellLists,
+            ResourceTrackingCellListIterator cellListIterator = new 
ResourceTrackingCellListIterator(allCellLists,
                     scanReq.getStorageScanRowNumThreshold(), // for old client 
(scan threshold)
                     !request.hasMaxScanBytes() ? Long.MAX_VALUE : 
request.getMaxScanBytes(), // for new client
                     scanReq.getTimeout());
 
-            IGTStore store = new HBaseReadonlyStore(cellListIterator, scanReq, 
hbaseRawScans.get(0).hbaseColumns, hbaseColumnsToGT, 
request.getRowkeyPreambleSize(), behavior.delayToggledOn(), 
request.getIsExactAggregate());
+            IGTStore store = new HBaseReadonlyStore(cellListIterator, scanReq, 
hbaseRawScans.get(0).hbaseColumns,
+                    hbaseColumnsToGT, request.getRowkeyPreambleSize(), 
behavior.delayToggledOn(),
+                    request.getIsExactAggregate());
 
             IGTScanner rawScanner = store.scan(scanReq);
-            IGTScanner finalScanner = scanReq.decorateScanner(rawScanner, 
behavior.filterToggledOn(), behavior.aggrToggledOn(), false, 
request.getSpillEnabled());
+            IGTScanner finalScanner = scanReq.decorateScanner(rawScanner, 
behavior.filterToggledOn(),
+                    behavior.aggrToggledOn(), false, 
request.getSpillEnabled());
 
             ByteBuffer buffer = 
ByteBuffer.allocate(BufferedMeasureCodec.DEFAULT_BUFFER_SIZE);
 
@@ -318,7 +327,9 @@ public class CubeVisitService extends 
CubeVisitProtos.CubeVisitService implement
                     finalRowCount++;
 
                     //if it's doing storage aggr, then should rely on 
GTAggregateScanner's limit check
-                    if (!scanReq.isDoingStorageAggregation() && finalRowCount 
>= storagePushDownLimit) {
+                    if (!scanReq.isDoingStorageAggregation()
+                            && (scanReq.getStorageLimitLevel() != 
StorageLimitLevel.NO_LIMIT
+                                    && finalRowCount >= storagePushDownLimit)) 
{
                         //read one more record than limit
                         logger.info("The finalScanner aborted because 
storagePushDownLimit is satisfied");
                         break;
@@ -327,22 +338,20 @@ public class CubeVisitService extends 
CubeVisitProtos.CubeVisitService implement
             } catch (KylinTimeoutException e) {
                 logger.info("Abort scan: {}", e.getMessage());
                 errorInfo = 
CubeVisitProtos.CubeVisitResponse.ErrorInfo.newBuilder()
-                        
.setType(CubeVisitProtos.CubeVisitResponse.ErrorType.TIMEOUT)
-                        .setMessage(e.getMessage())
+                        
.setType(CubeVisitProtos.CubeVisitResponse.ErrorType.TIMEOUT).setMessage(e.getMessage())
                         .build();
             } catch (ResourceLimitExceededException e) {
                 logger.info("Abort scan: {}", e.getMessage());
                 errorInfo = 
CubeVisitProtos.CubeVisitResponse.ErrorInfo.newBuilder()
                         
.setType(CubeVisitProtos.CubeVisitResponse.ErrorType.RESOURCE_LIMIT_EXCEEDED)
-                        .setMessage(e.getMessage())
-                        .build();
+                        .setMessage(e.getMessage()).build();
             } finally {
                 finalScanner.close();
             }
 
             appendProfileInfo(sb, "agg done", serviceStartTime);
-            logger.info("Total scanned {} rows and {} bytes",
-                    cellListIterator.getTotalScannedRowCount(), 
cellListIterator.getTotalScannedRowBytes());
+            logger.info("Total scanned {} rows and {} bytes", 
cellListIterator.getTotalScannedRowCount(),
+                    cellListIterator.getTotalScannedRowBytes());
 
             //outputStream.close() is not necessary
             byte[] compressedAllRows;
@@ -360,7 +369,8 @@ public class CubeVisitService extends 
CubeVisitProtos.CubeVisitService implement
             appendProfileInfo(sb, "compress done", serviceStartTime);
             logger.info("Size of final result = {} ({} before compressing)", 
compressedAllRows.length, allRows.length);
 
-            OperatingSystemMXBean operatingSystemMXBean = 
(OperatingSystemMXBean) ManagementFactory.getOperatingSystemMXBean();
+            OperatingSystemMXBean operatingSystemMXBean = 
(OperatingSystemMXBean) ManagementFactory
+                    .getOperatingSystemMXBean();
             double systemCpuLoad = operatingSystemMXBean.getSystemCpuLoad();
             double freePhysicalMemorySize = 
operatingSystemMXBean.getFreePhysicalMemorySize();
             double freeSwapSpaceSize = 
operatingSystemMXBean.getFreeSwapSpaceSize();
@@ -374,18 +384,15 @@ public class CubeVisitService extends 
CubeVisitProtos.CubeVisitService implement
             }
             done.run(responseBuilder.//
                     
setCompressedRows(HBaseZeroCopyByteString.wrap(compressedAllRows)).//too many 
array copies 
-                    
setStats(CubeVisitProtos.CubeVisitResponse.Stats.newBuilder().
-                            
setAggregatedRowCount(cellListIterator.getTotalScannedRowCount() - 
finalRowCount).
-                            
setScannedRowCount(cellListIterator.getTotalScannedRowCount()).
-                            
setScannedBytes(cellListIterator.getTotalScannedRowBytes()).
-                            setServiceStartTime(serviceStartTime).
-                            setServiceEndTime(System.currentTimeMillis()).
-                            setSystemCpuLoad(systemCpuLoad).
-                            setFreePhysicalMemorySize(freePhysicalMemorySize).
-                            setFreeSwapSpaceSize(freeSwapSpaceSize).
-                            
setHostname(InetAddress.getLocalHost().getHostName()).
-                            setEtcMsg(sb.toString()).
-                            setNormalComplete(errorInfo == null ? 1 : 
0).build())
+                    
setStats(CubeVisitProtos.CubeVisitResponse.Stats.newBuilder()
+                            
.setAggregatedRowCount(cellListIterator.getTotalScannedRowCount() - 
finalRowCount)
+                            
.setScannedRowCount(cellListIterator.getTotalScannedRowCount())
+                            
.setScannedBytes(cellListIterator.getTotalScannedRowBytes())
+                            
.setServiceStartTime(serviceStartTime).setServiceEndTime(System.currentTimeMillis())
+                            
.setSystemCpuLoad(systemCpuLoad).setFreePhysicalMemorySize(freePhysicalMemorySize)
+                            .setFreeSwapSpaceSize(freeSwapSpaceSize)
+                            
.setHostname(InetAddress.getLocalHost().getHostName()).setEtcMsg(sb.toString())
+                            .setNormalComplete(errorInfo == null ? 1 : 
0).build())
                     .build());
 
         } catch (IOException ioe) {

Reply via email to