KYLIN-976 Clean up, remove all explict check on TopN & HLLC

Project: http://git-wip-us.apache.org/repos/asf/kylin/repo
Commit: http://git-wip-us.apache.org/repos/asf/kylin/commit/7f50c17a
Tree: http://git-wip-us.apache.org/repos/asf/kylin/tree/7f50c17a
Diff: http://git-wip-us.apache.org/repos/asf/kylin/diff/7f50c17a

Branch: refs/heads/2.x-staging
Commit: 7f50c17a5202be9508c21d9cc301d231c8b5843e
Parents: 636aeec
Author: Li, Yang <yang...@ebay.com>
Authored: Fri Dec 4 16:38:04 2015 +0800
Committer: Yang Li <liy...@apache.org>
Committed: Sat Dec 5 06:55:39 2015 +0800

----------------------------------------------------------------------
 .../org/apache/kylin/common/KylinConfig.java    |  16 ++-
 .../apache/kylin/common/topn/TopNCounter.java   |   2 +-
 .../kylin/common/util/CliCommandExecutor.java   |   4 +-
 .../java/org/apache/kylin/common/util/Pair.java |   3 +-
 .../kylin/common/topn/TopNCounterTest.java      |   4 +-
 .../kylin/cube/CubeCapabilityChecker.java       |   5 -
 .../org/apache/kylin/cube/CubeDescManager.java  |  15 ++-
 .../org/apache/kylin/cube/CubeInstance.java     |   3 -
 .../java/org/apache/kylin/cube/CubeManager.java |   4 +-
 .../kylin/cube/gridtable/CubeCodeSystem.java    |   2 +-
 .../gridtable/CuboidToGridTableMapping.java     |  30 +----
 .../cube/gridtable/TrimmedCubeCodeSystem.java   |   2 +-
 .../InMemCubeBuilderInputConverter.java         |   3 +-
 .../cube/inmemcubing/InMemCubeBuilderUtils.java |  30 +----
 .../org/apache/kylin/cube/model/CubeDesc.java   |  11 +-
 .../model/validation/rule/FunctionRule.java     |   4 -
 .../kylin/gridtable/GTSampleCodeSystem.java     |   2 +-
 .../cube/inmemcubing/InMemCubeBuilderTest.java  |  25 ++--
 .../apache/kylin/dict/lookup/LookupTable.java   |   2 +-
 .../apache/kylin/measure/IMeasureFactory.java   |  24 ----
 .../apache/kylin/measure/MeasureAggregator.java |   6 +-
 .../kylin/measure/MeasureAggregators.java       |   2 +-
 .../org/apache/kylin/measure/MeasureType.java   |  50 ++------
 .../kylin/measure/MeasureTypeFactory.java       |  89 ++++++++++++++
 .../measure/basic/BasicMeasureFactory.java      |  32 -----
 .../kylin/measure/basic/BasicMeasureType.java   |  74 +++++++-----
 .../measure/hllc/HLLCAggregationFactory.java    |  36 ------
 .../kylin/measure/hllc/HLLCMeasureType.java     |  76 ++++++++----
 .../kylin/measure/hllc/HLLCSerializer.java      |   3 +-
 .../kylin/measure/hllc/LDCAggregator.java       |  63 ----------
 .../kylin/measure/topn/TopNMeasureFactory.java  |  36 ------
 .../kylin/measure/topn/TopNMeasureType.java     | 116 +++++++++++++------
 .../kylin/metadata/datatype/DataType.java       |  33 ++++--
 .../metadata/datatype/DataTypeSerializer.java   |  38 +++---
 .../kylin/metadata/model/FunctionDesc.java      |  65 ++---------
 .../kylin/metadata/tuple/TeeTupleIterator.java  |   5 -
 .../org/apache/kylin/metadata/tuple/Tuple.java  |  12 +-
 .../kylin/storage/translate/HBaseKeyRange.java  |   2 +-
 .../apache/kylin/engine/mr/DFSFileTable.java    |   2 +-
 .../engine/mr/steps/BaseCuboidMapperBase.java   |   3 +-
 .../apache/kylin/engine/mr/steps/CuboidJob.java |   5 +-
 .../mr/steps/MergeCuboidFromStorageMapper.java  |   2 +-
 .../engine/mr/steps/MergeCuboidMapper.java      |   2 +-
 .../apache/kylin/engine/spark/SparkCubing.java  |  11 +-
 .../measure/FixedLenMeasureCodec.java           |   3 +-
 .../cube/v1/CubeSegmentTopNTupleIterator.java   |  86 --------------
 .../hbase/cube/v1/CubeSegmentTupleIterator.java |   2 +-
 .../storage/hbase/cube/v1/CubeStorageQuery.java |  46 ++------
 .../hbase/cube/v1/CubeTupleConverter.java       |  60 +---------
 .../cube/v1/SerializedHBaseTupleIterator.java   |  14 +--
 .../observer/ObserverAggregators.java           |  20 +++-
 .../hbase/cube/v1/filter/FuzzyRowFilterV2.java  |   6 +-
 .../storage/hbase/cube/v2/CubeHBaseRPC.java     |   4 +-
 .../storage/hbase/cube/v2/CubeStorageQuery.java |  67 +++--------
 .../hbase/cube/v2/CubeTupleConverter.java       |  65 +----------
 .../hbase/cube/v2/HBaseReadonlyStore.java       |   1 -
 .../v2/SequentialCubeTopNTupleIterator.java     |  68 -----------
 .../cube/v2/SequentialCubeTupleIterator.java    |   2 +-
 .../endpoint/EndpointAggregators.java           |   7 +-
 .../endpoint/EndpointTupleIterator.java         |   3 +-
 .../storage/hbase/steps/CreateHTableJob.java    |   7 +-
 .../storage/hbase/steps/HBaseMROutput2.java     |   2 +-
 .../hbase/steps/HBaseMROutput2Transition.java   |   2 +-
 .../storage/hbase/steps/RowValueDecoder.java    |  21 +---
 .../hbase/util/GridTableHBaseBenchmark.java     |   2 +-
 .../v1/filter/TestFuzzyRowFilterV2EndToEnd.java |   8 +-
 66 files changed, 473 insertions(+), 977 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/kylin/blob/7f50c17a/core-common/src/main/java/org/apache/kylin/common/KylinConfig.java
----------------------------------------------------------------------
diff --git a/core-common/src/main/java/org/apache/kylin/common/KylinConfig.java 
b/core-common/src/main/java/org/apache/kylin/common/KylinConfig.java
index d2a94c5..a4b2b1a 100644
--- a/core-common/src/main/java/org/apache/kylin/common/KylinConfig.java
+++ b/core-common/src/main/java/org/apache/kylin/common/KylinConfig.java
@@ -279,14 +279,20 @@ public class KylinConfig implements Serializable {
         return getOptional(KYLIN_STORAGE_URL);
     }
 
+    /** was for route to hive, not used any more */
+    @Deprecated
     public String getHiveUrl() {
         return getOptional(HIVE_URL, "");
     }
 
+    /** was for route to hive, not used any more */
+    @Deprecated
     public String getHiveUser() {
         return getOptional(HIVE_USER, "");
     }
 
+    /** was for route to hive, not used any more */
+    @Deprecated
     public String getHivePassword() {
         return getOptional(HIVE_PASSWORD, "");
     }
@@ -302,6 +308,10 @@ public class KylinConfig implements Serializable {
     public String getHBaseClusterFs() {
         return getOptional(KYLIN_HBASE_CLUSTER_FS, "");
     }
+    
+    public String[] getMeasureTypeFactories() {
+        return getOptionalStringArray("kylin.measure.type.factories", new 
String[0]);
+    }
 
     public String getKylinJobLogDir() {
         return getOptional(KYLIN_JOB_LOG_DIR, "/tmp/kylin/logs");
@@ -445,7 +455,7 @@ public class KylinConfig implements Serializable {
     }
 
     public String[] getRestServers() {
-        return getOptionalStringArray(KYLIN_REST_SERVERS);
+        return getOptionalStringArray(KYLIN_REST_SERVERS, new String[0]);
     }
 
     public String getAdminDls() {
@@ -532,12 +542,12 @@ public class KylinConfig implements Serializable {
         return property != null ? property : properties.getProperty(prop);
     }
 
-    private String[] getOptionalStringArray(String prop) {
+    private String[] getOptionalStringArray(String prop, String[] dft) {
         final String property = getOptional(prop);
         if (!StringUtils.isBlank(property)) {
             return property.split("\\s*,\\s*");
         } else {
-            return new String[] {};
+            return dft;
         }
     }
 

http://git-wip-us.apache.org/repos/asf/kylin/blob/7f50c17a/core-common/src/main/java/org/apache/kylin/common/topn/TopNCounter.java
----------------------------------------------------------------------
diff --git 
a/core-common/src/main/java/org/apache/kylin/common/topn/TopNCounter.java 
b/core-common/src/main/java/org/apache/kylin/common/topn/TopNCounter.java
index 4a95c66..065d969 100644
--- a/core-common/src/main/java/org/apache/kylin/common/topn/TopNCounter.java
+++ b/core-common/src/main/java/org/apache/kylin/common/topn/TopNCounter.java
@@ -108,7 +108,7 @@ public class TopNCounter<T> implements Iterable<Counter<T>> 
{
 
         incrementCounter(counterNode, incrementCount);
 
-        return new Pair<Boolean, T>(isNewItem, droppedItem);
+        return Pair.newPair(isNewItem, droppedItem);
     }
 
     protected void incrementCounter(ListNode2<Counter<T>> counterNode, double 
incrementCount) {

http://git-wip-us.apache.org/repos/asf/kylin/blob/7f50c17a/core-common/src/main/java/org/apache/kylin/common/util/CliCommandExecutor.java
----------------------------------------------------------------------
diff --git 
a/core-common/src/main/java/org/apache/kylin/common/util/CliCommandExecutor.java
 
b/core-common/src/main/java/org/apache/kylin/common/util/CliCommandExecutor.java
index cc5346c..aa25c22 100644
--- 
a/core-common/src/main/java/org/apache/kylin/common/util/CliCommandExecutor.java
+++ 
b/core-common/src/main/java/org/apache/kylin/common/util/CliCommandExecutor.java
@@ -104,7 +104,7 @@ public class CliCommandExecutor {
             sshOutput = ssh.execCommand(command, remoteTimeoutSeconds, 
logAppender);
             int exitCode = sshOutput.getExitCode();
             String output = sshOutput.getText();
-            return new Pair<Integer, String>(exitCode, output);
+            return Pair.newPair(exitCode, output);
         } catch (IOException e) {
             throw e;
         } catch (Exception e) {
@@ -140,7 +140,7 @@ public class CliCommandExecutor {
 
         try {
             int exitCode = proc.waitFor();
-            return new Pair<Integer, String>(exitCode, result.toString());
+            return Pair.newPair(exitCode, result.toString());
         } catch (InterruptedException e) {
             throw new IOException(e);
         }

http://git-wip-us.apache.org/repos/asf/kylin/blob/7f50c17a/core-common/src/main/java/org/apache/kylin/common/util/Pair.java
----------------------------------------------------------------------
diff --git a/core-common/src/main/java/org/apache/kylin/common/util/Pair.java 
b/core-common/src/main/java/org/apache/kylin/common/util/Pair.java
index 9efdf3b..d28b05f 100644
--- a/core-common/src/main/java/org/apache/kylin/common/util/Pair.java
+++ b/core-common/src/main/java/org/apache/kylin/common/util/Pair.java
@@ -28,6 +28,7 @@ import java.io.Serializable;
  */
 public class Pair<T1, T2> implements Serializable {
     private static final long serialVersionUID = -3986244606585552569L;
+    
     protected T1 first = null;
     protected T2 second = null;
 
@@ -96,7 +97,7 @@ public class Pair<T1, T2> implements Serializable {
     }
 
     @Override
-    @SuppressWarnings("unchecked")
+    @SuppressWarnings("rawtypes")
     public boolean equals(Object other) {
         return other instanceof Pair && equals(first, ((Pair) other).first) && 
equals(second, ((Pair) other).second);
     }

http://git-wip-us.apache.org/repos/asf/kylin/blob/7f50c17a/core-common/src/test/java/org/apache/kylin/common/topn/TopNCounterTest.java
----------------------------------------------------------------------
diff --git 
a/core-common/src/test/java/org/apache/kylin/common/topn/TopNCounterTest.java 
b/core-common/src/test/java/org/apache/kylin/common/topn/TopNCounterTest.java
index a6aa610..821a0fc 100644
--- 
a/core-common/src/test/java/org/apache/kylin/common/topn/TopNCounterTest.java
+++ 
b/core-common/src/test/java/org/apache/kylin/common/topn/TopNCounterTest.java
@@ -248,7 +248,7 @@ public class TopNCounterTest {
             List<Pair<String, Double>> allRecords = Lists.newArrayList();
 
             for (Counter<String> counter : tops)
-                allRecords.add(new Pair(counter.getItem(), 
counter.getCount()));
+                allRecords.add(Pair.newPair(counter.getItem(), 
counter.getCount()));
             timeSpent += (System.currentTimeMillis() - startTime);
             return allRecords;
         }
@@ -284,7 +284,7 @@ public class TopNCounterTest {
             List<Pair<String, Double>> allRecords = Lists.newArrayList();
 
             for (Map.Entry<String, Double> entry : hashMap.entrySet()) {
-                allRecords.add(new Pair(entry.getKey(), entry.getValue()));
+                allRecords.add(Pair.newPair(entry.getKey(), entry.getValue()));
             }
 
             Collections.sort(allRecords, new Comparator<Pair<String, 
Double>>() {

http://git-wip-us.apache.org/repos/asf/kylin/blob/7f50c17a/core-cube/src/main/java/org/apache/kylin/cube/CubeCapabilityChecker.java
----------------------------------------------------------------------
diff --git 
a/core-cube/src/main/java/org/apache/kylin/cube/CubeCapabilityChecker.java 
b/core-cube/src/main/java/org/apache/kylin/cube/CubeCapabilityChecker.java
index 624bb0b..28252a3 100644
--- a/core-cube/src/main/java/org/apache/kylin/cube/CubeCapabilityChecker.java
+++ b/core-cube/src/main/java/org/apache/kylin/cube/CubeCapabilityChecker.java
@@ -169,11 +169,6 @@ public class CubeCapabilityChecker {
                 continue;
             }
 
-            // calcite can not handle distinct count
-            if (functionDesc.isCountDistinct()) {
-                continue;
-            }
-
             // calcite can do aggregation from columns on-the-fly
             List<TblColRef> neededCols = 
functionDesc.getParameter().getColRefs();
             if (neededCols.size() > 0 && 
cubeDesc.listDimensionColumnsIncludingDerived().containsAll(neededCols)) {

http://git-wip-us.apache.org/repos/asf/kylin/blob/7f50c17a/core-cube/src/main/java/org/apache/kylin/cube/CubeDescManager.java
----------------------------------------------------------------------
diff --git a/core-cube/src/main/java/org/apache/kylin/cube/CubeDescManager.java 
b/core-cube/src/main/java/org/apache/kylin/cube/CubeDescManager.java
index 53cd00f..93bde26 100644
--- a/core-cube/src/main/java/org/apache/kylin/cube/CubeDescManager.java
+++ b/core-cube/src/main/java/org/apache/kylin/cube/CubeDescManager.java
@@ -34,6 +34,7 @@ import org.apache.kylin.cube.cuboid.Cuboid;
 import org.apache.kylin.cube.model.CubeDesc;
 import org.apache.kylin.cube.model.validation.CubeMetadataValidator;
 import org.apache.kylin.cube.model.validation.ValidateContext;
+import org.apache.kylin.measure.MeasureTypeFactory;
 import org.apache.kylin.metadata.MetadataConstants;
 import org.apache.kylin.metadata.MetadataManager;
 import org.slf4j.Logger;
@@ -53,17 +54,13 @@ public class CubeDescManager {
     // static cached instances
     private static final ConcurrentHashMap<KylinConfig, CubeDescManager> CACHE 
= new ConcurrentHashMap<KylinConfig, CubeDescManager>();
 
-    // 
============================================================================
-
-    private KylinConfig config;
-    // name ==> CubeDesc
-    private CaseInsensitiveStringCache<CubeDesc> cubeDescMap;
-
     public static CubeDescManager getInstance(KylinConfig config) {
         CubeDescManager r = CACHE.get(config);
         if (r != null) {
             return r;
         }
+        
+        MeasureTypeFactory.init(config);
 
         synchronized (CubeDescManager.class) {
             r = CACHE.get(config);
@@ -87,6 +84,12 @@ public class CubeDescManager {
         CACHE.clear();
     }
 
+    // 
============================================================================
+
+    private KylinConfig config;
+    // name ==> CubeDesc
+    private CaseInsensitiveStringCache<CubeDesc> cubeDescMap;
+
     private CubeDescManager(KylinConfig config) throws IOException {
         logger.info("Initializing CubeDescManager with config " + config);
         this.config = config;

http://git-wip-us.apache.org/repos/asf/kylin/blob/7f50c17a/core-cube/src/main/java/org/apache/kylin/cube/CubeInstance.java
----------------------------------------------------------------------
diff --git a/core-cube/src/main/java/org/apache/kylin/cube/CubeInstance.java 
b/core-cube/src/main/java/org/apache/kylin/cube/CubeInstance.java
index 81c7909..dccc3f1 100644
--- a/core-cube/src/main/java/org/apache/kylin/cube/CubeInstance.java
+++ b/core-cube/src/main/java/org/apache/kylin/cube/CubeInstance.java
@@ -421,9 +421,6 @@ public class CubeInstance extends RootPersistentEntity 
implements IRealization,
         if 
(!this.getDescriptor().getModel().getPartitionDesc().isPartitioned())
             return false;
 
-        if (this.getDescriptor().hasHolisticCountDistinctMeasures())
-            return false;
-
         return this.getDescriptor().getAutoMergeTimeRanges() != null && 
this.getDescriptor().getAutoMergeTimeRanges().length > 0;
     }
 

http://git-wip-us.apache.org/repos/asf/kylin/blob/7f50c17a/core-cube/src/main/java/org/apache/kylin/cube/CubeManager.java
----------------------------------------------------------------------
diff --git a/core-cube/src/main/java/org/apache/kylin/cube/CubeManager.java 
b/core-cube/src/main/java/org/apache/kylin/cube/CubeManager.java
index 24286cc..885dafa 100644
--- a/core-cube/src/main/java/org/apache/kylin/cube/CubeManager.java
+++ b/core-cube/src/main/java/org/apache/kylin/cube/CubeManager.java
@@ -399,7 +399,7 @@ public class CubeManager implements IRealizationProvider {
         CubeUpdate cubeBuilder = new 
CubeUpdate(cube).setToAddSegs(appendSegment, mergeSegment);
         updateCube(cubeBuilder);
 
-        return new Pair<CubeSegment, CubeSegment>(appendSegment, mergeSegment);
+        return Pair.newPair(appendSegment, mergeSegment);
     }
 
     public CubeSegment appendSegments(CubeInstance cube, long endDate) throws 
IOException {
@@ -491,7 +491,7 @@ public class CubeManager implements IRealizationProvider {
                 }
             }
         }
-        return new Pair<Long, Long>(start, end);
+        return Pair.newPair(start, end);
     }
 
     private boolean hasOverlap(long startDate, long endDate, long 
anotherStartDate, long anotherEndDate) {

http://git-wip-us.apache.org/repos/asf/kylin/blob/7f50c17a/core-cube/src/main/java/org/apache/kylin/cube/gridtable/CubeCodeSystem.java
----------------------------------------------------------------------
diff --git 
a/core-cube/src/main/java/org/apache/kylin/cube/gridtable/CubeCodeSystem.java 
b/core-cube/src/main/java/org/apache/kylin/cube/gridtable/CubeCodeSystem.java
index 7f38c26..d766776 100644
--- 
a/core-cube/src/main/java/org/apache/kylin/cube/gridtable/CubeCodeSystem.java
+++ 
b/core-cube/src/main/java/org/apache/kylin/cube/gridtable/CubeCodeSystem.java
@@ -125,7 +125,7 @@ public class CubeCodeSystem implements IGTCodeSystem {
         MeasureAggregator<?>[] result = new 
MeasureAggregator[aggrFunctions.length];
         for (int i = 0; i < result.length; i++) {
             int col = columns.trueBitAt(i);
-            result[i] = MeasureAggregator.create(aggrFunctions[i], 
info.getColumnType(col).toString());
+            result[i] = MeasureAggregator.create(aggrFunctions[i], 
info.getColumnType(col));
         }
 
         // deal with holistic distinct count

http://git-wip-us.apache.org/repos/asf/kylin/blob/7f50c17a/core-cube/src/main/java/org/apache/kylin/cube/gridtable/CuboidToGridTableMapping.java
----------------------------------------------------------------------
diff --git 
a/core-cube/src/main/java/org/apache/kylin/cube/gridtable/CuboidToGridTableMapping.java
 
b/core-cube/src/main/java/org/apache/kylin/cube/gridtable/CuboidToGridTableMapping.java
index 361d96a..d0fca02 100644
--- 
a/core-cube/src/main/java/org/apache/kylin/cube/gridtable/CuboidToGridTableMapping.java
+++ 
b/core-cube/src/main/java/org/apache/kylin/cube/gridtable/CuboidToGridTableMapping.java
@@ -15,8 +15,6 @@ import org.apache.kylin.metadata.model.FunctionDesc;
 import org.apache.kylin.metadata.model.MeasureDesc;
 import org.apache.kylin.metadata.model.TblColRef;
 
-import com.google.common.collect.LinkedListMultimap;
-import com.google.common.collect.ListMultimap;
 import com.google.common.collect.Lists;
 import com.google.common.collect.Maps;
 
@@ -32,7 +30,7 @@ public class CuboidToGridTableMapping {
     private ImmutableBitSet gtPrimaryKey;
 
     private int nMetrics;
-    private ListMultimap<FunctionDesc, Integer> metrics2gt; // because count 
distinct may have a holistic version
+    private Map<FunctionDesc, Integer> metrics2gt; // because count distinct 
may have a holistic version
 
     public CuboidToGridTableMapping(Cuboid cuboid) {
         this.cuboid = cuboid;
@@ -68,19 +66,12 @@ public class CuboidToGridTableMapping {
         }
         
         // metrics
-        metrics2gt = LinkedListMultimap.create();
+        metrics2gt = Maps.newHashMap();
         for (MeasureDesc measure :cuboid.getCubeDesc().getMeasures()) {
             // Count distinct & holistic count distinct are equals() but 
different.
             // Ensure the holistic version if exists is always the first.
             FunctionDesc func = measure.getFunction();
-            if (func.isHolisticCountDistinct()) {
-                List<Integer> existing = metrics2gt.removeAll(func);
-                metrics2gt.put(func, gtColIdx);
-                metrics2gt.putAll(func, existing);
-            } else {
-                metrics2gt.put(func, gtColIdx);
-            }
-            
+            metrics2gt.put(func, gtColIdx);
             gtDataTypes.add(func.getReturnDataType());
             
             // map to column block
@@ -135,19 +126,8 @@ public class CuboidToGridTableMapping {
     }
 
     public int getIndexOf(FunctionDesc metric) {
-        List<Integer> list = metrics2gt.get(metric);
-        // normal case
-        if (list.size() == 1) {
-            return list.get(0);
-        }
-        // count distinct & its holistic version
-        else if (list.size() == 2) {
-            assert metric.isCountDistinct();
-            return metric.isHolisticCountDistinct() ? list.get(0) : 
list.get(1);
-        }
-        // unexpected
-        else
-            return -1;
+        Integer r = metrics2gt.get(metric);
+        return r == null ? -1 : r.intValue();
     }
 
     public List<TblColRef> getCuboidDimensionsInGTOrder() {

http://git-wip-us.apache.org/repos/asf/kylin/blob/7f50c17a/core-cube/src/main/java/org/apache/kylin/cube/gridtable/TrimmedCubeCodeSystem.java
----------------------------------------------------------------------
diff --git 
a/core-cube/src/main/java/org/apache/kylin/cube/gridtable/TrimmedCubeCodeSystem.java
 
b/core-cube/src/main/java/org/apache/kylin/cube/gridtable/TrimmedCubeCodeSystem.java
index c4d0a7e..19ebaef 100644
--- 
a/core-cube/src/main/java/org/apache/kylin/cube/gridtable/TrimmedCubeCodeSystem.java
+++ 
b/core-cube/src/main/java/org/apache/kylin/cube/gridtable/TrimmedCubeCodeSystem.java
@@ -94,7 +94,7 @@ public class TrimmedCubeCodeSystem implements IGTCodeSystem {
         MeasureAggregator<?>[] result = new 
MeasureAggregator[aggrFunctions.length];
         for (int i = 0; i < result.length; i++) {
             int col = columns.trueBitAt(i);
-            result[i] = MeasureAggregator.create(aggrFunctions[i], 
info.getColumnType(col).toString());
+            result[i] = MeasureAggregator.create(aggrFunctions[i], 
info.getColumnType(col));
         }
 
         // deal with holistic distinct count

http://git-wip-us.apache.org/repos/asf/kylin/blob/7f50c17a/core-cube/src/main/java/org/apache/kylin/cube/inmemcubing/InMemCubeBuilderInputConverter.java
----------------------------------------------------------------------
diff --git 
a/core-cube/src/main/java/org/apache/kylin/cube/inmemcubing/InMemCubeBuilderInputConverter.java
 
b/core-cube/src/main/java/org/apache/kylin/cube/inmemcubing/InMemCubeBuilderInputConverter.java
index fed9479..3e9a67d 100644
--- 
a/core-cube/src/main/java/org/apache/kylin/cube/inmemcubing/InMemCubeBuilderInputConverter.java
+++ 
b/core-cube/src/main/java/org/apache/kylin/cube/inmemcubing/InMemCubeBuilderInputConverter.java
@@ -99,8 +99,7 @@ public class InMemCubeBuilderInputConverter {
         int paramColIdx = 0; // index among parameters of column type
         for (int i = 0; i < paramCount; i++, param = param.getNextParameter()) 
{
             String value;
-            if (function.isCount() || function.isHolisticCountDistinct()) {
-                // note for holistic count distinct, this value will be ignored
+            if (function.isCount()) {
                 value = "1";
             } else if (param.isColumnType()) {
                 value = row.get(colIdxOnFlatTable[paramColIdx++]);

http://git-wip-us.apache.org/repos/asf/kylin/blob/7f50c17a/core-cube/src/main/java/org/apache/kylin/cube/inmemcubing/InMemCubeBuilderUtils.java
----------------------------------------------------------------------
diff --git 
a/core-cube/src/main/java/org/apache/kylin/cube/inmemcubing/InMemCubeBuilderUtils.java
 
b/core-cube/src/main/java/org/apache/kylin/cube/inmemcubing/InMemCubeBuilderUtils.java
index f8a887d..8fa99cd 100644
--- 
a/core-cube/src/main/java/org/apache/kylin/cube/inmemcubing/InMemCubeBuilderUtils.java
+++ 
b/core-cube/src/main/java/org/apache/kylin/cube/inmemcubing/InMemCubeBuilderUtils.java
@@ -17,43 +17,15 @@
 */
 package org.apache.kylin.cube.inmemcubing;
 
-import com.google.common.collect.Maps;
+import java.util.BitSet;
 
-import org.apache.kylin.common.util.Dictionary;
 import org.apache.kylin.common.util.ImmutableBitSet;
 import org.apache.kylin.common.util.Pair;
-import org.apache.kylin.cube.model.CubeDesc;
-import org.apache.kylin.cube.model.CubeJoinedFlatTableDesc;
-import org.apache.kylin.metadata.model.FunctionDesc;
-import org.apache.kylin.metadata.model.MeasureDesc;
-import org.apache.kylin.metadata.model.TblColRef;
-
-import java.util.BitSet;
-import java.util.HashMap;
-import java.util.Map;
 
 /**
  */
 public final class InMemCubeBuilderUtils {
     
-    public static final HashMap<Integer, Dictionary<String>> 
createTopNLiteralColDictionaryMap(CubeDesc cubeDesc, CubeJoinedFlatTableDesc 
intermediateTableDesc, Map<TblColRef, Dictionary<?>> dictionaryMap) {
-        HashMap<Integer, Dictionary<String>> result = Maps.newHashMap();
-        for (int measureIdx = 0; measureIdx < cubeDesc.getMeasures().size(); 
measureIdx++) {
-            MeasureDesc measureDesc = cubeDesc.getMeasures().get(measureIdx);
-            FunctionDesc func = measureDesc.getFunction();
-            if (func.isTopN()) {
-                int[] flatTableIdx = 
intermediateTableDesc.getMeasureColumnIndexes()[measureIdx];
-                int literalColIdx = flatTableIdx[flatTableIdx.length - 1];
-                TblColRef literalCol = func.getTopNLiteralColumn();
-                @SuppressWarnings("unchecked")
-                Dictionary<String> dictionary = (Dictionary<String>) 
dictionaryMap.get(literalCol);
-                //Preconditions.checkNotNull(dictionary);//FIXME disable check 
since dictionary is null when building empty segment
-                result.put(literalColIdx, dictionary);
-            }
-        }
-        return result;
-    }
-
     public static final Pair<ImmutableBitSet, ImmutableBitSet> 
getDimensionAndMetricColumnBitSet(final long cuboidId, final int measureCount) {
         int cardinality = Long.bitCount(cuboidId);
         BitSet dimension = new BitSet();

http://git-wip-us.apache.org/repos/asf/kylin/blob/7f50c17a/core-cube/src/main/java/org/apache/kylin/cube/model/CubeDesc.java
----------------------------------------------------------------------
diff --git a/core-cube/src/main/java/org/apache/kylin/cube/model/CubeDesc.java 
b/core-cube/src/main/java/org/apache/kylin/cube/model/CubeDesc.java
index 8c7d891..7e79f64 100644
--- a/core-cube/src/main/java/org/apache/kylin/cube/model/CubeDesc.java
+++ b/core-cube/src/main/java/org/apache/kylin/cube/model/CubeDesc.java
@@ -642,11 +642,6 @@ public class CubeDesc extends RootPersistentEntity {
             FunctionDesc func = m.getFunction();
             func.init(factTable);
             allColumns.addAll(func.getParameter().getColRefs());
-
-            // verify holistic count distinct as a dependent measure
-            if (func.isHolisticCountDistinct() && 
StringUtils.isBlank(m.getDependentMeasureRef())) {
-                throw new IllegalStateException(m + " is a holistic count 
distinct but it has no DependentMeasureRef defined!");
-            }
         }
     }
 
@@ -677,9 +672,9 @@ public class CubeDesc extends RootPersistentEntity {
         }
     }
 
-    public boolean hasHolisticCountDistinctMeasures() {
+    public boolean hasMemoryHungryMeasures() {
         for (MeasureDesc measure : measures) {
-            if (measure.getFunction().isHolisticCountDistinct()) {
+            if (measure.getFunction().getMeasureType().isMemoryHungry()) {
                 return true;
             }
         }
@@ -766,7 +761,7 @@ public class CubeDesc extends RootPersistentEntity {
         }
 
         for (MeasureDesc measure : measures) {
-            MeasureType aggrType = measure.getFunction().getMeasureType();
+            MeasureType<?> aggrType = measure.getFunction().getMeasureType();
             
result.addAll(aggrType.getColumnsNeedDictionary(measure.getFunction()));
         }
         return result;

http://git-wip-us.apache.org/repos/asf/kylin/blob/7f50c17a/core-cube/src/main/java/org/apache/kylin/cube/model/validation/rule/FunctionRule.java
----------------------------------------------------------------------
diff --git 
a/core-cube/src/main/java/org/apache/kylin/cube/model/validation/rule/FunctionRule.java
 
b/core-cube/src/main/java/org/apache/kylin/cube/model/validation/rule/FunctionRule.java
index c918798..530ce07 100644
--- 
a/core-cube/src/main/java/org/apache/kylin/cube/model/validation/rule/FunctionRule.java
+++ 
b/core-cube/src/main/java/org/apache/kylin/cube/model/validation/rule/FunctionRule.java
@@ -114,10 +114,6 @@ public class FunctionRule implements 
IValidatorRule<CubeDesc> {
             if (rtype.isIntegerFamily() == false) {
                 context.addResult(ResultLevel.ERROR, "Return type for function 
" + func + " must be one of " + DataType.INTEGER_FAMILY);
             }
-        } else if (funcDesc.isCountDistinct()) {
-            if (rtype.isHLLC() == false && funcDesc.isHolisticCountDistinct() 
== false) {
-                context.addResult(ResultLevel.ERROR, "Return type for function 
" + func + " must be hllc(10), hllc(12) etc.");
-            }
         } else if (funcDesc.isMax() || funcDesc.isMin() || funcDesc.isSum()) {
             if (rtype.isNumberFamily() == false) {
                 context.addResult(ResultLevel.ERROR, "Return type for function 
" + func + " must be one of " + DataType.NUMBER_FAMILY);

http://git-wip-us.apache.org/repos/asf/kylin/blob/7f50c17a/core-cube/src/main/java/org/apache/kylin/gridtable/GTSampleCodeSystem.java
----------------------------------------------------------------------
diff --git 
a/core-cube/src/main/java/org/apache/kylin/gridtable/GTSampleCodeSystem.java 
b/core-cube/src/main/java/org/apache/kylin/gridtable/GTSampleCodeSystem.java
index b3133be..77eb430 100644
--- a/core-cube/src/main/java/org/apache/kylin/gridtable/GTSampleCodeSystem.java
+++ b/core-cube/src/main/java/org/apache/kylin/gridtable/GTSampleCodeSystem.java
@@ -60,7 +60,7 @@ public class GTSampleCodeSystem implements IGTCodeSystem {
         MeasureAggregator<?>[] result = new 
MeasureAggregator[aggrFunctions.length];
         for (int i = 0; i < result.length; i++) {
             int col = columns.trueBitAt(i);
-            result[i] = MeasureAggregator.create(aggrFunctions[i], 
info.getColumnType(col).toString());
+            result[i] = MeasureAggregator.create(aggrFunctions[i], 
info.getColumnType(col));
         }
         return result;
     }

http://git-wip-us.apache.org/repos/asf/kylin/blob/7f50c17a/core-cube/src/test/java/org/apache/kylin/cube/inmemcubing/InMemCubeBuilderTest.java
----------------------------------------------------------------------
diff --git 
a/core-cube/src/test/java/org/apache/kylin/cube/inmemcubing/InMemCubeBuilderTest.java
 
b/core-cube/src/test/java/org/apache/kylin/cube/inmemcubing/InMemCubeBuilderTest.java
index ab87c2b..b593e48 100644
--- 
a/core-cube/src/test/java/org/apache/kylin/cube/inmemcubing/InMemCubeBuilderTest.java
+++ 
b/core-cube/src/test/java/org/apache/kylin/cube/inmemcubing/InMemCubeBuilderTest.java
@@ -186,15 +186,22 @@ public class InMemCubeBuilderTest extends 
LocalFileMetadataTestCase {
         for (int measureIdx = 0; measureIdx < 
cube.getDescriptor().getMeasures().size(); measureIdx++) {
             MeasureDesc measureDesc = 
cube.getDescriptor().getMeasures().get(measureIdx);
             FunctionDesc func = measureDesc.getFunction();
-            if (func.isTopN()) {
-                int[] flatTableIdx = 
flatTableDesc.getMeasureColumnIndexes()[measureIdx];
-                int literalColIdx = flatTableIdx[flatTableIdx.length - 1];
-                TblColRef literalCol = func.getTopNLiteralColumn();
-                logger.info("Building dictionary for " + literalCol);
-                List<byte[]> valueList = readValueList(flatTable, nColumns, 
literalColIdx);
-                Dictionary<String> dict = 
DictionaryGenerator.buildDictionaryFromValueEnumerator(literalCol.getType(), 
new IterableDictionaryValueEnumerator(valueList));
-
-                result.put(literalCol, dict);
+            List<TblColRef> dictCols = 
func.getMeasureType().getColumnsNeedDictionary(func);
+            if (dictCols.isEmpty())
+                continue;
+
+            int[] flatTableIdx = 
flatTableDesc.getMeasureColumnIndexes()[measureIdx];
+            List<TblColRef> paramCols = func.getParameter().getColRefs();
+            for (int i = 0; i < paramCols.size(); i++) {
+                TblColRef col = paramCols.get(i);
+                if (dictCols.contains(col)) {
+                    int colIdxOnFlat = flatTableIdx[i];
+                    logger.info("Building dictionary for " + col);
+                    List<byte[]> valueList = readValueList(flatTable, 
nColumns, colIdxOnFlat);
+                    Dictionary<String> dict = 
DictionaryGenerator.buildDictionaryFromValueEnumerator(col.getType(), new 
IterableDictionaryValueEnumerator(valueList));
+
+                    result.put(col, dict);
+                }
             }
         }
 

http://git-wip-us.apache.org/repos/asf/kylin/blob/7f50c17a/core-dictionary/src/main/java/org/apache/kylin/dict/lookup/LookupTable.java
----------------------------------------------------------------------
diff --git 
a/core-dictionary/src/main/java/org/apache/kylin/dict/lookup/LookupTable.java 
b/core-dictionary/src/main/java/org/apache/kylin/dict/lookup/LookupTable.java
index eb2b963..9abd28c 100644
--- 
a/core-dictionary/src/main/java/org/apache/kylin/dict/lookup/LookupTable.java
+++ 
b/core-dictionary/src/main/java/org/apache/kylin/dict/lookup/LookupTable.java
@@ -125,7 +125,7 @@ abstract public class LookupTable<T extends Comparable<T>> {
         if (returnBegin == null && returnEnd == null)
             return null;
         else
-            return new Pair<T, T>(returnBegin, returnEnd);
+            return Pair.newPair(returnBegin, returnEnd);
     }
 
     public Set<T> mapValues(String col, Set<T> values, String returnCol) {

http://git-wip-us.apache.org/repos/asf/kylin/blob/7f50c17a/core-metadata/src/main/java/org/apache/kylin/measure/IMeasureFactory.java
----------------------------------------------------------------------
diff --git 
a/core-metadata/src/main/java/org/apache/kylin/measure/IMeasureFactory.java 
b/core-metadata/src/main/java/org/apache/kylin/measure/IMeasureFactory.java
deleted file mode 100644
index 0ab547a..0000000
--- a/core-metadata/src/main/java/org/apache/kylin/measure/IMeasureFactory.java
+++ /dev/null
@@ -1,24 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- * 
- *     http://www.apache.org/licenses/LICENSE-2.0
- * 
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
-*/
-
-package org.apache.kylin.measure;
-
-public interface IMeasureFactory {
-
-    public MeasureType createMeasureType(String funcName, String dataType);
-}

http://git-wip-us.apache.org/repos/asf/kylin/blob/7f50c17a/core-metadata/src/main/java/org/apache/kylin/measure/MeasureAggregator.java
----------------------------------------------------------------------
diff --git 
a/core-metadata/src/main/java/org/apache/kylin/measure/MeasureAggregator.java 
b/core-metadata/src/main/java/org/apache/kylin/measure/MeasureAggregator.java
index 32e5128..4c031c7 100644
--- 
a/core-metadata/src/main/java/org/apache/kylin/measure/MeasureAggregator.java
+++ 
b/core-metadata/src/main/java/org/apache/kylin/measure/MeasureAggregator.java
@@ -20,13 +20,15 @@ package org.apache.kylin.measure;
 
 import java.io.Serializable;
 
+import org.apache.kylin.metadata.datatype.DataType;
+
 /**
  */
 @SuppressWarnings("serial")
 abstract public class MeasureAggregator<V> implements Serializable {
 
-    public static MeasureAggregator<?> create(String funcName, String 
dataType) {
-        return MeasureType.create(funcName, dataType).newAggregator();
+    public static MeasureAggregator<?> create(String funcName, DataType 
dataType) {
+        return MeasureTypeFactory.create(funcName, dataType).newAggregator();
     }
 
     public static int guessBigDecimalMemBytes() {

http://git-wip-us.apache.org/repos/asf/kylin/blob/7f50c17a/core-metadata/src/main/java/org/apache/kylin/measure/MeasureAggregators.java
----------------------------------------------------------------------
diff --git 
a/core-metadata/src/main/java/org/apache/kylin/measure/MeasureAggregators.java 
b/core-metadata/src/main/java/org/apache/kylin/measure/MeasureAggregators.java
index c6b456e..b3edbc3 100644
--- 
a/core-metadata/src/main/java/org/apache/kylin/measure/MeasureAggregators.java
+++ 
b/core-metadata/src/main/java/org/apache/kylin/measure/MeasureAggregators.java
@@ -45,7 +45,7 @@ public class MeasureAggregators implements Serializable {
         Map<String, Integer> measureIndexMap = new HashMap<String, Integer>();
         for (int i = 0; i < descLength; i++) {
             FunctionDesc func = measureDescs[i].getFunction();
-            aggs[i] = MeasureAggregator.create(func.getExpression(), 
func.getReturnType());
+            aggs[i] = func.getMeasureType().newAggregator();
             measureIndexMap.put(measureDescs[i].getName(), i);
         }
         // fill back dependent aggregator

http://git-wip-us.apache.org/repos/asf/kylin/blob/7f50c17a/core-metadata/src/main/java/org/apache/kylin/measure/MeasureType.java
----------------------------------------------------------------------
diff --git 
a/core-metadata/src/main/java/org/apache/kylin/measure/MeasureType.java 
b/core-metadata/src/main/java/org/apache/kylin/measure/MeasureType.java
index d4b2700..cf04c60 100644
--- a/core-metadata/src/main/java/org/apache/kylin/measure/MeasureType.java
+++ b/core-metadata/src/main/java/org/apache/kylin/measure/MeasureType.java
@@ -24,11 +24,6 @@ import java.util.List;
 import java.util.Map;
 
 import org.apache.kylin.common.util.Dictionary;
-import org.apache.kylin.measure.basic.BasicMeasureFactory;
-import org.apache.kylin.measure.hllc.HLLCAggregationFactory;
-import org.apache.kylin.measure.topn.TopNMeasureFactory;
-import org.apache.kylin.metadata.datatype.DataType;
-import org.apache.kylin.metadata.datatype.DataTypeSerializer;
 import org.apache.kylin.metadata.model.FunctionDesc;
 import org.apache.kylin.metadata.model.MeasureDesc;
 import org.apache.kylin.metadata.model.TblColRef;
@@ -37,54 +32,27 @@ import org.apache.kylin.metadata.realization.SQLDigest;
 import org.apache.kylin.metadata.tuple.Tuple;
 import org.apache.kylin.metadata.tuple.TupleInfo;
 
-import com.google.common.collect.Maps;
-
-abstract public class MeasureType {
-    
-    private static final Map<String, IMeasureFactory> factoryRegistry = 
Maps.newConcurrentMap();
-    private static final IMeasureFactory defaultFactory = new 
BasicMeasureFactory();
-    
-    static {
-        factoryRegistry.put(FunctionDesc.FUNC_COUNT_DISTINCT, new 
HLLCAggregationFactory());
-        factoryRegistry.put(FunctionDesc.FUNC_TOP_N, new TopNMeasureFactory());
-    }
-    
-    public static MeasureType create(String funcName, String dataType) {
-        funcName = funcName.toUpperCase();
-        dataType = dataType.toLowerCase();
-        
-        IMeasureFactory factory = factoryRegistry.get(funcName);
-        if (factory == null)
-            factory = defaultFactory;
-        
-        MeasureType result = factory.createMeasureType(funcName, dataType);
-        
-        // register serializer for aggr data type
-        DataType aggregationDataType = result.getAggregationDataType();
-        if (DataTypeSerializer.hasRegistered(aggregationDataType.getName()) == 
false) {
-            DataTypeSerializer.register(aggregationDataType.getName(), 
result.getAggregationDataSeralizer());
-        }
-        
-        return result;
-    }
+abstract public class MeasureType<T> {
     
     /* 
============================================================================
      * Define
      * 
---------------------------------------------------------------------------- */
     
-    abstract public DataType getAggregationDataType();
-    
-    abstract public Class<? extends DataTypeSerializer<?>> 
getAggregationDataSeralizer();
+    public void validate(FunctionDesc functionDesc) throws 
IllegalArgumentException {
+        return;
+    }
     
-    abstract public void validate(MeasureDesc measureDesc) throws 
IllegalArgumentException;
+    public boolean isMemoryHungry() {
+        return false;
+    }
     
     /* 
============================================================================
      * Build
      * 
---------------------------------------------------------------------------- */
     
-    abstract public MeasureIngester<?> newIngester();
+    abstract public MeasureIngester<T> newIngester();
     
-    abstract public MeasureAggregator<?> newAggregator();
+    abstract public MeasureAggregator<T> newAggregator();
  
     public List<TblColRef> getColumnsNeedDictionary(FunctionDesc functionDesc) 
{
         return Collections.emptyList();

http://git-wip-us.apache.org/repos/asf/kylin/blob/7f50c17a/core-metadata/src/main/java/org/apache/kylin/measure/MeasureTypeFactory.java
----------------------------------------------------------------------
diff --git 
a/core-metadata/src/main/java/org/apache/kylin/measure/MeasureTypeFactory.java 
b/core-metadata/src/main/java/org/apache/kylin/measure/MeasureTypeFactory.java
new file mode 100644
index 0000000..48eddb2
--- /dev/null
+++ 
b/core-metadata/src/main/java/org/apache/kylin/measure/MeasureTypeFactory.java
@@ -0,0 +1,89 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ * 
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ * 
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+*/
+
+package org.apache.kylin.measure;
+
+import java.util.List;
+import java.util.Map;
+
+import org.apache.kylin.common.KylinConfig;
+import org.apache.kylin.common.util.ClassUtil;
+import org.apache.kylin.common.util.Pair;
+import org.apache.kylin.measure.basic.BasicMeasureType;
+import org.apache.kylin.measure.hllc.HLLCMeasureType;
+import org.apache.kylin.measure.topn.TopNMeasureType;
+import org.apache.kylin.metadata.datatype.DataType;
+import org.apache.kylin.metadata.datatype.DataTypeSerializer;
+
+import com.google.common.collect.Lists;
+import com.google.common.collect.Maps;
+
+abstract public class MeasureTypeFactory<T> {
+
+    abstract public MeasureType<T> createMeasureType(String funcName, DataType 
dataType);
+    
+    abstract public String getAggrFunctionName();
+    abstract public String getAggrDataTypeName();
+    abstract public Class<? extends DataTypeSerializer<T>> 
getAggrDataTypeSerializer();
+    
+    // 
============================================================================
+    
+    private static Map<Pair<String, String>, MeasureTypeFactory<?>> factories 
= Maps.newHashMap();
+    private static MeasureTypeFactory<?> defaultFactory = new 
BasicMeasureType.Factory();
+    
+    public static synchronized void init(KylinConfig config) {
+        if (factories.isEmpty() == false)
+            return;
+        
+        List<MeasureTypeFactory<?>> factoryInsts = Lists.newArrayList();
+        
+        // two built-in advanced measure types
+        factoryInsts.add(new HLLCMeasureType.Factory());
+        factoryInsts.add(new TopNMeasureType.Factory());
+        
+        // more custom measure types
+        for (String factoryClz : config.getMeasureTypeFactories()) {
+            factoryInsts.add((MeasureTypeFactory<?>) 
ClassUtil.newInstance(factoryClz));
+        }
+        
+        // register factories & data type serializers
+        for (MeasureTypeFactory<?> factory : factoryInsts) {
+            String funcName = factory.getAggrFunctionName().toUpperCase();
+            String dataTypeName = factory.getAggrDataTypeName().toLowerCase();
+            Class<? extends DataTypeSerializer<?>> serializer = 
factory.getAggrDataTypeSerializer();
+            
+            DataType.register(dataTypeName);
+            DataTypeSerializer.register(dataTypeName, serializer);
+            factories.put(Pair.newPair(funcName, dataTypeName), factory);
+        }
+    }
+    
+    public static MeasureType<?> create(String funcName, String dataType) {
+        return create(funcName, DataType.getType(dataType));
+    }
+    
+    public static MeasureType<?> create(String funcName, DataType dataType) {
+        funcName = funcName.toUpperCase();
+        
+        MeasureTypeFactory<?> factory = factories.get(Pair.newPair(funcName, 
dataType.getName()));
+        if (factory == null)
+            factory = defaultFactory;
+        
+        return factory.createMeasureType(funcName, dataType);
+    }
+}

http://git-wip-us.apache.org/repos/asf/kylin/blob/7f50c17a/core-metadata/src/main/java/org/apache/kylin/measure/basic/BasicMeasureFactory.java
----------------------------------------------------------------------
diff --git 
a/core-metadata/src/main/java/org/apache/kylin/measure/basic/BasicMeasureFactory.java
 
b/core-metadata/src/main/java/org/apache/kylin/measure/basic/BasicMeasureFactory.java
deleted file mode 100644
index 7bfee49..0000000
--- 
a/core-metadata/src/main/java/org/apache/kylin/measure/basic/BasicMeasureFactory.java
+++ /dev/null
@@ -1,32 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- * 
- *     http://www.apache.org/licenses/LICENSE-2.0
- * 
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
-*/
-
-package org.apache.kylin.measure.basic;
-
-import org.apache.kylin.measure.IMeasureFactory;
-import org.apache.kylin.measure.MeasureType;
-import org.apache.kylin.metadata.datatype.DataType;
-
-public class BasicMeasureFactory implements IMeasureFactory {
-
-    @Override
-    public MeasureType createMeasureType(String funcName, String dataType) {
-        return new BasicMeasureType(funcName, DataType.getType(dataType));
-    }
-
-}

http://git-wip-us.apache.org/repos/asf/kylin/blob/7f50c17a/core-metadata/src/main/java/org/apache/kylin/measure/basic/BasicMeasureType.java
----------------------------------------------------------------------
diff --git 
a/core-metadata/src/main/java/org/apache/kylin/measure/basic/BasicMeasureType.java
 
b/core-metadata/src/main/java/org/apache/kylin/measure/basic/BasicMeasureType.java
index fea6ba5..bc9e6b1 100644
--- 
a/core-metadata/src/main/java/org/apache/kylin/measure/basic/BasicMeasureType.java
+++ 
b/core-metadata/src/main/java/org/apache/kylin/measure/basic/BasicMeasureType.java
@@ -21,47 +21,59 @@ package org.apache.kylin.measure.basic;
 import org.apache.kylin.measure.MeasureAggregator;
 import org.apache.kylin.measure.MeasureIngester;
 import org.apache.kylin.measure.MeasureType;
-import org.apache.kylin.metadata.datatype.BigDecimalSerializer;
+import org.apache.kylin.measure.MeasureTypeFactory;
 import org.apache.kylin.metadata.datatype.DataType;
-import org.apache.kylin.metadata.datatype.DataTypeSerializer;
-import org.apache.kylin.metadata.datatype.DoubleSerializer;
-import org.apache.kylin.metadata.datatype.LongSerializer;
 import org.apache.kylin.metadata.model.FunctionDesc;
-import org.apache.kylin.metadata.model.MeasureDesc;
 
+@SuppressWarnings("rawtypes")
 public class BasicMeasureType extends MeasureType {
     
+    public static class Factory extends MeasureTypeFactory {
+
+        @Override
+        public MeasureType createMeasureType(String funcName, DataType 
dataType) {
+            return new BasicMeasureType(funcName, dataType);
+        }
+
+        @Override
+        public String getAggrFunctionName() {
+            return null;
+        }
+
+        @Override
+        public String getAggrDataTypeName() {
+            return null;
+        }
+
+        @Override
+        public Class getAggrDataTypeSerializer() {
+            return null;
+        }
+    }
+
     private final String funcName;
     private final DataType dataType;
 
     public BasicMeasureType(String funcName, DataType dataType) {
+        validate(funcName, dataType);
         this.funcName = funcName;
         this.dataType = dataType;
     }
 
     @Override
-    public DataType getAggregationDataType() {
-        return dataType;
+    public void validate(FunctionDesc functionDesc) throws 
IllegalArgumentException {
+        validate(functionDesc.getExpression(), 
functionDesc.getReturnDataType());
     }
 
-    public Class<? extends DataTypeSerializer<?>> 
getAggregationDataSeralizer() {
-        if (dataType.isIntegerFamily())
-            return LongSerializer.class;
-        else if (dataType.isDecimal())
-            return BigDecimalSerializer.class;
-        else if (dataType.isNumberFamily())
-            return DoubleSerializer.class;
-        else
-            throw new IllegalArgumentException("No serializer for aggregation 
type " + dataType);
+    private void validate(String funcName, DataType dataType) throws 
IllegalArgumentException {
+        if ((funcName.equals(FunctionDesc.FUNC_SUM) //
+                || funcName.equals(FunctionDesc.FUNC_COUNT) //
+                || funcName.equals(FunctionDesc.FUNC_MAX) //
+                || funcName.equals(FunctionDesc.FUNC_MIN)) == false)
+            throw new IllegalArgumentException();
     }
     
     @Override
-    public void validate(MeasureDesc measureDesc) throws 
IllegalArgumentException {
-        // TODO Auto-generated method stub
-        
-    }
-
-    @Override
     public MeasureIngester<?> newIngester() {
         if (dataType.isIntegerFamily())
             return new LongIngester();
@@ -72,7 +84,7 @@ public class BasicMeasureType extends MeasureType {
         else
             throw new IllegalArgumentException("No ingester for aggregation 
type " + dataType);
     }
-    
+
     @Override
     public MeasureAggregator<?> newAggregator() {
         if (isSum() || isCount()) {
@@ -99,21 +111,21 @@ public class BasicMeasureType extends MeasureType {
         }
         throw new IllegalArgumentException("No aggregator for func '" + 
funcName + "' and return type '" + dataType + "'");
     }
-    
+
     private boolean isSum() {
-        return FunctionDesc.FUNC_SUM.equalsIgnoreCase(funcName);
+        return FunctionDesc.FUNC_SUM.equals(funcName);
     }
 
     private boolean isCount() {
-        return FunctionDesc.FUNC_COUNT.equalsIgnoreCase(funcName);
+        return FunctionDesc.FUNC_COUNT.equals(funcName);
     }
-    
+
     private boolean isMax() {
-        return FunctionDesc.FUNC_MAX.equalsIgnoreCase(funcName);
+        return FunctionDesc.FUNC_MAX.equals(funcName);
     }
-    
+
     private boolean isMin() {
-        return FunctionDesc.FUNC_MIN.equalsIgnoreCase(funcName);
+        return FunctionDesc.FUNC_MIN.equals(funcName);
     }
 
     @Override
@@ -126,5 +138,5 @@ public class BasicMeasureType extends MeasureType {
         // TODO Auto-generated method stub
         return null;
     }
-    
+
 }

http://git-wip-us.apache.org/repos/asf/kylin/blob/7f50c17a/core-metadata/src/main/java/org/apache/kylin/measure/hllc/HLLCAggregationFactory.java
----------------------------------------------------------------------
diff --git 
a/core-metadata/src/main/java/org/apache/kylin/measure/hllc/HLLCAggregationFactory.java
 
b/core-metadata/src/main/java/org/apache/kylin/measure/hllc/HLLCAggregationFactory.java
deleted file mode 100644
index 13e5520..0000000
--- 
a/core-metadata/src/main/java/org/apache/kylin/measure/hllc/HLLCAggregationFactory.java
+++ /dev/null
@@ -1,36 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- * 
- *     http://www.apache.org/licenses/LICENSE-2.0
- * 
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
-*/
-
-package org.apache.kylin.measure.hllc;
-
-import org.apache.kylin.measure.IMeasureFactory;
-import org.apache.kylin.measure.MeasureType;
-import org.apache.kylin.metadata.datatype.DataType;
-import org.apache.kylin.metadata.model.FunctionDesc;
-
-public class HLLCAggregationFactory implements IMeasureFactory {
-
-    @Override
-    public MeasureType createMeasureType(String funcName, String dataType) {
-        if (FunctionDesc.FUNC_COUNT_DISTINCT.equalsIgnoreCase(funcName) == 
false)
-            throw new IllegalArgumentException();
-        
-        return new HLLCMeasureType(DataType.getType(dataType));
-    }
-
-}

http://git-wip-us.apache.org/repos/asf/kylin/blob/7f50c17a/core-metadata/src/main/java/org/apache/kylin/measure/hllc/HLLCMeasureType.java
----------------------------------------------------------------------
diff --git 
a/core-metadata/src/main/java/org/apache/kylin/measure/hllc/HLLCMeasureType.java
 
b/core-metadata/src/main/java/org/apache/kylin/measure/hllc/HLLCMeasureType.java
index 83fcf3b..073de6d 100644
--- 
a/core-metadata/src/main/java/org/apache/kylin/measure/hllc/HLLCMeasureType.java
+++ 
b/core-metadata/src/main/java/org/apache/kylin/measure/hllc/HLLCMeasureType.java
@@ -25,43 +25,72 @@ import org.apache.kylin.common.util.Dictionary;
 import org.apache.kylin.measure.MeasureAggregator;
 import org.apache.kylin.measure.MeasureIngester;
 import org.apache.kylin.measure.MeasureType;
+import org.apache.kylin.measure.MeasureTypeFactory;
 import org.apache.kylin.metadata.datatype.DataType;
 import org.apache.kylin.metadata.datatype.DataTypeSerializer;
+import org.apache.kylin.metadata.model.FunctionDesc;
 import org.apache.kylin.metadata.model.MeasureDesc;
 import org.apache.kylin.metadata.model.TblColRef;
 
-public class HLLCMeasureType extends MeasureType {
+public class HLLCMeasureType extends MeasureType<HyperLogLogPlusCounter> {
+
+    public static final String FUNC_COUNT_DISTINCT = "COUNT_DISTINCT";
+    public static final String DATATYPE_HLLC = "hllc";
+    
+    public static class Factory extends 
MeasureTypeFactory<HyperLogLogPlusCounter> {
+
+        @Override
+        public MeasureType<HyperLogLogPlusCounter> createMeasureType(String 
funcName, DataType dataType) {
+            return new HLLCMeasureType(funcName, dataType);
+        }
+
+        @Override
+        public String getAggrFunctionName() {
+            return FUNC_COUNT_DISTINCT;
+        }
+
+        @Override
+        public String getAggrDataTypeName() {
+            return DATATYPE_HLLC;
+        }
+
+        @Override
+        public Class<? extends DataTypeSerializer<HyperLogLogPlusCounter>> 
getAggrDataTypeSerializer() {
+            return HLLCSerializer.class;
+        }
+    }
+    
+    // 
============================================================================
 
     private final DataType dataType;
 
-    public HLLCMeasureType(DataType dataType) {
-        if ("hllc".equals(dataType.getName()) == false)
-            throw new IllegalArgumentException();
-        
+    public HLLCMeasureType(String funcName, DataType dataType) {
+        validate(funcName, dataType);
         this.dataType = dataType;
-
-        if (this.dataType.getPrecision() < 10 || this.dataType.getPrecision() 
> 16)
-            throw new IllegalArgumentException("HLLC precision must be between 
10 and 16");
     }
 
-    @Override
-    public DataType getAggregationDataType() {
-        return dataType;
+    public void validate(FunctionDesc functionDesc) throws 
IllegalArgumentException {
+        validate(functionDesc.getExpression(), 
functionDesc.getReturnDataType());
     }
 
-    @Override
-    public Class<? extends DataTypeSerializer<?>> 
getAggregationDataSeralizer() {
-        return HLLCSerializer.class;
+    private void validate(String funcName, DataType dataType) {
+        if (FUNC_COUNT_DISTINCT.equals(funcName) == false)
+            throw new IllegalArgumentException();
+
+        if (DATATYPE_HLLC.equals(dataType.getName()) == false)
+            throw new IllegalArgumentException();
+
+        if (dataType.getPrecision() < 1 || dataType.getPrecision() > 5000)
+            throw new IllegalArgumentException();
     }
 
     @Override
-    public void validate(MeasureDesc measureDesc) throws 
IllegalArgumentException {
-        // TODO Auto-generated method stub
-
+    public boolean isMemoryHungry() {
+        return true;
     }
 
     @Override
-    public MeasureIngester<?> newIngester() {
+    public MeasureIngester<HyperLogLogPlusCounter> newIngester() {
         return new MeasureIngester<HyperLogLogPlusCounter>() {
             HyperLogLogPlusCounter current = new 
HyperLogLogPlusCounter(dataType.getPrecision());
 
@@ -77,11 +106,8 @@ public class HLLCMeasureType extends MeasureType {
     }
 
     @Override
-    public MeasureAggregator<?> newAggregator() {
-        if (dataType.isHLLC())
-            return new HLLCAggregator(dataType.getPrecision());
-        else
-            return new LDCAggregator();
+    public MeasureAggregator<HyperLogLogPlusCounter> newAggregator() {
+        return new HLLCAggregator(dataType.getPrecision());
     }
 
     @Override
@@ -95,4 +121,8 @@ public class HLLCMeasureType extends MeasureType {
         return null;
     }
 
+    public static boolean isCountDistinct(FunctionDesc func) {
+        return FUNC_COUNT_DISTINCT.equalsIgnoreCase(func.getExpression());
+    }
+
 }

http://git-wip-us.apache.org/repos/asf/kylin/blob/7f50c17a/core-metadata/src/main/java/org/apache/kylin/measure/hllc/HLLCSerializer.java
----------------------------------------------------------------------
diff --git 
a/core-metadata/src/main/java/org/apache/kylin/measure/hllc/HLLCSerializer.java 
b/core-metadata/src/main/java/org/apache/kylin/measure/hllc/HLLCSerializer.java
index 7131201..7424493 100644
--- 
a/core-metadata/src/main/java/org/apache/kylin/measure/hllc/HLLCSerializer.java
+++ 
b/core-metadata/src/main/java/org/apache/kylin/measure/hllc/HLLCSerializer.java
@@ -81,7 +81,8 @@ public class HLLCSerializer extends 
DataTypeSerializer<HyperLogLogPlusCounter> {
 
     @Override
     public int getStorageBytesEstimate() {
-        return current().maxLength();
+        // for HLL, it will be compressed when export to bytes
+        return (int) (current().maxLength() * 0.75);
     }
 
 }

http://git-wip-us.apache.org/repos/asf/kylin/blob/7f50c17a/core-metadata/src/main/java/org/apache/kylin/measure/hllc/LDCAggregator.java
----------------------------------------------------------------------
diff --git 
a/core-metadata/src/main/java/org/apache/kylin/measure/hllc/LDCAggregator.java 
b/core-metadata/src/main/java/org/apache/kylin/measure/hllc/LDCAggregator.java
deleted file mode 100644
index 5d96450..0000000
--- 
a/core-metadata/src/main/java/org/apache/kylin/measure/hllc/LDCAggregator.java
+++ /dev/null
@@ -1,63 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- * 
- *     http://www.apache.org/licenses/LICENSE-2.0
- * 
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
-*/
-
-package org.apache.kylin.measure.hllc;
-
-import org.apache.kylin.measure.MeasureAggregator;
-import org.apache.kylin.metadata.datatype.LongMutable;
-
-/**
- * Long Distinct Count
- */
-@SuppressWarnings("serial")
-public class LDCAggregator extends MeasureAggregator<LongMutable> {
-
-    private static LongMutable ZERO = new LongMutable(0);
-
-    private HLLCAggregator hllAgg = null;
-    private LongMutable state = new LongMutable(0);
-
-    @SuppressWarnings("rawtypes")
-    public void setDependentAggregator(MeasureAggregator agg) {
-        this.hllAgg = (HLLCAggregator) agg;
-    }
-
-    @Override
-    public void reset() {
-    }
-
-    @Override
-    public void aggregate(LongMutable value) {
-    }
-
-    @Override
-    public LongMutable getState() {
-        if (hllAgg == null) {
-            return ZERO;
-        } else {
-            state.set(hllAgg.getState().getCountEstimate());
-            return state;
-        }
-    }
-
-    @Override
-    public int getMemBytesEstimate() {
-        return guessLongMemBytes();
-    }
-
-}

http://git-wip-us.apache.org/repos/asf/kylin/blob/7f50c17a/core-metadata/src/main/java/org/apache/kylin/measure/topn/TopNMeasureFactory.java
----------------------------------------------------------------------
diff --git 
a/core-metadata/src/main/java/org/apache/kylin/measure/topn/TopNMeasureFactory.java
 
b/core-metadata/src/main/java/org/apache/kylin/measure/topn/TopNMeasureFactory.java
deleted file mode 100644
index 1ceb607..0000000
--- 
a/core-metadata/src/main/java/org/apache/kylin/measure/topn/TopNMeasureFactory.java
+++ /dev/null
@@ -1,36 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- * 
- *     http://www.apache.org/licenses/LICENSE-2.0
- * 
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
-*/
-
-package org.apache.kylin.measure.topn;
-
-import org.apache.kylin.measure.IMeasureFactory;
-import org.apache.kylin.measure.MeasureType;
-import org.apache.kylin.metadata.datatype.DataType;
-import org.apache.kylin.metadata.model.FunctionDesc;
-
-public class TopNMeasureFactory implements IMeasureFactory {
-
-    @Override
-    public MeasureType createMeasureType(String funcName, String dataType) {
-        if (FunctionDesc.FUNC_TOP_N.equalsIgnoreCase(funcName) == false)
-            throw new IllegalArgumentException();
-        
-        return new TopNMeasureType(DataType.getType(dataType));
-    }
-
-}

http://git-wip-us.apache.org/repos/asf/kylin/blob/7f50c17a/core-metadata/src/main/java/org/apache/kylin/measure/topn/TopNMeasureType.java
----------------------------------------------------------------------
diff --git 
a/core-metadata/src/main/java/org/apache/kylin/measure/topn/TopNMeasureType.java
 
b/core-metadata/src/main/java/org/apache/kylin/measure/topn/TopNMeasureType.java
index 01582d0..c3d36ca 100644
--- 
a/core-metadata/src/main/java/org/apache/kylin/measure/topn/TopNMeasureType.java
+++ 
b/core-metadata/src/main/java/org/apache/kylin/measure/topn/TopNMeasureType.java
@@ -32,7 +32,7 @@ import org.apache.kylin.common.util.Dictionary;
 import org.apache.kylin.measure.MeasureAggregator;
 import org.apache.kylin.measure.MeasureIngester;
 import org.apache.kylin.measure.MeasureType;
-import org.apache.kylin.measure.hllc.HLLCSerializer;
+import org.apache.kylin.measure.MeasureTypeFactory;
 import org.apache.kylin.metadata.datatype.DataType;
 import org.apache.kylin.metadata.datatype.DataTypeSerializer;
 import org.apache.kylin.metadata.model.FunctionDesc;
@@ -47,44 +47,70 @@ import org.slf4j.LoggerFactory;
 
 import com.google.common.collect.Lists;
 
-public class TopNMeasureType extends MeasureType {
+public class TopNMeasureType extends MeasureType<TopNCounter<ByteArray>> {
 
     private static final Logger logger = 
LoggerFactory.getLogger(TopNMeasureType.class);
 
-    private final DataType dataType;
+    public static final String FUNC_TOP_N = "TOP_N";
+    public static final String DATATYPE_TOPN = "topn";
+    
+    public static class Factory extends 
MeasureTypeFactory<TopNCounter<ByteArray>> {
 
-    public TopNMeasureType(DataType dataType) {
-        if ("topn".equals(dataType.getName()) == false)
-            throw new IllegalArgumentException();
+        @Override
+        public MeasureType<TopNCounter<ByteArray>> createMeasureType(String 
funcName, DataType dataType) {
+            return new TopNMeasureType(funcName, dataType);
+        }
 
-        this.dataType = dataType;
+        @Override
+        public String getAggrFunctionName() {
+            return FUNC_TOP_N;
+        }
+
+        @Override
+        public String getAggrDataTypeName() {
+            return DATATYPE_TOPN;
+        }
 
-        if (this.dataType.getPrecision() < 1 || this.dataType.getPrecision() > 
1000)
-            throw new IllegalArgumentException("TopN precision must be between 
1 and 1000");
+        @Override
+        public Class<? extends DataTypeSerializer<TopNCounter<ByteArray>>> 
getAggrDataTypeSerializer() {
+            return TopNCounterSerializer.class;
+        }
     }
+    
+    // 
============================================================================
 
-    @Override
-    public DataType getAggregationDataType() {
-        return dataType;
+    private final DataType dataType;
+
+    public TopNMeasureType(String funcName, DataType dataType) {
+        validate(funcName, dataType);
+        this.dataType = dataType;
     }
 
-    @Override
-    public Class<? extends DataTypeSerializer<?>> 
getAggregationDataSeralizer() {
-        return HLLCSerializer.class;
+    public void validate(FunctionDesc functionDesc) throws 
IllegalArgumentException {
+        validate(functionDesc.getExpression(), 
functionDesc.getReturnDataType());
     }
 
-    @Override
-    public void validate(MeasureDesc measureDesc) throws 
IllegalArgumentException {
-        // TODO Auto-generated method stub
+    private void validate(String funcName, DataType dataType) {
+        if (FUNC_TOP_N.equals(funcName) == false)
+            throw new IllegalArgumentException();
+        
+        if (DATATYPE_TOPN.equals(dataType.getName()) == false)
+            throw new IllegalArgumentException();
 
+        if (dataType.getPrecision() < 1 || dataType.getPrecision() > 5000)
+            throw new IllegalArgumentException();
     }
 
-    @SuppressWarnings("rawtypes")
     @Override
-    public MeasureIngester<?> newIngester() {
-        return new MeasureIngester<TopNCounter>() {
+    public boolean isMemoryHungry() {
+        return true;
+    }
+
+    @Override
+    public MeasureIngester<TopNCounter<ByteArray>> newIngester() {
+        return new MeasureIngester<TopNCounter<ByteArray>>() {
             @Override
-            public TopNCounter valueOf(String[] values, MeasureDesc 
measureDesc, Map<TblColRef, Dictionary<String>> dictionaryMap) {
+            public TopNCounter<ByteArray> valueOf(String[] values, MeasureDesc 
measureDesc, Map<TblColRef, Dictionary<String>> dictionaryMap) {
                 if (values.length != 2)
                     throw new IllegalArgumentException();
 
@@ -92,7 +118,7 @@ public class TopNMeasureType extends MeasureType {
                 String literal = values[1];
 
                 // encode literal using dictionary
-                TblColRef literalCol = 
measureDesc.getFunction().getTopNLiteralColumn();
+                TblColRef literalCol = 
getTopNLiteralColumn(measureDesc.getFunction());
                 Dictionary<String> dictionary = dictionaryMap.get(literalCol);
                 int keyEncodedValue = dictionary.getIdFromValue(literal);
 
@@ -104,12 +130,11 @@ public class TopNMeasureType extends MeasureType {
                 return topNCounter;
             }
 
-            @SuppressWarnings("unchecked")
             @Override
-            public TopNCounter reEncodeDictionary(TopNCounter value, 
MeasureDesc measureDesc, Map<TblColRef, Dictionary<String>> oldDicts, 
Map<TblColRef, Dictionary<String>> newDicts) {
-                TopNCounter<ByteArray> topNCounter = (TopNCounter<ByteArray>) 
value;
+            public TopNCounter<ByteArray> 
reEncodeDictionary(TopNCounter<ByteArray> value, MeasureDesc measureDesc, 
Map<TblColRef, Dictionary<String>> oldDicts, Map<TblColRef, Dictionary<String>> 
newDicts) {
+                TopNCounter<ByteArray> topNCounter = value;
 
-                TblColRef colRef = 
measureDesc.getFunction().getTopNLiteralColumn();
+                TblColRef colRef = 
getTopNLiteralColumn(measureDesc.getFunction());
                 Dictionary<String> sourceDict = oldDicts.get(colRef);
                 Dictionary<String> mergedDict = newDicts.get(colRef);
 
@@ -138,7 +163,7 @@ public class TopNMeasureType extends MeasureType {
     }
 
     @Override
-    public MeasureAggregator<?> newAggregator() {
+    public MeasureAggregator<TopNCounter<ByteArray>> newAggregator() {
         return new TopNAggregator();
     }
 
@@ -159,11 +184,11 @@ public class TopNMeasureType extends MeasureType {
 
         // the measure function must be SUM
         FunctionDesc onlyFunction = digest.aggregations.iterator().next();
-        if (onlyFunction.isSum() == false)
+        if (isTopNCompatibleSum(topN.getFunction(), onlyFunction) == false)
             return null;
 
-        TblColRef literalCol = topN.getFunction().getTopNLiteralColumn();
-        if (unmatchedDimensions.contains(literalCol) && 
topN.getFunction().isTopNCompatibleSum(onlyFunction)) {
+        TblColRef literalCol = getTopNLiteralColumn(topN.getFunction());
+        if (unmatchedDimensions.contains(literalCol)) {
             unmatchedDimensions.remove(literalCol);
             unmatchedAggregations.remove(onlyFunction);
             return new CapabilityInfluence() {
@@ -176,6 +201,21 @@ public class TopNMeasureType extends MeasureType {
             return null;
     }
 
+    private boolean isTopNCompatibleSum(FunctionDesc topN, FunctionDesc sum) {
+        if (sum == null)
+            return false;
+
+        if (!isTopN(topN) || !sum.isSum())
+            return false;
+
+        if (sum.getParameter().getColRefs().isEmpty())
+            return false;
+
+        TblColRef sumCol = sum.getParameter().getColRefs().get(0);
+        TblColRef topnNumCol = getTopNNumericColumn(topN);
+        return sumCol.equals(topnNumCol);
+    }
+
     @Override
     public boolean needRewrite() {
         return false;
@@ -189,7 +229,7 @@ public class TopNMeasureType extends MeasureType {
     @Override
     public void beforeStorageQuery(MeasureDesc measureDesc, SQLDigest 
sqlDigest) {
         FunctionDesc topnFunc = measureDesc.getFunction();
-        TblColRef topnLiteralCol = topnFunc.getTopNLiteralColumn();
+        TblColRef topnLiteralCol = getTopNLiteralColumn(topnFunc);
 
         if (sqlDigest.groupbyColumns.contains(topnLiteralCol) == false)
             return;
@@ -213,12 +253,12 @@ public class TopNMeasureType extends MeasureType {
     public boolean needAdvancedTupleFilling() {
         return true;
     }
-    
+
     @Override
     public void fillTupleSimply(Tuple tuple, int indexInTuple, Object 
measureValue) {
         throw new UnsupportedOperationException();
     }
-    
+
     @Override
     public IAdvMeasureFiller getAdvancedTupleFiller(FunctionDesc function, 
TupleInfo tupleInfo, Map<TblColRef, Dictionary<String>> dictionaryMap) {
         final TblColRef literalCol = getTopNLiteralColumn(function);
@@ -227,7 +267,7 @@ public class TopNMeasureType extends MeasureType {
         final int literalTupleIdx = tupleInfo.hasColumn(literalCol) ? 
tupleInfo.getColumnIndex(literalCol) : -1;
         // for TopN, the aggr must be SUM, so the number fill into the column 
position (without rewrite)
         final int numericTupleIdx = tupleInfo.hasColumn(numericCol) ? 
tupleInfo.getColumnIndex(numericCol) : -1;
-        
+
         return new IAdvMeasureFiller() {
             private TopNCounter<ByteArray> topNCounter;
             private Iterator<Counter<ByteArray>> topNCounterIterator;
@@ -249,7 +289,7 @@ public class TopNMeasureType extends MeasureType {
             public void fillTuplle(Tuple tuple, int row) {
                 if (expectRow++ != row)
                     throw new IllegalStateException();
-                
+
                 Counter<ByteArray> counter = topNCounterIterator.next();
                 int key = BytesUtil.readUnsigned(counter.getItem().array(), 0, 
counter.getItem().array().length);
                 String colValue = topNColDict.getValueFromId(key);
@@ -266,4 +306,8 @@ public class TopNMeasureType extends MeasureType {
     private TblColRef getTopNLiteralColumn(FunctionDesc functionDesc) {
         return functionDesc.getParameter().getColRefs().get(1);
     }
+
+    private boolean isTopN(FunctionDesc functionDesc) {
+        return FUNC_TOP_N.equalsIgnoreCase(functionDesc.getExpression());
+    }
 }

http://git-wip-us.apache.org/repos/asf/kylin/blob/7f50c17a/core-metadata/src/main/java/org/apache/kylin/metadata/datatype/DataType.java
----------------------------------------------------------------------
diff --git 
a/core-metadata/src/main/java/org/apache/kylin/metadata/datatype/DataType.java 
b/core-metadata/src/main/java/org/apache/kylin/metadata/datatype/DataType.java
index 235c99f..05e6744 100644
--- 
a/core-metadata/src/main/java/org/apache/kylin/metadata/datatype/DataType.java
+++ 
b/core-metadata/src/main/java/org/apache/kylin/metadata/datatype/DataType.java
@@ -21,6 +21,7 @@ package org.apache.kylin.metadata.datatype;
 import java.io.Serializable;
 import java.util.HashMap;
 import java.util.HashSet;
+import java.util.LinkedHashSet;
 import java.util.Map;
 import java.util.Set;
 import java.util.concurrent.ConcurrentHashMap;
@@ -28,21 +29,37 @@ import java.util.concurrent.ConcurrentMap;
 import java.util.regex.Matcher;
 import java.util.regex.Pattern;
 
+import org.apache.commons.lang.StringUtils;
+
 /**
  */
 @SuppressWarnings("serial")
 public class DataType implements Serializable {
 
-    // standard sql types, ref: 
http://www.w3schools.com/sql/sql_datatypes_general.asp
-    public static final String VALID_TYPES_STRING = 
"any|char|varchar|boolean|binary" //
-            + 
"|integer|tinyint|smallint|bigint|decimal|numeric|float|real|double" //
-            + 
"|date|time|datetime|timestamp|byte|int|short|long|string|hllc|topn";
+    private static final LinkedHashSet<String> VALID_TYPES = new 
LinkedHashSet<String>();
 
+    private static Pattern TYPE_PATTERN = null;
     private static final String TYPE_PATTEN_TAIL = "\\s*" //
             + "(?:" + "[(]" + "([\\d\\s,]+)" + "[)]" + ")?";
 
-    private static final Pattern TYPE_PATTERN = Pattern.compile( //
-            "(" + VALID_TYPES_STRING + ")" + TYPE_PATTEN_TAIL, 
Pattern.CASE_INSENSITIVE);
+    public static synchronized void register(String... typeNames) {
+        for (String typeName : typeNames) {
+            VALID_TYPES.add(typeName);
+        }
+
+        TYPE_PATTERN = Pattern.compile( //
+                "(" + StringUtils.join(VALID_TYPES, "|") + ")" //
+                        + TYPE_PATTEN_TAIL, Pattern.CASE_INSENSITIVE);
+    }
+
+    // standard sql types, ref: 
http://www.w3schools.com/sql/sql_datatypes_general.asp
+    static {
+        register("any", "char", "varchar", "string", //
+                "boolean", "byte", "binary", //
+                "int", "short", "long", "integer", "tinyint", "smallint", 
"bigint", //
+                "float", "real", "double", "decimal", "numeric", //
+                "date", "time", "datetime", "timestamp");
+    }
 
     public static final Set<String> INTEGER_FAMILY = new HashSet<String>();
     public static final Set<String> NUMBER_FAMILY = new HashSet<String>();
@@ -219,10 +236,6 @@ public class DataType implements Serializable {
         return name.equals("decimal");
     }
 
-    public boolean isHLLC() {
-        return name.equals("hllc");
-    }
-
     public String getName() {
         return name;
     }

http://git-wip-us.apache.org/repos/asf/kylin/blob/7f50c17a/core-metadata/src/main/java/org/apache/kylin/metadata/datatype/DataTypeSerializer.java
----------------------------------------------------------------------
diff --git 
a/core-metadata/src/main/java/org/apache/kylin/metadata/datatype/DataTypeSerializer.java
 
b/core-metadata/src/main/java/org/apache/kylin/metadata/datatype/DataTypeSerializer.java
index fd3121f..01f8cc6 100644
--- 
a/core-metadata/src/main/java/org/apache/kylin/metadata/datatype/DataTypeSerializer.java
+++ 
b/core-metadata/src/main/java/org/apache/kylin/metadata/datatype/DataTypeSerializer.java
@@ -19,13 +19,9 @@
 package org.apache.kylin.metadata.datatype;
 
 import java.nio.ByteBuffer;
-import java.util.Collections;
-import java.util.HashMap;
 import java.util.Map;
 
 import org.apache.kylin.common.util.BytesSerializer;
-import org.apache.kylin.measure.hllc.HLLCSerializer;
-import org.apache.kylin.measure.topn.TopNCounterSerializer;
 
 import com.google.common.collect.Maps;
 
@@ -36,28 +32,20 @@ import com.google.common.collect.Maps;
  */
 abstract public class DataTypeSerializer<T> implements BytesSerializer<T> {
 
-    final static Map<String, Class<?>> implementations;
+    final static Map<String, Class<?>> implementations = Maps.newHashMap();
     static {
-        HashMap<String, Class<?>> impl = Maps.newHashMap();
-        impl.put("varchar", StringSerializer.class);
-        impl.put("decimal", BigDecimalSerializer.class);
-        impl.put("double", DoubleSerializer.class);
-        impl.put("float", DoubleSerializer.class);
-        impl.put("bigint", LongSerializer.class);
-        impl.put("long", LongSerializer.class);
-        impl.put("integer", LongSerializer.class);
-        impl.put("int", LongSerializer.class);
-        impl.put("smallint", LongSerializer.class);
-        impl.put("date", DateTimeSerializer.class);
-        impl.put("datetime", DateTimeSerializer.class);
-        impl.put("timestamp", DateTimeSerializer.class);
-        impl.put("topn", TopNCounterSerializer.class);
-        impl.put("hllc", HLLCSerializer.class);
-        implementations = Collections.unmodifiableMap(impl);
-    }
-    
-    public static boolean hasRegistered(String dataTypeName) {
-        return implementations.containsKey(dataTypeName);
+        implementations.put("varchar", StringSerializer.class);
+        implementations.put("decimal", BigDecimalSerializer.class);
+        implementations.put("double", DoubleSerializer.class);
+        implementations.put("float", DoubleSerializer.class);
+        implementations.put("bigint", LongSerializer.class);
+        implementations.put("long", LongSerializer.class);
+        implementations.put("integer", LongSerializer.class);
+        implementations.put("int", LongSerializer.class);
+        implementations.put("smallint", LongSerializer.class);
+        implementations.put("date", DateTimeSerializer.class);
+        implementations.put("datetime", DateTimeSerializer.class);
+        implementations.put("timestamp", DateTimeSerializer.class);
     }
     
     public static void register(String dataTypeName, Class<? extends 
DataTypeSerializer<?>> impl) {

http://git-wip-us.apache.org/repos/asf/kylin/blob/7f50c17a/core-metadata/src/main/java/org/apache/kylin/metadata/model/FunctionDesc.java
----------------------------------------------------------------------
diff --git 
a/core-metadata/src/main/java/org/apache/kylin/metadata/model/FunctionDesc.java 
b/core-metadata/src/main/java/org/apache/kylin/metadata/model/FunctionDesc.java
index 104f3c7..39fe6b3 100644
--- 
a/core-metadata/src/main/java/org/apache/kylin/metadata/model/FunctionDesc.java
+++ 
b/core-metadata/src/main/java/org/apache/kylin/metadata/model/FunctionDesc.java
@@ -22,6 +22,8 @@ import java.util.ArrayList;
 import java.util.Collection;
 
 import org.apache.kylin.measure.MeasureType;
+import org.apache.kylin.measure.MeasureTypeFactory;
+import org.apache.kylin.measure.basic.BasicMeasureType;
 import org.apache.kylin.metadata.datatype.DataType;
 
 import com.fasterxml.jackson.annotation.JsonAutoDetect;
@@ -38,8 +40,6 @@ public class FunctionDesc {
     public static final String FUNC_MIN = "MIN";
     public static final String FUNC_MAX = "MAX";
     public static final String FUNC_COUNT = "COUNT";
-    public static final String FUNC_COUNT_DISTINCT = "COUNT_DISTINCT";
-    public static final String FUNC_TOP_N = "TOP_N";
 
     public static final String PARAMETER_TYPE_CONSTANT = "constant";
     public static final String PARAMETER_TYPE_COLUMN = "column";
@@ -52,7 +52,7 @@ public class FunctionDesc {
     private String returnType;
 
     private DataType returnDataType;
-    private MeasureType measureType;
+    private MeasureType<?> measureType;
     private boolean isDimensionAsMetric = false;
 
     public void init(TableDesc factTable) {
@@ -80,12 +80,12 @@ public class FunctionDesc {
         }
     }
     
-    public MeasureType getMeasureType() {
+    public MeasureType<?> getMeasureType() {
         if (isDimensionAsMetric)
             return null;
         
         if (measureType == null) {
-            measureType = MeasureType.create(getExpression(), getReturnType());
+            measureType = MeasureTypeFactory.create(getExpression(), 
getReturnDataType());
         }
         return measureType;
     }
@@ -108,12 +108,12 @@ public class FunctionDesc {
     }
 
     public DataType getRewriteFieldType() {
-        if (isCountDistinct() || isTopN())
-            return DataType.ANY;
-        else if (isSum() || isMax() || isMin())
+        if (isSum() || isMax() || isMin())
             return parameter.getColRefs().get(0).getType();
-        else
+        else if (getMeasureType() instanceof BasicMeasureType)
             return returnDataType;
+        else
+            return DataType.ANY;
     }
 
     public ColumnDesc newFakeRewriteColumn(TableDesc sourceTable) {
@@ -142,22 +142,6 @@ public class FunctionDesc {
         return FUNC_COUNT.equalsIgnoreCase(expression);
     }
 
-    public boolean isCountDistinct() {
-        return FUNC_COUNT_DISTINCT.equalsIgnoreCase(expression);
-    }
-
-    public boolean isTopN() {
-        return FUNC_TOP_N.equalsIgnoreCase(expression);
-    }
-
-    public boolean isHolisticCountDistinct() {
-        if (isCountDistinct() && returnDataType != null && 
returnDataType.isBigInt()) {
-            return true;
-        } else {
-            return false;
-        }
-    }
-
     /**
      * Get Full Expression such as sum(amount), count(1), count(*)...
      */
@@ -272,35 +256,4 @@ public class FunctionDesc {
         return "FunctionDesc [expression=" + expression + ", parameter=" + 
parameter + ", returnType=" + returnType + "]";
     }
 
-    // cols[0] is numeric (e.g. GMV), cols[1] is literal (e.g. SELLER)
-    public TblColRef getTopNNumericColumn() {
-        if (isTopN() == false)
-            throw new IllegalStateException();
-
-        return parameter.getColRefs().get(0);
-    }
-
-    // cols[0] is numeric (e.g. GMV), cols[1] is literal (e.g. SELLER)
-    public TblColRef getTopNLiteralColumn() {
-        if (isTopN() == false)
-            throw new IllegalStateException();
-
-        return parameter.getColRefs().get(1);
-    }
-
-    public boolean isTopNCompatibleSum(FunctionDesc sum) {
-        if (isTopN() == false)
-            throw new IllegalStateException();
-
-        if (sum == null) {
-            return false;
-        }
-
-        if (this.isTopN() && sum.isSum()) {
-            if 
(this.getParameter().getColRefs().get(0).equals(sum.getParameter().getColRefs().get(0)))
-                return true;
-        }
-
-        return false;
-    }
 }

http://git-wip-us.apache.org/repos/asf/kylin/blob/7f50c17a/core-metadata/src/main/java/org/apache/kylin/metadata/tuple/TeeTupleIterator.java
----------------------------------------------------------------------
diff --git 
a/core-metadata/src/main/java/org/apache/kylin/metadata/tuple/TeeTupleIterator.java
 
b/core-metadata/src/main/java/org/apache/kylin/metadata/tuple/TeeTupleIterator.java
index 49328e9..c12f18a 100644
--- 
a/core-metadata/src/main/java/org/apache/kylin/metadata/tuple/TeeTupleIterator.java
+++ 
b/core-metadata/src/main/java/org/apache/kylin/metadata/tuple/TeeTupleIterator.java
@@ -2,9 +2,6 @@ package org.apache.kylin.metadata.tuple;
 
 import java.util.List;
 
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-
 import com.google.common.collect.Lists;
 
 /**
@@ -13,8 +10,6 @@ import com.google.common.collect.Lists;
  */
 public class TeeTupleIterator implements ITupleIterator {
 
-    private static final Logger logger = 
LoggerFactory.getLogger(TeeTupleIterator.class);
-
     private ITupleIterator underlying;
     private List<ITuple> duplicatedData;
     private List<TeeTupleItrListener> listeners = Lists.newArrayList();

http://git-wip-us.apache.org/repos/asf/kylin/blob/7f50c17a/core-metadata/src/main/java/org/apache/kylin/metadata/tuple/Tuple.java
----------------------------------------------------------------------
diff --git 
a/core-metadata/src/main/java/org/apache/kylin/metadata/tuple/Tuple.java 
b/core-metadata/src/main/java/org/apache/kylin/metadata/tuple/Tuple.java
index c79e1f8..d38aafd 100644
--- a/core-metadata/src/main/java/org/apache/kylin/metadata/tuple/Tuple.java
+++ b/core-metadata/src/main/java/org/apache/kylin/metadata/tuple/Tuple.java
@@ -24,8 +24,6 @@ import java.util.List;
 
 import net.sf.ehcache.pool.sizeof.annotations.IgnoreSizeOf;
 
-import org.apache.kylin.common.hll.HyperLogLogPlusCounter;
-import org.apache.kylin.common.topn.TopNCounter;
 import org.apache.kylin.common.util.DateFormat;
 import org.apache.kylin.metadata.datatype.DoubleMutable;
 import org.apache.kylin.metadata.datatype.LongMutable;
@@ -61,15 +59,7 @@ public class Tuple implements ITuple {
     public ITuple makeCopy() {
         Tuple ret = new Tuple(this.info);
         for (int i = 0; i < this.values.length; ++i) {
-            if (this.values[i] == null) {
-                ret.values[i] = null;
-            } else if (this.values[i] instanceof HyperLogLogPlusCounter) {
-                ret.values[i] = new 
HyperLogLogPlusCounter((HyperLogLogPlusCounter) this.values[i]);
-            } else if (this.values[i] instanceof TopNCounter) {
-                ret.values[i] = null;
-            } else {
-                ret.values[i] = this.values[i];
-            }
+            ret.values[i] = this.values[i];
         }
         return ret;
     }

Reply via email to