http://git-wip-us.apache.org/repos/asf/kylin/blob/c721d679/metadata/src/main/java/org/apache/kylin/measure/MeasureType.java
----------------------------------------------------------------------
diff --git a/metadata/src/main/java/org/apache/kylin/measure/MeasureType.java 
b/metadata/src/main/java/org/apache/kylin/measure/MeasureType.java
new file mode 100644
index 0000000..66ca209
--- /dev/null
+++ b/metadata/src/main/java/org/apache/kylin/measure/MeasureType.java
@@ -0,0 +1,146 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ * 
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ * 
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+*/
+
+package org.apache.kylin.measure;
+
+import java.util.Collection;
+import java.util.Collections;
+import java.util.List;
+import java.util.Map;
+
+import org.apache.kylin.common.util.Dictionary;
+import org.apache.kylin.metadata.model.FunctionDesc;
+import org.apache.kylin.metadata.model.MeasureDesc;
+import org.apache.kylin.metadata.model.TblColRef;
+import 
org.apache.kylin.metadata.realization.CapabilityResult.CapabilityInfluence;
+import org.apache.kylin.metadata.realization.SQLDigest;
+import org.apache.kylin.metadata.tuple.Tuple;
+import org.apache.kylin.metadata.tuple.TupleInfo;
+
+/**
+ * MeasureType captures how a kind of aggregation is defined, how it is 
calculated 
+ * during cube build, and how it is involved in query and storage scan.
+ *
+ * @param <T> the Java type of aggregation data object, e.g. 
HyperLogLogPlusCounter
+ */
+abstract public class MeasureType<T> {
+    
+    /* 
============================================================================
+     * Define
+     * 
---------------------------------------------------------------------------- */
+
+    /** Validates a user defined FunctionDesc has expected parameter etc. 
Throw IllegalArgumentException if anything wrong. */
+    public void validate(FunctionDesc functionDesc) throws 
IllegalArgumentException {
+        return;
+    }
+
+    /** Although most aggregated object takes only 8 bytes like long or 
double, 
+     * some advanced aggregation like HyperLogLog or TopN can consume more 
than 10 KB for 
+     * each object, which requires special care on memory allocation. */
+    public boolean isMemoryHungry() {
+        return false;
+    }
+    
+    /* 
============================================================================
+     * Build
+     * 
---------------------------------------------------------------------------- */
+
+    /** Return a MeasureIngester which knows how to init aggregation object 
from raw records. */
+    abstract public MeasureIngester<T> newIngester();
+
+    /** Return a MeasureAggregator which does aggregation. */
+    abstract public MeasureAggregator<T> newAggregator();
+
+    /** Some special measures need dictionary to encode column values for 
optimal storage. TopN is an example. */
+    public List<TblColRef> getColumnsNeedDictionary(FunctionDesc functionDesc) 
{
+        return Collections.emptyList();
+    }
+
+    /* 
============================================================================
+     * Cube Selection
+     * 
---------------------------------------------------------------------------- */
+
+    /**
+     * Some special measures hold columns which are usually treated as 
dimensions (or vice-versa). 
+     * This is where they override to influence cube capability check.
+     *
+     * A SQLDigest contains dimensions and measures extracted from a query. 
After comparing to
+     * cube definition, the matched dimensions and measures are crossed out, 
and what's left is
+     * the <code>unmatchedDimensions</code> and 
<code>unmatchedAggregations</code>.
+     *
+     * Each measure type on the cube is then called on this method to check if 
any of the unmatched
+     * can be fulfilled. If a measure type cannot fulfill any of the 
unmatched, it simply return null.
+     * Or otherwise, <code>unmatchedDimensions</code> and 
<code>unmatchedAggregations</code> must
+     * be modified to drop the satisfied dimension or measure, and a 
CapabilityInfluence object
+     * must be returned to mark the contribution of this measure type.
+     */
+    public CapabilityInfluence influenceCapabilityCheck(Collection<TblColRef> 
unmatchedDimensions, Collection<FunctionDesc> unmatchedAggregations, SQLDigest 
digest, MeasureDesc measureDesc) {
+        return null;
+    }
+    
+    /* 
============================================================================
+     * Query Rewrite
+     * 
---------------------------------------------------------------------------- */
+
+    // TODO support user defined Calcite aggr function
+
+    /** Whether or not Calcite rel-tree needs rewrite to do last around of 
aggregation */
+    abstract public boolean needRewrite();
+
+    /** Returns a Calcite aggregation function implementation class */
+    abstract public Class<?> getRewriteCalciteAggrFunctionClass();
+    
+    /* 
============================================================================
+     * Storage
+     * 
---------------------------------------------------------------------------- */
+
+    /**
+     * Some special measures hold columns which are usually treated as 
dimensions (or vice-versa). 
+     * They need to adjust dimensions and measures in <code>sqlDigest</code> 
before scanning,
+     * such that correct cuboid and measures can be selected by storage.
+     */
+    public void adjustSqlDigest(MeasureDesc measureDesc, SQLDigest sqlDigest) {
+    }
+
+    /** Return true if one storage record maps to multiple tuples, or false 
otherwise. */
+    public boolean needAdvancedTupleFilling() {
+        return false;
+    }
+
+    /** The simply filling mode, one tuple per storage record. */
+    public void fillTupleSimply(Tuple tuple, int indexInTuple, Object 
measureValue) {
+        tuple.setMeasureValue(indexInTuple, measureValue);
+    }
+
+    /** The advanced filling mode, multiple tuples per storage record. */
+    public IAdvMeasureFiller getAdvancedTupleFiller(FunctionDesc function, 
TupleInfo returnTupleInfo, Map<TblColRef, Dictionary<String>> dictionaryMap) {
+        throw new UnsupportedOperationException();
+    }
+
+    public static interface IAdvMeasureFiller {
+
+        /** Reload a value from storage and get ready to fill multiple tuples 
with it. */
+        void reload(Object measureValue);
+
+        /** Returns how many rows contained in last loaded value. */
+        int getNumOfRows();
+
+        /** Fill in specified row into tuple. */
+        void fillTuplle(Tuple tuple, int row);
+    }
+}

http://git-wip-us.apache.org/repos/asf/kylin/blob/c721d679/metadata/src/main/java/org/apache/kylin/measure/MeasureTypeFactory.java
----------------------------------------------------------------------
diff --git 
a/metadata/src/main/java/org/apache/kylin/measure/MeasureTypeFactory.java 
b/metadata/src/main/java/org/apache/kylin/measure/MeasureTypeFactory.java
new file mode 100644
index 0000000..0784f91
--- /dev/null
+++ b/metadata/src/main/java/org/apache/kylin/measure/MeasureTypeFactory.java
@@ -0,0 +1,93 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ * 
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ * 
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+*/
+
+package org.apache.kylin.measure;
+
+import java.util.List;
+import java.util.Map;
+
+import org.apache.kylin.measure.basic.BasicMeasureType;
+import org.apache.kylin.measure.hllc.HLLCMeasureType;
+import org.apache.kylin.metadata.datatype.DataType;
+import org.apache.kylin.metadata.datatype.DataTypeSerializer;
+
+import com.google.common.collect.Lists;
+import com.google.common.collect.Maps;
+
+abstract public class MeasureTypeFactory<T> {
+
+    abstract public MeasureType<T> createMeasureType(String funcName, DataType 
dataType);
+
+    abstract public String getAggrFunctionName();
+
+    abstract public String getAggrDataTypeName();
+
+    abstract public Class<? extends DataTypeSerializer<T>> 
getAggrDataTypeSerializer();
+
+    // 
============================================================================
+
+
+    private static Map<String, MeasureTypeFactory<?>> factories = 
Maps.newHashMap();
+    private static MeasureTypeFactory<?> defaultFactory = new 
BasicMeasureType.Factory();
+
+    static {
+        init();
+    }
+
+    public static synchronized void init() {
+        if (factories.isEmpty() == false)
+            return;
+
+        List<MeasureTypeFactory<?>> factoryInsts = Lists.newArrayList();
+
+        // two built-in advanced measure types
+        factoryInsts.add(new HLLCMeasureType.Factory());
+
+        /*
+         * Maybe do classpath search for more custom measure types?
+         * More MeasureType cannot be configured via kylin.properties alone,
+         * because used in coprocessor, the new classes must be on classpath
+         * and be packaged into coprocessor jar.
+         */
+
+        // register factories & data type serializers
+        for (MeasureTypeFactory<?> factory : factoryInsts) {
+            String funcName = factory.getAggrFunctionName().toUpperCase();
+            String dataTypeName = factory.getAggrDataTypeName().toLowerCase();
+            Class<? extends DataTypeSerializer<?>> serializer = 
factory.getAggrDataTypeSerializer();
+
+            DataType.register(dataTypeName);
+            DataTypeSerializer.register(dataTypeName, serializer);
+            factories.put(funcName, factory);
+        }
+    }
+
+    public static MeasureType<?> create(String funcName, String dataType) {
+        return create(funcName, DataType.getInstance(dataType));
+    }
+
+    public static MeasureType<?> create(String funcName, DataType dataType) {
+        funcName = funcName.toUpperCase();
+
+        MeasureTypeFactory<?> factory = factories.get(funcName);
+        if (factory == null)
+            factory = defaultFactory;
+
+        return factory.createMeasureType(funcName, dataType);
+    }
+}

http://git-wip-us.apache.org/repos/asf/kylin/blob/c721d679/metadata/src/main/java/org/apache/kylin/measure/basic/BasicMeasureType.java
----------------------------------------------------------------------
diff --git 
a/metadata/src/main/java/org/apache/kylin/measure/basic/BasicMeasureType.java 
b/metadata/src/main/java/org/apache/kylin/measure/basic/BasicMeasureType.java
new file mode 100644
index 0000000..4ab4584
--- /dev/null
+++ 
b/metadata/src/main/java/org/apache/kylin/measure/basic/BasicMeasureType.java
@@ -0,0 +1,152 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ * 
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ * 
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+*/
+
+package org.apache.kylin.measure.basic;
+
+import org.apache.kylin.common.KylinConfig;
+import org.apache.kylin.measure.MeasureAggregator;
+import org.apache.kylin.measure.MeasureIngester;
+import org.apache.kylin.measure.MeasureType;
+import org.apache.kylin.measure.MeasureTypeFactory;
+import org.apache.kylin.metadata.datatype.DataType;
+import org.apache.kylin.metadata.model.FunctionDesc;
+
+@SuppressWarnings("rawtypes")
+public class BasicMeasureType extends MeasureType {
+
+    public static class Factory extends MeasureTypeFactory {
+
+        @Override
+        public MeasureType createMeasureType(String funcName, DataType 
dataType) {
+            return new BasicMeasureType(funcName, dataType);
+        }
+
+        @Override
+        public String getAggrFunctionName() {
+            return null;
+        }
+
+        @Override
+        public String getAggrDataTypeName() {
+            return null;
+        }
+
+        @Override
+        public Class getAggrDataTypeSerializer() {
+            return null;
+        }
+    }
+
+    private final String funcName;
+    private final DataType dataType;
+
+    public BasicMeasureType(String funcName, DataType dataType) {
+        // note at query parsing phase, the data type may be null, because 
only function and parameters are known
+        this.funcName = funcName;
+        this.dataType = dataType;
+    }
+
+    @Override
+    public void validate(FunctionDesc functionDesc) throws 
IllegalArgumentException {
+        DataType rtype = dataType;
+
+        if (funcName.equals(FunctionDesc.FUNC_SUM)) {
+            if (rtype.isNumberFamily() == false) {
+                throw new IllegalArgumentException("Return type for function " 
+ funcName + " must be one of " + DataType.NUMBER_FAMILY);
+            }
+        } else if (funcName.equals(FunctionDesc.FUNC_COUNT)) {
+            if (rtype.isIntegerFamily() == false) {
+                throw new IllegalArgumentException("Return type for function " 
+ funcName + " must be one of " + DataType.INTEGER_FAMILY);
+            }
+        } else if (funcName.equals(FunctionDesc.FUNC_MAX) || 
funcName.equals(FunctionDesc.FUNC_MIN)) {
+            if (rtype.isNumberFamily() == false) {
+                throw new IllegalArgumentException("Return type for function " 
+ funcName + " must be one of " + DataType.NUMBER_FAMILY);
+            }
+        } else {
+            KylinConfig config = KylinConfig.getInstanceFromEnv();
+            if (config.isQueryIgnoreUnknownFunction() == false)
+                throw new IllegalArgumentException("Unrecognized function: [" 
+ funcName + "]");
+        }
+    }
+
+    @Override
+    public MeasureIngester<?> newIngester() {
+        if (dataType.isIntegerFamily())
+            return new LongIngester();
+        else if (dataType.isDecimal())
+            return new BigDecimalIngester();
+        else if (dataType.isNumberFamily())
+            return new DoubleIngester();
+        else
+            throw new IllegalArgumentException("No ingester for aggregation 
type " + dataType);
+    }
+
+    @Override
+    public MeasureAggregator<?> newAggregator() {
+        if (isSum() || isCount()) {
+            if (dataType.isDecimal())
+                return new BigDecimalSumAggregator();
+            else if (dataType.isIntegerFamily())
+                return new LongSumAggregator();
+            else if (dataType.isNumberFamily())
+                return new DoubleSumAggregator();
+        } else if (isMax()) {
+            if (dataType.isDecimal())
+                return new BigDecimalMaxAggregator();
+            else if (dataType.isIntegerFamily())
+                return new LongMaxAggregator();
+            else if (dataType.isNumberFamily())
+                return new DoubleMaxAggregator();
+        } else if (isMin()) {
+            if (dataType.isDecimal())
+                return new BigDecimalMinAggregator();
+            else if (dataType.isIntegerFamily())
+                return new LongMinAggregator();
+            else if (dataType.isNumberFamily())
+                return new DoubleMinAggregator();
+        }
+        throw new IllegalArgumentException("No aggregator for func '" + 
funcName + "' and return type '" + dataType + "'");
+    }
+
+    private boolean isSum() {
+        return FunctionDesc.FUNC_SUM.equals(funcName);
+    }
+
+    private boolean isCount() {
+        return FunctionDesc.FUNC_COUNT.equals(funcName);
+    }
+
+    private boolean isMax() {
+        return FunctionDesc.FUNC_MAX.equals(funcName);
+    }
+
+    private boolean isMin() {
+        return FunctionDesc.FUNC_MIN.equals(funcName);
+    }
+
+    @Override
+    public boolean needRewrite() {
+        return !isSum();
+    }
+
+    @Override
+    public Class<?> getRewriteCalciteAggrFunctionClass() {
+        return null;
+    }
+
+}

http://git-wip-us.apache.org/repos/asf/kylin/blob/c721d679/metadata/src/main/java/org/apache/kylin/measure/basic/BigDecimalIngester.java
----------------------------------------------------------------------
diff --git 
a/metadata/src/main/java/org/apache/kylin/measure/basic/BigDecimalIngester.java 
b/metadata/src/main/java/org/apache/kylin/measure/basic/BigDecimalIngester.java
new file mode 100644
index 0000000..b51917c
--- /dev/null
+++ 
b/metadata/src/main/java/org/apache/kylin/measure/basic/BigDecimalIngester.java
@@ -0,0 +1,41 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ * 
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ * 
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+*/
+
+package org.apache.kylin.measure.basic;
+
+import java.math.BigDecimal;
+import java.util.Map;
+
+import org.apache.kylin.common.util.Dictionary;
+import org.apache.kylin.measure.MeasureIngester;
+import org.apache.kylin.metadata.model.MeasureDesc;
+import org.apache.kylin.metadata.model.TblColRef;
+
+public class BigDecimalIngester extends MeasureIngester<BigDecimal> {
+
+    @Override
+    public BigDecimal valueOf(String[] values, MeasureDesc measureDesc, 
Map<TblColRef, Dictionary<String>> dictionaryMap) {
+        if (values.length > 1)
+            throw new IllegalArgumentException();
+
+        if (values[0] == null)
+            return new BigDecimal(0);
+        else
+            return new BigDecimal(values[0]);
+    }
+}

http://git-wip-us.apache.org/repos/asf/kylin/blob/c721d679/metadata/src/main/java/org/apache/kylin/measure/basic/BigDecimalMaxAggregator.java
----------------------------------------------------------------------
diff --git 
a/metadata/src/main/java/org/apache/kylin/measure/basic/BigDecimalMaxAggregator.java
 
b/metadata/src/main/java/org/apache/kylin/measure/basic/BigDecimalMaxAggregator.java
new file mode 100644
index 0000000..44c8765
--- /dev/null
+++ 
b/metadata/src/main/java/org/apache/kylin/measure/basic/BigDecimalMaxAggregator.java
@@ -0,0 +1,56 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.kylin.measure.basic;
+
+
+import org.apache.kylin.measure.MeasureAggregator;
+
+import java.math.BigDecimal;
+
+/**
+ * @author yangli9
+ * 
+ */
+public class BigDecimalMaxAggregator extends MeasureAggregator<BigDecimal> {
+
+    BigDecimal max = null;
+
+    @Override
+    public void reset() {
+        max = null;
+    }
+
+    @Override
+    public void aggregate(BigDecimal value) {
+        if (max == null)
+            max = value;
+        else if (max.compareTo(value) < 0)
+            max = value;
+    }
+
+    @Override
+    public BigDecimal getState() {
+        return max;
+    }
+
+    @Override
+    public int getMemBytesEstimate() {
+        return guessBigDecimalMemBytes();
+    }
+}

http://git-wip-us.apache.org/repos/asf/kylin/blob/c721d679/metadata/src/main/java/org/apache/kylin/measure/basic/BigDecimalMinAggregator.java
----------------------------------------------------------------------
diff --git 
a/metadata/src/main/java/org/apache/kylin/measure/basic/BigDecimalMinAggregator.java
 
b/metadata/src/main/java/org/apache/kylin/measure/basic/BigDecimalMinAggregator.java
new file mode 100644
index 0000000..87f9acf
--- /dev/null
+++ 
b/metadata/src/main/java/org/apache/kylin/measure/basic/BigDecimalMinAggregator.java
@@ -0,0 +1,57 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.kylin.measure.basic;
+
+
+import org.apache.kylin.measure.MeasureAggregator;
+
+import java.math.BigDecimal;
+
+/**
+ * @author yangli9
+ * 
+ */
+public class BigDecimalMinAggregator extends MeasureAggregator<BigDecimal> {
+
+    BigDecimal max = null;
+
+    @Override
+    public void reset() {
+        max = null;
+    }
+
+    @Override
+    public void aggregate(BigDecimal value) {
+        if (max == null)
+            max = value;
+        else if (max.compareTo(value) > 0)
+            max = value;
+    }
+
+    @Override
+    public BigDecimal getState() {
+        return max;
+    }
+
+    @Override
+    public int getMemBytesEstimate() {
+        return guessBigDecimalMemBytes();
+    }
+
+}

http://git-wip-us.apache.org/repos/asf/kylin/blob/c721d679/metadata/src/main/java/org/apache/kylin/measure/basic/BigDecimalSumAggregator.java
----------------------------------------------------------------------
diff --git 
a/metadata/src/main/java/org/apache/kylin/measure/basic/BigDecimalSumAggregator.java
 
b/metadata/src/main/java/org/apache/kylin/measure/basic/BigDecimalSumAggregator.java
new file mode 100644
index 0000000..de19828
--- /dev/null
+++ 
b/metadata/src/main/java/org/apache/kylin/measure/basic/BigDecimalSumAggregator.java
@@ -0,0 +1,53 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.kylin.measure.basic;
+
+
+import org.apache.kylin.measure.MeasureAggregator;
+
+import java.math.BigDecimal;
+
+/**
+ * @author yangli9
+ * 
+ */
+public class BigDecimalSumAggregator extends MeasureAggregator<BigDecimal> {
+
+    BigDecimal sum = new BigDecimal(0);
+
+    @Override
+    public void reset() {
+        sum = new BigDecimal(0);
+    }
+
+    @Override
+    public void aggregate(BigDecimal value) {
+        sum = sum.add(value);
+    }
+
+    @Override
+    public BigDecimal getState() {
+        return sum;
+    }
+
+    @Override
+    public int getMemBytesEstimate() {
+        return guessBigDecimalMemBytes();
+    }
+}

http://git-wip-us.apache.org/repos/asf/kylin/blob/c721d679/metadata/src/main/java/org/apache/kylin/measure/basic/DoubleIngester.java
----------------------------------------------------------------------
diff --git 
a/metadata/src/main/java/org/apache/kylin/measure/basic/DoubleIngester.java 
b/metadata/src/main/java/org/apache/kylin/measure/basic/DoubleIngester.java
new file mode 100644
index 0000000..9118c28
--- /dev/null
+++ b/metadata/src/main/java/org/apache/kylin/measure/basic/DoubleIngester.java
@@ -0,0 +1,46 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ * 
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ * 
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+*/
+
+package org.apache.kylin.measure.basic;
+
+import java.util.Map;
+
+import org.apache.hadoop.io.DoubleWritable;
+import org.apache.kylin.common.util.Dictionary;
+import org.apache.kylin.measure.MeasureIngester;
+import org.apache.kylin.metadata.model.MeasureDesc;
+import org.apache.kylin.metadata.model.TblColRef;
+
+public class DoubleIngester extends MeasureIngester<DoubleWritable> {
+
+    // avoid repeated object creation
+    private DoubleWritable current = new DoubleWritable();
+
+    @Override
+    public DoubleWritable valueOf(String[] values, MeasureDesc measureDesc, 
Map<TblColRef, Dictionary<String>> dictionaryMap) {
+        if (values.length > 1)
+            throw new IllegalArgumentException();
+
+        DoubleWritable l = current;
+        if (values[0] == null)
+            l.set(0L);
+        else
+            l.set(Double.parseDouble(values[0]));
+        return l;
+    }
+}

http://git-wip-us.apache.org/repos/asf/kylin/blob/c721d679/metadata/src/main/java/org/apache/kylin/measure/basic/DoubleMaxAggregator.java
----------------------------------------------------------------------
diff --git 
a/metadata/src/main/java/org/apache/kylin/measure/basic/DoubleMaxAggregator.java
 
b/metadata/src/main/java/org/apache/kylin/measure/basic/DoubleMaxAggregator.java
new file mode 100644
index 0000000..dd1c727
--- /dev/null
+++ 
b/metadata/src/main/java/org/apache/kylin/measure/basic/DoubleMaxAggregator.java
@@ -0,0 +1,55 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.kylin.measure.basic;
+
+import org.apache.hadoop.io.DoubleWritable;
+import org.apache.kylin.measure.MeasureAggregator;
+
+/**
+ * @author yangli9
+ * 
+ */
+public class DoubleMaxAggregator extends MeasureAggregator<DoubleWritable> {
+
+    DoubleWritable max = null;
+
+    @Override
+    public void reset() {
+        max = null;
+    }
+
+    @Override
+    public void aggregate(DoubleWritable value) {
+        if (max == null)
+            max = new DoubleWritable(value.get());
+        else if (max.get() < value.get())
+            max.set(value.get());
+    }
+
+    @Override
+    public DoubleWritable getState() {
+        return max;
+    }
+
+    @Override
+    public int getMemBytesEstimate() {
+        return guessDoubleMemBytes();
+    }
+
+}

http://git-wip-us.apache.org/repos/asf/kylin/blob/c721d679/metadata/src/main/java/org/apache/kylin/measure/basic/DoubleMinAggregator.java
----------------------------------------------------------------------
diff --git 
a/metadata/src/main/java/org/apache/kylin/measure/basic/DoubleMinAggregator.java
 
b/metadata/src/main/java/org/apache/kylin/measure/basic/DoubleMinAggregator.java
new file mode 100644
index 0000000..5c19d7a
--- /dev/null
+++ 
b/metadata/src/main/java/org/apache/kylin/measure/basic/DoubleMinAggregator.java
@@ -0,0 +1,55 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.kylin.measure.basic;
+
+import org.apache.hadoop.io.DoubleWritable;
+import org.apache.kylin.measure.MeasureAggregator;
+
+/**
+ * @author yangli9
+ * 
+ */
+public class DoubleMinAggregator extends MeasureAggregator<DoubleWritable> {
+
+    DoubleWritable min = null;
+
+    @Override
+    public void reset() {
+        min = null;
+    }
+
+    @Override
+    public void aggregate(DoubleWritable value) {
+        if (min == null)
+            min = new DoubleWritable(value.get());
+        else if (min.get() > value.get())
+            min.set(value.get());
+    }
+
+    @Override
+    public DoubleWritable getState() {
+        return min;
+    }
+
+    @Override
+    public int getMemBytesEstimate() {
+        return guessDoubleMemBytes();
+    }
+
+}

http://git-wip-us.apache.org/repos/asf/kylin/blob/c721d679/metadata/src/main/java/org/apache/kylin/measure/basic/DoubleSumAggregator.java
----------------------------------------------------------------------
diff --git 
a/metadata/src/main/java/org/apache/kylin/measure/basic/DoubleSumAggregator.java
 
b/metadata/src/main/java/org/apache/kylin/measure/basic/DoubleSumAggregator.java
new file mode 100644
index 0000000..a5ec9ff
--- /dev/null
+++ 
b/metadata/src/main/java/org/apache/kylin/measure/basic/DoubleSumAggregator.java
@@ -0,0 +1,52 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.kylin.measure.basic;
+
+import org.apache.hadoop.io.DoubleWritable;
+import org.apache.kylin.measure.MeasureAggregator;
+
+/**
+ * @author yangli9
+ * 
+ */
+public class DoubleSumAggregator extends MeasureAggregator<DoubleWritable> {
+
+    DoubleWritable sum = new DoubleWritable();
+
+    @Override
+    public void reset() {
+        sum.set(0.0);
+    }
+
+    @Override
+    public void aggregate(DoubleWritable value) {
+        sum.set(sum.get() + value.get());
+    }
+
+    @Override
+    public DoubleWritable getState() {
+        return sum;
+    }
+
+    @Override
+    public int getMemBytesEstimate() {
+        return guessDoubleMemBytes();
+    }
+
+}

http://git-wip-us.apache.org/repos/asf/kylin/blob/c721d679/metadata/src/main/java/org/apache/kylin/measure/basic/LongIngester.java
----------------------------------------------------------------------
diff --git 
a/metadata/src/main/java/org/apache/kylin/measure/basic/LongIngester.java 
b/metadata/src/main/java/org/apache/kylin/measure/basic/LongIngester.java
new file mode 100644
index 0000000..6c951d1
--- /dev/null
+++ b/metadata/src/main/java/org/apache/kylin/measure/basic/LongIngester.java
@@ -0,0 +1,46 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ * 
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ * 
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+*/
+
+package org.apache.kylin.measure.basic;
+
+import java.util.Map;
+
+import org.apache.hadoop.io.LongWritable;
+import org.apache.kylin.common.util.Dictionary;
+import org.apache.kylin.measure.MeasureIngester;
+import org.apache.kylin.metadata.model.MeasureDesc;
+import org.apache.kylin.metadata.model.TblColRef;
+
+public class LongIngester extends MeasureIngester<LongWritable> {
+
+    // avoid repeated object creation
+    private LongWritable current = new LongWritable();
+
+    @Override
+    public LongWritable valueOf(String[] values, MeasureDesc measureDesc, 
Map<TblColRef, Dictionary<String>> dictionaryMap) {
+        if (values.length > 1)
+            throw new IllegalArgumentException();
+
+        LongWritable l = current;
+        if (values[0] == null)
+            l.set(0L);
+        else
+            l.set(Long.parseLong(values[0]));
+        return l;
+    }
+}

http://git-wip-us.apache.org/repos/asf/kylin/blob/c721d679/metadata/src/main/java/org/apache/kylin/measure/basic/LongMaxAggregator.java
----------------------------------------------------------------------
diff --git 
a/metadata/src/main/java/org/apache/kylin/measure/basic/LongMaxAggregator.java 
b/metadata/src/main/java/org/apache/kylin/measure/basic/LongMaxAggregator.java
new file mode 100644
index 0000000..38db6f1
--- /dev/null
+++ 
b/metadata/src/main/java/org/apache/kylin/measure/basic/LongMaxAggregator.java
@@ -0,0 +1,55 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.kylin.measure.basic;
+
+import org.apache.hadoop.io.LongWritable;
+import org.apache.kylin.measure.MeasureAggregator;
+
+/**
+ * @author yangli9
+ * 
+ */
+public class LongMaxAggregator extends MeasureAggregator<LongWritable> {
+
+    LongWritable max = null;
+
+    @Override
+    public void reset() {
+        max = null;
+    }
+
+    @Override
+    public void aggregate(LongWritable value) {
+        if (max == null)
+            max = new LongWritable(value.get());
+        else if (max.get() < value.get())
+            max.set(value.get());
+    }
+
+    @Override
+    public LongWritable getState() {
+        return max;
+    }
+
+    @Override
+    public int getMemBytesEstimate() {
+        return guessLongMemBytes();
+    }
+
+}

http://git-wip-us.apache.org/repos/asf/kylin/blob/c721d679/metadata/src/main/java/org/apache/kylin/measure/basic/LongMinAggregator.java
----------------------------------------------------------------------
diff --git 
a/metadata/src/main/java/org/apache/kylin/measure/basic/LongMinAggregator.java 
b/metadata/src/main/java/org/apache/kylin/measure/basic/LongMinAggregator.java
new file mode 100644
index 0000000..2774169
--- /dev/null
+++ 
b/metadata/src/main/java/org/apache/kylin/measure/basic/LongMinAggregator.java
@@ -0,0 +1,55 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.kylin.measure.basic;
+
+import org.apache.hadoop.io.LongWritable;
+import org.apache.kylin.measure.MeasureAggregator;
+
+/**
+ * @author yangli9
+ * 
+ */
+public class LongMinAggregator extends MeasureAggregator<LongWritable> {
+
+    LongWritable min = null;
+
+    @Override
+    public void reset() {
+        min = null;
+    }
+
+    @Override
+    public void aggregate(LongWritable value) {
+        if (min == null)
+            min = new LongWritable(value.get());
+        else if (min.get() > value.get())
+            min.set(value.get());
+    }
+
+    @Override
+    public LongWritable getState() {
+        return min;
+    }
+
+    @Override
+    public int getMemBytesEstimate() {
+        return guessLongMemBytes();
+    }
+
+}

http://git-wip-us.apache.org/repos/asf/kylin/blob/c721d679/metadata/src/main/java/org/apache/kylin/measure/basic/LongSumAggregator.java
----------------------------------------------------------------------
diff --git 
a/metadata/src/main/java/org/apache/kylin/measure/basic/LongSumAggregator.java 
b/metadata/src/main/java/org/apache/kylin/measure/basic/LongSumAggregator.java
new file mode 100644
index 0000000..7b4e0b3
--- /dev/null
+++ 
b/metadata/src/main/java/org/apache/kylin/measure/basic/LongSumAggregator.java
@@ -0,0 +1,52 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.kylin.measure.basic;
+
+import org.apache.hadoop.io.LongWritable;
+import org.apache.kylin.measure.MeasureAggregator;
+
+/**
+ * @author yangli9
+ * 
+ */
+public class LongSumAggregator extends MeasureAggregator<LongWritable> {
+
+    LongWritable sum = new LongWritable();
+
+    @Override
+    public void reset() {
+        sum.set(0);
+    }
+
+    @Override
+    public void aggregate(LongWritable value) {
+        sum.set(sum.get() + value.get());
+    }
+
+    @Override
+    public LongWritable getState() {
+        return sum;
+    }
+
+    @Override
+    public int getMemBytesEstimate() {
+        return guessLongMemBytes();
+    }
+
+}

http://git-wip-us.apache.org/repos/asf/kylin/blob/c721d679/metadata/src/main/java/org/apache/kylin/measure/hllc/HLLCAggregator.java
----------------------------------------------------------------------
diff --git 
a/metadata/src/main/java/org/apache/kylin/measure/hllc/HLLCAggregator.java 
b/metadata/src/main/java/org/apache/kylin/measure/hllc/HLLCAggregator.java
new file mode 100644
index 0000000..90be370
--- /dev/null
+++ b/metadata/src/main/java/org/apache/kylin/measure/hllc/HLLCAggregator.java
@@ -0,0 +1,64 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ * 
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ * 
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+*/
+
+package org.apache.kylin.measure.hllc;
+
+import org.apache.kylin.common.hll.HyperLogLogPlusCounter;
+import org.apache.kylin.measure.MeasureAggregator;
+
+/**
+ */
+@SuppressWarnings("serial")
+public class HLLCAggregator extends MeasureAggregator<HyperLogLogPlusCounter> {
+
+    final int precision;
+    HyperLogLogPlusCounter sum = null;
+
+    public HLLCAggregator(int precision) {
+        this.precision = precision;
+    }
+
+    @Override
+    public void reset() {
+        sum = null;
+    }
+
+    @Override
+    public void aggregate(HyperLogLogPlusCounter value) {
+        if (sum == null)
+            sum = new HyperLogLogPlusCounter(value);
+        else
+            sum.merge(value);
+    }
+
+    @Override
+    public HyperLogLogPlusCounter getState() {
+        return sum;
+    }
+
+    @Override
+    public int getMemBytesEstimate() {
+        // 1024 + 60 returned by AggregationCacheMemSizeTest
+        return 8 // aggregator obj shell
+                + 4 // precision
+                + 8 // ref to HLLC
+                + 8 // HLLC obj shell
+                + 32 + (1 << precision); // HLLC internal
+    }
+
+}

http://git-wip-us.apache.org/repos/asf/kylin/blob/c721d679/metadata/src/main/java/org/apache/kylin/measure/hllc/HLLCMeasureType.java
----------------------------------------------------------------------
diff --git 
a/metadata/src/main/java/org/apache/kylin/measure/hllc/HLLCMeasureType.java 
b/metadata/src/main/java/org/apache/kylin/measure/hllc/HLLCMeasureType.java
new file mode 100644
index 0000000..fffb0d1
--- /dev/null
+++ b/metadata/src/main/java/org/apache/kylin/measure/hllc/HLLCMeasureType.java
@@ -0,0 +1,127 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ * 
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ * 
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+*/
+
+package org.apache.kylin.measure.hllc;
+
+import java.util.Map;
+
+import org.apache.kylin.common.hll.HyperLogLogPlusCounter;
+import org.apache.kylin.common.util.Dictionary;
+import org.apache.kylin.measure.MeasureAggregator;
+import org.apache.kylin.measure.MeasureIngester;
+import org.apache.kylin.measure.MeasureType;
+import org.apache.kylin.measure.MeasureTypeFactory;
+import org.apache.kylin.metadata.datatype.DataType;
+import org.apache.kylin.metadata.datatype.DataTypeSerializer;
+import org.apache.kylin.metadata.model.FunctionDesc;
+import org.apache.kylin.metadata.model.MeasureDesc;
+import org.apache.kylin.metadata.model.TblColRef;
+
+public class HLLCMeasureType extends MeasureType<HyperLogLogPlusCounter> {
+
+    public static final String FUNC_COUNT_DISTINCT = "COUNT_DISTINCT";
+    public static final String DATATYPE_HLLC = "hllc";
+
+    public static class Factory extends 
MeasureTypeFactory<HyperLogLogPlusCounter> {
+
+        @Override
+        public MeasureType<HyperLogLogPlusCounter> createMeasureType(String 
funcName, DataType dataType) {
+            return new HLLCMeasureType(funcName, dataType);
+        }
+
+        @Override
+        public String getAggrFunctionName() {
+            return FUNC_COUNT_DISTINCT;
+        }
+
+        @Override
+        public String getAggrDataTypeName() {
+            return DATATYPE_HLLC;
+        }
+
+        @Override
+        public Class<? extends DataTypeSerializer<HyperLogLogPlusCounter>> 
getAggrDataTypeSerializer() {
+            return HLLCSerializer.class;
+        }
+    }
+
+    // 
============================================================================
+
+    private final DataType dataType;
+
+    public HLLCMeasureType(String funcName, DataType dataType) {
+        // note at query parsing phase, the data type may be null, because 
only function and parameters are known
+        this.dataType = dataType;
+    }
+
+    public void validate(FunctionDesc functionDesc) throws 
IllegalArgumentException {
+        validate(functionDesc.getExpression(), 
functionDesc.getReturnDataType(), true);
+    }
+
+    private void validate(String funcName, DataType dataType, boolean 
checkDataType) {
+        if (FUNC_COUNT_DISTINCT.equals(funcName) == false)
+            throw new IllegalArgumentException();
+
+        if (DATATYPE_HLLC.equals(dataType.getName()) == false)
+            throw new IllegalArgumentException();
+
+        if (dataType.getPrecision() < 1 || dataType.getPrecision() > 5000)
+            throw new IllegalArgumentException();
+    }
+
+    @Override
+    public boolean isMemoryHungry() {
+        return true;
+    }
+
+    @Override
+    public MeasureIngester<HyperLogLogPlusCounter> newIngester() {
+        return new MeasureIngester<HyperLogLogPlusCounter>() {
+            HyperLogLogPlusCounter current = new 
HyperLogLogPlusCounter(dataType.getPrecision());
+
+            @Override
+            public HyperLogLogPlusCounter valueOf(String[] values, MeasureDesc 
measureDesc, Map<TblColRef, Dictionary<String>> dictionaryMap) {
+                HyperLogLogPlusCounter hllc = current;
+                hllc.clear();
+                for (String v : values)
+                    hllc.add(v == null ? "__nUlL__" : v);
+                return hllc;
+            }
+        };
+    }
+
+    @Override
+    public MeasureAggregator<HyperLogLogPlusCounter> newAggregator() {
+        return new HLLCAggregator(dataType.getPrecision());
+    }
+
+    @Override
+    public boolean needRewrite() {
+        return true;
+    }
+
+    @Override
+    public Class<?> getRewriteCalciteAggrFunctionClass() {
+        return HLLDistinctCountAggFunc.class;
+    }
+
+    public static boolean isCountDistinct(FunctionDesc func) {
+        return FUNC_COUNT_DISTINCT.equalsIgnoreCase(func.getExpression());
+    }
+
+}

http://git-wip-us.apache.org/repos/asf/kylin/blob/c721d679/metadata/src/main/java/org/apache/kylin/measure/hllc/HLLCSerializer.java
----------------------------------------------------------------------
diff --git 
a/metadata/src/main/java/org/apache/kylin/measure/hllc/HLLCSerializer.java 
b/metadata/src/main/java/org/apache/kylin/measure/hllc/HLLCSerializer.java
new file mode 100644
index 0000000..e5a379a
--- /dev/null
+++ b/metadata/src/main/java/org/apache/kylin/measure/hllc/HLLCSerializer.java
@@ -0,0 +1,88 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ * 
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ * 
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+*/
+
+package org.apache.kylin.measure.hllc;
+
+import java.io.IOException;
+import java.nio.ByteBuffer;
+
+import org.apache.kylin.common.hll.HyperLogLogPlusCounter;
+import org.apache.kylin.metadata.datatype.DataType;
+import org.apache.kylin.metadata.datatype.DataTypeSerializer;
+
+/**
+ * @author yangli9
+ *
+ */
+public class HLLCSerializer extends DataTypeSerializer<HyperLogLogPlusCounter> 
{
+
+    // be thread-safe and avoid repeated obj creation
+    private ThreadLocal<HyperLogLogPlusCounter> current = new 
ThreadLocal<HyperLogLogPlusCounter>();
+
+    private int precision;
+
+    public HLLCSerializer(DataType type) {
+        this.precision = type.getPrecision();
+    }
+
+    @Override
+    public void serialize(HyperLogLogPlusCounter value, ByteBuffer out) {
+        try {
+            value.writeRegisters(out);
+        } catch (IOException e) {
+            throw new RuntimeException(e);
+        }
+    }
+
+    private HyperLogLogPlusCounter current() {
+        HyperLogLogPlusCounter hllc = current.get();
+        if (hllc == null) {
+            hllc = new HyperLogLogPlusCounter(precision);
+            current.set(hllc);
+        }
+        return hllc;
+    }
+
+    @Override
+    public HyperLogLogPlusCounter deserialize(ByteBuffer in) {
+        HyperLogLogPlusCounter hllc = current();
+        try {
+            hllc.readRegisters(in);
+        } catch (IOException e) {
+            throw new RuntimeException(e);
+        }
+        return hllc;
+    }
+
+    @Override
+    public int peekLength(ByteBuffer in) {
+        return current().peekLength(in);
+    }
+
+    @Override
+    public int maxLength() {
+        return current().maxLength();
+    }
+
+    @Override
+    public int getStorageBytesEstimate() {
+        // for HLL, it will be compressed when export to bytes
+        return (int) (current().maxLength() * 0.75);
+    }
+
+}

http://git-wip-us.apache.org/repos/asf/kylin/blob/c721d679/metadata/src/main/java/org/apache/kylin/measure/hllc/HLLDistinctCountAggFunc.java
----------------------------------------------------------------------
diff --git 
a/metadata/src/main/java/org/apache/kylin/measure/hllc/HLLDistinctCountAggFunc.java
 
b/metadata/src/main/java/org/apache/kylin/measure/hllc/HLLDistinctCountAggFunc.java
new file mode 100644
index 0000000..471bc8a
--- /dev/null
+++ 
b/metadata/src/main/java/org/apache/kylin/measure/hllc/HLLDistinctCountAggFunc.java
@@ -0,0 +1,152 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.kylin.measure.hllc;
+
+import java.io.IOException;
+import java.nio.ByteBuffer;
+
+import org.apache.kylin.common.hll.HyperLogLogPlusCounter;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+/**
+ * @author xjiang
+ */
+public class HLLDistinctCountAggFunc {
+
+    private static final Logger logger = 
LoggerFactory.getLogger(HLLDistinctCountAggFunc.class);
+
+    public static HyperLogLogPlusCounter init() {
+        return null;
+    }
+
+    public static HyperLogLogPlusCounter initAdd(Object v) {
+        if (v instanceof Long) { // holistic case
+            long l = (Long) v;
+            return new FixedValueHLLCMockup(l);
+        } else {
+            HyperLogLogPlusCounter c = (HyperLogLogPlusCounter) v;
+            return new HyperLogLogPlusCounter(c);
+        }
+    }
+
+    public static HyperLogLogPlusCounter add(HyperLogLogPlusCounter counter, 
Object v) {
+        if (v instanceof Long) { // holistic case
+            long l = (Long) v;
+            if (counter == null) {
+                return new FixedValueHLLCMockup(l);
+            } else {
+                if (!(counter instanceof FixedValueHLLCMockup))
+                    throw new IllegalStateException("counter is not 
FixedValueHLLCMockup");
+
+                ((FixedValueHLLCMockup) counter).set(l);
+                return counter;
+            }
+        } else {
+            HyperLogLogPlusCounter c = (HyperLogLogPlusCounter) v;
+            if (counter == null) {
+                return new HyperLogLogPlusCounter(c);
+            } else {
+                counter.merge(c);
+                return counter;
+            }
+        }
+    }
+
+    public static HyperLogLogPlusCounter merge(HyperLogLogPlusCounter 
counter0, Object counter1) {
+        return add(counter0, counter1);
+    }
+
+    public static long result(HyperLogLogPlusCounter counter) {
+        return counter == null ? 0L : counter.getCountEstimate();
+    }
+
+    private static class FixedValueHLLCMockup extends HyperLogLogPlusCounter {
+
+        private Long value = null;
+
+        FixedValueHLLCMockup(long value) {
+            this.value = value;
+        }
+
+        public void set(long value) {
+            if (this.value == null) {
+                this.value = value;
+            } else {
+                long oldValue = Math.abs(this.value.longValue());
+                long take = Math.max(oldValue, value);
+                logger.warn("Error to aggregate holistic count distinct, old 
value " + oldValue + ", new value " + value + ", taking " + take);
+                this.value = -take; // make it obvious that this value is wrong
+            }
+        }
+
+        @Override
+        public void clear() {
+            this.value = null;
+        }
+
+        @Override
+        protected void add(long hash) {
+            throw new UnsupportedOperationException();
+        }
+
+        @Override
+        public void merge(HyperLogLogPlusCounter another) {
+            throw new UnsupportedOperationException();
+        }
+
+        @Override
+        public long getCountEstimate() {
+            return value;
+        }
+
+        @Override
+        public void writeRegisters(ByteBuffer out) throws IOException {
+            throw new UnsupportedOperationException();
+        }
+
+        @Override
+        public void readRegisters(ByteBuffer in) throws IOException {
+            throw new UnsupportedOperationException();
+        }
+
+        @Override
+        public int hashCode() {
+            final int prime = 31;
+            int result = super.hashCode();
+            result = prime * result + (int) (value ^ (value >>> 32));
+            return result;
+        }
+
+        @Override
+        public boolean equals(Object obj) {
+            if (this == obj)
+                return true;
+            if (!super.equals(obj))
+                return false;
+            if (getClass() != obj.getClass())
+                return false;
+            FixedValueHLLCMockup other = (FixedValueHLLCMockup) obj;
+            if (!value.equals(other.value))
+                return false;
+            return true;
+        }
+    }
+
+}

http://git-wip-us.apache.org/repos/asf/kylin/blob/c721d679/metadata/src/main/java/org/apache/kylin/metadata/datatype/BigDecimalSerializer.java
----------------------------------------------------------------------
diff --git 
a/metadata/src/main/java/org/apache/kylin/metadata/datatype/BigDecimalSerializer.java
 
b/metadata/src/main/java/org/apache/kylin/metadata/datatype/BigDecimalSerializer.java
new file mode 100644
index 0000000..468e4b0
--- /dev/null
+++ 
b/metadata/src/main/java/org/apache/kylin/metadata/datatype/BigDecimalSerializer.java
@@ -0,0 +1,105 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ * 
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ * 
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+*/
+
+package org.apache.kylin.metadata.datatype;
+
+import java.math.BigDecimal;
+import java.math.BigInteger;
+import java.nio.ByteBuffer;
+
+import org.apache.kylin.common.util.BytesUtil;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+/**
+ * @author yangli9
+ *
+ */
+public class BigDecimalSerializer extends DataTypeSerializer<BigDecimal> {
+
+    private static final Logger logger = 
LoggerFactory.getLogger(BigDecimalSerializer.class);
+
+    final DataType type;
+    final int maxLength;
+
+    int avoidVerbose = 0;
+
+    public BigDecimalSerializer(DataType type) {
+        this.type = type;
+        // see serialize(): 1 byte scale, 1 byte length, assume every 2 digits 
takes 1 byte
+        this.maxLength = 1 + 1 + (type.getPrecision() + 1) / 2;
+    }
+
+    @Override
+    public void serialize(BigDecimal value, ByteBuffer out) {
+        if (value.scale() > type.getScale()) {
+            if (avoidVerbose % 10000 == 0) {
+                logger.warn("value's scale has exceeded the " + 
type.getScale() + ", cut it off, to ensure encoded value do not exceed 
maxLength " + maxLength + " times:" + (avoidVerbose++));
+            }
+            value = value.setScale(type.getScale(), 
BigDecimal.ROUND_HALF_EVEN);
+        }
+        byte[] bytes = value.unscaledValue().toByteArray();
+        if (bytes.length + 2 > maxLength) {
+            throw new IllegalArgumentException("'" + value + "' exceeds the 
expected length for type " + type);
+        }
+
+        BytesUtil.writeVInt(value.scale(), out);
+        BytesUtil.writeVInt(bytes.length, out);
+        out.put(bytes);
+    }
+
+    @Override
+    public BigDecimal deserialize(ByteBuffer in) {
+        int scale = BytesUtil.readVInt(in);
+        int n = BytesUtil.readVInt(in);
+
+        byte[] bytes = new byte[n];
+        in.get(bytes);
+
+        return new BigDecimal(new BigInteger(bytes), scale);
+    }
+
+    @Override
+    public int peekLength(ByteBuffer in) {
+        int mark = in.position();
+
+        @SuppressWarnings("unused")
+        int scale = BytesUtil.readVInt(in);
+        int n = BytesUtil.readVInt(in);
+        int len = in.position() - mark + n;
+
+        in.position(mark);
+        return len;
+    }
+
+    @Override
+    public int maxLength() {
+        return maxLength;
+    }
+
+    @Override
+    public int getStorageBytesEstimate() {
+        return 8;
+    }
+
+    @Override
+    public BigDecimal valueOf(String str) {
+        return new BigDecimal(str);
+    }
+
+}

http://git-wip-us.apache.org/repos/asf/kylin/blob/c721d679/metadata/src/main/java/org/apache/kylin/metadata/datatype/DataType.java
----------------------------------------------------------------------
diff --git 
a/metadata/src/main/java/org/apache/kylin/metadata/datatype/DataType.java 
b/metadata/src/main/java/org/apache/kylin/metadata/datatype/DataType.java
new file mode 100644
index 0000000..0ac07e1
--- /dev/null
+++ b/metadata/src/main/java/org/apache/kylin/metadata/datatype/DataType.java
@@ -0,0 +1,285 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.kylin.metadata.datatype;
+
+import java.util.HashMap;
+import java.util.HashSet;
+import java.util.LinkedHashSet;
+import java.util.Map;
+import java.util.Set;
+import java.util.concurrent.ConcurrentHashMap;
+import java.util.concurrent.ConcurrentMap;
+import java.util.regex.Matcher;
+import java.util.regex.Pattern;
+
+import org.apache.commons.lang.StringUtils;
+import org.apache.kylin.measure.MeasureTypeFactory;
+import org.apache.kylin.metadata.model.TblColRef;
+
+/**
+ * @author yangli9
+ * 
+ */
+public class DataType {
+    private static final LinkedHashSet<String> VALID_TYPES = new 
LinkedHashSet<String>();
+
+    private static Pattern TYPE_PATTERN = null;
+    private static final String TYPE_PATTEN_TAIL = "\\s*" //
+            + "(?:" + "[(]" + "([\\d\\s,]+)" + "[)]" + ")?";
+
+    public static synchronized void register(String... typeNames) {
+        for (String typeName : typeNames) {
+            VALID_TYPES.add(typeName);
+        }
+
+        TYPE_PATTERN = Pattern.compile( //
+                "(" + StringUtils.join(VALID_TYPES, "|") + ")" //
+                        + TYPE_PATTEN_TAIL, Pattern.CASE_INSENSITIVE);
+    }
+
+    // standard sql types, ref: 
http://www.w3schools.com/sql/sql_datatypes_general.asp
+    static {
+        register("any", "char", "varchar", "string", //
+                "boolean", "byte", "binary", //
+                "int", "short", "long", "integer", "tinyint", "smallint", 
"bigint", //
+                "float", "real", "double", "decimal", "numeric", //
+                "date", "time", "datetime", "timestamp", //
+                TblColRef.InnerDataTypeEnum.LITERAL.getDataType(), 
TblColRef.InnerDataTypeEnum.DERIVED.getDataType());
+    }
+
+    public static final Set<String> INTEGER_FAMILY = new HashSet<String>();
+    public static final Set<String> NUMBER_FAMILY = new HashSet<String>();
+    public static final Set<String> DATETIME_FAMILY = new HashSet<String>();
+    public static final Set<String> STRING_FAMILY = new HashSet<String>();
+    private static final Map<String, String> LEGACY_TYPE_MAP = new 
HashMap<String, String>();
+    static {
+        INTEGER_FAMILY.add("tinyint");
+        INTEGER_FAMILY.add("smallint");
+        INTEGER_FAMILY.add("integer");
+        INTEGER_FAMILY.add("bigint");
+
+        NUMBER_FAMILY.addAll(INTEGER_FAMILY);
+        NUMBER_FAMILY.add("float");
+        NUMBER_FAMILY.add("double");
+        NUMBER_FAMILY.add("decimal");
+        NUMBER_FAMILY.add("real");
+        NUMBER_FAMILY.add("numeric");
+
+        DATETIME_FAMILY.add("date");
+        DATETIME_FAMILY.add("time");
+        DATETIME_FAMILY.add("datetime");
+        DATETIME_FAMILY.add("timestamp");
+
+        STRING_FAMILY.add("varchar");
+        STRING_FAMILY.add("char");
+
+        LEGACY_TYPE_MAP.put("byte", "tinyint");
+        LEGACY_TYPE_MAP.put("int", "integer");
+        LEGACY_TYPE_MAP.put("short", "smallint");
+        LEGACY_TYPE_MAP.put("long", "bigint");
+        LEGACY_TYPE_MAP.put("string", "varchar");
+        LEGACY_TYPE_MAP.put("hllc10", "hllc(10)");
+        LEGACY_TYPE_MAP.put("hllc12", "hllc(12)");
+        LEGACY_TYPE_MAP.put("hllc14", "hllc(14)");
+        LEGACY_TYPE_MAP.put("hllc15", "hllc(15)");
+        LEGACY_TYPE_MAP.put("hllc16", "hllc(16)");
+
+    }
+
+    private static final ConcurrentMap<DataType, DataType> CACHE = new 
ConcurrentHashMap<DataType, DataType>();
+
+    public static final DataType ANY = DataType.getInstance("any");
+
+
+    static {
+        MeasureTypeFactory.init();
+    }
+
+    public static DataType getInstance(String type) {
+        if (type == null)
+            return null;
+
+        DataType dataType = new DataType(type);
+        DataType cached = CACHE.get(dataType);
+        if (cached == null) {
+            CACHE.put(dataType, dataType);
+            cached = dataType;
+        }
+        return cached;
+    }
+
+    // 
============================================================================
+
+    private String name;
+    private int precision;
+    private int scale;
+
+
+    private DataType(String datatype) {
+        datatype = datatype.trim().toLowerCase();
+        datatype = replaceLegacy(datatype);
+
+        Pattern pattern = TYPE_PATTERN;
+        Matcher m = pattern.matcher(datatype);
+        if (m.matches() == false)
+            throw new IllegalArgumentException("bad data type -- " + datatype 
+ ", does not match " + pattern);
+
+        name = replaceLegacy(m.group(1));
+        precision = -1;
+        scale = -1;
+
+        String leftover = m.group(2);
+        if (leftover != null) {
+            String[] parts = leftover.split("\\s*,\\s*");
+            for (int i = 0; i < parts.length; i++) {
+                int n;
+                try {
+                    n = Integer.parseInt(parts[i]);
+                } catch (NumberFormatException e) {
+                    throw new IllegalArgumentException("bad data type -- " + 
datatype + ", precision/scale not numeric");
+                }
+                if (i == 0)
+                    precision = n;
+                else if (i == 1)
+                    scale = n;
+                else
+                    throw new IllegalArgumentException("bad data type -- " + 
datatype + ", too many precision/scale parts");
+            }
+        }
+
+        // FIXME 256 for unknown string precision
+        if ((name.equals("char") || name.equals("varchar")) && precision == 
-1) {
+            precision = 256; // to save memory at frontend, e.g. tableau will
+                             // allocate memory according to this
+        }
+
+        // FIXME (19,4) for unknown decimal precision
+        if ((name.equals("decimal") || name.equals("numeric")) && precision == 
-1) {
+            precision = 19;
+            scale = 4;
+        }
+    }
+
+    private String replaceLegacy(String str) {
+        String replace = LEGACY_TYPE_MAP.get(str);
+        return replace == null ? str : replace;
+    }
+
+    public int getStorageBytesEstimate() {
+        return DataTypeSerializer.create(this).getStorageBytesEstimate();
+    }
+
+
+    public boolean isStringFamily() {
+        return STRING_FAMILY.contains(name);
+    }
+
+    public boolean isIntegerFamily() {
+        return INTEGER_FAMILY.contains(name);
+    }
+
+    public boolean isNumberFamily() {
+        return NUMBER_FAMILY.contains(name);
+    }
+
+    public boolean isDateTimeFamily() {
+        return DATETIME_FAMILY.contains(name);
+    }
+
+    public boolean isTinyInt() {
+        return name.equals("tinyint");
+    }
+
+    public boolean isSmallInt() {
+        return name.equals("smallint");
+    }
+
+    public boolean isInt() {
+        return name.equals("integer");
+    }
+
+    public boolean isBigInt() {
+        return name.equals("bigint");
+    }
+
+    public boolean isFloat() {
+        return name.equals("float");
+    }
+
+    public boolean isDouble() {
+        return name.equals("double");
+    }
+
+    public boolean isDecimal() {
+        return name.equals("decimal");
+    }
+
+    public String getName() {
+        return name;
+    }
+
+    public int getPrecision() {
+        return precision;
+    }
+
+    public int getScale() {
+        return scale;
+    }
+
+    @Override
+    public int hashCode() {
+        final int prime = 31;
+        int result = 1;
+        result = prime * result + ((name == null) ? 0 : name.hashCode());
+        result = prime * result + precision;
+        result = prime * result + scale;
+        return result;
+    }
+
+    @Override
+    public boolean equals(Object obj) {
+        if (this == obj)
+            return true;
+        if (obj == null)
+            return false;
+        if (getClass() != obj.getClass())
+            return false;
+        DataType other = (DataType) obj;
+        if (name == null) {
+            if (other.name != null)
+                return false;
+        } else if (!name.equals(other.name))
+            return false;
+        if (precision != other.precision)
+            return false;
+        if (scale != other.scale)
+            return false;
+        return true;
+    }
+
+    @Override
+    public String toString() {
+        if (precision < 0 && scale < 0)
+            return name;
+        else if (scale < 0)
+            return name + "(" + precision + ")";
+        else
+            return name + "(" + precision + "," + scale + ")";
+    }
+}

http://git-wip-us.apache.org/repos/asf/kylin/blob/c721d679/metadata/src/main/java/org/apache/kylin/metadata/datatype/DataTypeSerializer.java
----------------------------------------------------------------------
diff --git 
a/metadata/src/main/java/org/apache/kylin/metadata/datatype/DataTypeSerializer.java
 
b/metadata/src/main/java/org/apache/kylin/metadata/datatype/DataTypeSerializer.java
new file mode 100644
index 0000000..601925b
--- /dev/null
+++ 
b/metadata/src/main/java/org/apache/kylin/metadata/datatype/DataTypeSerializer.java
@@ -0,0 +1,90 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ * 
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ * 
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+*/
+
+package org.apache.kylin.metadata.datatype;
+
+import java.nio.ByteBuffer;
+import java.util.Map;
+
+import org.apache.kylin.common.util.BytesSerializer;
+
+import com.google.common.collect.Maps;
+
+/**
+ * Note: the implementations MUST be thread-safe.
+ */
+abstract public class DataTypeSerializer<T> implements BytesSerializer<T> {
+
+    final static Map<String, Class<?>> implementations = Maps.newHashMap();
+    static {
+        implementations.put("varchar", StringSerializer.class);
+        implementations.put("decimal", BigDecimalSerializer.class);
+        implementations.put("double", DoubleSerializer.class);
+        implementations.put("float", DoubleSerializer.class);
+        implementations.put("bigint", LongSerializer.class);
+        implementations.put("long", LongSerializer.class);
+        implementations.put("integer", LongSerializer.class);
+        implementations.put("int", LongSerializer.class);
+        implementations.put("smallint", LongSerializer.class);
+        implementations.put("date", DateTimeSerializer.class);
+        implementations.put("datetime", DateTimeSerializer.class);
+        implementations.put("timestamp", DateTimeSerializer.class);
+    }
+
+    public static void register(String dataTypeName, Class<? extends 
DataTypeSerializer<?>> impl) {
+        implementations.put(dataTypeName, impl);
+    }
+
+    public static DataTypeSerializer<?> create(String dataType) {
+        return create(DataType.getInstance(dataType));
+    }
+
+    public static DataTypeSerializer<?> create(DataType type) {
+        Class<?> clz = implementations.get(type.getName());
+        if (clz == null)
+            throw new RuntimeException("No DataTypeSerializer for type " + 
type);
+
+        try {
+            return (DataTypeSerializer<?>) 
clz.getConstructor(DataType.class).newInstance(type);
+        } catch (Exception e) {
+            throw new RuntimeException(e); // never happen
+        }
+    }
+
+    /** peek into buffer and return the length of serialization */
+    abstract public int peekLength(ByteBuffer in);
+
+    /** return the max number of bytes to the longest serialization */
+    abstract public int maxLength();
+
+    /** get an estimate of size in bytes of the serialized data */
+    abstract public int getStorageBytesEstimate();
+
+    /** an optional convenient method that converts a string to this data type 
(for dimensions) */
+    public T valueOf(String str) {
+        throw new UnsupportedOperationException();
+    }
+
+    /** convert from obj to string */
+    public String toString(T value) {
+        if (value == null)
+            return "NULL";
+        else
+            return value.toString();
+    }
+}

http://git-wip-us.apache.org/repos/asf/kylin/blob/c721d679/metadata/src/main/java/org/apache/kylin/metadata/datatype/DateTimeSerializer.java
----------------------------------------------------------------------
diff --git 
a/metadata/src/main/java/org/apache/kylin/metadata/datatype/DateTimeSerializer.java
 
b/metadata/src/main/java/org/apache/kylin/metadata/datatype/DateTimeSerializer.java
new file mode 100644
index 0000000..f14481a
--- /dev/null
+++ 
b/metadata/src/main/java/org/apache/kylin/metadata/datatype/DateTimeSerializer.java
@@ -0,0 +1,57 @@
+package org.apache.kylin.metadata.datatype;
+
+import java.nio.ByteBuffer;
+
+import org.apache.hadoop.io.LongWritable;
+import org.apache.kylin.common.util.DateFormat;
+
+public class DateTimeSerializer extends DataTypeSerializer<LongWritable> {
+
+    // be thread-safe and avoid repeated obj creation
+    private ThreadLocal<LongWritable> current = new 
ThreadLocal<LongWritable>();
+
+    public DateTimeSerializer(DataType type) {
+    }
+
+    @Override
+    public void serialize(LongWritable value, ByteBuffer out) {
+        out.putLong(value.get());
+    }
+
+    private LongWritable current() {
+        LongWritable l = current.get();
+        if (l == null) {
+            l = new LongWritable();
+            current.set(l);
+        }
+        return l;
+    }
+
+    @Override
+    public LongWritable deserialize(ByteBuffer in) {
+        LongWritable l = current();
+        l.set(in.getLong());
+        return l;
+    }
+
+    @Override
+    public int peekLength(ByteBuffer in) {
+        return 8;
+    }
+
+    @Override
+    public int maxLength() {
+        return 8;
+    }
+
+    @Override
+    public int getStorageBytesEstimate() {
+        return 8;
+    }
+
+    @Override
+    public LongWritable valueOf(String str) {
+        return new LongWritable(DateFormat.stringToMillis(str));
+    }
+
+}

http://git-wip-us.apache.org/repos/asf/kylin/blob/c721d679/metadata/src/main/java/org/apache/kylin/metadata/datatype/DoubleSerializer.java
----------------------------------------------------------------------
diff --git 
a/metadata/src/main/java/org/apache/kylin/metadata/datatype/DoubleSerializer.java
 
b/metadata/src/main/java/org/apache/kylin/metadata/datatype/DoubleSerializer.java
new file mode 100644
index 0000000..04b43cf
--- /dev/null
+++ 
b/metadata/src/main/java/org/apache/kylin/metadata/datatype/DoubleSerializer.java
@@ -0,0 +1,75 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ * 
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ * 
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+*/
+
+package org.apache.kylin.metadata.datatype;
+
+import java.nio.ByteBuffer;
+
+import org.apache.hadoop.io.DoubleWritable;
+
+/**
+ */
+public class DoubleSerializer extends DataTypeSerializer<DoubleWritable> {
+
+    // be thread-safe and avoid repeated obj creation
+    private ThreadLocal<DoubleWritable> current = new 
ThreadLocal<DoubleWritable>();
+
+    public DoubleSerializer(DataType type) {
+    }
+
+    @Override
+    public void serialize(DoubleWritable value, ByteBuffer out) {
+        out.putDouble(value.get());
+    }
+
+    private DoubleWritable current() {
+        DoubleWritable d = current.get();
+        if (d == null) {
+            d = new DoubleWritable();
+            current.set(d);
+        }
+        return d;
+    }
+
+    @Override
+    public DoubleWritable deserialize(ByteBuffer in) {
+        DoubleWritable d = current();
+        d.set(in.getDouble());
+        return d;
+    }
+
+    @Override
+    public int peekLength(ByteBuffer in) {
+        return 8;
+    }
+
+    @Override
+    public int maxLength() {
+        return 8;
+    }
+
+    @Override
+    public int getStorageBytesEstimate() {
+        return 8;
+    }
+
+    @Override
+    public DoubleWritable valueOf(String str) {
+        return new DoubleWritable(Double.parseDouble(str));
+    }
+}

http://git-wip-us.apache.org/repos/asf/kylin/blob/c721d679/metadata/src/main/java/org/apache/kylin/metadata/datatype/LongSerializer.java
----------------------------------------------------------------------
diff --git 
a/metadata/src/main/java/org/apache/kylin/metadata/datatype/LongSerializer.java 
b/metadata/src/main/java/org/apache/kylin/metadata/datatype/LongSerializer.java
new file mode 100644
index 0000000..d893b29
--- /dev/null
+++ 
b/metadata/src/main/java/org/apache/kylin/metadata/datatype/LongSerializer.java
@@ -0,0 +1,82 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ * 
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ * 
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+*/
+
+package org.apache.kylin.metadata.datatype;
+
+import java.nio.ByteBuffer;
+
+import org.apache.hadoop.io.LongWritable;
+import org.apache.kylin.common.util.BytesUtil;
+
+/**
+ */
+public class LongSerializer extends DataTypeSerializer<LongWritable> {
+
+    // be thread-safe and avoid repeated obj creation
+    private ThreadLocal<LongWritable> current = new 
ThreadLocal<LongWritable>();
+
+    public LongSerializer(DataType type) {
+    }
+
+    @Override
+    public void serialize(LongWritable value, ByteBuffer out) {
+        BytesUtil.writeVLong(value.get(), out);
+    }
+
+    private LongWritable current() {
+        LongWritable l = current.get();
+        if (l == null) {
+            l = new LongWritable();
+            current.set(l);
+        }
+        return l;
+    }
+
+    @Override
+    public LongWritable deserialize(ByteBuffer in) {
+        LongWritable l = current();
+        l.set(BytesUtil.readVLong(in));
+        return l;
+    }
+
+    @Override
+    public int peekLength(ByteBuffer in) {
+        int mark = in.position();
+
+        BytesUtil.readVLong(in);
+        int len = in.position() - mark;
+
+        in.position(mark);
+        return len;
+    }
+
+    @Override
+    public int maxLength() {
+        return 9; // vlong: 1 + 8
+    }
+
+    @Override
+    public int getStorageBytesEstimate() {
+        return 5;
+    }
+
+    @Override
+    public LongWritable valueOf(String str) {
+        return new LongWritable(Long.parseLong(str));
+    }
+}

http://git-wip-us.apache.org/repos/asf/kylin/blob/c721d679/metadata/src/main/java/org/apache/kylin/metadata/datatype/StringSerializer.java
----------------------------------------------------------------------
diff --git 
a/metadata/src/main/java/org/apache/kylin/metadata/datatype/StringSerializer.java
 
b/metadata/src/main/java/org/apache/kylin/metadata/datatype/StringSerializer.java
new file mode 100644
index 0000000..eef2868
--- /dev/null
+++ 
b/metadata/src/main/java/org/apache/kylin/metadata/datatype/StringSerializer.java
@@ -0,0 +1,52 @@
+package org.apache.kylin.metadata.datatype;
+
+import java.nio.ByteBuffer;
+
+import org.apache.kylin.common.util.BytesUtil;
+
+public class StringSerializer extends DataTypeSerializer<String> {
+
+    final DataType type;
+    final int maxLength;
+
+    public StringSerializer(DataType type) {
+        this.type = type;
+        // see serialize(): 2 byte length, rest is String.toBytes()
+        this.maxLength = 2 + type.getPrecision();
+    }
+
+    @Override
+    public void serialize(String value, ByteBuffer out) {
+        int start = out.position();
+
+        BytesUtil.writeUTFString(value, out);
+
+        if (out.position() - start > maxLength)
+            throw new IllegalArgumentException("'" + value + "' exceeds the 
expected length for type " + type);
+    }
+
+    @Override
+    public String deserialize(ByteBuffer in) {
+        return BytesUtil.readUTFString(in);
+    }
+
+    @Override
+    public int peekLength(ByteBuffer in) {
+        return BytesUtil.peekByteArrayLength(in);
+    }
+
+    @Override
+    public int maxLength() {
+        return maxLength;
+    }
+
+    @Override
+    public int getStorageBytesEstimate() {
+        return maxLength;
+    }
+
+    @Override
+    public String valueOf(String str) {
+        return str;
+    }
+}

http://git-wip-us.apache.org/repos/asf/kylin/blob/c721d679/metadata/src/main/java/org/apache/kylin/metadata/measure/BigDecimalMaxAggregator.java
----------------------------------------------------------------------
diff --git 
a/metadata/src/main/java/org/apache/kylin/metadata/measure/BigDecimalMaxAggregator.java
 
b/metadata/src/main/java/org/apache/kylin/metadata/measure/BigDecimalMaxAggregator.java
deleted file mode 100644
index 56222f1..0000000
--- 
a/metadata/src/main/java/org/apache/kylin/metadata/measure/BigDecimalMaxAggregator.java
+++ /dev/null
@@ -1,53 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.kylin.metadata.measure;
-
-import java.math.BigDecimal;
-
-/**
- * @author yangli9
- * 
- */
-public class BigDecimalMaxAggregator extends MeasureAggregator<BigDecimal> {
-
-    BigDecimal max = null;
-
-    @Override
-    public void reset() {
-        max = null;
-    }
-
-    @Override
-    public void aggregate(BigDecimal value) {
-        if (max == null)
-            max = value;
-        else if (max.compareTo(value) < 0)
-            max = value;
-    }
-
-    @Override
-    public BigDecimal getState() {
-        return max;
-    }
-
-    @Override
-    public int getMemBytes() {
-        return guessBigDecimalMemBytes();
-    }
-}

Reply via email to