http://git-wip-us.apache.org/repos/asf/kylin/blob/c721d679/metadata/src/main/java/org/apache/kylin/metadata/measure/BigDecimalMinAggregator.java
----------------------------------------------------------------------
diff --git 
a/metadata/src/main/java/org/apache/kylin/metadata/measure/BigDecimalMinAggregator.java
 
b/metadata/src/main/java/org/apache/kylin/metadata/measure/BigDecimalMinAggregator.java
deleted file mode 100644
index d0aff5a..0000000
--- 
a/metadata/src/main/java/org/apache/kylin/metadata/measure/BigDecimalMinAggregator.java
+++ /dev/null
@@ -1,54 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.kylin.metadata.measure;
-
-import java.math.BigDecimal;
-
-/**
- * @author yangli9
- * 
- */
-public class BigDecimalMinAggregator extends MeasureAggregator<BigDecimal> {
-
-    BigDecimal max = null;
-
-    @Override
-    public void reset() {
-        max = null;
-    }
-
-    @Override
-    public void aggregate(BigDecimal value) {
-        if (max == null)
-            max = value;
-        else if (max.compareTo(value) > 0)
-            max = value;
-    }
-
-    @Override
-    public BigDecimal getState() {
-        return max;
-    }
-
-    @Override
-    public int getMemBytes() {
-        return guessBigDecimalMemBytes();
-    }
-
-}

http://git-wip-us.apache.org/repos/asf/kylin/blob/c721d679/metadata/src/main/java/org/apache/kylin/metadata/measure/BigDecimalSerializer.java
----------------------------------------------------------------------
diff --git 
a/metadata/src/main/java/org/apache/kylin/metadata/measure/BigDecimalSerializer.java
 
b/metadata/src/main/java/org/apache/kylin/metadata/measure/BigDecimalSerializer.java
deleted file mode 100644
index 3c858b2..0000000
--- 
a/metadata/src/main/java/org/apache/kylin/metadata/measure/BigDecimalSerializer.java
+++ /dev/null
@@ -1,62 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.kylin.metadata.measure;
-
-import java.math.BigDecimal;
-import java.math.BigInteger;
-import java.nio.ByteBuffer;
-
-import org.apache.kylin.common.util.Bytes;
-import org.apache.kylin.common.util.BytesUtil;
-
-/**
- * @author yangli9
- * 
- */
-public class BigDecimalSerializer extends MeasureSerializer<BigDecimal> {
-
-    @Override
-    public void serialize(BigDecimal value, ByteBuffer out) {
-        byte[] bytes = value.unscaledValue().toByteArray();
-
-        BytesUtil.writeVInt(value.scale(), out);
-        BytesUtil.writeVInt(bytes.length, out);
-        out.put(bytes);
-    }
-
-    @Override
-    public BigDecimal deserialize(ByteBuffer in) {
-        int scale = BytesUtil.readVInt(in);
-        int n = BytesUtil.readVInt(in);
-
-        byte[] bytes = new byte[n];
-        in.get(bytes);
-
-        return new BigDecimal(new BigInteger(bytes), scale);
-    }
-
-    @Override
-    public BigDecimal valueOf(byte[] value) {
-        if (value == null)
-            return new BigDecimal(0);
-        else
-            return new BigDecimal(Bytes.toString(value));
-    }
-
-}

http://git-wip-us.apache.org/repos/asf/kylin/blob/c721d679/metadata/src/main/java/org/apache/kylin/metadata/measure/BigDecimalSumAggregator.java
----------------------------------------------------------------------
diff --git 
a/metadata/src/main/java/org/apache/kylin/metadata/measure/BigDecimalSumAggregator.java
 
b/metadata/src/main/java/org/apache/kylin/metadata/measure/BigDecimalSumAggregator.java
deleted file mode 100644
index ca9d9d9..0000000
--- 
a/metadata/src/main/java/org/apache/kylin/metadata/measure/BigDecimalSumAggregator.java
+++ /dev/null
@@ -1,50 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.kylin.metadata.measure;
-
-import java.math.BigDecimal;
-
-/**
- * @author yangli9
- * 
- */
-public class BigDecimalSumAggregator extends MeasureAggregator<BigDecimal> {
-
-    BigDecimal sum = new BigDecimal(0);
-
-    @Override
-    public void reset() {
-        sum = new BigDecimal(0);
-    }
-
-    @Override
-    public void aggregate(BigDecimal value) {
-        sum = sum.add(value);
-    }
-
-    @Override
-    public BigDecimal getState() {
-        return sum;
-    }
-
-    @Override
-    public int getMemBytes() {
-        return guessBigDecimalMemBytes();
-    }
-}

http://git-wip-us.apache.org/repos/asf/kylin/blob/c721d679/metadata/src/main/java/org/apache/kylin/metadata/measure/DoubleMaxAggregator.java
----------------------------------------------------------------------
diff --git 
a/metadata/src/main/java/org/apache/kylin/metadata/measure/DoubleMaxAggregator.java
 
b/metadata/src/main/java/org/apache/kylin/metadata/measure/DoubleMaxAggregator.java
deleted file mode 100644
index b73bf57..0000000
--- 
a/metadata/src/main/java/org/apache/kylin/metadata/measure/DoubleMaxAggregator.java
+++ /dev/null
@@ -1,54 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.kylin.metadata.measure;
-
-import org.apache.hadoop.io.DoubleWritable;
-
-/**
- * @author yangli9
- * 
- */
-public class DoubleMaxAggregator extends MeasureAggregator<DoubleWritable> {
-
-    DoubleWritable max = null;
-
-    @Override
-    public void reset() {
-        max = null;
-    }
-
-    @Override
-    public void aggregate(DoubleWritable value) {
-        if (max == null)
-            max = new DoubleWritable(value.get());
-        else if (max.get() < value.get())
-            max.set(value.get());
-    }
-
-    @Override
-    public DoubleWritable getState() {
-        return max;
-    }
-
-    @Override
-    public int getMemBytes() {
-        return guessDoubleMemBytes();
-    }
-
-}

http://git-wip-us.apache.org/repos/asf/kylin/blob/c721d679/metadata/src/main/java/org/apache/kylin/metadata/measure/DoubleMinAggregator.java
----------------------------------------------------------------------
diff --git 
a/metadata/src/main/java/org/apache/kylin/metadata/measure/DoubleMinAggregator.java
 
b/metadata/src/main/java/org/apache/kylin/metadata/measure/DoubleMinAggregator.java
deleted file mode 100644
index b7b5005..0000000
--- 
a/metadata/src/main/java/org/apache/kylin/metadata/measure/DoubleMinAggregator.java
+++ /dev/null
@@ -1,54 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.kylin.metadata.measure;
-
-import org.apache.hadoop.io.DoubleWritable;
-
-/**
- * @author yangli9
- * 
- */
-public class DoubleMinAggregator extends MeasureAggregator<DoubleWritable> {
-
-    DoubleWritable min = null;
-
-    @Override
-    public void reset() {
-        min = null;
-    }
-
-    @Override
-    public void aggregate(DoubleWritable value) {
-        if (min == null)
-            min = new DoubleWritable(value.get());
-        else if (min.get() > value.get())
-            min.set(value.get());
-    }
-
-    @Override
-    public DoubleWritable getState() {
-        return min;
-    }
-
-    @Override
-    public int getMemBytes() {
-        return guessDoubleMemBytes();
-    }
-
-}

http://git-wip-us.apache.org/repos/asf/kylin/blob/c721d679/metadata/src/main/java/org/apache/kylin/metadata/measure/DoubleSerializer.java
----------------------------------------------------------------------
diff --git 
a/metadata/src/main/java/org/apache/kylin/metadata/measure/DoubleSerializer.java
 
b/metadata/src/main/java/org/apache/kylin/metadata/measure/DoubleSerializer.java
deleted file mode 100644
index 9b2a076..0000000
--- 
a/metadata/src/main/java/org/apache/kylin/metadata/measure/DoubleSerializer.java
+++ /dev/null
@@ -1,55 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.kylin.metadata.measure;
-
-import java.nio.ByteBuffer;
-
-import org.apache.hadoop.io.DoubleWritable;
-import org.apache.kylin.common.util.Bytes;
-
-/**
- * @author yangli9
- * 
- */
-public class DoubleSerializer extends MeasureSerializer<DoubleWritable> {
-
-    // avoid mass object creation
-    DoubleWritable current = new DoubleWritable();
-
-    @Override
-    public void serialize(DoubleWritable value, ByteBuffer out) {
-        out.putDouble(value.get());
-    }
-
-    @Override
-    public DoubleWritable deserialize(ByteBuffer in) {
-        current.set(in.getDouble());
-        return current;
-    }
-
-    @Override
-    public DoubleWritable valueOf(byte[] value) {
-        if (value == null)
-            current.set(0d);
-        else
-            current.set(Double.parseDouble(Bytes.toString(value)));
-        return current;
-    }
-
-}

http://git-wip-us.apache.org/repos/asf/kylin/blob/c721d679/metadata/src/main/java/org/apache/kylin/metadata/measure/DoubleSumAggregator.java
----------------------------------------------------------------------
diff --git 
a/metadata/src/main/java/org/apache/kylin/metadata/measure/DoubleSumAggregator.java
 
b/metadata/src/main/java/org/apache/kylin/metadata/measure/DoubleSumAggregator.java
deleted file mode 100644
index 3e3623d..0000000
--- 
a/metadata/src/main/java/org/apache/kylin/metadata/measure/DoubleSumAggregator.java
+++ /dev/null
@@ -1,51 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.kylin.metadata.measure;
-
-import org.apache.hadoop.io.DoubleWritable;
-
-/**
- * @author yangli9
- * 
- */
-public class DoubleSumAggregator extends MeasureAggregator<DoubleWritable> {
-
-    DoubleWritable sum = new DoubleWritable();
-
-    @Override
-    public void reset() {
-        sum.set(0.0);
-    }
-
-    @Override
-    public void aggregate(DoubleWritable value) {
-        sum.set(sum.get() + value.get());
-    }
-
-    @Override
-    public DoubleWritable getState() {
-        return sum;
-    }
-
-    @Override
-    public int getMemBytes() {
-        return guessDoubleMemBytes();
-    }
-
-}

http://git-wip-us.apache.org/repos/asf/kylin/blob/c721d679/metadata/src/main/java/org/apache/kylin/metadata/measure/HLLCAggregator.java
----------------------------------------------------------------------
diff --git 
a/metadata/src/main/java/org/apache/kylin/metadata/measure/HLLCAggregator.java 
b/metadata/src/main/java/org/apache/kylin/metadata/measure/HLLCAggregator.java
deleted file mode 100644
index 1ed00f3..0000000
--- 
a/metadata/src/main/java/org/apache/kylin/metadata/measure/HLLCAggregator.java
+++ /dev/null
@@ -1,57 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.kylin.metadata.measure;
-
-import org.apache.kylin.common.hll.HyperLogLogPlusCounter;
-
-/**
- * @author yangli9
- * 
- */
-public class HLLCAggregator extends MeasureAggregator<HyperLogLogPlusCounter> {
-
-    HyperLogLogPlusCounter sum = null;
-
-    @Override
-    public void reset() {
-        sum = null;
-    }
-
-    @Override
-    public void aggregate(HyperLogLogPlusCounter value) {
-        if (sum == null)
-            sum = new HyperLogLogPlusCounter(value);
-        else
-            sum.merge(value);
-    }
-
-    @Override
-    public HyperLogLogPlusCounter getState() {
-        return sum;
-    }
-
-    @Override
-    public int getMemBytes() {
-        if (sum == null)
-            return Integer.MIN_VALUE;
-        else
-            return 4 + sum.getMemBytes();
-    }
-
-}

http://git-wip-us.apache.org/repos/asf/kylin/blob/c721d679/metadata/src/main/java/org/apache/kylin/metadata/measure/HLLCSerializer.java
----------------------------------------------------------------------
diff --git 
a/metadata/src/main/java/org/apache/kylin/metadata/measure/HLLCSerializer.java 
b/metadata/src/main/java/org/apache/kylin/metadata/measure/HLLCSerializer.java
deleted file mode 100644
index 10b687e..0000000
--- 
a/metadata/src/main/java/org/apache/kylin/metadata/measure/HLLCSerializer.java
+++ /dev/null
@@ -1,67 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.kylin.metadata.measure;
-
-import java.io.IOException;
-import java.nio.ByteBuffer;
-
-import org.apache.kylin.common.hll.HyperLogLogPlusCounter;
-
-/**
- * @author yangli9
- * 
- */
-public class HLLCSerializer extends MeasureSerializer<HyperLogLogPlusCounter> {
-
-    HyperLogLogPlusCounter current;
-
-    public HLLCSerializer(int p) {
-        current = new HyperLogLogPlusCounter(p);
-    }
-
-    @Override
-    public void serialize(HyperLogLogPlusCounter value, ByteBuffer out) {
-        try {
-            value.writeRegisters(out);
-        } catch (IOException e) {
-            throw new RuntimeException(e);
-        }
-    }
-
-    @Override
-    public HyperLogLogPlusCounter deserialize(ByteBuffer in) {
-        try {
-            current.readRegisters(in);
-        } catch (IOException e) {
-            throw new RuntimeException(e);
-        }
-        return current;
-    }
-
-    @Override
-    public HyperLogLogPlusCounter valueOf(byte[] value) {
-        current.clear();
-        if (value == null)
-            current.add("__nUlL__");
-        else
-            current.add(value);
-        return current;
-    }
-
-}

http://git-wip-us.apache.org/repos/asf/kylin/blob/c721d679/metadata/src/main/java/org/apache/kylin/metadata/measure/LDCAggregator.java
----------------------------------------------------------------------
diff --git 
a/metadata/src/main/java/org/apache/kylin/metadata/measure/LDCAggregator.java 
b/metadata/src/main/java/org/apache/kylin/metadata/measure/LDCAggregator.java
deleted file mode 100644
index bddcaf4..0000000
--- 
a/metadata/src/main/java/org/apache/kylin/metadata/measure/LDCAggregator.java
+++ /dev/null
@@ -1,64 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.kylin.metadata.measure;
-
-import org.apache.hadoop.io.LongWritable;
-
-/**
- * Long Distinct Count
- * 
- * @author xjiang
- * 
- */
-public class LDCAggregator extends MeasureAggregator<LongWritable> {
-
-    private static LongWritable ZERO = new LongWritable(0);
-
-    private HLLCAggregator hllAgg = null;
-    private LongWritable state = new LongWritable(0);
-
-    @SuppressWarnings("rawtypes")
-    public void setDependentAggregator(MeasureAggregator agg) {
-        this.hllAgg = (HLLCAggregator) agg;
-    }
-
-    @Override
-    public void reset() {
-    }
-
-    @Override
-    public void aggregate(LongWritable value) {
-    }
-
-    @Override
-    public LongWritable getState() {
-        if (hllAgg == null) {
-            return ZERO;
-        } else {
-            state.set(hllAgg.getState().getCountEstimate());
-            return state;
-        }
-    }
-
-    @Override
-    public int getMemBytes() {
-        return guessLongMemBytes();
-    }
-
-}

http://git-wip-us.apache.org/repos/asf/kylin/blob/c721d679/metadata/src/main/java/org/apache/kylin/metadata/measure/LongMaxAggregator.java
----------------------------------------------------------------------
diff --git 
a/metadata/src/main/java/org/apache/kylin/metadata/measure/LongMaxAggregator.java
 
b/metadata/src/main/java/org/apache/kylin/metadata/measure/LongMaxAggregator.java
deleted file mode 100644
index 2397f45..0000000
--- 
a/metadata/src/main/java/org/apache/kylin/metadata/measure/LongMaxAggregator.java
+++ /dev/null
@@ -1,54 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.kylin.metadata.measure;
-
-import org.apache.hadoop.io.LongWritable;
-
-/**
- * @author yangli9
- * 
- */
-public class LongMaxAggregator extends MeasureAggregator<LongWritable> {
-
-    LongWritable max = null;
-
-    @Override
-    public void reset() {
-        max = null;
-    }
-
-    @Override
-    public void aggregate(LongWritable value) {
-        if (max == null)
-            max = new LongWritable(value.get());
-        else if (max.get() < value.get())
-            max.set(value.get());
-    }
-
-    @Override
-    public LongWritable getState() {
-        return max;
-    }
-
-    @Override
-    public int getMemBytes() {
-        return guessLongMemBytes();
-    }
-
-}

http://git-wip-us.apache.org/repos/asf/kylin/blob/c721d679/metadata/src/main/java/org/apache/kylin/metadata/measure/LongMinAggregator.java
----------------------------------------------------------------------
diff --git 
a/metadata/src/main/java/org/apache/kylin/metadata/measure/LongMinAggregator.java
 
b/metadata/src/main/java/org/apache/kylin/metadata/measure/LongMinAggregator.java
deleted file mode 100644
index 8f6b66d..0000000
--- 
a/metadata/src/main/java/org/apache/kylin/metadata/measure/LongMinAggregator.java
+++ /dev/null
@@ -1,54 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.kylin.metadata.measure;
-
-import org.apache.hadoop.io.LongWritable;
-
-/**
- * @author yangli9
- * 
- */
-public class LongMinAggregator extends MeasureAggregator<LongWritable> {
-
-    LongWritable min = null;
-
-    @Override
-    public void reset() {
-        min = null;
-    }
-
-    @Override
-    public void aggregate(LongWritable value) {
-        if (min == null)
-            min = new LongWritable(value.get());
-        else if (min.get() > value.get())
-            min.set(value.get());
-    }
-
-    @Override
-    public LongWritable getState() {
-        return min;
-    }
-
-    @Override
-    public int getMemBytes() {
-        return guessLongMemBytes();
-    }
-
-}

http://git-wip-us.apache.org/repos/asf/kylin/blob/c721d679/metadata/src/main/java/org/apache/kylin/metadata/measure/LongSerializer.java
----------------------------------------------------------------------
diff --git 
a/metadata/src/main/java/org/apache/kylin/metadata/measure/LongSerializer.java 
b/metadata/src/main/java/org/apache/kylin/metadata/measure/LongSerializer.java
deleted file mode 100644
index 5e73de2..0000000
--- 
a/metadata/src/main/java/org/apache/kylin/metadata/measure/LongSerializer.java
+++ /dev/null
@@ -1,56 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.kylin.metadata.measure;
-
-import java.nio.ByteBuffer;
-
-import org.apache.hadoop.io.LongWritable;
-import org.apache.kylin.common.util.Bytes;
-import org.apache.kylin.common.util.BytesUtil;
-
-/**
- * @author yangli9
- * 
- */
-public class LongSerializer extends MeasureSerializer<LongWritable> {
-
-    // avoid mass object creation
-    LongWritable current = new LongWritable();
-
-    @Override
-    public void serialize(LongWritable value, ByteBuffer out) {
-        BytesUtil.writeVLong(value.get(), out);
-    }
-
-    @Override
-    public LongWritable deserialize(ByteBuffer in) {
-        current.set(BytesUtil.readVLong(in));
-        return current;
-    }
-
-    @Override
-    public LongWritable valueOf(byte[] value) {
-        if (value == null)
-            current.set(0L);
-        else
-            current.set(Long.parseLong(Bytes.toString(value)));
-        return current;
-    }
-
-}

http://git-wip-us.apache.org/repos/asf/kylin/blob/c721d679/metadata/src/main/java/org/apache/kylin/metadata/measure/LongSumAggregator.java
----------------------------------------------------------------------
diff --git 
a/metadata/src/main/java/org/apache/kylin/metadata/measure/LongSumAggregator.java
 
b/metadata/src/main/java/org/apache/kylin/metadata/measure/LongSumAggregator.java
deleted file mode 100644
index 865f895..0000000
--- 
a/metadata/src/main/java/org/apache/kylin/metadata/measure/LongSumAggregator.java
+++ /dev/null
@@ -1,51 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.kylin.metadata.measure;
-
-import org.apache.hadoop.io.LongWritable;
-
-/**
- * @author yangli9
- * 
- */
-public class LongSumAggregator extends MeasureAggregator<LongWritable> {
-
-    LongWritable sum = new LongWritable();
-
-    @Override
-    public void reset() {
-        sum.set(0);
-    }
-
-    @Override
-    public void aggregate(LongWritable value) {
-        sum.set(sum.get() + value.get());
-    }
-
-    @Override
-    public LongWritable getState() {
-        return sum;
-    }
-
-    @Override
-    public int getMemBytes() {
-        return guessLongMemBytes();
-    }
-
-}

http://git-wip-us.apache.org/repos/asf/kylin/blob/c721d679/metadata/src/main/java/org/apache/kylin/metadata/measure/MeasureAggregator.java
----------------------------------------------------------------------
diff --git 
a/metadata/src/main/java/org/apache/kylin/metadata/measure/MeasureAggregator.java
 
b/metadata/src/main/java/org/apache/kylin/metadata/measure/MeasureAggregator.java
deleted file mode 100644
index 6abf4af..0000000
--- 
a/metadata/src/main/java/org/apache/kylin/metadata/measure/MeasureAggregator.java
+++ /dev/null
@@ -1,102 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.kylin.metadata.measure;
-
-import org.apache.kylin.metadata.model.DataType;
-import org.apache.kylin.metadata.model.FunctionDesc;
-
-/**
- * @author yangli9
- * 
- */
-abstract public class MeasureAggregator<V> {
-
-    public static MeasureAggregator<?> create(String funcName, String 
returnType) {
-        if (FunctionDesc.FUNC_SUM.equalsIgnoreCase(funcName) || 
FunctionDesc.FUNC_COUNT.equalsIgnoreCase(funcName)) {
-            if (isInteger(returnType))
-                return new LongSumAggregator();
-            else if (isBigDecimal(returnType))
-                return new BigDecimalSumAggregator();
-            else if (isDouble(returnType))
-                return new DoubleSumAggregator();
-        } else if 
(FunctionDesc.FUNC_COUNT_DISTINCT.equalsIgnoreCase(funcName)) {
-            if (DataType.getInstance(returnType).isHLLC())
-                return new HLLCAggregator();
-            else
-                return new LDCAggregator();
-        } else if (FunctionDesc.FUNC_MAX.equalsIgnoreCase(funcName)) {
-            if (isInteger(returnType))
-                return new LongMaxAggregator();
-            else if (isBigDecimal(returnType))
-                return new BigDecimalMaxAggregator();
-            else if (isDouble(returnType))
-                return new DoubleMaxAggregator();
-        } else if (FunctionDesc.FUNC_MIN.equalsIgnoreCase(funcName)) {
-            if (isInteger(returnType))
-                return new LongMinAggregator();
-            else if (isBigDecimal(returnType))
-                return new BigDecimalMinAggregator();
-            else if (isDouble(returnType))
-                return new DoubleMinAggregator();
-        }
-        throw new IllegalArgumentException("No aggregator for func '" + 
funcName + "' and return type '" + returnType + "'");
-    }
-
-    public static boolean isBigDecimal(String type) {
-        return type.startsWith("decimal");
-    }
-
-    public static boolean isDouble(String type) {
-        return "double".equalsIgnoreCase(type) || 
"float".equalsIgnoreCase(type) || "real".equalsIgnoreCase(type);
-    }
-
-    public static boolean isInteger(String type) {
-        return "long".equalsIgnoreCase(type) || 
"bigint".equalsIgnoreCase(type) || "int".equalsIgnoreCase(type) || 
"integer".equalsIgnoreCase(type);
-    }
-
-    public static int guessBigDecimalMemBytes() {
-        return 4 // ref
-        + 20; // guess of BigDecimal
-    }
-
-    public static int guessDoubleMemBytes() {
-        return 4 // ref
-        + 8;
-    }
-
-    public static int guessLongMemBytes() {
-        return 4 // ref
-        + 8;
-    }
-
-    // 
============================================================================
-
-    @SuppressWarnings("rawtypes")
-    public void setDependentAggregator(MeasureAggregator agg) {
-    }
-
-    abstract public void reset();
-
-    abstract public void aggregate(V value);
-
-    abstract public V getState();
-
-    // get an estimate of memory consumption
-    abstract public int getMemBytes();
-}

http://git-wip-us.apache.org/repos/asf/kylin/blob/c721d679/metadata/src/main/java/org/apache/kylin/metadata/measure/MeasureAggregators.java
----------------------------------------------------------------------
diff --git 
a/metadata/src/main/java/org/apache/kylin/metadata/measure/MeasureAggregators.java
 
b/metadata/src/main/java/org/apache/kylin/metadata/measure/MeasureAggregators.java
deleted file mode 100644
index b8225f1..0000000
--- 
a/metadata/src/main/java/org/apache/kylin/metadata/measure/MeasureAggregators.java
+++ /dev/null
@@ -1,82 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.kylin.metadata.measure;
-
-import java.util.Collection;
-import java.util.HashMap;
-import java.util.Map;
-
-import org.apache.kylin.metadata.model.FunctionDesc;
-import org.apache.kylin.metadata.model.MeasureDesc;
-
-/**
- * @author yangli9
- * 
- */
-@SuppressWarnings({ "rawtypes", "unchecked" })
-public class MeasureAggregators {
-
-    private MeasureDesc[] descs;
-    private MeasureAggregator[] aggs;
-
-    public MeasureAggregators(Collection<MeasureDesc> measureDescs) {
-        this((MeasureDesc[]) measureDescs.toArray(new 
MeasureDesc[measureDescs.size()]));
-    }
-
-    public MeasureAggregators(MeasureDesc... measureDescs) {
-        descs = measureDescs;
-        aggs = new MeasureAggregator[descs.length];
-
-        Map<String, Integer> measureIndexMap = new HashMap<String, Integer>();
-        for (int i = 0; i < descs.length; i++) {
-            FunctionDesc func = descs[i].getFunction();
-            aggs[i] = MeasureAggregator.create(func.getExpression(), 
func.getReturnType());
-            measureIndexMap.put(descs[i].getName(), i);
-        }
-        // fill back dependent aggregator
-        for (int i = 0; i < descs.length; i++) {
-            String depMsrRef = descs[i].getDependentMeasureRef();
-            if (depMsrRef != null) {
-                int index = measureIndexMap.get(depMsrRef);
-                aggs[i].setDependentAggregator(aggs[index]);
-            }
-        }
-    }
-
-    public void reset() {
-        for (int i = 0; i < aggs.length; i++) {
-            aggs[i].reset();
-        }
-    }
-
-    public void aggregate(Object[] values) {
-        assert values.length == descs.length;
-
-        for (int i = 0; i < descs.length; i++) {
-            aggs[i].aggregate(values[i]);
-        }
-    }
-
-    public void collectStates(Object[] states) {
-        for (int i = 0; i < descs.length; i++) {
-            states[i] = aggs[i].getState();
-        }
-    }
-
-}

http://git-wip-us.apache.org/repos/asf/kylin/blob/c721d679/metadata/src/main/java/org/apache/kylin/metadata/measure/MeasureCodec.java
----------------------------------------------------------------------
diff --git 
a/metadata/src/main/java/org/apache/kylin/metadata/measure/MeasureCodec.java 
b/metadata/src/main/java/org/apache/kylin/metadata/measure/MeasureCodec.java
deleted file mode 100644
index 70be77c..0000000
--- a/metadata/src/main/java/org/apache/kylin/metadata/measure/MeasureCodec.java
+++ /dev/null
@@ -1,83 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.kylin.metadata.measure;
-
-import java.nio.ByteBuffer;
-import java.util.Collection;
-
-import org.apache.hadoop.io.Text;
-import org.apache.kylin.metadata.model.MeasureDesc;
-
-/**
- * @author yangli9
- * 
- */
-@SuppressWarnings({ "rawtypes", "unchecked" })
-public class MeasureCodec {
-
-    int nMeasures;
-    MeasureSerializer[] serializers;
-
-    public MeasureCodec(Collection<MeasureDesc> measureDescs) {
-        this((MeasureDesc[]) measureDescs.toArray(new 
MeasureDesc[measureDescs.size()]));
-    }
-
-    public MeasureCodec(MeasureDesc... measureDescs) {
-        String[] dataTypes = new String[measureDescs.length];
-        for (int i = 0; i < dataTypes.length; i++) {
-            dataTypes[i] = measureDescs[i].getFunction().getReturnType();
-        }
-        init(dataTypes);
-    }
-
-    public MeasureCodec(String... dataTypes) {
-        init(dataTypes);
-    }
-
-    private void init(String[] dataTypes) {
-        nMeasures = dataTypes.length;
-        serializers = new MeasureSerializer[nMeasures];
-
-        for (int i = 0; i < nMeasures; i++) {
-            serializers[i] = MeasureSerializer.create(dataTypes[i]);
-        }
-    }
-
-    public MeasureSerializer getSerializer(int idx) {
-        return serializers[idx];
-    }
-
-    public void decode(Text bytes, Object[] result) {
-        decode(ByteBuffer.wrap(bytes.getBytes(), 0, bytes.getLength()), 
result);
-    }
-
-    public void decode(ByteBuffer buf, Object[] result) {
-        assert result.length == nMeasures;
-        for (int i = 0; i < nMeasures; i++) {
-            result[i] = serializers[i].deserialize(buf);
-        }
-    }
-
-    public void encode(Object[] values, ByteBuffer out) {
-        assert values.length == nMeasures;
-        for (int i = 0; i < nMeasures; i++) {
-            serializers[i].serialize(values[i], out);
-        }
-    }
-}

http://git-wip-us.apache.org/repos/asf/kylin/blob/c721d679/metadata/src/main/java/org/apache/kylin/metadata/measure/MeasureSerializer.java
----------------------------------------------------------------------
diff --git 
a/metadata/src/main/java/org/apache/kylin/metadata/measure/MeasureSerializer.java
 
b/metadata/src/main/java/org/apache/kylin/metadata/measure/MeasureSerializer.java
deleted file mode 100644
index 2f2e39c..0000000
--- 
a/metadata/src/main/java/org/apache/kylin/metadata/measure/MeasureSerializer.java
+++ /dev/null
@@ -1,68 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.kylin.metadata.measure;
-
-import java.util.HashMap;
-
-import org.apache.kylin.common.util.BytesSerializer;
-import org.apache.kylin.metadata.model.DataType;
-
-/**
- * @author yangli9
- * 
- */
-abstract public class MeasureSerializer<T> implements BytesSerializer<T> {
-
-    final static HashMap<String, Class<?>> implementations = new 
HashMap<String, Class<?>>();
-    static {
-        implementations.put("decimal", BigDecimalSerializer.class);
-        implementations.put("double", DoubleSerializer.class);
-        implementations.put("float", DoubleSerializer.class);
-        implementations.put("bigint", LongSerializer.class);
-        implementations.put("long", LongSerializer.class);
-        implementations.put("integer", LongSerializer.class);
-        implementations.put("int", LongSerializer.class);
-    }
-
-    public static MeasureSerializer<?> create(String dataType) {
-        DataType type = DataType.getInstance(dataType);
-        if (type.isHLLC()) {
-            return new HLLCSerializer(type.getPrecision());
-        }
-
-        Class<?> clz = implementations.get(type.getName());
-        if (clz == null)
-            throw new RuntimeException("No MeasureSerializer for type " + 
dataType);
-
-        try {
-            return (MeasureSerializer<?>) clz.newInstance();
-        } catch (Exception e) {
-            throw new RuntimeException(e); // never happen
-        }
-    }
-
-    abstract public T valueOf(byte[] value);
-
-    public String toString(T value) {
-        if (value == null)
-            return "NULL";
-        else
-            return value.toString();
-    }
-}

http://git-wip-us.apache.org/repos/asf/kylin/blob/c721d679/metadata/src/main/java/org/apache/kylin/metadata/measure/fixedlen/FixedHLLCodec.java
----------------------------------------------------------------------
diff --git 
a/metadata/src/main/java/org/apache/kylin/metadata/measure/fixedlen/FixedHLLCodec.java
 
b/metadata/src/main/java/org/apache/kylin/metadata/measure/fixedlen/FixedHLLCodec.java
index 3f94373..92580fc 100644
--- 
a/metadata/src/main/java/org/apache/kylin/metadata/measure/fixedlen/FixedHLLCodec.java
+++ 
b/metadata/src/main/java/org/apache/kylin/metadata/measure/fixedlen/FixedHLLCodec.java
@@ -20,7 +20,7 @@ package org.apache.kylin.metadata.measure.fixedlen;
 import java.nio.ByteBuffer;
 
 import org.apache.kylin.common.hll.HyperLogLogPlusCounter;
-import org.apache.kylin.metadata.model.DataType;
+import org.apache.kylin.metadata.datatype.DataType;
 
 /**
  * Created by Hongbin Ma(Binmahone) on 2/10/15.

http://git-wip-us.apache.org/repos/asf/kylin/blob/c721d679/metadata/src/main/java/org/apache/kylin/metadata/measure/fixedlen/FixedLenMeasureCodec.java
----------------------------------------------------------------------
diff --git 
a/metadata/src/main/java/org/apache/kylin/metadata/measure/fixedlen/FixedLenMeasureCodec.java
 
b/metadata/src/main/java/org/apache/kylin/metadata/measure/fixedlen/FixedLenMeasureCodec.java
index 76aea34..6fc2b8d 100644
--- 
a/metadata/src/main/java/org/apache/kylin/metadata/measure/fixedlen/FixedLenMeasureCodec.java
+++ 
b/metadata/src/main/java/org/apache/kylin/metadata/measure/fixedlen/FixedLenMeasureCodec.java
@@ -18,12 +18,13 @@
 
 package org.apache.kylin.metadata.measure.fixedlen;
 
-import org.apache.kylin.metadata.model.DataType;
+import org.apache.kylin.measure.hllc.HLLCMeasureType;
+import org.apache.kylin.metadata.datatype.DataType;
 
 abstract public class FixedLenMeasureCodec<T> {
 
     public static FixedLenMeasureCodec<?> get(DataType type) {
-        if (type.isHLLC()) {
+        if (HLLCMeasureType.DATATYPE_HLLC.equals(type.getName())) {
             return new FixedHLLCodec(type);
         } else {
             return new FixedPointLongCodec(type);

http://git-wip-us.apache.org/repos/asf/kylin/blob/c721d679/metadata/src/main/java/org/apache/kylin/metadata/measure/fixedlen/FixedPointLongCodec.java
----------------------------------------------------------------------
diff --git 
a/metadata/src/main/java/org/apache/kylin/metadata/measure/fixedlen/FixedPointLongCodec.java
 
b/metadata/src/main/java/org/apache/kylin/metadata/measure/fixedlen/FixedPointLongCodec.java
index dca027c..94bc354 100644
--- 
a/metadata/src/main/java/org/apache/kylin/metadata/measure/fixedlen/FixedPointLongCodec.java
+++ 
b/metadata/src/main/java/org/apache/kylin/metadata/measure/fixedlen/FixedPointLongCodec.java
@@ -22,7 +22,7 @@ import java.math.BigDecimal;
 
 import org.apache.hadoop.io.LongWritable;
 import org.apache.kylin.common.util.BytesUtil;
-import org.apache.kylin.metadata.model.DataType;
+import org.apache.kylin.metadata.datatype.DataType;
 
 public class FixedPointLongCodec extends FixedLenMeasureCodec<LongWritable> {
 

http://git-wip-us.apache.org/repos/asf/kylin/blob/c721d679/metadata/src/main/java/org/apache/kylin/metadata/model/ColumnDesc.java
----------------------------------------------------------------------
diff --git 
a/metadata/src/main/java/org/apache/kylin/metadata/model/ColumnDesc.java 
b/metadata/src/main/java/org/apache/kylin/metadata/model/ColumnDesc.java
index ff7b198..51b13cf 100644
--- a/metadata/src/main/java/org/apache/kylin/metadata/model/ColumnDesc.java
+++ b/metadata/src/main/java/org/apache/kylin/metadata/model/ColumnDesc.java
@@ -23,6 +23,7 @@ import org.apache.commons.lang.StringUtils;
 import com.fasterxml.jackson.annotation.JsonAutoDetect;
 import com.fasterxml.jackson.annotation.JsonAutoDetect.Visibility;
 import com.fasterxml.jackson.annotation.JsonProperty;
+import org.apache.kylin.metadata.datatype.DataType;
 
 /**
  * Column Metadata from Source. All name should be uppercase.

http://git-wip-us.apache.org/repos/asf/kylin/blob/c721d679/metadata/src/main/java/org/apache/kylin/metadata/model/DataType.java
----------------------------------------------------------------------
diff --git 
a/metadata/src/main/java/org/apache/kylin/metadata/model/DataType.java 
b/metadata/src/main/java/org/apache/kylin/metadata/model/DataType.java
deleted file mode 100644
index d813d6f..0000000
--- a/metadata/src/main/java/org/apache/kylin/metadata/model/DataType.java
+++ /dev/null
@@ -1,290 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.kylin.metadata.model;
-
-import java.util.HashMap;
-import java.util.HashSet;
-import java.util.Map;
-import java.util.Set;
-import java.util.concurrent.ConcurrentHashMap;
-import java.util.concurrent.ConcurrentMap;
-import java.util.regex.Matcher;
-import java.util.regex.Pattern;
-
-/**
- * @author yangli9
- * 
- */
-public class DataType {
-
-    public static final String VALID_TYPES_STRING = 
"any|char|varchar|boolean|binary" //
-            + 
"|integer|tinyint|smallint|bigint|decimal|numeric|float|real|double" //
-            + "|date|time|datetime|timestamp|byte|int|short|long|string|hllc" 
//
-            + "|" + TblColRef.InnerDataTypeEnum.LITERAL.getDataType() //
-            + "|" + TblColRef.InnerDataTypeEnum.DERIVED.getDataType();
-
-    private static final Pattern TYPE_PATTERN = Pattern.compile(
-    // standard sql types, ref:
-    // http://www.w3schools.com/sql/sql_datatypes_general.asp
-            "(" + VALID_TYPES_STRING + ")" + "\\s*" //
-                    + "(?:" + "[(]" + "([\\d\\s,]+)" + "[)]" + ")?", 
Pattern.CASE_INSENSITIVE);
-
-    public static final Set<String> INTEGER_FAMILY = new HashSet<String>();
-    public static final Set<String> NUMBER_FAMILY = new HashSet<String>();
-    public static final Set<String> DATETIME_FAMILY = new HashSet<String>();
-    public static final Set<String> STRING_FAMILY = new HashSet<String>();
-    private static final Set<Integer> HLLC_PRECISIONS = new HashSet<Integer>();
-    private static final Map<String, String> LEGACY_TYPE_MAP = new 
HashMap<String, String>();
-    static {
-        INTEGER_FAMILY.add("tinyint");
-        INTEGER_FAMILY.add("smallint");
-        INTEGER_FAMILY.add("integer");
-        INTEGER_FAMILY.add("bigint");
-
-        NUMBER_FAMILY.addAll(INTEGER_FAMILY);
-        NUMBER_FAMILY.add("float");
-        NUMBER_FAMILY.add("double");
-        NUMBER_FAMILY.add("decimal");
-        NUMBER_FAMILY.add("real");
-        NUMBER_FAMILY.add("numeric");
-
-        DATETIME_FAMILY.add("date");
-        DATETIME_FAMILY.add("time");
-        DATETIME_FAMILY.add("datetime");
-        DATETIME_FAMILY.add("timestamp");
-
-        STRING_FAMILY.add("varchar");
-        STRING_FAMILY.add("char");
-
-        LEGACY_TYPE_MAP.put("byte", "tinyint");
-        LEGACY_TYPE_MAP.put("int", "integer");
-        LEGACY_TYPE_MAP.put("short", "smallint");
-        LEGACY_TYPE_MAP.put("long", "bigint");
-        LEGACY_TYPE_MAP.put("string", "varchar");
-        LEGACY_TYPE_MAP.put("hllc10", "hllc(10)");
-        LEGACY_TYPE_MAP.put("hllc12", "hllc(12)");
-        LEGACY_TYPE_MAP.put("hllc14", "hllc(14)");
-        LEGACY_TYPE_MAP.put("hllc15", "hllc(15)");
-        LEGACY_TYPE_MAP.put("hllc16", "hllc(16)");
-
-        for (int i = 10; i <= 16; i++)
-            HLLC_PRECISIONS.add(i);
-    }
-
-    private static final ConcurrentMap<DataType, DataType> CACHE = new 
ConcurrentHashMap<DataType, DataType>();
-
-    public static final DataType ANY = DataType.getInstance("any");
-
-    public static DataType getInstance(String type) {
-        if (type == null)
-            return null;
-
-        DataType dataType = new DataType(type);
-        DataType cached = CACHE.get(dataType);
-        if (cached == null) {
-            CACHE.put(dataType, dataType);
-            cached = dataType;
-        }
-        return cached;
-    }
-
-    // 
============================================================================
-
-    private String name;
-    private int precision;
-    private int scale;
-
-    DataType(String datatype) {
-        parseDataType(datatype);
-    }
-
-    private void parseDataType(String datatype) {
-        datatype = datatype.trim().toLowerCase();
-        datatype = replaceLegacy(datatype);
-
-        Matcher m = TYPE_PATTERN.matcher(datatype);
-        if (m.matches() == false)
-            throw new IllegalArgumentException("bad data type -- " + datatype 
+ ", does not match " + TYPE_PATTERN);
-
-        name = replaceLegacy(m.group(1));
-        precision = -1;
-        scale = -1;
-
-        String leftover = m.group(2);
-        if (leftover != null) {
-            String[] parts = leftover.split("\\s*,\\s*");
-            for (int i = 0; i < parts.length; i++) {
-                int n;
-                try {
-                    n = Integer.parseInt(parts[i]);
-                } catch (NumberFormatException e) {
-                    throw new IllegalArgumentException("bad data type -- " + 
datatype + ", precision/scale not numeric");
-                }
-                if (i == 0)
-                    precision = n;
-                else if (i == 1)
-                    scale = n;
-                else
-                    throw new IllegalArgumentException("bad data type -- " + 
datatype + ", too many precision/scale parts");
-            }
-        }
-
-        // FIXME 256 for unknown string precision
-        if ((name.equals("char") || name.equals("varchar")) && precision == 
-1) {
-            precision = 256; // to save memory at frontend, e.g. tableau will
-                             // allocate memory according to this
-        }
-
-        // FIXME (19,4) for unknown decimal precision
-        if ((name.equals("decimal") || name.equals("numeric")) && precision == 
-1) {
-            precision = 39;
-            scale = 16;
-        }
-
-        if (isHLLC() && HLLC_PRECISIONS.contains(precision) == false)
-            throw new IllegalArgumentException("HLLC precision must be one of 
" + HLLC_PRECISIONS);
-    }
-
-    private String replaceLegacy(String str) {
-        String replace = LEGACY_TYPE_MAP.get(str);
-        return replace == null ? str : replace;
-    }
-
-    public int getSpaceEstimate() {
-        if (isTinyInt()) {
-            return 1;
-        } else if (isSmallInt()) {
-            return 2;
-        } else if (isInt()) {
-            return 4;
-        } else if (isBigInt()) {
-            return 8;
-        } else if (isFloat()) {
-            return 4;
-        } else if (isDouble()) {
-            return 8;
-        } else if (isDecimal()) {
-            return 8;
-        } else if (isHLLC()) {
-            return 1 << precision;
-        }
-        throw new IllegalStateException("The return type : " + name + " is not 
recognized;");
-    }
-
-    public boolean isStringFamily() {
-        return STRING_FAMILY.contains(name);
-    }
-
-    public boolean isIntegerFamily() {
-        return INTEGER_FAMILY.contains(name);
-    }
-
-    public boolean isNumberFamily() {
-        return NUMBER_FAMILY.contains(name);
-    }
-
-    public boolean isDateTimeFamily() {
-        return DATETIME_FAMILY.contains(name);
-    }
-
-    public boolean isTinyInt() {
-        return name.equals("tinyint");
-    }
-
-    public boolean isSmallInt() {
-        return name.equals("smallint");
-    }
-
-    public boolean isInt() {
-        return name.equals("integer");
-    }
-
-    public boolean isBigInt() {
-        return name.equals("bigint");
-    }
-
-    public boolean isFloat() {
-        return name.equals("float");
-    }
-
-    public boolean isDouble() {
-        return name.equals("double");
-    }
-
-    public boolean isDecimal() {
-        return name.equals("decimal");
-    }
-
-    public boolean isHLLC() {
-        return name.equals("hllc");
-    }
-
-    public String getName() {
-        return name;
-    }
-
-    public int getPrecision() {
-        return precision;
-    }
-
-    public int getScale() {
-        return scale;
-    }
-
-    @Override
-    public int hashCode() {
-        final int prime = 31;
-        int result = 1;
-        result = prime * result + ((name == null) ? 0 : name.hashCode());
-        result = prime * result + precision;
-        result = prime * result + scale;
-        return result;
-    }
-
-    @Override
-    public boolean equals(Object obj) {
-        if (this == obj)
-            return true;
-        if (obj == null)
-            return false;
-        if (getClass() != obj.getClass())
-            return false;
-        DataType other = (DataType) obj;
-        if (name == null) {
-            if (other.name != null)
-                return false;
-        } else if (!name.equals(other.name))
-            return false;
-        if (precision != other.precision)
-            return false;
-        if (scale != other.scale)
-            return false;
-        return true;
-    }
-
-    @Override
-    public String toString() {
-        if (precision < 0 && scale < 0)
-            return name;
-        else if (scale < 0)
-            return name + "(" + precision + ")";
-        else
-            return name + "(" + precision + "," + scale + ")";
-    }
-}

http://git-wip-us.apache.org/repos/asf/kylin/blob/c721d679/metadata/src/main/java/org/apache/kylin/metadata/model/FunctionDesc.java
----------------------------------------------------------------------
diff --git 
a/metadata/src/main/java/org/apache/kylin/metadata/model/FunctionDesc.java 
b/metadata/src/main/java/org/apache/kylin/metadata/model/FunctionDesc.java
index 18be936..972efe2 100644
--- a/metadata/src/main/java/org/apache/kylin/metadata/model/FunctionDesc.java
+++ b/metadata/src/main/java/org/apache/kylin/metadata/model/FunctionDesc.java
@@ -20,6 +20,11 @@ package org.apache.kylin.metadata.model;
 
 import java.util.Collection;
 
+import org.apache.kylin.measure.MeasureType;
+import org.apache.kylin.measure.MeasureTypeFactory;
+import org.apache.kylin.measure.basic.BasicMeasureType;
+import org.apache.kylin.metadata.datatype.DataType;
+
 import com.fasterxml.jackson.annotation.JsonAutoDetect;
 import com.fasterxml.jackson.annotation.JsonAutoDetect.Visibility;
 import com.fasterxml.jackson.annotation.JsonProperty;
@@ -48,6 +53,7 @@ public class FunctionDesc {
     private String returnType;
 
     private DataType returnDataType;
+    private MeasureType<?> measureType;
     private boolean isDimensionAsMetric = false;
 
     public String getRewriteFieldName() {
@@ -61,8 +67,21 @@ public class FunctionDesc {
         }
     }
 
+    public MeasureType<?> getMeasureType() {
+        if (isDimensionAsMetric)
+            return null;
+
+        if (measureType == null) {
+            measureType = MeasureTypeFactory.create(getExpression(), 
getReturnDataType());
+        }
+        return measureType;
+    }
+
     public boolean needRewrite() {
-        return !isSum() && !isHolisticCountDistinct() && 
!isDimensionAsMetric();
+        if (isDimensionAsMetric)
+            return false;
+
+        return getMeasureType().needRewrite();
     }
 
     public boolean isMin() {
@@ -81,18 +100,6 @@ public class FunctionDesc {
         return FUNC_COUNT.equalsIgnoreCase(expression);
     }
 
-    public boolean isCountDistinct() {
-        return FUNC_COUNT_DISTINCT.equalsIgnoreCase(expression);
-    }
-
-    public boolean isHolisticCountDistinct() {
-        if (isCountDistinct() && returnDataType != null && 
returnDataType.isBigInt()) {
-            return true;
-        } else {
-            return false;
-        }
-    }
-
     /**
      * Get Full Expression such as sum(amount), count(1), count(*)...
      */
@@ -130,16 +137,17 @@ public class FunctionDesc {
         this.parameter = parameter;
     }
 
-    public DataType getSQLType() {
-        if (isCountDistinct())
-            return DataType.ANY;
-        else if (isSum() || isMax() || isMin())
+    public DataType getRewriteFieldType() {
+        if (isSum() || isMax() || isMin())
             return parameter.getColRefs().get(0).getType();
-        else
+        else if (getMeasureType() instanceof BasicMeasureType)
             return returnDataType;
+        else
+            return DataType.ANY;
     }
 
     public String getReturnType() {
+
         return returnType;
     }
 
@@ -147,6 +155,15 @@ public class FunctionDesc {
         return returnDataType;
     }
 
+
+    public int getParameterCount() {
+        int count = 0;
+        for (ParameterDesc p = parameter; p != null; p = p.getNextParameter()) 
{
+            count++;
+        }
+        return count;
+    }
+
     public void setReturnType(String returnType) {
         this.returnType = returnType;
         this.initReturnDataType();

http://git-wip-us.apache.org/repos/asf/kylin/blob/c721d679/metadata/src/main/java/org/apache/kylin/metadata/model/MeasureDesc.java
----------------------------------------------------------------------
diff --git 
a/metadata/src/main/java/org/apache/kylin/metadata/model/MeasureDesc.java 
b/metadata/src/main/java/org/apache/kylin/metadata/model/MeasureDesc.java
index 70c28ba..c2f3a9c 100644
--- a/metadata/src/main/java/org/apache/kylin/metadata/model/MeasureDesc.java
+++ b/metadata/src/main/java/org/apache/kylin/metadata/model/MeasureDesc.java
@@ -71,10 +71,6 @@ public class MeasureDesc {
         this.dependentMeasureRef = dependentMeasureRef;
     }
 
-    public boolean isHolisticCountDistinct() {
-        return function.isHolisticCountDistinct();
-    }
-
     @Override
     public boolean equals(Object o) {
         if (this == o)

http://git-wip-us.apache.org/repos/asf/kylin/blob/c721d679/metadata/src/main/java/org/apache/kylin/metadata/model/ParameterDesc.java
----------------------------------------------------------------------
diff --git 
a/metadata/src/main/java/org/apache/kylin/metadata/model/ParameterDesc.java 
b/metadata/src/main/java/org/apache/kylin/metadata/model/ParameterDesc.java
index 18c297b..733576f 100644
--- a/metadata/src/main/java/org/apache/kylin/metadata/model/ParameterDesc.java
+++ b/metadata/src/main/java/org/apache/kylin/metadata/model/ParameterDesc.java
@@ -41,6 +41,8 @@ public class ParameterDesc {
     private String type;
     @JsonProperty("value")
     private String value;
+    @JsonProperty("next_parameter")
+    private ParameterDesc nextParameter;
 
     private List<TblColRef> colRefs;
 
@@ -72,54 +74,49 @@ public class ParameterDesc {
         this.colRefs = colRefs;
     }
 
-    public boolean isColumnType() {
-        return COLUMN_TYPE.equals(type);
+    public ParameterDesc getNextParameter() {
+        return nextParameter;
     }
 
-    public void normalizeColumnValue() {
-        if (isColumnType()) {
-            String values[] = value.split("\\s*,\\s*");
-            for (int i = 0; i < values.length; i++)
-                values[i] = values[i].toUpperCase();
-            Arrays.sort(values);
-            value = StringUtils.join(",", values);
-        }
+    public void setNextParameter(ParameterDesc nextParameter) {
+        this.nextParameter = nextParameter;
     }
 
-    @Override
-    public int hashCode() {
-        final int prime = 31;
-        int result = 1;
-        result = prime * result + ((type == null) ? 0 : type.hashCode());
-        result = prime * result + ((value == null) ? 0 : value.hashCode());
-        return result;
+    public boolean isColumnType() {
+        return COLUMN_TYPE.equals(type);
     }
 
+
     @Override
-    public boolean equals(Object obj) {
-        if (this == obj)
+    public boolean equals(Object o) {
+        if (this == o)
             return true;
-        if (obj == null)
+        if (o == null || getClass() != o.getClass())
             return false;
-        if (getClass() != obj.getClass())
+
+        ParameterDesc that = (ParameterDesc) o;
+
+        if (nextParameter != null ? !nextParameter.equals(that.nextParameter) 
: that.nextParameter != null)
             return false;
-        ParameterDesc other = (ParameterDesc) obj;
-        if (type == null) {
-            if (other.type != null)
-                return false;
-        } else if (!type.equals(other.type))
+        if (type != null ? !type.equals(that.type) : that.type != null)
             return false;
-        if (value == null) {
-            if (other.value != null)
-                return false;
-        } else if (!value.equals(other.value))
+        if (value != null ? !value.equals(that.value) : that.value != null)
             return false;
+
         return true;
     }
 
     @Override
+    public int hashCode() {
+        int result = type != null ? type.hashCode() : 0;
+        result = 31 * result + (value != null ? value.hashCode() : 0);
+        result = 31 * result + (nextParameter != null ? 
nextParameter.hashCode() : 0);
+        return result;
+    }
+
+    @Override
     public String toString() {
-        return "ParameterDesc [type=" + type + ", value=" + value + "]";
+        return "ParameterDesc [type=" + type + ", value=" + value + ", 
nextParam=" + nextParameter + "]";
     }
 
 }

http://git-wip-us.apache.org/repos/asf/kylin/blob/c721d679/metadata/src/main/java/org/apache/kylin/metadata/model/TblColRef.java
----------------------------------------------------------------------
diff --git 
a/metadata/src/main/java/org/apache/kylin/metadata/model/TblColRef.java 
b/metadata/src/main/java/org/apache/kylin/metadata/model/TblColRef.java
index fa945de..3573ae1 100644
--- a/metadata/src/main/java/org/apache/kylin/metadata/model/TblColRef.java
+++ b/metadata/src/main/java/org/apache/kylin/metadata/model/TblColRef.java
@@ -19,6 +19,7 @@
 package org.apache.kylin.metadata.model;
 
 import org.apache.commons.lang.StringUtils;
+import org.apache.kylin.metadata.datatype.DataType;
 
 /**
  * Created with IntelliJ IDEA. User: lukhan Date: 9/26/13 Time: 1:30 PM To

http://git-wip-us.apache.org/repos/asf/kylin/blob/c721d679/metadata/src/main/java/org/apache/kylin/metadata/realization/CapabilityResult.java
----------------------------------------------------------------------
diff --git 
a/metadata/src/main/java/org/apache/kylin/metadata/realization/CapabilityResult.java
 
b/metadata/src/main/java/org/apache/kylin/metadata/realization/CapabilityResult.java
new file mode 100644
index 0000000..9d95b1e
--- /dev/null
+++ 
b/metadata/src/main/java/org/apache/kylin/metadata/realization/CapabilityResult.java
@@ -0,0 +1,63 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ * 
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ * 
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+*/
+
+package org.apache.kylin.metadata.realization;
+
+import java.util.List;
+
+import org.apache.kylin.metadata.model.FunctionDesc;
+
+import com.google.common.collect.Lists;
+
+public class CapabilityResult {
+
+    /** Is capable or not */
+    public boolean capable;
+
+    /** The smaller the cost, the more capable the realization */
+    public int cost;
+
+    /**
+     * For info purpose, marker objects to indicate all special features
+     * (dimension-as-measure, topN etc.) that have decided the capability.
+     */
+    public List<CapabilityInfluence> influences = 
Lists.newArrayListWithCapacity(1);
+
+    public static interface CapabilityInfluence {
+        /** Suggest a multiplier to influence query cost */
+        double suggestCostMultiplier();
+    }
+
+    public static class DimensionAsMeasure implements CapabilityInfluence {
+
+        final FunctionDesc function;
+
+        public DimensionAsMeasure(FunctionDesc function) {
+            this.function = function;
+        }
+
+        @Override
+        public double suggestCostMultiplier() {
+            return 1.0;
+        }
+
+        public FunctionDesc getMeasureFunction() {
+            return function;
+        }
+    }
+}

http://git-wip-us.apache.org/repos/asf/kylin/blob/c721d679/metadata/src/main/java/org/apache/kylin/metadata/realization/IRealization.java
----------------------------------------------------------------------
diff --git 
a/metadata/src/main/java/org/apache/kylin/metadata/realization/IRealization.java
 
b/metadata/src/main/java/org/apache/kylin/metadata/realization/IRealization.java
index b00bc10..5f3a66b 100644
--- 
a/metadata/src/main/java/org/apache/kylin/metadata/realization/IRealization.java
+++ 
b/metadata/src/main/java/org/apache/kylin/metadata/realization/IRealization.java
@@ -24,21 +24,10 @@ import org.apache.kylin.metadata.model.MeasureDesc;
 import org.apache.kylin.metadata.model.TblColRef;
 
 public interface IRealization {
-
-    public boolean isCapable(SQLDigest digest);
-
     /**
-     * Given the features of a query, return an integer indicating how capable 
the realization
-     * is to answer the query.
-     *
-     * @return -1 if the realization cannot fulfill the query;
-     *         or a number between 0-100 if the realization can answer the 
query, the smaller
-     *         the number, the more efficient the realization.
-     *         Especially,
-     *           0 - means the realization has the exact result 
pre-calculated, no less no more;
-     *         100 - means the realization will scan the full table with 
little or no indexing.
+     * Given the features of a query, check how capable the realization is to 
answer the query.
      */
-    public int getCost(SQLDigest digest);
+    public CapabilityResult isCapable(SQLDigest digest);
 
     /**
      * Get whether this specific realization is a cube or InvertedIndex

http://git-wip-us.apache.org/repos/asf/kylin/blob/c721d679/metadata/src/main/java/org/apache/kylin/metadata/realization/SQLDigestUtil.java
----------------------------------------------------------------------
diff --git 
a/metadata/src/main/java/org/apache/kylin/metadata/realization/SQLDigestUtil.java
 
b/metadata/src/main/java/org/apache/kylin/metadata/realization/SQLDigestUtil.java
index eb9c4bb..4bd895b 100644
--- 
a/metadata/src/main/java/org/apache/kylin/metadata/realization/SQLDigestUtil.java
+++ 
b/metadata/src/main/java/org/apache/kylin/metadata/realization/SQLDigestUtil.java
@@ -23,7 +23,7 @@ import org.apache.kylin.metadata.filter.CompareTupleFilter;
 import org.apache.kylin.metadata.filter.ConstantTupleFilter;
 import org.apache.kylin.metadata.filter.LogicalTupleFilter;
 import org.apache.kylin.metadata.filter.TupleFilter;
-import org.apache.kylin.metadata.model.DataType;
+import org.apache.kylin.metadata.datatype.DataType;
 import org.apache.kylin.metadata.model.TblColRef;
 
 import com.google.common.base.Function;

http://git-wip-us.apache.org/repos/asf/kylin/blob/c721d679/metadata/src/main/java/org/apache/kylin/metadata/tuple/Tuple.java
----------------------------------------------------------------------
diff --git a/metadata/src/main/java/org/apache/kylin/metadata/tuple/Tuple.java 
b/metadata/src/main/java/org/apache/kylin/metadata/tuple/Tuple.java
new file mode 100644
index 0000000..321a768
--- /dev/null
+++ b/metadata/src/main/java/org/apache/kylin/metadata/tuple/Tuple.java
@@ -0,0 +1,164 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.kylin.metadata.tuple;
+
+import java.math.BigDecimal;
+import java.util.Date;
+import java.util.List;
+
+import org.apache.kylin.common.util.DateFormat;
+import org.apache.kylin.metadata.model.TblColRef;
+
+/**
+ * @author xjiang
+ */
+public class Tuple implements ITuple {
+
+    private final TupleInfo info;
+    private final Object[] values;
+
+    public Tuple(TupleInfo info) {
+        this.info = info;
+        this.values = new Object[info.size()];
+    }
+
+    public List<String> getAllFields() {
+        return info.getAllFields();
+    }
+
+    public List<TblColRef> getAllColumns() {
+        return info.getAllColumns();
+    }
+
+    public Object[] getAllValues() {
+        return values;
+    }
+
+    public TupleInfo getInfo() {
+        return info;
+    }
+
+    public String getFieldName(TblColRef col) {
+        return info.getFieldName(col);
+    }
+
+    public TblColRef getFieldColumn(String fieldName) {
+        return info.getColumn(fieldName);
+    }
+
+    public Object getValue(String fieldName) {
+        int index = info.getFieldIndex(fieldName);
+        return values[index];
+    }
+
+    public Object getValue(TblColRef col) {
+        int index = info.getColumnIndex(col);
+        return values[index];
+    }
+
+    public String getDataType(String fieldName) {
+        return info.getDataType(fieldName);
+    }
+
+    public void setFieldObjectValue(int index, Object fieldValue) {
+        values[index] = fieldValue;
+    }
+
+    public void setFieldObjectValue(String fieldName, Object fieldValue) {
+        int index = info.getFieldIndex(fieldName);
+        values[index] = fieldValue;
+    }
+
+    public void setDimensionValue(String fieldName, String fieldValue) {
+        Object objectValue = convertOptiqCellValue(fieldValue, 
getDataType(fieldName));
+        setFieldObjectValue(fieldName, objectValue);
+    }
+
+    public void setMeasureValue(int idx, Object fieldValue) {
+        String dataType = info.getDataType(idx);
+        // special handling for BigDecimal, allow double be aggregated as
+        // BigDecimal during cube build for best precision
+        if ("double".equals(dataType) && fieldValue instanceof BigDecimal) {
+            fieldValue = ((BigDecimal) fieldValue).doubleValue();
+        } else if ("integer".equals(dataType) && !(fieldValue instanceof 
Integer)) {
+            fieldValue = ((Number) fieldValue).intValue();
+        } else if ("float".equals(dataType) && fieldValue instanceof 
BigDecimal) {
+            fieldValue = ((BigDecimal) fieldValue).floatValue();
+        }
+
+        setFieldObjectValue(idx, fieldValue);
+    }
+
+    public void setMeasureValue(String fieldName, Object fieldValue) {
+        setMeasureValue(info.getFieldIndex(fieldName), fieldValue);
+    }
+
+    public boolean hasColumn(TblColRef column) {
+        return info.hasColumn(column);
+    }
+
+    @Override
+    public String toString() {
+        StringBuilder sb = new StringBuilder();
+        for (String field : info.getAllFields()) {
+            sb.append(field);
+            sb.append("=");
+            sb.append(getValue(field));
+            sb.append(",");
+        }
+        return sb.toString();
+    }
+
+    public static Object convertOptiqCellValue(String strValue, String 
dataType) {
+        if (strValue == null)
+            return null;
+
+        if ((strValue.equals("") || strValue.equals("\\N")) && 
!dataType.equals("string"))
+            return null;
+
+        // TODO use data type enum instead of string comparison
+        if ("date".equals(dataType)) {
+            // convert epoch time
+            Date dateValue = DateFormat.stringToDate(strValue); // NOTE: 
forces GMT timezone
+            long millis = dateValue.getTime();
+            long days = millis / (1000 * 3600 * 24);
+            return Integer.valueOf((int) days); // Optiq expects Integer 
instead of Long. by honma
+        } else if ("tinyint".equals(dataType)) {
+            return Byte.valueOf(strValue);
+        } else if ("short".equals(dataType) || "smallint".equals(dataType)) {
+            return Short.valueOf(strValue);
+        } else if ("integer".equals(dataType)) {
+            return Integer.valueOf(strValue);
+        } else if ("long".equals(dataType) || "bigint".equals(dataType)) {
+            return Long.valueOf(strValue);
+        } else if ("double".equals(dataType)) {
+            return Double.valueOf(strValue);
+        } else if ("decimal".equals(dataType)) {
+            return new BigDecimal(strValue);
+        } else if ("timestamp".equals(dataType)) {
+            return Long.valueOf(DateFormat.stringToMillis(strValue));
+        } else if ("float".equals(dataType)) {
+            return Float.valueOf(strValue);
+        } else if ("boolean".equals(dataType)) {
+            return Boolean.valueOf(strValue);
+        } else {
+            return strValue;
+        }
+    }
+}

http://git-wip-us.apache.org/repos/asf/kylin/blob/c721d679/metadata/src/main/java/org/apache/kylin/metadata/tuple/TupleInfo.java
----------------------------------------------------------------------
diff --git 
a/metadata/src/main/java/org/apache/kylin/metadata/tuple/TupleInfo.java 
b/metadata/src/main/java/org/apache/kylin/metadata/tuple/TupleInfo.java
new file mode 100644
index 0000000..3176492
--- /dev/null
+++ b/metadata/src/main/java/org/apache/kylin/metadata/tuple/TupleInfo.java
@@ -0,0 +1,113 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.kylin.metadata.tuple;
+
+import java.util.ArrayList;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+
+import org.apache.kylin.metadata.model.TblColRef;
+
+/**
+ * 
+ * @author xjiang
+ * 
+ */
+public class TupleInfo {
+
+    private final Map<String, Integer> fieldMap;
+    private final Map<TblColRef, Integer> columnMap;
+    private final List<String> fields;
+    private final List<TblColRef> columns;
+    private final List<String> dataTypes;
+
+    public TupleInfo() {
+        fieldMap = new HashMap<String, Integer>();
+        columnMap = new HashMap<TblColRef, Integer>();
+        fields = new ArrayList<String>();
+        columns = new ArrayList<TblColRef>();
+        dataTypes = new ArrayList<String>();
+    }
+
+    public TblColRef getColumn(String fieldName) {
+        int idx = getFieldIndex(fieldName);
+        return columns.get(idx);
+    }
+
+    public int getColumnIndex(TblColRef col) {
+        return columnMap.get(col);
+    }
+
+    public String getDataType(String fieldName) {
+        int idx = getFieldIndex(fieldName);
+        return dataTypes.get(idx);
+    }
+
+    public String getDataType(int idx) {
+        return dataTypes.get(idx);
+    }
+
+    public int getFieldIndex(String fieldName) {
+        return fieldMap.get(fieldName);
+    }
+
+    public String getFieldName(TblColRef col) {
+        int idx = columnMap.get(col);
+        return fields.get(idx);
+    }
+
+    public boolean hasColumn(TblColRef col) {
+        return columnMap.containsKey(col);
+    }
+
+    public void setField(String fieldName, TblColRef col, String dataType, int 
index) {
+        fieldMap.put(fieldName, index);
+        if (col != null)
+            columnMap.put(col, index);
+
+        if (fields.size() > index)
+            fields.set(index, fieldName);
+        else
+            fields.add(index, fieldName);
+
+        if (columns.size() > index)
+            columns.set(index, col);
+        else
+            columns.add(index, col);
+
+        if (dataTypes.size() > index)
+            dataTypes.set(index, dataType);
+        else
+            dataTypes.add(index, dataType);
+    }
+
+    public int size() {
+        return fields.size();
+    }
+
+    public List<String> getAllFields() {
+        return fields;
+    }
+
+    public List<TblColRef> getAllColumns() {
+        return columns;
+    }
+
+}

http://git-wip-us.apache.org/repos/asf/kylin/blob/c721d679/metadata/src/test/java/org/apache/kylin/metadata/datatype/BigDecimalSerializerTest.java
----------------------------------------------------------------------
diff --git 
a/metadata/src/test/java/org/apache/kylin/metadata/datatype/BigDecimalSerializerTest.java
 
b/metadata/src/test/java/org/apache/kylin/metadata/datatype/BigDecimalSerializerTest.java
new file mode 100644
index 0000000..584fb72
--- /dev/null
+++ 
b/metadata/src/test/java/org/apache/kylin/metadata/datatype/BigDecimalSerializerTest.java
@@ -0,0 +1,69 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.kylin.metadata.datatype;
+
+import static org.junit.Assert.assertEquals;
+
+import java.math.BigDecimal;
+import java.nio.ByteBuffer;
+
+import org.junit.BeforeClass;
+import org.junit.Test;
+
+/**
+ */
+public class BigDecimalSerializerTest {
+
+    private static BigDecimalSerializer bigDecimalSerializer;
+
+    @BeforeClass
+    public static void beforeClass() {
+        bigDecimalSerializer = new 
BigDecimalSerializer(DataType.getInstance("decimal"));
+    }
+
+    @Test
+    public void testNormal() {
+        BigDecimal input = new BigDecimal("1234.1234");
+        ByteBuffer buffer = ByteBuffer.allocate(256);
+        buffer.mark();
+        bigDecimalSerializer.serialize(input, buffer);
+        buffer.reset();
+        BigDecimal output = bigDecimalSerializer.deserialize(buffer);
+        assertEquals(input, output);
+    }
+
+    @Test
+    public void testScaleOutOfRange() {
+        BigDecimal input = new BigDecimal("1234.1234567890");
+        ByteBuffer buffer = ByteBuffer.allocate(256);
+        buffer.mark();
+        bigDecimalSerializer.serialize(input, buffer);
+        buffer.reset();
+        BigDecimal output = bigDecimalSerializer.deserialize(buffer);
+        assertEquals(input.setScale(bigDecimalSerializer.type.getScale(), 
BigDecimal.ROUND_HALF_EVEN), output);
+    }
+
+    @Test(expected = IllegalArgumentException.class)
+    public void testOutOfPrecision() {
+        BigDecimal input = new BigDecimal("66855344214907231736.4924");
+        ByteBuffer buffer = ByteBuffer.allocate(256);
+        bigDecimalSerializer.serialize(input, buffer);
+    }
+
+}

http://git-wip-us.apache.org/repos/asf/kylin/blob/c721d679/query/src/main/java/org/apache/kylin/query/enumerator/LookupTableEnumerator.java
----------------------------------------------------------------------
diff --git 
a/query/src/main/java/org/apache/kylin/query/enumerator/LookupTableEnumerator.java
 
b/query/src/main/java/org/apache/kylin/query/enumerator/LookupTableEnumerator.java
index 04ce098..223a9c8 100644
--- 
a/query/src/main/java/org/apache/kylin/query/enumerator/LookupTableEnumerator.java
+++ 
b/query/src/main/java/org/apache/kylin/query/enumerator/LookupTableEnumerator.java
@@ -33,7 +33,7 @@ import org.apache.kylin.metadata.realization.IRealization;
 import org.apache.kylin.query.relnode.OLAPContext;
 import org.apache.kylin.query.schema.OLAPTable;
 import org.apache.kylin.storage.hybrid.HybridInstance;
-import org.apache.kylin.storage.tuple.Tuple;
+import org.apache.kylin.metadata.tuple.Tuple;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 

Reply via email to