KYLIN-2603, push down having clause when possible

Project: http://git-wip-us.apache.org/repos/asf/kylin/repo
Commit: http://git-wip-us.apache.org/repos/asf/kylin/commit/e1bc72e3
Tree: http://git-wip-us.apache.org/repos/asf/kylin/tree/e1bc72e3
Diff: http://git-wip-us.apache.org/repos/asf/kylin/diff/e1bc72e3

Branch: refs/heads/master
Commit: e1bc72e36d83a42d364e6d9b0ee893fbc018d498
Parents: 83fb144
Author: Li Yang <liy...@apache.org>
Authored: Wed May 10 21:01:53 2017 +0800
Committer: hongbin ma <m...@kyligence.io>
Committed: Tue May 23 20:18:17 2017 +0800

----------------------------------------------------------------------
 .../cube/gridtable/ScanRangePlannerBase.java    |   3 +-
 .../org/apache/kylin/cube/model/RowKeyDesc.java |  16 +-
 .../kylin/gridtable/GTAggregateScanner.java     | 223 ++++++++++++++-----
 .../apache/kylin/gridtable/GTScanRequest.java   |  19 +-
 .../kylin/gridtable/GTScanRequestBuilder.java   |   8 +-
 .../java/org/apache/kylin/gridtable/GTUtil.java |  33 ++-
 .../apache/kylin/metadata/model/ColumnDesc.java |   2 +
 .../apache/kylin/metadata/model/TblColRef.java  |   2 +
 .../kylin/metadata/realization/SQLDigest.java   |  36 ++-
 .../storage/gtrecord/CubeScanRangePlanner.java  |  12 +-
 .../storage/gtrecord/CubeSegmentScanner.java    |  10 +-
 .../gtrecord/GTCubeStorageQueryBase.java        |  51 ++++-
 .../gtrecord/GTCubeStorageQueryRequest.java     |  14 +-
 .../kylin/storage/gtrecord/ScannerWorker.java   |   4 +
 .../storage/gtrecord/DictGridTableTest.java     |  14 ++
 .../kylin/storage/hbase/ITStorageTest.java      |   6 +-
 .../kylin/query/relnode/OLAPAggregateRel.java   |   6 +-
 .../apache/kylin/query/relnode/OLAPContext.java |   9 +-
 .../kylin/query/relnode/OLAPFilterRel.java      |   9 +-
 19 files changed, 370 insertions(+), 107 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/kylin/blob/e1bc72e3/core-cube/src/main/java/org/apache/kylin/cube/gridtable/ScanRangePlannerBase.java
----------------------------------------------------------------------
diff --git 
a/core-cube/src/main/java/org/apache/kylin/cube/gridtable/ScanRangePlannerBase.java
 
b/core-cube/src/main/java/org/apache/kylin/cube/gridtable/ScanRangePlannerBase.java
index d938f2b..ed0a77a 100644
--- 
a/core-cube/src/main/java/org/apache/kylin/cube/gridtable/ScanRangePlannerBase.java
+++ 
b/core-cube/src/main/java/org/apache/kylin/cube/gridtable/ScanRangePlannerBase.java
@@ -44,14 +44,15 @@ import com.google.common.collect.Sets;
 public abstract class ScanRangePlannerBase {
 
     //GT 
-    protected TupleFilter gtFilter;
     protected GTInfo gtInfo;
+    protected TupleFilter gtFilter;
     protected Pair<ByteArray, ByteArray> gtStartAndEnd;
     protected TblColRef gtPartitionCol;
     protected ImmutableBitSet gtDimensions;
     protected ImmutableBitSet gtAggrGroups;
     protected ImmutableBitSet gtAggrMetrics;
     protected String[] gtAggrFuncs;
+    protected TupleFilter havingFilter;
     protected boolean isPartitionColUsingDatetimeEncoding = true;
 
     protected RecordComparator rangeStartComparator;

http://git-wip-us.apache.org/repos/asf/kylin/blob/e1bc72e3/core-cube/src/main/java/org/apache/kylin/cube/model/RowKeyDesc.java
----------------------------------------------------------------------
diff --git 
a/core-cube/src/main/java/org/apache/kylin/cube/model/RowKeyDesc.java 
b/core-cube/src/main/java/org/apache/kylin/cube/model/RowKeyDesc.java
index 00557c5..124f126 100644
--- a/core-cube/src/main/java/org/apache/kylin/cube/model/RowKeyDesc.java
+++ b/core-cube/src/main/java/org/apache/kylin/cube/model/RowKeyDesc.java
@@ -18,21 +18,23 @@
 
 package org.apache.kylin.cube.model;
 
-import com.fasterxml.jackson.annotation.JsonAutoDetect;
-import com.fasterxml.jackson.annotation.JsonAutoDetect.Visibility;
-import com.fasterxml.jackson.annotation.JsonProperty;
-import com.google.common.base.Objects;
-import org.apache.commons.lang.ArrayUtils;
-import org.apache.kylin.metadata.model.TblColRef;
-
 import java.util.Arrays;
 import java.util.HashMap;
 import java.util.HashSet;
 import java.util.Map;
 import java.util.Set;
 
+import org.apache.commons.lang.ArrayUtils;
+import org.apache.kylin.metadata.model.TblColRef;
+
+import com.fasterxml.jackson.annotation.JsonAutoDetect;
+import com.fasterxml.jackson.annotation.JsonAutoDetect.Visibility;
+import com.fasterxml.jackson.annotation.JsonProperty;
+import com.google.common.base.Objects;
+
 /**
  */
+@SuppressWarnings("serial")
 @JsonAutoDetect(fieldVisibility = Visibility.NONE, getterVisibility = 
Visibility.NONE, isGetterVisibility = Visibility.NONE, setterVisibility = 
Visibility.NONE)
 public class RowKeyDesc implements java.io.Serializable {
 

http://git-wip-us.apache.org/repos/asf/kylin/blob/e1bc72e3/core-cube/src/main/java/org/apache/kylin/gridtable/GTAggregateScanner.java
----------------------------------------------------------------------
diff --git 
a/core-cube/src/main/java/org/apache/kylin/gridtable/GTAggregateScanner.java 
b/core-cube/src/main/java/org/apache/kylin/gridtable/GTAggregateScanner.java
index 0dd6fa9..45a9148 100644
--- a/core-cube/src/main/java/org/apache/kylin/gridtable/GTAggregateScanner.java
+++ b/core-cube/src/main/java/org/apache/kylin/gridtable/GTAggregateScanner.java
@@ -30,9 +30,9 @@ import java.util.Arrays;
 import java.util.Comparator;
 import java.util.Iterator;
 import java.util.List;
+import java.util.Map.Entry;
 import java.util.PriorityQueue;
 import java.util.SortedMap;
-import java.util.Map.Entry;
 
 import org.apache.commons.io.IOUtils;
 import org.apache.kylin.common.exceptions.ResourceLimitExceededException;
@@ -40,11 +40,15 @@ import org.apache.kylin.common.util.ByteArray;
 import org.apache.kylin.common.util.Bytes;
 import org.apache.kylin.common.util.ImmutableBitSet;
 import org.apache.kylin.common.util.MemoryBudgetController;
-import org.apache.kylin.common.util.Pair;
 import org.apache.kylin.common.util.MemoryBudgetController.MemoryWaterLevel;
+import org.apache.kylin.common.util.Pair;
 import org.apache.kylin.measure.BufferedMeasureCodec;
 import org.apache.kylin.measure.MeasureAggregator;
 import org.apache.kylin.measure.MeasureAggregators;
+import org.apache.kylin.metadata.filter.IFilterCodeSystem;
+import org.apache.kylin.metadata.filter.TupleFilter;
+import org.apache.kylin.metadata.model.TblColRef;
+import org.apache.kylin.metadata.tuple.ITuple;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
@@ -67,6 +71,7 @@ public class GTAggregateScanner implements IGTScanner {
     final long spillThreshold; // 0 means no memory control && no spill
     final int storagePushDownLimit;//default to be Int.MAX
     final boolean spillEnabled;
+    final TupleFilter havingFilter;
 
     private int aggregatedRowCount = 0;
     private MemoryWaterLevel memTracker;
@@ -92,6 +97,7 @@ public class GTAggregateScanner implements IGTScanner {
         this.aggrMask = new boolean[metricsAggrFuncs.length];
         this.storagePushDownLimit = req.getStoragePushDownLimit();
         this.spillEnabled = spillEnabled;
+        this.havingFilter = req.getHavingFilterPushDown();
 
         Arrays.fill(aggrMask, true);
     }
@@ -327,62 +333,143 @@ public class GTAggregateScanner implements IGTScanner {
         }
 
         public Iterator<GTRecord> iterator() {
+            Iterator<Entry<byte[], MeasureAggregator[]>> it = null;
+
             if (dumps.isEmpty()) {
                 // the all-in-mem case
+                it = aggBufMap.entrySet().iterator();
+            } else {
+                // the spill case
+                if (!aggBufMap.isEmpty()) {
+                    spillBuffMap(getEstimateSizeOfAggrCache()); // TODO allow 
merge in-mem map with spilled dumps
+                }
+                DumpMerger merger = new DumpMerger(dumps);
+                it = merger.iterator();
+            }
 
-                return new Iterator<GTRecord>() {
+            final Iterator<Entry<byte[], MeasureAggregator[]>> input = it;
 
-                    final Iterator<Entry<byte[], MeasureAggregator[]>> it = 
aggBufMap.entrySet().iterator();
-                    final ReturningRecord returningRecord = new 
ReturningRecord();
+            return new Iterator<GTRecord>() {
 
-                    @Override
-                    public boolean hasNext() {
-                        return it.hasNext();
-                    }
+                final ReturningRecord returningRecord = new ReturningRecord();
+                Entry<byte[], MeasureAggregator[]> returningEntry = null;
+                final HavingFilterChecker havingFilterChecker = (havingFilter 
== null) ? null : new HavingFilterChecker();
 
-                    @Override
-                    public GTRecord next() {
-                        Entry<byte[], MeasureAggregator[]> entry = it.next();
-                        returningRecord.load(entry.getKey(), entry.getValue());
-                        return returningRecord.record;
+                @Override
+                public boolean hasNext() {
+                    while (returningEntry == null && input.hasNext()) {
+                        returningEntry = input.next();
+                        if (havingFilterChecker != null)
+                            returningEntry = 
havingFilterChecker.check(returningEntry);
                     }
+                    return returningEntry != null;
+                }
 
-                    @Override
-                    public void remove() {
-                        throw new UnsupportedOperationException();
-                    }
-                };
-            } else {
-                // the spill case
-                if (!aggBufMap.isEmpty()) {
-                    this.spillBuffMap(getEstimateSizeOfAggrCache()); // TODO 
allow merge in-mem map with spilled dumps
+                @Override
+                public GTRecord next() {
+                    returningRecord.load(returningEntry.getKey(), 
returningEntry.getValue());
+                    returningEntry = null;
+                    return returningRecord.record;
                 }
 
-                return new Iterator<GTRecord>() {
-                    final DumpMerger merger = new DumpMerger(dumps);
-                    final Iterator<Pair<byte[], MeasureAggregator[]>> it = 
merger.iterator();
-                    final ReturningRecord returningRecord = new 
ReturningRecord();
+                @Override
+                public void remove() {
+                    throw new UnsupportedOperationException();
+                }
+            };
+        }
 
-                    @Override
-                    public boolean hasNext() {
-                        return it.hasNext();
-                    }
+        class HavingFilterChecker {
 
-                    @Override
-                    public GTRecord next() {
-                        Pair<byte[], MeasureAggregator[]> entry = it.next();
-                        returningRecord.load(entry.getKey(), entry.getValue());
-                        return returningRecord.record;
-                    }
+            final HavingFilterTuple tuple = new HavingFilterTuple();
+            final IFilterCodeSystem cs = new HavingFilterCodeSys();
 
-                    @Override
-                    public void remove() {
-                        throw new UnsupportedOperationException();
-                    }
-                };
+            HavingFilterChecker() {
+                logger.info("Evaluating 'having' filter -- " + havingFilter);
+            }
+
+            public Entry<byte[], MeasureAggregator[]> check(Entry<byte[], 
MeasureAggregator[]> returningEntry) {
+                tuple.aggrValues = returningEntry.getValue();
+                boolean pass = havingFilter.evaluate(tuple, cs);
+                return pass ? returningEntry : null;
+            }
+        }
+
+        private class HavingFilterCodeSys implements IFilterCodeSystem {
+
+            Object o2Cache;
+            double n2Cache;
+
+            @Override
+            public int compare(Object o1, Object o2) {
+                if (o1 == null && o2 == null)
+                    return 0;
+
+                if (o1 == null) // null is bigger to align with CubeCodeSystem
+                    return 1;
+
+                if (o2 == null) // null is bigger to align with CubeCodeSystem
+                    return -1;
+
+                // for the 'having clause', we only concern numbers and 
BigDecimal
+                // we try to cache the o2, which should be a constant 
according to CompareTupleFilter.evaluate()
+
+                double n1 = ((Number) o1).doubleValue();
+                double n2 = (o2Cache == o2) ? n2Cache : 
Double.parseDouble((String) o2);
+                
+                if (o2Cache == null) {
+                    o2Cache = o2;
+                    n2Cache = n2;
+                }
+
+                return Double.compare(n1, n2);
+            }
+
+            @Override
+            public boolean isNull(Object code) {
+                return code == null;
+            }
+
+            @Override
+            public void serialize(Object code, ByteBuffer buf) {
+                throw new UnsupportedOperationException();
+            }
+
+            @Override
+            public Object deserialize(ByteBuffer buf) {
+                throw new UnsupportedOperationException();
             }
         }
 
+        private class HavingFilterTuple implements ITuple {
+            MeasureAggregator[] aggrValues;
+
+            @Override
+            public Object getValue(TblColRef col) {
+                return 
aggrValues[col.getColumnDesc().getZeroBasedIndex()].getState();
+            }
+
+            @Override
+            public List<String> getAllFields() {
+                throw new UnsupportedOperationException();
+            }
+
+            @Override
+            public List<TblColRef> getAllColumns() {
+                throw new UnsupportedOperationException();
+            }
+
+            @Override
+            public Object[] getAllValues() {
+                throw new UnsupportedOperationException();
+            }
+
+            @Override
+            public ITuple makeCopy() {
+                throw new UnsupportedOperationException();
+            }
+        };
+
         class ReturningRecord {
             final GTRecord record = new GTRecord(info);
             final Object[] tmpValues = new Object[metrics.trueBitCount()];
@@ -468,8 +555,7 @@ public class GTAggregateScanner implements IGTScanner {
             }
 
             public void flush() throws IOException {
-                logger.info("AggregationCache(size={} est_mem_size={} 
threshold={}) will spill to {}",
-                        buffMap.size(), estMemSize, spillThreshold, 
dumpedFile.getAbsolutePath());
+                logger.info("AggregationCache(size={} est_mem_size={} 
threshold={}) will spill to {}", buffMap.size(), estMemSize, spillThreshold, 
dumpedFile.getAbsolutePath());
 
                 if (buffMap != null) {
                     DataOutputStream dos = null;
@@ -503,18 +589,18 @@ public class GTAggregateScanner implements IGTScanner {
             }
         }
 
-        class DumpMerger implements Iterable<Pair<byte[], 
MeasureAggregator[]>> {
-            final PriorityQueue<Pair<byte[], Integer>> minHeap;
+        class DumpMerger implements Iterable<Entry<byte[], 
MeasureAggregator[]>> {
+            final PriorityQueue<Entry<byte[], Integer>> minHeap;
             final List<Iterator<Pair<byte[], byte[]>>> dumpIterators;
             final List<Object[]> dumpCurrentValues;
             final MeasureAggregator[] resultMeasureAggregators = 
newAggregators();
             final MeasureAggregators resultAggrs = new 
MeasureAggregators(resultMeasureAggregators);
 
             public DumpMerger(List<Dump> dumps) {
-                minHeap = new PriorityQueue<>(dumps.size(), new 
Comparator<Pair<byte[], Integer>>() {
+                minHeap = new PriorityQueue<>(dumps.size(), new 
Comparator<Entry<byte[], Integer>>() {
                     @Override
-                    public int compare(Pair<byte[], Integer> o1, Pair<byte[], 
Integer> o2) {
-                        return bytesComparator.compare(o1.getFirst(), 
o2.getFirst());
+                    public int compare(Entry<byte[], Integer> o1, 
Entry<byte[], Integer> o2) {
+                        return bytesComparator.compare(o1.getKey(), 
o2.getKey());
                     }
                 });
                 dumpIterators = Lists.newArrayListWithCapacity(dumps.size());
@@ -536,7 +622,7 @@ public class GTAggregateScanner implements IGTScanner {
             private void enqueueFromDump(int index) {
                 if (dumpIterators.get(index) != null && 
dumpIterators.get(index).hasNext()) {
                     Pair<byte[], byte[]> pair = 
dumpIterators.get(index).next();
-                    minHeap.offer(new Pair(pair.getKey(), index));
+                    minHeap.offer(new SimpleEntry(pair.getKey(), index));
                     Object[] metricValues = new Object[metrics.trueBitCount()];
                     measureCodec.decode(ByteBuffer.wrap(pair.getValue()), 
metricValues);
                     dumpCurrentValues.set(index, metricValues);
@@ -544,21 +630,21 @@ public class GTAggregateScanner implements IGTScanner {
             }
 
             @Override
-            public Iterator<Pair<byte[], MeasureAggregator[]>> iterator() {
-                return new Iterator<Pair<byte[], MeasureAggregator[]>>() {
+            public Iterator<Entry<byte[], MeasureAggregator[]>> iterator() {
+                return new Iterator<Entry<byte[], MeasureAggregator[]>>() {
                     @Override
                     public boolean hasNext() {
                         return !minHeap.isEmpty();
                     }
 
                     private void internalAggregate() {
-                        Pair<byte[], Integer> peekEntry = minHeap.poll();
+                        Entry<byte[], Integer> peekEntry = minHeap.poll();
                         
resultAggrs.aggregate(dumpCurrentValues.get(peekEntry.getValue()));
                         enqueueFromDump(peekEntry.getValue());
                     }
 
                     @Override
-                    public Pair<byte[], MeasureAggregator[]> next() {
+                    public Entry<byte[], MeasureAggregator[]> next() {
                         // Use minimum heap to merge sort the keys,
                         // also do aggregation for measures with same keys in 
different dumps
                         resultAggrs.reset();
@@ -570,7 +656,7 @@ public class GTAggregateScanner implements IGTScanner {
                             internalAggregate();
                         }
 
-                        return new Pair(peekKey, resultMeasureAggregators);
+                        return new SimpleEntry(peekKey, 
resultMeasureAggregators);
                     }
 
                     @Override
@@ -581,4 +667,31 @@ public class GTAggregateScanner implements IGTScanner {
             }
         }
     }
+
+    private static class SimpleEntry<K, V> implements Entry<K, V> {
+        K k;
+        V v;
+
+        SimpleEntry(K k, V v) {
+            this.k = k;
+            this.v = v;
+        }
+
+        @Override
+        public K getKey() {
+            return k;
+        }
+
+        @Override
+        public V getValue() {
+            return v;
+        }
+
+        @Override
+        public V setValue(V value) {
+            V oldV = v;
+            this.v = value;
+            return oldV;
+        }
+    }
 }

http://git-wip-us.apache.org/repos/asf/kylin/blob/e1bc72e3/core-cube/src/main/java/org/apache/kylin/gridtable/GTScanRequest.java
----------------------------------------------------------------------
diff --git 
a/core-cube/src/main/java/org/apache/kylin/gridtable/GTScanRequest.java 
b/core-cube/src/main/java/org/apache/kylin/gridtable/GTScanRequest.java
index ae35d2b..9b6b2a6 100644
--- a/core-cube/src/main/java/org/apache/kylin/gridtable/GTScanRequest.java
+++ b/core-cube/src/main/java/org/apache/kylin/gridtable/GTScanRequest.java
@@ -33,7 +33,9 @@ import org.apache.kylin.common.util.ImmutableBitSet;
 import org.apache.kylin.common.util.SerializeToByteBuffer;
 import org.apache.kylin.measure.BufferedMeasureCodec;
 import org.apache.kylin.metadata.datatype.DataType;
+import org.apache.kylin.metadata.filter.StringCodeSystem;
 import org.apache.kylin.metadata.filter.TupleFilter;
+import org.apache.kylin.metadata.filter.TupleFilterSerializer;
 import org.apache.kylin.metadata.model.TblColRef;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
@@ -56,6 +58,7 @@ public class GTScanRequest {
 
     // optional filtering
     private TupleFilter filterPushDown;
+    private TupleFilter havingFilterPushDown;
 
     // optional aggregation
     private ImmutableBitSet aggrGroupBy;
@@ -75,8 +78,9 @@ public class GTScanRequest {
     private transient boolean doingStorageAggregation = false;
 
     GTScanRequest(GTInfo info, List<GTScanRange> ranges, ImmutableBitSet 
dimensions, ImmutableBitSet aggrGroupBy, //
-            ImmutableBitSet aggrMetrics, String[] aggrMetricsFuncs, 
TupleFilter filterPushDown, boolean allowStorageAggregation, //
-            double aggCacheMemThreshold, int storageScanRowNumThreshold, int 
storagePushDownLimit, String storageBehavior, long startTime, long timeout) {
+            ImmutableBitSet aggrMetrics, String[] aggrMetricsFuncs, 
TupleFilter filterPushDown, TupleFilter havingFilterPushDown, // 
+            boolean allowStorageAggregation, double aggCacheMemThreshold, int 
storageScanRowNumThreshold, //
+            int storagePushDownLimit, String storageBehavior, long startTime, 
long timeout) {
         this.info = info;
         if (ranges == null) {
             this.ranges = Lists.newArrayList(new GTScanRange(new 
GTRecord(info), new GTRecord(info)));
@@ -85,6 +89,7 @@ public class GTScanRequest {
         }
         this.columns = dimensions;
         this.filterPushDown = filterPushDown;
+        this.havingFilterPushDown = havingFilterPushDown;
 
         this.aggrGroupBy = aggrGroupBy;
         this.aggrMetrics = aggrMetrics;
@@ -272,10 +277,10 @@ public class GTScanRequest {
         return filterPushDown;
     }
 
-    public void setFilterPushDown(TupleFilter filter) {
-        filterPushDown = filter;
+    public TupleFilter getHavingFilterPushDown() {
+        return havingFilterPushDown;
     }
-
+    
     public ImmutableBitSet getDimensions() {
         return this.getColumns().andNot(this.getAggrMetrics());
     }
@@ -359,6 +364,7 @@ public class GTScanRequest {
 
             ImmutableBitSet.serializer.serialize(value.columns, out);
             
BytesUtil.writeByteArray(GTUtil.serializeGTFilter(value.filterPushDown, 
value.info), out);
+            
BytesUtil.writeByteArray(TupleFilterSerializer.serialize(value.havingFilterPushDown,
 StringCodeSystem.INSTANCE), out);
 
             ImmutableBitSet.serializer.serialize(value.aggrGroupBy, out);
             ImmutableBitSet.serializer.serialize(value.aggrMetrics, out);
@@ -392,6 +398,7 @@ public class GTScanRequest {
 
             ImmutableBitSet sColumns = 
ImmutableBitSet.serializer.deserialize(in);
             TupleFilter sGTFilter = 
GTUtil.deserializeGTFilter(BytesUtil.readByteArray(in), sInfo);
+            TupleFilter sGTHavingFilter = 
TupleFilterSerializer.deserialize(BytesUtil.readByteArray(in), 
StringCodeSystem.INSTANCE);
 
             ImmutableBitSet sAggGroupBy = 
ImmutableBitSet.serializer.deserialize(in);
             ImmutableBitSet sAggrMetrics = 
ImmutableBitSet.serializer.deserialize(in);
@@ -406,7 +413,7 @@ public class GTScanRequest {
 
             return new 
GTScanRequestBuilder().setInfo(sInfo).setRanges(sRanges).setDimensions(sColumns).//
             
setAggrGroupBy(sAggGroupBy).setAggrMetrics(sAggrMetrics).setAggrMetricsFuncs(sAggrMetricFuncs).//
-            
setFilterPushDown(sGTFilter).setAllowStorageAggregation(sAllowPreAggr).setAggCacheMemThreshold(sAggrCacheGB).//
+            
setFilterPushDown(sGTFilter).setHavingFilterPushDown(sGTHavingFilter).setAllowStorageAggregation(sAllowPreAggr).setAggCacheMemThreshold(sAggrCacheGB).//
             
setStorageScanRowNumThreshold(storageScanRowNumThreshold).setStoragePushDownLimit(storagePushDownLimit).//
             
setStartTime(startTime).setTimeout(timeout).setStorageBehavior(storageBehavior).createGTScanRequest();
         }

http://git-wip-us.apache.org/repos/asf/kylin/blob/e1bc72e3/core-cube/src/main/java/org/apache/kylin/gridtable/GTScanRequestBuilder.java
----------------------------------------------------------------------
diff --git 
a/core-cube/src/main/java/org/apache/kylin/gridtable/GTScanRequestBuilder.java 
b/core-cube/src/main/java/org/apache/kylin/gridtable/GTScanRequestBuilder.java
index bcec1f4..ba1fdbc 100644
--- 
a/core-cube/src/main/java/org/apache/kylin/gridtable/GTScanRequestBuilder.java
+++ 
b/core-cube/src/main/java/org/apache/kylin/gridtable/GTScanRequestBuilder.java
@@ -29,6 +29,7 @@ public class GTScanRequestBuilder {
     private GTInfo info;
     private List<GTScanRange> ranges;
     private TupleFilter filterPushDown;
+    private TupleFilter havingFilterPushDown;
     private ImmutableBitSet dimensions;
     private ImmutableBitSet aggrGroupBy = null;
     private ImmutableBitSet aggrMetrics = null;
@@ -56,6 +57,11 @@ public class GTScanRequestBuilder {
         return this;
     }
 
+    public GTScanRequestBuilder setHavingFilterPushDown(TupleFilter 
havingFilterPushDown) {
+        this.havingFilterPushDown = havingFilterPushDown;
+        return this;
+    }
+    
     public GTScanRequestBuilder setDimensions(ImmutableBitSet dimensions) {
         this.dimensions = dimensions;
         return this;
@@ -131,6 +137,6 @@ public class GTScanRequestBuilder {
         this.startTime = startTime == -1 ? System.currentTimeMillis() : 
startTime;
         this.timeout = timeout == -1 ? 300000 : timeout;
 
-        return new GTScanRequest(info, ranges, dimensions, aggrGroupBy, 
aggrMetrics, aggrMetricsFuncs, filterPushDown, allowStorageAggregation, 
aggCacheMemThreshold, storageScanRowNumThreshold, storagePushDownLimit, 
storageBehavior, startTime, timeout);
+        return new GTScanRequest(info, ranges, dimensions, aggrGroupBy, 
aggrMetrics, aggrMetricsFuncs, filterPushDown, havingFilterPushDown, 
allowStorageAggregation, aggCacheMemThreshold, storageScanRowNumThreshold, 
storagePushDownLimit, storageBehavior, startTime, timeout);
     }
 }
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/kylin/blob/e1bc72e3/core-cube/src/main/java/org/apache/kylin/gridtable/GTUtil.java
----------------------------------------------------------------------
diff --git a/core-cube/src/main/java/org/apache/kylin/gridtable/GTUtil.java 
b/core-cube/src/main/java/org/apache/kylin/gridtable/GTUtil.java
index 7496778..7a7e4e6 100755
--- a/core-cube/src/main/java/org/apache/kylin/gridtable/GTUtil.java
+++ b/core-cube/src/main/java/org/apache/kylin/gridtable/GTUtil.java
@@ -20,7 +20,9 @@ package org.apache.kylin.gridtable;
 
 import java.nio.ByteBuffer;
 import java.util.Collection;
+import java.util.HashMap;
 import java.util.List;
+import java.util.Map;
 import java.util.Set;
 
 import org.apache.kylin.common.util.ByteArray;
@@ -61,18 +63,28 @@ public class GTUtil {
 
     public static TupleFilter convertFilterColumnsAndConstants(TupleFilter 
rootFilter, GTInfo info, //
             List<TblColRef> colMapping, Set<TblColRef> 
unevaluatableColumnCollector) {
-        return convertFilter(rootFilter, info, colMapping, true, 
unevaluatableColumnCollector);
+        Map<TblColRef, Integer> map = colListToMap(colMapping);
+        return convertFilter(rootFilter, info, map, true, 
unevaluatableColumnCollector);
+    }
+
+    protected static Map<TblColRef, Integer> colListToMap(List<TblColRef> 
colMapping) {
+        Map<TblColRef, Integer> map = new HashMap<>();
+        for (int i = 0; i < colMapping.size(); i++) {
+            map.put(colMapping.get(i), i);
+        }
+        return map;
     }
 
     // converts TblColRef to GridTable column, encode constants, drop 
unEvaluatable parts
     private static TupleFilter convertFilter(TupleFilter rootFilter, final 
GTInfo info, //
-            final List<TblColRef> colMapping, final boolean encodeConstants, //
+            final Map<TblColRef, Integer> colMapping, final boolean 
encodeConstants, //
             final Set<TblColRef> unevaluatableColumnCollector) {
 
         IFilterCodeSystem<ByteArray> filterCodeSystem = 
wrap(info.codeSystem.getComparator());
+        
+        GTConvertDecorator decorator = new 
GTConvertDecorator(unevaluatableColumnCollector, colMapping, info, 
encodeConstants);
 
-        byte[] bytes = TupleFilterSerializer.serialize(rootFilter, new 
GTConvertDecorator(unevaluatableColumnCollector, colMapping, info, 
encodeConstants), filterCodeSystem);
-
+        byte[] bytes = TupleFilterSerializer.serialize(rootFilter, decorator, 
filterCodeSystem);
         return TupleFilterSerializer.deserialize(bytes, filterCodeSystem);
     }
 
@@ -106,17 +118,22 @@ public class GTUtil {
 
     protected static class GTConvertDecorator implements 
TupleFilterSerializer.Decorator {
         protected final Set<TblColRef> unevaluatableColumnCollector;
-        protected final List<TblColRef> colMapping;
+        protected final Map<TblColRef, Integer> colMapping;
         protected final GTInfo info;
         protected final boolean encodeConstants;
 
-        public GTConvertDecorator(Set<TblColRef> unevaluatableColumnCollector, 
List<TblColRef> colMapping, GTInfo info, boolean encodeConstants) {
+        public GTConvertDecorator(Set<TblColRef> unevaluatableColumnCollector, 
Map<TblColRef, Integer> colMapping, GTInfo info, boolean encodeConstants) {
             this.unevaluatableColumnCollector = unevaluatableColumnCollector;
             this.colMapping = colMapping;
             this.info = info;
             this.encodeConstants = encodeConstants;
             buf = ByteBuffer.allocate(info.getMaxColumnLength());
         }
+        
+        protected int mapCol(TblColRef col) {
+            Integer i = colMapping.get(col);
+            return i == null ? -1 : i;
+        }
 
         @Override
         public TupleFilter onSerialize(TupleFilter filter) {
@@ -140,7 +157,7 @@ public class GTUtil {
             // map to column onto grid table
             if (colMapping != null && filter instanceof ColumnTupleFilter) {
                 ColumnTupleFilter colFilter = (ColumnTupleFilter) filter;
-                int gtColIdx = colMapping.indexOf(colFilter.getColumn());
+                int gtColIdx = mapCol(colFilter.getColumn());
                 return new ColumnTupleFilter(info.colRef(gtColIdx));
             }
 
@@ -174,7 +191,7 @@ public class GTUtil {
             //with normal ConstantTupleFilter
 
             Object firstValue = constValues.iterator().next();
-            int col = colMapping == null ? 
externalCol.getColumnDesc().getZeroBasedIndex() : 
colMapping.indexOf(externalCol);
+            int col = colMapping == null ? 
externalCol.getColumnDesc().getZeroBasedIndex() : mapCol(externalCol);
 
             TupleFilter result;
             ByteArray code;

http://git-wip-us.apache.org/repos/asf/kylin/blob/e1bc72e3/core-metadata/src/main/java/org/apache/kylin/metadata/model/ColumnDesc.java
----------------------------------------------------------------------
diff --git 
a/core-metadata/src/main/java/org/apache/kylin/metadata/model/ColumnDesc.java 
b/core-metadata/src/main/java/org/apache/kylin/metadata/model/ColumnDesc.java
index 2cd4964..5d15d56 100644
--- 
a/core-metadata/src/main/java/org/apache/kylin/metadata/model/ColumnDesc.java
+++ 
b/core-metadata/src/main/java/org/apache/kylin/metadata/model/ColumnDesc.java
@@ -133,6 +133,8 @@ public class ColumnDesc implements Serializable {
 
     public void setId(String id) {
         this.id = id;
+        if (id != null)
+            zeroBasedIndex = Integer.parseInt(id) - 1;
     }
 
     public String getName() {

http://git-wip-us.apache.org/repos/asf/kylin/blob/e1bc72e3/core-metadata/src/main/java/org/apache/kylin/metadata/model/TblColRef.java
----------------------------------------------------------------------
diff --git 
a/core-metadata/src/main/java/org/apache/kylin/metadata/model/TblColRef.java 
b/core-metadata/src/main/java/org/apache/kylin/metadata/model/TblColRef.java
index da62a75..aa4c056 100644
--- a/core-metadata/src/main/java/org/apache/kylin/metadata/model/TblColRef.java
+++ b/core-metadata/src/main/java/org/apache/kylin/metadata/model/TblColRef.java
@@ -202,6 +202,8 @@ public class TblColRef implements Serializable {
             return false;
         if ((table == null ? other.table == null : table.equals(other.table)) 
== false)
             return false;
+        if (this.isInnerColumn() != other.isInnerColumn())
+            return false;
         return true;
     }
 

http://git-wip-us.apache.org/repos/asf/kylin/blob/e1bc72e3/core-metadata/src/main/java/org/apache/kylin/metadata/realization/SQLDigest.java
----------------------------------------------------------------------
diff --git 
a/core-metadata/src/main/java/org/apache/kylin/metadata/realization/SQLDigest.java
 
b/core-metadata/src/main/java/org/apache/kylin/metadata/realization/SQLDigest.java
index 36f303b..03ff3ff 100644
--- 
a/core-metadata/src/main/java/org/apache/kylin/metadata/realization/SQLDigest.java
+++ 
b/core-metadata/src/main/java/org/apache/kylin/metadata/realization/SQLDigest.java
@@ -46,34 +46,52 @@ public class SQLDigest {
         }
     }
 
+    // model
     public String factTable;
-    public TupleFilter filter;
-    public List<JoinDesc> joinDescs;
     public Set<TblColRef> allColumns;
+    public List<JoinDesc> joinDescs;
+
+    // group by
     public List<TblColRef> groupbyColumns;
     public Set<TblColRef> subqueryJoinParticipants;
-    public Set<TblColRef> filterColumns;
+
+    // aggregation
     public Set<TblColRef> metricColumns;
     public List<FunctionDesc> aggregations; // storage level measure type, on 
top of which various sql aggr function may apply
     public List<SQLCall> aggrSqlCalls; // sql level aggregation function call
+
+    // filter
+    public Set<TblColRef> filterColumns;
+    public TupleFilter filter;
+    public TupleFilter havingFilter;
+
+    // sort & limit
     public List<TblColRef> sortColumns;
     public List<OrderEnum> sortOrders;
     public boolean isRawQuery;
     public boolean limitPrecedesAggr;
 
-    public SQLDigest(String factTable, TupleFilter filter, List<JoinDesc> 
joinDescs, Set<TblColRef> allColumns, //
-            List<TblColRef> groupbyColumns, Set<TblColRef> 
subqueryJoinParticipants, Set<TblColRef> filterColumns, Set<TblColRef> 
metricColumns, //
-            List<FunctionDesc> aggregations, List<SQLCall> aggrSqlCalls, 
List<TblColRef> sortColumns, List<OrderEnum> sortOrders, boolean 
limitPrecedesAggr) {
+    public SQLDigest(String factTable, Set<TblColRef> allColumns, 
List<JoinDesc> joinDescs, // model
+            List<TblColRef> groupbyColumns, Set<TblColRef> 
subqueryJoinParticipants, // group by
+            Set<TblColRef> metricColumns, List<FunctionDesc> aggregations, 
List<SQLCall> aggrSqlCalls, // aggregation
+            Set<TblColRef> filterColumns, TupleFilter filter, TupleFilter 
havingFilter, // filter
+            List<TblColRef> sortColumns, List<OrderEnum> sortOrders, boolean 
limitPrecedesAggr // sort & limit
+    ) {
         this.factTable = factTable;
-        this.filter = filter;
-        this.joinDescs = joinDescs;
         this.allColumns = allColumns;
+        this.joinDescs = joinDescs;
+
         this.groupbyColumns = groupbyColumns;
         this.subqueryJoinParticipants = subqueryJoinParticipants;
-        this.filterColumns = filterColumns;
+
         this.metricColumns = metricColumns;
         this.aggregations = aggregations;
         this.aggrSqlCalls = aggrSqlCalls;
+
+        this.filterColumns = filterColumns;
+        this.filter = filter;
+        this.havingFilter = havingFilter;
+
         this.sortColumns = sortColumns;
         this.sortOrders = sortOrders;
         this.isRawQuery = isRawQuery();

http://git-wip-us.apache.org/repos/asf/kylin/blob/e1bc72e3/core-storage/src/main/java/org/apache/kylin/storage/gtrecord/CubeScanRangePlanner.java
----------------------------------------------------------------------
diff --git 
a/core-storage/src/main/java/org/apache/kylin/storage/gtrecord/CubeScanRangePlanner.java
 
b/core-storage/src/main/java/org/apache/kylin/storage/gtrecord/CubeScanRangePlanner.java
index c3cc858..cecea85 100644
--- 
a/core-storage/src/main/java/org/apache/kylin/storage/gtrecord/CubeScanRangePlanner.java
+++ 
b/core-storage/src/main/java/org/apache/kylin/storage/gtrecord/CubeScanRangePlanner.java
@@ -73,7 +73,7 @@ public class CubeScanRangePlanner extends 
ScanRangePlannerBase {
     protected StorageContext context;
 
     public CubeScanRangePlanner(CubeSegment cubeSegment, Cuboid cuboid, 
TupleFilter filter, Set<TblColRef> dimensions, Set<TblColRef> groupByDims, //
-            Collection<FunctionDesc> metrics, StorageContext context) {
+            Collection<FunctionDesc> metrics, TupleFilter havingFilter, 
StorageContext context) {
         this.context = context;
 
         this.maxScanRanges = 
KylinConfig.getInstanceFromEnv().getQueryStorageVisitScanRangeMax();
@@ -100,6 +100,7 @@ public class CubeScanRangePlanner extends 
ScanRangePlannerBase {
         //replace the constant values in filter to dictionary codes
         Set<TblColRef> groupByPushDown = Sets.newHashSet(groupByDims);
         this.gtFilter = GTUtil.convertFilterColumnsAndConstants(filter, 
gtInfo, mapping.getCuboidDimensionsInGTOrder(), groupByPushDown);
+        this.havingFilter = havingFilter;
 
         this.gtDimensions = mapping.makeGridTableColumns(dimensions);
         this.gtAggrGroups = 
mapping.makeGridTableColumns(replaceDerivedColumns(groupByPushDown, 
cubeSegment.getCubeDesc()));
@@ -115,15 +116,10 @@ public class CubeScanRangePlanner extends 
ScanRangePlannerBase {
                 this.gtPartitionCol = gtInfo.colRef(index);
             }
         }
-
     }
 
     /**
-     * constrcut GTScanRangePlanner with incomplete information. only be used 
for UT  
-     * @param info
-     * @param gtStartAndEnd
-     * @param gtPartitionCol
-     * @param gtFilter
+     * Construct  GTScanRangePlanner with incomplete information. For UT only.
      */
     public CubeScanRangePlanner(GTInfo info, Pair<ByteArray, ByteArray> 
gtStartAndEnd, TblColRef gtPartitionCol, TupleFilter gtFilter) {
 
@@ -152,7 +148,7 @@ public class CubeScanRangePlanner extends 
ScanRangePlannerBase {
             scanRequest = new 
GTScanRequestBuilder().setInfo(gtInfo).setRanges(scanRanges).setDimensions(gtDimensions).//
                     
setAggrGroupBy(gtAggrGroups).setAggrMetrics(gtAggrMetrics).setAggrMetricsFuncs(gtAggrFuncs).setFilterPushDown(gtFilter).//
                     
setAllowStorageAggregation(context.isNeedStorageAggregation()).setAggCacheMemThreshold(cubeSegment.getConfig().getQueryCoprocessorMemGB()).//
-                    
setStoragePushDownLimit(context.getFinalPushDownLimit()).createGTScanRequest();
+                    
setStoragePushDownLimit(context.getFinalPushDownLimit()).setHavingFilterPushDown(havingFilter).createGTScanRequest();
         } else {
             scanRequest = null;
         }

http://git-wip-us.apache.org/repos/asf/kylin/blob/e1bc72e3/core-storage/src/main/java/org/apache/kylin/storage/gtrecord/CubeSegmentScanner.java
----------------------------------------------------------------------
diff --git 
a/core-storage/src/main/java/org/apache/kylin/storage/gtrecord/CubeSegmentScanner.java
 
b/core-storage/src/main/java/org/apache/kylin/storage/gtrecord/CubeSegmentScanner.java
index 31a9f99..ee12743 100644
--- 
a/core-storage/src/main/java/org/apache/kylin/storage/gtrecord/CubeSegmentScanner.java
+++ 
b/core-storage/src/main/java/org/apache/kylin/storage/gtrecord/CubeSegmentScanner.java
@@ -51,7 +51,7 @@ public class CubeSegmentScanner implements IGTScanner {
     final GTScanRequest scanRequest;
 
     public CubeSegmentScanner(CubeSegment cubeSeg, Cuboid cuboid, 
Set<TblColRef> dimensions, Set<TblColRef> groups, //
-            Collection<FunctionDesc> metrics, TupleFilter originalfilter, 
StorageContext context) {
+            Collection<FunctionDesc> metrics, TupleFilter originalfilter, 
TupleFilter havingFilter, StorageContext context) {
         
         logger.info("Init CubeSegmentScanner for segment {}", 
cubeSeg.getName());
         
@@ -70,16 +70,22 @@ public class CubeSegmentScanner implements IGTScanner {
 
         CubeScanRangePlanner scanRangePlanner;
         try {
-            scanRangePlanner = new CubeScanRangePlanner(cubeSeg, cuboid, 
filter, dimensions, groups, metrics, context);
+            scanRangePlanner = new CubeScanRangePlanner(cubeSeg, cuboid, 
filter, dimensions, groups, metrics, havingFilter, context);
         } catch (RuntimeException e) {
             throw e;
         } catch (Exception e) {
             throw new RuntimeException(e);
         }
+        
         scanRequest = scanRangePlanner.planScanRequest();
+        
         String gtStorage = ((GTCubeStorageQueryBase) 
context.getStorageQuery()).getGTStorage();
         scanner = new ScannerWorker(cubeSeg, cuboid, scanRequest, gtStorage, 
context);
     }
+    
+    public boolean isSegmentSkipped() {
+        return scanner.isSegmentSkipped();
+    }
 
     @Override
     public Iterator<GTRecord> iterator() {

http://git-wip-us.apache.org/repos/asf/kylin/blob/e1bc72e3/core-storage/src/main/java/org/apache/kylin/storage/gtrecord/GTCubeStorageQueryBase.java
----------------------------------------------------------------------
diff --git 
a/core-storage/src/main/java/org/apache/kylin/storage/gtrecord/GTCubeStorageQueryBase.java
 
b/core-storage/src/main/java/org/apache/kylin/storage/gtrecord/GTCubeStorageQueryBase.java
index 0f942f0..5faa098 100644
--- 
a/core-storage/src/main/java/org/apache/kylin/storage/gtrecord/GTCubeStorageQueryBase.java
+++ 
b/core-storage/src/main/java/org/apache/kylin/storage/gtrecord/GTCubeStorageQueryBase.java
@@ -44,6 +44,7 @@ import 
org.apache.kylin.metadata.filter.TupleFilter.FilterOperatorEnum;
 import org.apache.kylin.metadata.model.FunctionDesc;
 import org.apache.kylin.metadata.model.MeasureDesc;
 import org.apache.kylin.metadata.model.SegmentStatusEnum;
+import org.apache.kylin.metadata.model.Segments;
 import org.apache.kylin.metadata.model.TblColRef;
 import org.apache.kylin.metadata.realization.SQLDigest;
 import org.apache.kylin.metadata.tuple.ITupleIterator;
@@ -83,8 +84,10 @@ public abstract class GTCubeStorageQueryBase implements 
IStorageQuery {
                 continue;
             }
 
-            scanner = new CubeSegmentScanner(cubeSeg, request.getCuboid(), 
request.getDimensions(), request.getGroups(), request.getMetrics(), 
request.getFilter(), request.getContext());
-            scanners.add(scanner);
+            scanner = new CubeSegmentScanner(cubeSeg, request.getCuboid(), 
request.getDimensions(), request.getGroups(), request.getMetrics(), 
request.getFilter(), request.getHavingFilter(), request.getContext());
+            
+            if (!scanner.isSegmentSkipped())
+                scanners.add(scanner);
         }
 
         if (scanners.isEmpty())
@@ -147,10 +150,13 @@ public abstract class GTCubeStorageQueryBase implements 
IStorageQuery {
         enableStreamAggregateIfBeneficial(cuboid, groupsD, context);
         // set query deadline
         context.setDeadline(cubeInstance);
-
+        
+        // push down having clause filter if possible
+        TupleFilter havingFilter = 
checkHavingCanPushDown(sqlDigest.havingFilter, groupsD, sqlDigest.aggregations, 
metrics);
+        
         logger.info("Cuboid identified: cube={}, cuboidId={}, groupsD={}, 
filterD={}, limitPushdown={}, storageAggr={}", cubeInstance.getName(), 
cuboid.getId(), groupsD, filterColumnD, context.getFinalPushDownLimit(), 
context.isNeedStorageAggregation());
 
-        return new GTCubeStorageQueryRequest(cuboid, dimensionsD, groupsD, 
filterColumnD, metrics, filterD, context);
+        return new GTCubeStorageQueryRequest(cuboid, dimensionsD, groupsD, 
filterColumnD, metrics, filterD, havingFilter, context);
     }
 
     protected abstract String getGTStorage();
@@ -416,4 +422,41 @@ public abstract class GTCubeStorageQueryBase implements 
IStorageQuery {
         }
     }
 
+    private TupleFilter checkHavingCanPushDown(TupleFilter havingFilter, 
Set<TblColRef> groupsD, List<FunctionDesc> aggregations, Set<FunctionDesc> 
metrics) {
+        // must have only one segment
+        Segments<CubeSegment> readySegs = 
cubeInstance.getSegments(SegmentStatusEnum.READY);
+        if (readySegs.size() != 1)
+            return null;
+        
+        // sharded-by column must on group by
+        CubeDesc desc = cubeInstance.getDescriptor();
+        Set<TblColRef> shardBy = desc.getShardByColumns();
+        if (groupsD == null || shardBy.isEmpty() || 
!groupsD.containsAll(shardBy))
+            return null;
+        
+        // OK, push down
+        logger.info("Push down having filter " + havingFilter);
+        
+        // convert columns in the filter
+        Set<TblColRef> aggrOutCols = new HashSet<>();
+        TupleFilter.collectColumns(havingFilter, aggrOutCols);
+        
+        for (TblColRef aggrOutCol : aggrOutCols) {
+            int aggrIdxOnSql = aggrOutCol.getColumnDesc().getZeroBasedIndex(); 
// aggr index marked in OLAPAggregateRel
+            FunctionDesc aggrFunc = aggregations.get(aggrIdxOnSql);
+            
+            // calculate the index of this aggr among all the metrics that is 
sending to storage
+            int aggrIdxAmongMetrics = 0;
+            for (MeasureDesc m : cubeDesc.getMeasures()) {
+                if (aggrFunc.equals(m.getFunction()))
+                    break;
+                if (metrics.contains(m.getFunction()))
+                    aggrIdxAmongMetrics++;
+            }
+            aggrOutCol.getColumnDesc().setId("" + (aggrIdxAmongMetrics + 1));
+        }
+        
+        return havingFilter;
+    }
+
 }

http://git-wip-us.apache.org/repos/asf/kylin/blob/e1bc72e3/core-storage/src/main/java/org/apache/kylin/storage/gtrecord/GTCubeStorageQueryRequest.java
----------------------------------------------------------------------
diff --git 
a/core-storage/src/main/java/org/apache/kylin/storage/gtrecord/GTCubeStorageQueryRequest.java
 
b/core-storage/src/main/java/org/apache/kylin/storage/gtrecord/GTCubeStorageQueryRequest.java
index 68f755c..7793515 100644
--- 
a/core-storage/src/main/java/org/apache/kylin/storage/gtrecord/GTCubeStorageQueryRequest.java
+++ 
b/core-storage/src/main/java/org/apache/kylin/storage/gtrecord/GTCubeStorageQueryRequest.java
@@ -27,6 +27,7 @@ import org.apache.kylin.metadata.model.FunctionDesc;
 import org.apache.kylin.metadata.model.TblColRef;
 import org.apache.kylin.storage.StorageContext;
 
+@SuppressWarnings("serial")
 public class GTCubeStorageQueryRequest implements Serializable {
     private Cuboid cuboid;
     private Set<TblColRef> dimensions;
@@ -34,15 +35,18 @@ public class GTCubeStorageQueryRequest implements 
Serializable {
     private Set<TblColRef> filterCols;
     private Set<FunctionDesc> metrics;
     private TupleFilter filter;
+    private TupleFilter havingFilter;
     private StorageContext context;
 
-    public GTCubeStorageQueryRequest(Cuboid cuboid, Set<TblColRef> dimensions, 
Set<TblColRef> groups, Set<TblColRef> filterCols, Set<FunctionDesc> metrics, 
TupleFilter filter, StorageContext context) {
+    public GTCubeStorageQueryRequest(Cuboid cuboid, Set<TblColRef> dimensions, 
Set<TblColRef> groups, //
+            Set<TblColRef> filterCols, Set<FunctionDesc> metrics, TupleFilter 
filter, TupleFilter havingFilter, StorageContext context) {
         this.cuboid = cuboid;
         this.dimensions = dimensions;
         this.groups = groups;
         this.filterCols = filterCols;
         this.metrics = metrics;
         this.filter = filter;
+        this.havingFilter = havingFilter;
         this.context = context;
     }
 
@@ -86,6 +90,14 @@ public class GTCubeStorageQueryRequest implements 
Serializable {
         this.filter = filter;
     }
 
+    public TupleFilter getHavingFilter() {
+        return havingFilter;
+    }
+
+    public void setHavingFilter(TupleFilter havingFilter) {
+        this.havingFilter = havingFilter;
+    }
+
     public StorageContext getContext() {
         return context;
     }

http://git-wip-us.apache.org/repos/asf/kylin/blob/e1bc72e3/core-storage/src/main/java/org/apache/kylin/storage/gtrecord/ScannerWorker.java
----------------------------------------------------------------------
diff --git 
a/core-storage/src/main/java/org/apache/kylin/storage/gtrecord/ScannerWorker.java
 
b/core-storage/src/main/java/org/apache/kylin/storage/gtrecord/ScannerWorker.java
index fe22e9c..8f64bd1 100644
--- 
a/core-storage/src/main/java/org/apache/kylin/storage/gtrecord/ScannerWorker.java
+++ 
b/core-storage/src/main/java/org/apache/kylin/storage/gtrecord/ScannerWorker.java
@@ -55,6 +55,10 @@ public class ScannerWorker {
             throw new RuntimeException(e);
         }
     }
+    
+    public boolean isSegmentSkipped() {
+        return internal instanceof EmptyGTScanner;
+    }
 
     public Iterator<GTRecord> iterator() {
         return internal.iterator();

http://git-wip-us.apache.org/repos/asf/kylin/blob/e1bc72e3/core-storage/src/test/java/org/apache/kylin/storage/gtrecord/DictGridTableTest.java
----------------------------------------------------------------------
diff --git 
a/core-storage/src/test/java/org/apache/kylin/storage/gtrecord/DictGridTableTest.java
 
b/core-storage/src/test/java/org/apache/kylin/storage/gtrecord/DictGridTableTest.java
index 7500b00..672f3e0 100644
--- 
a/core-storage/src/test/java/org/apache/kylin/storage/gtrecord/DictGridTableTest.java
+++ 
b/core-storage/src/test/java/org/apache/kylin/storage/gtrecord/DictGridTableTest.java
@@ -61,6 +61,7 @@ import org.apache.kylin.metadata.filter.TupleFilter;
 import org.apache.kylin.metadata.filter.TupleFilter.FilterOperatorEnum;
 import org.apache.kylin.metadata.model.TableDesc;
 import org.apache.kylin.metadata.model.TblColRef;
+import org.apache.kylin.metadata.model.TblColRef.InnerDataTypeEnum;
 import org.junit.After;
 import org.junit.Assert;
 import org.junit.Before;
@@ -321,6 +322,19 @@ public class DictGridTableTest extends 
LocalFileMetadataTestCase {
     }
 
     @Test
+    public void verifyAggregateAndHavingFilter() throws IOException {
+        GTInfo info = table.getInfo();
+        
+        TblColRef havingCol = TblColRef.newInnerColumn("SUM_OF_BIGDECIMAL", 
InnerDataTypeEnum.LITERAL);
+        havingCol.getColumnDesc().setId("1"); // point to the first aggregated 
measure
+        CompareTupleFilter havingFilter = compare(havingCol, 
FilterOperatorEnum.GT, "20");
+        
+        GTScanRequest req = new 
GTScanRequestBuilder().setInfo(info).setRanges(null).setDimensions(null).setAggrGroupBy(setOf(1)).setAggrMetrics(setOf(4)).setAggrMetricsFuncs(new
 String[] { "sum" 
}).setHavingFilterPushDown(havingFilter).createGTScanRequest();
+        
+        doScanAndVerify(table, useDeserializedGTScanRequest(req), "[null, 20, 
null, null, 42.0]", "[null, 30, null, null, 52.5]");
+    }
+    
+    @Test
     public void testFilterScannerPerf() throws IOException {
         GridTable table = newTestPerfTable();
         GTInfo info = table.getInfo();

http://git-wip-us.apache.org/repos/asf/kylin/blob/e1bc72e3/kylin-it/src/test/java/org/apache/kylin/storage/hbase/ITStorageTest.java
----------------------------------------------------------------------
diff --git 
a/kylin-it/src/test/java/org/apache/kylin/storage/hbase/ITStorageTest.java 
b/kylin-it/src/test/java/org/apache/kylin/storage/hbase/ITStorageTest.java
index 847baf8..24589a8 100644
--- a/kylin-it/src/test/java/org/apache/kylin/storage/hbase/ITStorageTest.java
+++ b/kylin-it/src/test/java/org/apache/kylin/storage/hbase/ITStorageTest.java
@@ -135,7 +135,11 @@ public class ITStorageTest extends HBaseMetadataTestCase {
         int count = 0;
         ITupleIterator iterator = null;
         try {
-            SQLDigest sqlDigest = new SQLDigest("default.test_kylin_fact", 
filter, null, Collections.<TblColRef> emptySet(), groups, Sets.<TblColRef> 
newHashSet(), Collections.<TblColRef> emptySet(), Collections.<TblColRef> 
emptySet(), aggregations, Collections.<SQLCall> emptyList(), new 
ArrayList<TblColRef>(), new ArrayList<SQLDigest.OrderEnum>(), false);
+            SQLDigest sqlDigest = new SQLDigest("default.test_kylin_fact", 
/*allCol*/ Collections.<TblColRef> emptySet(), /*join*/ null, //
+                    groups, /*subqueryJoinParticipants*/ Sets.<TblColRef> 
newHashSet(), //
+                    /*metricCol*/ Collections.<TblColRef> emptySet(), 
aggregations, /*aggrSqlCalls*/ Collections.<SQLCall> emptyList(), //
+                    /*filter col*/ Collections.<TblColRef> emptySet(), filter, 
null, //
+                    /*sortCol*/ new ArrayList<TblColRef>(), new 
ArrayList<SQLDigest.OrderEnum>(), false);
             iterator = storageEngine.search(context, sqlDigest, 
mockup.newTupleInfo(groups, aggregations));
             while (iterator.hasNext()) {
                 ITuple tuple = iterator.next();

http://git-wip-us.apache.org/repos/asf/kylin/blob/e1bc72e3/query/src/main/java/org/apache/kylin/query/relnode/OLAPAggregateRel.java
----------------------------------------------------------------------
diff --git 
a/query/src/main/java/org/apache/kylin/query/relnode/OLAPAggregateRel.java 
b/query/src/main/java/org/apache/kylin/query/relnode/OLAPAggregateRel.java
index 24711d3..adb145a 100644
--- a/query/src/main/java/org/apache/kylin/query/relnode/OLAPAggregateRel.java
+++ b/query/src/main/java/org/apache/kylin/query/relnode/OLAPAggregateRel.java
@@ -167,6 +167,7 @@ public class OLAPAggregateRel extends Aggregate implements 
OLAPRel {
         if (!this.afterAggregate) {
             addToContextGroupBy(this.groups);
             this.context.aggregations.addAll(this.aggregations);
+            
this.context.aggrOutCols.addAll(columnRowType.getAllColumns().subList(groups.size(),
 columnRowType.getAllColumns().size()));
             this.context.afterAggregate = true;
 
             if (this.context.afterLimit) {
@@ -209,14 +210,15 @@ public class OLAPAggregateRel extends Aggregate 
implements OLAPRel {
         for (int i = 0; i < this.aggregations.size(); i++) {
             FunctionDesc aggFunc = this.aggregations.get(i);
             String aggOutName;
-            if (aggFunc != null && aggFunc.needRewriteField()) {
+            if (aggFunc != null) {
                 aggOutName = aggFunc.getRewriteFieldName();
             } else {
                 AggregateCall aggCall = this.rewriteAggCalls.get(i);
                 int index = aggCall.getArgList().get(0);
-                aggOutName = getSqlFuncName(aggCall) + "_" + 
inputColumnRowType.getColumnByIndex(index).getIdentity() + "_";
+                aggOutName = getSqlFuncName(aggCall) + "_" + 
inputColumnRowType.getColumnByIndex(index).getIdentity().replace('.', '_') + 
"_";
             }
             TblColRef aggOutCol = TblColRef.newInnerColumn(aggOutName, 
TblColRef.InnerDataTypeEnum.LITERAL);
+            aggOutCol.getColumnDesc().setId("" + (i + 1)); // mark the index 
of aggregation
             columns.add(aggOutCol);
         }
         return new ColumnRowType(columns);

http://git-wip-us.apache.org/repos/asf/kylin/blob/e1bc72e3/query/src/main/java/org/apache/kylin/query/relnode/OLAPContext.java
----------------------------------------------------------------------
diff --git 
a/query/src/main/java/org/apache/kylin/query/relnode/OLAPContext.java 
b/query/src/main/java/org/apache/kylin/query/relnode/OLAPContext.java
index db5f2eb..31ed075 100644
--- a/query/src/main/java/org/apache/kylin/query/relnode/OLAPContext.java
+++ b/query/src/main/java/org/apache/kylin/query/relnode/OLAPContext.java
@@ -126,9 +126,11 @@ public class OLAPContext {
     public Set<TblColRef> subqueryJoinParticipants = new 
HashSet<TblColRef>();//subqueryJoinParticipants will be added to 
groupByColumns(only when other group by co-exists) and allColumns
     public Set<TblColRef> metricsColumns = new HashSet<>();
     public List<FunctionDesc> aggregations = new ArrayList<>(); // storage 
level measure type, on top of which various sql aggr function may apply
+    public List<TblColRef> aggrOutCols = new ArrayList<>(); // aggregation 
output (inner) columns
     public List<SQLCall> aggrSqlCalls = new ArrayList<>(); // sql level 
aggregation function call
     public Set<TblColRef> filterColumns = new HashSet<>();
     public TupleFilter filter;
+    public TupleFilter havingFilter;
     public List<JoinDesc> joins = new LinkedList<>();
     public JoinsTree joinsTree;
     private List<TblColRef> sortColumns;
@@ -150,7 +152,12 @@ public class OLAPContext {
 
     public SQLDigest getSQLDigest() {
         if (sqlDigest == null)
-            sqlDigest = new SQLDigest(firstTableScan.getTableName(), filter, 
joins, allColumns, groupByColumns, subqueryJoinParticipants, filterColumns, 
metricsColumns, aggregations, aggrSqlCalls, sortColumns, sortOrders, 
limitPrecedesAggr);
+            sqlDigest = new SQLDigest(firstTableScan.getTableName(), 
allColumns, joins, // model
+                    groupByColumns, subqueryJoinParticipants, // group by
+                    metricsColumns, aggregations, aggrSqlCalls, // aggregation
+                    filterColumns, filter, havingFilter, // filter
+                    sortColumns, sortOrders, limitPrecedesAggr // sort & limit
+            );
         return sqlDigest;
     }
 

http://git-wip-us.apache.org/repos/asf/kylin/blob/e1bc72e3/query/src/main/java/org/apache/kylin/query/relnode/OLAPFilterRel.java
----------------------------------------------------------------------
diff --git 
a/query/src/main/java/org/apache/kylin/query/relnode/OLAPFilterRel.java 
b/query/src/main/java/org/apache/kylin/query/relnode/OLAPFilterRel.java
index 882c959..0833a92 100755
--- a/query/src/main/java/org/apache/kylin/query/relnode/OLAPFilterRel.java
+++ b/query/src/main/java/org/apache/kylin/query/relnode/OLAPFilterRel.java
@@ -355,7 +355,12 @@ public class OLAPFilterRel extends Filter implements 
OLAPRel {
         if (!context.afterAggregate) {
             translateFilter(context);
         } else {
-            context.afterHavingClauseFilter = true;//having clause is skipped
+            context.afterHavingClauseFilter = true;
+            
+            TupleFilterVisitor visitor = new 
TupleFilterVisitor(this.columnRowType);
+            TupleFilter havingFilter = this.condition.accept(visitor);
+            if (context.havingFilter == null)
+                context.havingFilter = havingFilter;
         }
     }
 
@@ -372,8 +377,10 @@ public class OLAPFilterRel extends Filter implements 
OLAPRel {
 
         TupleFilterVisitor visitor = new 
TupleFilterVisitor(this.columnRowType);
         TupleFilter filter = this.condition.accept(visitor);
+        
         // optimize the filter, the optimization has to be segment-irrelevant
         new FilterOptimizeTransformer().transform(filter);
+        
         Set<TblColRef> filterColumns = Sets.newHashSet();
         TupleFilter.collectColumns(filter, filterColumns);
         for (TblColRef tblColRef : filterColumns) {

Reply via email to