This is an automated email from the ASF dual-hosted git repository.

nic pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/kylin.git


The following commit(s) were added to refs/heads/master by this push:
     new 1a2f779  Kylin 3392 support sum(null)
1a2f779 is described below

commit 1a2f779a074c1d03703eebbb7506d171ef26ce56
Author: Yifei Wu <vafuler...@gmail.com>
AuthorDate: Wed Sep 11 22:02:52 2019 +0800

    Kylin 3392 support sum(null)
    
    * KYLIN-3392 support sum(null)
    
    * KYLIN-3392 support sum(null)
    
    * Minor, code review
---
 .../org/apache/kylin/common/util/BytesUtil.java    |  49 +++++-
 .../kylin/measure/basic/BasicMeasureType.java      |  10 +-
 .../kylin/measure/basic/BigDecimalIngester.java    |  17 +-
 .../measure/basic/BigDecimalMaxAggregator.java     |  13 +-
 .../measure/basic/BigDecimalMinAggregator.java     |  13 +-
 .../measure/basic/BigDecimalSumAggregator.java     |  10 +-
 ...LongSumAggregator.java => CountAggregator.java} |  14 +-
 .../apache/kylin/measure/basic/DoubleIngester.java |   7 +-
 .../kylin/measure/basic/DoubleMaxAggregator.java   |  15 +-
 .../kylin/measure/basic/DoubleMinAggregator.java   |  15 +-
 .../kylin/measure/basic/DoubleSumAggregator.java   |  16 +-
 .../apache/kylin/measure/basic/LongIngester.java   |   2 +-
 .../kylin/measure/basic/LongMaxAggregator.java     |  15 +-
 .../kylin/measure/basic/LongMinAggregator.java     |  15 +-
 .../kylin/measure/basic/LongSumAggregator.java     |  16 +-
 .../metadata/datatype/BigDecimalSerializer.java    |  18 +--
 .../kylin/metadata/datatype/DoubleSerializer.java  |  13 +-
 .../kylin/metadata/datatype/LongSerializer.java    |   4 +-
 .../kylin/measure/basic/BasicAggregatorTest.java   | 178 +++++++++++++++++++++
 .../kylin/metadata/datatype/SerializerTest.java    | 136 ++++++++++++++++
 .../kylin/engine/mr/steps/CubeReducerTest.java     |   8 +-
 21 files changed, 506 insertions(+), 78 deletions(-)

diff --git 
a/core-common/src/main/java/org/apache/kylin/common/util/BytesUtil.java 
b/core-common/src/main/java/org/apache/kylin/common/util/BytesUtil.java
index e14f6cc..1434bf7 100644
--- a/core-common/src/main/java/org/apache/kylin/common/util/BytesUtil.java
+++ b/core-common/src/main/java/org/apache/kylin/common/util/BytesUtil.java
@@ -6,9 +6,9 @@
  * to you under the Apache License, Version 2.0 (the
  * "License"); you may not use this file except in compliance
  * with the License.  You may obtain a copy of the License at
- * 
+ *
  *     http://www.apache.org/licenses/LICENSE-2.0
- * 
+ *
  * Unless required by applicable law or agreed to in writing, software
  * distributed under the License is distributed on an "AS IS" BASIS,
  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
@@ -31,6 +31,10 @@ public class BytesUtil {
     }
 
     public static final byte[] EMPTY_BYTE_ARRAY = new byte[0];
+    // for there are some extra available code, just like binary {-113, 0}, 
when using variable-length serialization method to compress a Long number value.
+    // as for binary {-113, 0}, it should represents the 0L according to the 
#org.apache.kylin.common.util.BytesUtil.writeVLong algorithm. However, the 0L 
value
+    // is just use {0} rather than {-113, 0} or {-114, 0, 0}.
+    public static final byte[] VNULL_BYTE_ARRAY = new byte[] { -113, 0 };
 
     public static void writeByte(byte num, byte[] bytes, int offset, int size) 
{
         for (int i = offset + size - 1; i >= offset; i--) {
@@ -217,13 +221,38 @@ public class BytesUtil {
     // 
============================================================================
 
     public static void writeVInt(int i, ByteBuffer out) {
+        writeVLong((long) i, out);
+    }
 
+    public static void writeVLongObject(Long i, ByteBuffer out) {
+        if (i == null) {
+            out.put(VNULL_BYTE_ARRAY);
+            return;
+        }
         writeVLong(i, out);
+    }
 
+    public static Long readVLongObject(ByteBuffer in) {
+        byte firstByte = in.get();
+        int len = decodeVIntSize(firstByte);
+        if (len == 1) {
+            return (long) firstByte;
+        }
+        long i = 0;
+        byte b = 0;
+        for (int idx = 0; idx < len - 1; idx++) {
+            b = in.get();
+            i = i << 8;
+            i = i | (b & 0xFF);
+        }
+
+        if (len == 2 && equalsNullByteArray(firstByte, b))
+            return null;
+
+        return (isNegativeVInt(firstByte) ? (i ^ -1L) : i);
     }
 
     public static void writeVLong(long i, ByteBuffer out) {
-
         if (i >= -112 && i <= 127) {
             out.put((byte) i);
             return;
@@ -256,7 +285,7 @@ public class BytesUtil {
         byte firstByte = in.get();
         int len = decodeVIntSize(firstByte);
         if (len == 1) {
-            return firstByte;
+            return (long) firstByte;
         }
         long i = 0;
         for (int idx = 0; idx < len - 1; idx++) {
@@ -264,6 +293,7 @@ public class BytesUtil {
             i = i << 8;
             i = i | (b & 0xFF);
         }
+
         return (isNegativeVInt(firstByte) ? (i ^ -1L) : i);
     }
 
@@ -479,7 +509,6 @@ public class BytesUtil {
     }
 
     /**
-     *
      * @param hex String value of a byte array in hex, e.g, "\\x00\\x0A";
      * @return the byte array that the hex represented.
      */
@@ -493,4 +522,14 @@ public class BytesUtil {
         return b;
     }
 
+    private static boolean equalsNullByteArray(Byte... bytes) {
+        if (bytes == null || bytes.length < 2)
+            return false;
+
+        if (VNULL_BYTE_ARRAY[0] == bytes[0] && VNULL_BYTE_ARRAY[1] == bytes[1])
+            return true;
+
+        return false;
+    }
+
 }
diff --git 
a/core-metadata/src/main/java/org/apache/kylin/measure/basic/BasicMeasureType.java
 
b/core-metadata/src/main/java/org/apache/kylin/measure/basic/BasicMeasureType.java
index b0836df..b292f5f 100644
--- 
a/core-metadata/src/main/java/org/apache/kylin/measure/basic/BasicMeasureType.java
+++ 
b/core-metadata/src/main/java/org/apache/kylin/measure/basic/BasicMeasureType.java
@@ -6,15 +6,15 @@
  * to you under the Apache License, Version 2.0 (the
  * "License"); you may not use this file except in compliance
  * with the License.  You may obtain a copy of the License at
- * 
+ *
  *     http://www.apache.org/licenses/LICENSE-2.0
- * 
+ *
  * Unless required by applicable law or agreed to in writing, software
  * distributed under the License is distributed on an "AS IS" BASIS,
  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
  * See the License for the specific language governing permissions and
  * limitations under the License.
-*/
+ */
 
 package org.apache.kylin.measure.basic;
 
@@ -98,7 +98,9 @@ public class BasicMeasureType extends MeasureType {
 
     @Override
     public MeasureAggregator<?> newAggregator() {
-        if (isSum() || isCount()) {
+        if (isCount()) {
+            return new CountAggregator();
+        } else if (isSum()) {
             if (dataType.isDecimal())
                 return new BigDecimalSumAggregator();
             else if (dataType.isIntegerFamily())
diff --git 
a/core-metadata/src/main/java/org/apache/kylin/measure/basic/BigDecimalIngester.java
 
b/core-metadata/src/main/java/org/apache/kylin/measure/basic/BigDecimalIngester.java
index 5194606..1810324 100644
--- 
a/core-metadata/src/main/java/org/apache/kylin/measure/basic/BigDecimalIngester.java
+++ 
b/core-metadata/src/main/java/org/apache/kylin/measure/basic/BigDecimalIngester.java
@@ -6,15 +6,15 @@
  * to you under the Apache License, Version 2.0 (the
  * "License"); you may not use this file except in compliance
  * with the License.  You may obtain a copy of the License at
- * 
+ *
  *     http://www.apache.org/licenses/LICENSE-2.0
- * 
+ *
  * Unless required by applicable law or agreed to in writing, software
  * distributed under the License is distributed on an "AS IS" BASIS,
  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
  * See the License for the specific language governing permissions and
  * limitations under the License.
-*/
+ */
 
 package org.apache.kylin.measure.basic;
 
@@ -26,17 +26,20 @@ import org.apache.kylin.measure.MeasureIngester;
 import org.apache.kylin.metadata.model.MeasureDesc;
 import org.apache.kylin.metadata.model.TblColRef;
 
+@SuppressWarnings("serial")
 public class BigDecimalIngester extends MeasureIngester<BigDecimal> {
 
     @Override
-    public BigDecimal valueOf(String[] values, MeasureDesc measureDesc, 
Map<TblColRef, Dictionary<String>> dictionaryMap) {
+    public BigDecimal valueOf(String[] values, MeasureDesc measureDesc,
+            Map<TblColRef, Dictionary<String>> dictionaryMap) {
         if (values.length > 1)
             throw new IllegalArgumentException();
 
-        if (values[0] == null || values[0].length() == 0)
-            return new BigDecimal(0);
-        else
+        if (values[0] == null || values[0].length() == 0) {
+            return null;
+        } else {
             return new BigDecimal(values[0]);
+        }
     }
 
     @Override
diff --git 
a/core-metadata/src/main/java/org/apache/kylin/measure/basic/BigDecimalMaxAggregator.java
 
b/core-metadata/src/main/java/org/apache/kylin/measure/basic/BigDecimalMaxAggregator.java
index 7a57965..d94d89a 100644
--- 
a/core-metadata/src/main/java/org/apache/kylin/measure/basic/BigDecimalMaxAggregator.java
+++ 
b/core-metadata/src/main/java/org/apache/kylin/measure/basic/BigDecimalMaxAggregator.java
@@ -36,10 +36,15 @@ public class BigDecimalMaxAggregator extends 
MeasureAggregator<BigDecimal> {
 
     @Override
     public void aggregate(BigDecimal value) {
-        if (max == null)
-            max = value;
-        else if (max.compareTo(value) < 0)
-            max = value;
+        if (value != null) {
+            if (max == null) {
+                max = value;
+                return;
+            }
+
+            if (max.compareTo(value) < 0)
+                max = value;
+        }
     }
 
     @Override
diff --git 
a/core-metadata/src/main/java/org/apache/kylin/measure/basic/BigDecimalMinAggregator.java
 
b/core-metadata/src/main/java/org/apache/kylin/measure/basic/BigDecimalMinAggregator.java
index 71ba7fb..370bb46 100644
--- 
a/core-metadata/src/main/java/org/apache/kylin/measure/basic/BigDecimalMinAggregator.java
+++ 
b/core-metadata/src/main/java/org/apache/kylin/measure/basic/BigDecimalMinAggregator.java
@@ -36,10 +36,15 @@ public class BigDecimalMinAggregator extends 
MeasureAggregator<BigDecimal> {
 
     @Override
     public void aggregate(BigDecimal value) {
-        if (min == null)
-            min = value;
-        else if (min.compareTo(value) > 0)
-            min = value;
+        if (value != null) {
+            if (min == null) {
+                min = value;
+                return;
+            }
+
+            if (min.compareTo(value) > 0)
+                min = value;
+        }
     }
 
     @Override
diff --git 
a/core-metadata/src/main/java/org/apache/kylin/measure/basic/BigDecimalSumAggregator.java
 
b/core-metadata/src/main/java/org/apache/kylin/measure/basic/BigDecimalSumAggregator.java
index fa59c54..ed29088 100644
--- 
a/core-metadata/src/main/java/org/apache/kylin/measure/basic/BigDecimalSumAggregator.java
+++ 
b/core-metadata/src/main/java/org/apache/kylin/measure/basic/BigDecimalSumAggregator.java
@@ -36,12 +36,12 @@ public class BigDecimalSumAggregator extends 
MeasureAggregator<BigDecimal> {
 
     @Override
     public void aggregate(BigDecimal value) {
-        if (value == null)
-            return;
-        if (sum == null) {
-            sum = new BigDecimal(0);
+        if (value != null) {
+            if (sum == null)
+                sum = new BigDecimal(0);
+
+            sum = sum.add(value);
         }
-        sum = sum.add(value);
     }
 
     @Override
diff --git 
a/core-metadata/src/main/java/org/apache/kylin/measure/basic/LongSumAggregator.java
 
b/core-metadata/src/main/java/org/apache/kylin/measure/basic/CountAggregator.java
similarity index 90%
copy from 
core-metadata/src/main/java/org/apache/kylin/measure/basic/LongSumAggregator.java
copy to 
core-metadata/src/main/java/org/apache/kylin/measure/basic/CountAggregator.java
index 1f9c0d7..ab868f5 100644
--- 
a/core-metadata/src/main/java/org/apache/kylin/measure/basic/LongSumAggregator.java
+++ 
b/core-metadata/src/main/java/org/apache/kylin/measure/basic/CountAggregator.java
@@ -6,30 +6,28 @@
  * to you under the Apache License, Version 2.0 (the
  * "License"); you may not use this file except in compliance
  * with the License.  You may obtain a copy of the License at
- * 
+ *
  *     http://www.apache.org/licenses/LICENSE-2.0
- * 
+ *
  * Unless required by applicable law or agreed to in writing, software
  * distributed under the License is distributed on an "AS IS" BASIS,
  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
  * See the License for the specific language governing permissions and
  * limitations under the License.
-*/
+ */
 
 package org.apache.kylin.measure.basic;
 
 import org.apache.kylin.measure.MeasureAggregator;
 
-/**
- */
 @SuppressWarnings("serial")
-public class LongSumAggregator extends MeasureAggregator<Long> {
+public class CountAggregator extends MeasureAggregator<Long> {
 
-    Long sum = new Long(0L);
+    Long sum = 0L;
 
     @Override
     public void reset() {
-        sum = new Long(0L);
+        sum = 0L;
     }
 
     @Override
diff --git 
a/core-metadata/src/main/java/org/apache/kylin/measure/basic/DoubleIngester.java
 
b/core-metadata/src/main/java/org/apache/kylin/measure/basic/DoubleIngester.java
index 90ecb0d..71bf896 100644
--- 
a/core-metadata/src/main/java/org/apache/kylin/measure/basic/DoubleIngester.java
+++ 
b/core-metadata/src/main/java/org/apache/kylin/measure/basic/DoubleIngester.java
@@ -32,10 +32,11 @@ public class DoubleIngester extends MeasureIngester<Double> 
{
         if (values.length > 1)
             throw new IllegalArgumentException();
 
-        if (values[0] == null || values[0].length() == 0)
-            return new Double(0);
-        else
+        if (values[0] == null || values[0].length() == 0) {
+            return null;
+        } else {
             return Double.parseDouble(values[0]);
+        }
     }
 
     @Override
diff --git 
a/core-metadata/src/main/java/org/apache/kylin/measure/basic/DoubleMaxAggregator.java
 
b/core-metadata/src/main/java/org/apache/kylin/measure/basic/DoubleMaxAggregator.java
index f33555e..2587577 100644
--- 
a/core-metadata/src/main/java/org/apache/kylin/measure/basic/DoubleMaxAggregator.java
+++ 
b/core-metadata/src/main/java/org/apache/kylin/measure/basic/DoubleMaxAggregator.java
@@ -34,14 +34,21 @@ public class DoubleMaxAggregator extends 
MeasureAggregator<Double> {
 
     @Override
     public void aggregate(Double value) {
-        if (max == null)
-            max = value;
-        else if (max < value)
-            max = value;
+        if (value != null) {
+            if (max == null)
+                max = -Double.MAX_VALUE;
+
+            max = Math.max(max, value);
+        }
     }
 
     @Override
     public Double aggregate(Double value1, Double value2) {
+        if (value1 == null)
+            return value2;
+        if (value2 == null)
+            return value1;
+
         return Math.max(value1, value2);
     }
 
diff --git 
a/core-metadata/src/main/java/org/apache/kylin/measure/basic/DoubleMinAggregator.java
 
b/core-metadata/src/main/java/org/apache/kylin/measure/basic/DoubleMinAggregator.java
index 8e69f21..a9a68bc 100644
--- 
a/core-metadata/src/main/java/org/apache/kylin/measure/basic/DoubleMinAggregator.java
+++ 
b/core-metadata/src/main/java/org/apache/kylin/measure/basic/DoubleMinAggregator.java
@@ -34,14 +34,21 @@ public class DoubleMinAggregator extends 
MeasureAggregator<Double> {
 
     @Override
     public void aggregate(Double value) {
-        if (min == null)
-            min = value;
-        else if (min > value)
-            min = value;
+        if (value != null) {
+            if (min == null)
+                min = Double.MAX_VALUE;
+
+            min = Math.min(value, min);
+        }
     }
 
     @Override
     public Double aggregate(Double value1, Double value2) {
+        if (value1 == null)
+            return value2;
+        if (value2 == null)
+            return value1;
+
         return Math.min(value1, value2);
     }
 
diff --git 
a/core-metadata/src/main/java/org/apache/kylin/measure/basic/DoubleSumAggregator.java
 
b/core-metadata/src/main/java/org/apache/kylin/measure/basic/DoubleSumAggregator.java
index df0ba52..45d8951 100644
--- 
a/core-metadata/src/main/java/org/apache/kylin/measure/basic/DoubleSumAggregator.java
+++ 
b/core-metadata/src/main/java/org/apache/kylin/measure/basic/DoubleSumAggregator.java
@@ -25,20 +25,30 @@ import org.apache.kylin.measure.MeasureAggregator;
 @SuppressWarnings("serial")
 public class DoubleSumAggregator extends MeasureAggregator<Double> {
 
-    Double sum = new Double(0);
+    Double sum = null;
 
     @Override
     public void reset() {
-        sum = new Double(0);
+        sum = null;
     }
 
     @Override
     public void aggregate(Double value) {
-        sum = sum + value;
+        if (value != null) {
+            if (sum == null)
+                sum = 0d;
+
+            sum = sum + value;
+        }
     }
 
     @Override
     public Double aggregate(Double value1, Double value2) {
+        if (value1 == null)
+            return value2;
+        if (value2 == null)
+            return value1;
+
         return Double.valueOf(value1 + value2);
     }
 
diff --git 
a/core-metadata/src/main/java/org/apache/kylin/measure/basic/LongIngester.java 
b/core-metadata/src/main/java/org/apache/kylin/measure/basic/LongIngester.java
index 793acf2..b48c4a9 100644
--- 
a/core-metadata/src/main/java/org/apache/kylin/measure/basic/LongIngester.java
+++ 
b/core-metadata/src/main/java/org/apache/kylin/measure/basic/LongIngester.java
@@ -33,7 +33,7 @@ public class LongIngester extends MeasureIngester<Long> {
             throw new IllegalArgumentException();
 
         if (values[0] == null || values[0].length() == 0)
-            return new Long(0L);
+            return null;
         else
             return Long.valueOf(values[0]);
     }
diff --git 
a/core-metadata/src/main/java/org/apache/kylin/measure/basic/LongMaxAggregator.java
 
b/core-metadata/src/main/java/org/apache/kylin/measure/basic/LongMaxAggregator.java
index b9a2b94..200f648 100644
--- 
a/core-metadata/src/main/java/org/apache/kylin/measure/basic/LongMaxAggregator.java
+++ 
b/core-metadata/src/main/java/org/apache/kylin/measure/basic/LongMaxAggregator.java
@@ -34,14 +34,21 @@ public class LongMaxAggregator extends 
MeasureAggregator<Long> {
 
     @Override
     public void aggregate(Long value) {
-        if (max == null)
-            max = value;
-        else if (max < value)
-            max = value;
+        if (value != null) {
+            if (max == null)
+                max = Long.MIN_VALUE;
+
+            max = Math.max(max, value);
+        }
     }
 
     @Override
     public Long aggregate(Long value1, Long value2) {
+        if (value1 == null)
+            return value2;
+        if (value2 == null)
+            return value1;
+
         return Math.max(value1, value2);
     }
 
diff --git 
a/core-metadata/src/main/java/org/apache/kylin/measure/basic/LongMinAggregator.java
 
b/core-metadata/src/main/java/org/apache/kylin/measure/basic/LongMinAggregator.java
index 9185142..0a66d3c 100644
--- 
a/core-metadata/src/main/java/org/apache/kylin/measure/basic/LongMinAggregator.java
+++ 
b/core-metadata/src/main/java/org/apache/kylin/measure/basic/LongMinAggregator.java
@@ -34,14 +34,21 @@ public class LongMinAggregator extends 
MeasureAggregator<Long> {
 
     @Override
     public void aggregate(Long value) {
-        if (min == null)
-            min = value;
-        else if (min > value)
-            min = value;
+        if (value != null) {
+            if (min == null)
+                min = Long.MAX_VALUE;
+
+            min = Math.min(value, min);
+        }
     }
 
     @Override
     public Long aggregate(Long value1, Long value2) {
+        if (value1 == null)
+            return value2;
+        if (value2 == null)
+            return value1;
+
         return Math.min(value1, value2);
     }
 
diff --git 
a/core-metadata/src/main/java/org/apache/kylin/measure/basic/LongSumAggregator.java
 
b/core-metadata/src/main/java/org/apache/kylin/measure/basic/LongSumAggregator.java
index 1f9c0d7..9fd8e6e 100644
--- 
a/core-metadata/src/main/java/org/apache/kylin/measure/basic/LongSumAggregator.java
+++ 
b/core-metadata/src/main/java/org/apache/kylin/measure/basic/LongSumAggregator.java
@@ -25,20 +25,30 @@ import org.apache.kylin.measure.MeasureAggregator;
 @SuppressWarnings("serial")
 public class LongSumAggregator extends MeasureAggregator<Long> {
 
-    Long sum = new Long(0L);
+    Long sum = null;
 
     @Override
     public void reset() {
-        sum = new Long(0L);
+        sum = null;
     }
 
     @Override
     public void aggregate(Long value) {
-        sum += value;
+        if (value != null) {
+            if (sum == null)
+                sum = 0L;
+
+            sum += value;
+        }
     }
 
     @Override
     public Long aggregate(Long value1, Long value2) {
+        if (value1 == null)
+            return value2;
+        if (value2 == null)
+            return value1;
+
         return Long.valueOf(value1 + value2);
     }
 
diff --git 
a/core-metadata/src/main/java/org/apache/kylin/metadata/datatype/BigDecimalSerializer.java
 
b/core-metadata/src/main/java/org/apache/kylin/metadata/datatype/BigDecimalSerializer.java
index 967c00d..8d2a454 100644
--- 
a/core-metadata/src/main/java/org/apache/kylin/metadata/datatype/BigDecimalSerializer.java
+++ 
b/core-metadata/src/main/java/org/apache/kylin/metadata/datatype/BigDecimalSerializer.java
@@ -47,8 +47,7 @@ public class BigDecimalSerializer extends 
DataTypeSerializer<BigDecimal> {
     @Override
     public void serialize(BigDecimal value, ByteBuffer out) {
         if (value == null) {
-            BytesUtil.writeVInt(0, out);
-            BytesUtil.writeVInt(-1, out);
+            BytesUtil.writeVLongObject(null, out);
             return;
         }
 
@@ -63,24 +62,25 @@ public class BigDecimalSerializer extends 
DataTypeSerializer<BigDecimal> {
             throw new IllegalArgumentException("'" + value + "' exceeds the 
expected length for type " + type);
         }
 
-        BytesUtil.writeVInt(value.scale(), out);
+        BytesUtil.writeVLong(value.scale(), out);
         BytesUtil.writeVInt(bytes.length, out);
         out.put(bytes);
     }
 
     @Override
     public BigDecimal deserialize(ByteBuffer in) {
-        int scale = BytesUtil.readVInt(in);
-        int n = BytesUtil.readVInt(in);
-
-        if (n < 0) {
+        Long scale = BytesUtil.readVLongObject(in);
+        if (scale == null)
             return null;
-        }
 
+        if ((scale > Integer.MAX_VALUE) || (scale < Integer.MIN_VALUE)) {
+            throw new IllegalArgumentException("value too long to fit in 
integer");
+        }
+        int n = BytesUtil.readVInt(in);
         byte[] bytes = new byte[n];
         in.get(bytes);
 
-        return new BigDecimal(new BigInteger(bytes), scale);
+        return new BigDecimal(new BigInteger(bytes), scale.intValue());
     }
 
     @Override
diff --git 
a/core-metadata/src/main/java/org/apache/kylin/metadata/datatype/DoubleSerializer.java
 
b/core-metadata/src/main/java/org/apache/kylin/metadata/datatype/DoubleSerializer.java
index d3677bf..05cd77d 100644
--- 
a/core-metadata/src/main/java/org/apache/kylin/metadata/datatype/DoubleSerializer.java
+++ 
b/core-metadata/src/main/java/org/apache/kylin/metadata/datatype/DoubleSerializer.java
@@ -30,12 +30,23 @@ public class DoubleSerializer extends 
DataTypeSerializer<Double> {
 
     @Override
     public void serialize(Double value, ByteBuffer out) {
+        // use Double.NaN binary represent NULL,for Double.NaN is replaced by 
NULL in Hive
+        if (value == null) {
+            out.putDouble(Double.NaN);
+            return;
+        }
+
         out.putDouble(value);
     }
 
     @Override
     public Double deserialize(ByteBuffer in) {
-        return in.getDouble();
+        Double value = in.getDouble();
+        if (Double.compare(Double.NaN, value) == 0) {
+            return null;
+        }
+
+        return value;
     }
 
     @Override
diff --git 
a/core-metadata/src/main/java/org/apache/kylin/metadata/datatype/LongSerializer.java
 
b/core-metadata/src/main/java/org/apache/kylin/metadata/datatype/LongSerializer.java
index 9f72a82..46cb0c8 100644
--- 
a/core-metadata/src/main/java/org/apache/kylin/metadata/datatype/LongSerializer.java
+++ 
b/core-metadata/src/main/java/org/apache/kylin/metadata/datatype/LongSerializer.java
@@ -32,12 +32,12 @@ public class LongSerializer extends 
DataTypeSerializer<Long> {
 
     @Override
     public void serialize(Long value, ByteBuffer out) {
-        BytesUtil.writeVLong(value, out);
+        BytesUtil.writeVLongObject(value, out);
     }
 
     @Override
     public Long deserialize(ByteBuffer in) {
-        return BytesUtil.readVLong(in);
+        return BytesUtil.readVLongObject(in);
     }
 
     @Override
diff --git 
a/core-metadata/src/test/java/org/apache/kylin/measure/basic/BasicAggregatorTest.java
 
b/core-metadata/src/test/java/org/apache/kylin/measure/basic/BasicAggregatorTest.java
new file mode 100644
index 0000000..8673d25
--- /dev/null
+++ 
b/core-metadata/src/test/java/org/apache/kylin/measure/basic/BasicAggregatorTest.java
@@ -0,0 +1,178 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.kylin.measure.basic;
+
+import java.math.BigDecimal;
+
+import org.apache.kylin.measure.MeasureAggregator;
+import org.junit.Assert;
+import org.junit.Test;
+
+public class BasicAggregatorTest {
+
+    @Test
+    public void testSumNull() throws Exception {
+        // Sum NULL
+        testNullAggregator(LongSumAggregator.class);
+        testNullAggregator(DoubleSumAggregator.class);
+        testNullAggregator(BigDecimalSumAggregator.class);
+
+        // Max NULL
+        testNullAggregator(LongMaxAggregator.class);
+        testNullAggregator(DoubleMaxAggregator.class);
+        testNullAggregator(BigDecimalMaxAggregator.class);
+
+        // Min NULL
+        testNullAggregator(LongMinAggregator.class);
+        testNullAggregator(DoubleMinAggregator.class);
+        testNullAggregator(BigDecimalMinAggregator.class);
+    }
+
+    @Test
+    public void testSumNormal() throws Exception {
+        // sum long
+        LongSumAggregator sumAggregator = new LongSumAggregator();
+        testNormalAggregator(sumAggregator, 0L, 1234L);
+        testNormalAggregator(sumAggregator, 1234L, 1234L);
+        testNormalAggregator(sumAggregator, -1234L, 2345L);
+        testNormalAggregator(sumAggregator, -1234L, -2345L);
+        testNormalAggregator(sumAggregator, Long.MAX_VALUE - 1, 1L);
+        testNormalAggregator(sumAggregator, Long.MIN_VALUE + 1, 
Long.MAX_VALUE);
+        // sum double
+        DoubleSumAggregator doubleSumAggregator = new DoubleSumAggregator();
+        testNormalAggregator(doubleSumAggregator, 0.0d, 1234.6d);
+        testNormalAggregator(doubleSumAggregator, -1234.5678d, 1234.65d);
+        testNormalAggregator(doubleSumAggregator, -1234.5678d, -1234.65d);
+        testNormalAggregator(doubleSumAggregator, Double.MAX_VALUE - 0.34d, 
0.34d);
+        testNormalAggregator(doubleSumAggregator, Double.MIN_VALUE, 
Double.MIN_VALUE);
+        testNormalAggregator(doubleSumAggregator, -Double.MAX_VALUE, 
-Double.MIN_VALUE);
+        // sum big-decimal
+        BigDecimalSumAggregator bigDecimalSumAggregator = new 
BigDecimalSumAggregator();
+        testNormalAggregator(bigDecimalSumAggregator, new BigDecimal(0), new 
BigDecimal(0));
+        testNormalAggregator(bigDecimalSumAggregator, new 
BigDecimal(12345678.098765432),
+                new BigDecimal(123456789.09876543211));
+        testNormalAggregator(bigDecimalSumAggregator, new 
BigDecimal(-1222345678.098765432),
+                new BigDecimal(-456123456789.09876543211));
+        testNormalAggregator(bigDecimalSumAggregator, new 
BigDecimal(-456712345678.098765432),
+                new BigDecimal(123456789.09876543211));
+
+        // max long
+        LongMaxAggregator maxLongAggregator = new LongMaxAggregator();
+        testNormalAggregator(maxLongAggregator, 0L, 1234L);
+        testNormalAggregator(maxLongAggregator, 1234L, 1234L);
+        testNormalAggregator(maxLongAggregator, -1234L, 2345L);
+        testNormalAggregator(maxLongAggregator, -1234L, -2345L);
+        testNormalAggregator(maxLongAggregator, Long.MAX_VALUE - 1, 1L);
+        testNormalAggregator(maxLongAggregator, Long.MIN_VALUE + 1, 
Long.MAX_VALUE);
+        // max double
+        DoubleMaxAggregator doubleMaxAggregator = new DoubleMaxAggregator();
+        testNormalAggregator(doubleMaxAggregator, 0.0d, 1234.6d);
+        testNormalAggregator(doubleMaxAggregator, -1234.5678d, 1234.65d);
+        testNormalAggregator(doubleMaxAggregator, -1234.5678d, -1234.65d);
+        testNormalAggregator(doubleMaxAggregator, Double.MAX_VALUE - 0.34d, 
0.34d);
+        testNormalAggregator(doubleMaxAggregator, Double.MIN_VALUE, 
Double.MIN_VALUE);
+        testNormalAggregator(doubleMaxAggregator, -Double.MAX_VALUE, 
-Double.MIN_VALUE);
+        // max big-decimal
+        BigDecimalMaxAggregator bigDecimalMaxAggregator = new 
BigDecimalMaxAggregator();
+        testNormalAggregator(bigDecimalMaxAggregator, new BigDecimal(0), new 
BigDecimal(0));
+        testNormalAggregator(bigDecimalMaxAggregator, new 
BigDecimal(12345678.098765432),
+                new BigDecimal(123456789.09876543211));
+        testNormalAggregator(bigDecimalMaxAggregator, new 
BigDecimal(-1222345678.098765432),
+                new BigDecimal(-456123456789.09876543211));
+        testNormalAggregator(bigDecimalMaxAggregator, new 
BigDecimal(-456712345678.098765432),
+                new BigDecimal(123456789.09876543211));
+
+        // min long
+        LongMinAggregator longMinAggregator = new LongMinAggregator();
+        testNormalAggregator(longMinAggregator, 0L, 1234L);
+        testNormalAggregator(longMinAggregator, 1234L, 1234L);
+        testNormalAggregator(longMinAggregator, -1234L, 2345L);
+        testNormalAggregator(longMinAggregator, -1234L, -2345L);
+        testNormalAggregator(longMinAggregator, Long.MAX_VALUE - 1, 1L);
+        testNormalAggregator(longMinAggregator, Long.MIN_VALUE + 1, 
Long.MAX_VALUE);
+        // min double
+        DoubleMinAggregator doubleMinAggregator = new DoubleMinAggregator();
+        testNormalAggregator(doubleMinAggregator, 0.0d, 1234.6d);
+        testNormalAggregator(doubleMinAggregator, -1234.5678d, 1234.65d);
+        testNormalAggregator(doubleMinAggregator, -1234.5678d, -1234.65d);
+        testNormalAggregator(doubleMinAggregator, Double.MAX_VALUE - 0.34d, 
0.34d);
+        testNormalAggregator(doubleMinAggregator, Double.MIN_VALUE, 
Double.MIN_VALUE);
+        testNormalAggregator(doubleMinAggregator, -Double.MAX_VALUE, 
-Double.MIN_VALUE);
+        // min big-decimal
+        BigDecimalMinAggregator bigDecimalMinAggregator = new 
BigDecimalMinAggregator();
+        testNormalAggregator(bigDecimalMinAggregator, new BigDecimal(0), new 
BigDecimal(0));
+        testNormalAggregator(bigDecimalMinAggregator, new 
BigDecimal(12345678.098765432),
+                new BigDecimal(123456789.09876543211));
+        testNormalAggregator(bigDecimalMinAggregator, new 
BigDecimal(-1222345678.098765432),
+                new BigDecimal(-456123456789.09876543211));
+        testNormalAggregator(bigDecimalMinAggregator, new 
BigDecimal(-456712345678.098765432),
+                new BigDecimal(123456789.09876543211));
+    }
+
+    private void testNullAggregator(Class<? extends MeasureAggregator> clazz) 
throws Exception {
+        MeasureAggregator aggregator = clazz.newInstance();
+        aggregator.aggregate(null);
+        Assert.assertEquals(null, aggregator.getState());
+        Assert.assertEquals(null, aggregator.aggregate(null, null));
+
+        Number value = null;
+        if (aggregator.getState() instanceof Long) {
+            value = 1234L;
+        } else if (aggregator.getState() instanceof Double) {
+            value = 1234.456;
+        } else if (aggregator.getState() instanceof BigDecimal) {
+            value = new BigDecimal(12340.56789);
+        }
+        aggregator.aggregate(value);
+        Assert.assertEquals(value, aggregator.getState());
+    }
+
+    private <V> void testNormalAggregator(MeasureAggregator<V> aggregator, V 
value1, V value2) {
+        aggregator.reset();
+        V res = aggregator.aggregate(value1, value2);
+        if (aggregator.getClass().getName().contains("Max")) {
+            // max
+            if (value1 instanceof Long)
+                Assert.assertEquals(Math.max((Long) value1, (Long) value2), 
res);
+            else if (value1 instanceof Double)
+                Assert.assertEquals(Math.max((Double) value1, (Double) 
value2), (Double) res, 0.000001);
+            else
+                Assert.assertEquals(((BigDecimal) value1).max((BigDecimal) 
value2), res);
+
+        } else if (aggregator.getClass().getName().contains("Min")) {
+            // min
+            if (value1 instanceof Long)
+                Assert.assertEquals(Math.min((Long) value1, (Long) value2), 
res);
+            else if (value1 instanceof Double)
+                Assert.assertEquals(Math.min((Double) value1, (Double) 
value2), (Double) res, 0.0000001);
+            else
+                Assert.assertEquals(((BigDecimal) value1).min((BigDecimal) 
value2), res);
+
+        } else {
+            // sum
+            if (value1 instanceof Long)
+                Assert.assertEquals(((Long) value1) + ((Long) value2), res);
+            else if (value1 instanceof Double)
+                Assert.assertEquals(((Double) value1) + ((Double) value2), 
(Double) res, 0.000001);
+            else
+                Assert.assertEquals(((BigDecimal) value1).add((BigDecimal) 
value2), res);
+        }
+    }
+
+}
diff --git 
a/core-metadata/src/test/java/org/apache/kylin/metadata/datatype/SerializerTest.java
 
b/core-metadata/src/test/java/org/apache/kylin/metadata/datatype/SerializerTest.java
new file mode 100644
index 0000000..2ac2383
--- /dev/null
+++ 
b/core-metadata/src/test/java/org/apache/kylin/metadata/datatype/SerializerTest.java
@@ -0,0 +1,136 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ * 
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ * 
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+*/
+
+package org.apache.kylin.metadata.datatype;
+
+import static org.junit.Assert.assertEquals;
+
+import java.math.BigDecimal;
+import java.nio.ByteBuffer;
+
+import org.apache.kylin.common.util.LocalFileMetadataTestCase;
+import org.junit.After;
+import org.junit.AfterClass;
+import org.junit.Before;
+import org.junit.BeforeClass;
+import org.junit.Test;
+
+/**
+ */
+public class SerializerTest extends LocalFileMetadataTestCase {
+
+    private static BigDecimalSerializer bigDecimalSerializer;
+    private static LongSerializer longSerializer;
+    private static DoubleSerializer doubleSerializer;
+
+    private ByteBuffer buffer;
+
+    @AfterClass
+    public static void after() throws Exception {
+        cleanAfterClass();
+    }
+
+    @BeforeClass
+    public static void beforeClass() {
+        staticCreateTestMetadata();
+        bigDecimalSerializer = new 
BigDecimalSerializer(DataType.getType("decimal"));
+        longSerializer = new LongSerializer(DataType.getType("long"));
+        doubleSerializer = new DoubleSerializer(DataType.getType("double"));
+    }
+
+    @Before
+    public void setup() {
+        buffer = ByteBuffer.allocate(256);
+    }
+
+    @After
+    public void clean() {
+        buffer.clear();
+    }
+
+    @Test
+    public void testScaleOutOfRange() {
+        BigDecimal input = new BigDecimal("1234.1234567890");
+        buffer.mark();
+        bigDecimalSerializer.serialize(input, buffer);
+        buffer.reset();
+        BigDecimal output = bigDecimalSerializer.deserialize(buffer);
+        assertEquals(input.setScale(bigDecimalSerializer.type.getScale(), 
BigDecimal.ROUND_HALF_EVEN), output);
+    }
+
+    @Test(expected = IllegalArgumentException.class)
+    public void testOutOfPrecision() {
+        BigDecimal input = new BigDecimal("66855344214907231736.4924");
+        bigDecimalSerializer.serialize(input, buffer);
+    }
+
+    @Test
+    public void testNull() {
+        //BigDecimal NULL
+        buffer.mark();
+        bigDecimalSerializer.serialize(null, buffer);
+        buffer.reset();
+        assertEquals(null, bigDecimalSerializer.deserialize(buffer));
+
+        //Long NULL
+        buffer.mark();
+        longSerializer.serialize(null, buffer);
+        buffer.reset();
+        assertEquals(null, longSerializer.deserialize(buffer));
+
+        //Double NULL
+        buffer.mark();
+        doubleSerializer.serialize(null, buffer);
+        buffer.reset();
+        assertEquals(null, doubleSerializer.deserialize(buffer));
+    }
+
+    @Test
+    public void testNormal() {
+        // Long
+        testNumberSerialization(longSerializer, 1024L);
+        testNumberSerialization(longSerializer, -1024L);
+        testNumberSerialization(longSerializer, 0L);
+        testNumberSerialization(longSerializer, Long.MAX_VALUE);
+        testNumberSerialization(longSerializer, Long.MIN_VALUE);
+
+        //Double
+        testNumberSerialization(doubleSerializer, 1234.56d);
+        testNumberSerialization(doubleSerializer, -1234.56d);
+        testNumberSerialization(doubleSerializer, 0.0);
+        testNumberSerialization(doubleSerializer, Double.MAX_VALUE);
+        testNumberSerialization(doubleSerializer, -Double.MAX_VALUE);
+        testNumberSerialization(doubleSerializer, Double.MIN_VALUE);
+        testNumberSerialization(doubleSerializer, -Double.MAX_VALUE);
+
+        //Bigdecimal
+        testNumberSerialization(bigDecimalSerializer, new BigDecimal(0));
+        testNumberSerialization(bigDecimalSerializer, new 
BigDecimal(123456789.0987654321 * 123456789.0987654321));
+        testNumberSerialization(bigDecimalSerializer, new 
BigDecimal(-123456789.0987654321 * 123456789.0987654321));
+    }
+
+    private <T> void testNumberSerialization(DataTypeSerializer<T> serializer, 
T number) {
+        buffer.mark();
+        serializer.serialize(number, buffer);
+        buffer.reset();
+        if (number instanceof Double)
+            assertEquals(((Double) number), (Double) 
serializer.deserialize(buffer), 0.0000001);
+        else
+            assertEquals(number, serializer.deserialize(buffer));
+    }
+}
diff --git 
a/engine-mr/src/test/java/org/apache/kylin/engine/mr/steps/CubeReducerTest.java 
b/engine-mr/src/test/java/org/apache/kylin/engine/mr/steps/CubeReducerTest.java
index f90a644..aa34a0f 100644
--- 
a/engine-mr/src/test/java/org/apache/kylin/engine/mr/steps/CubeReducerTest.java
+++ 
b/engine-mr/src/test/java/org/apache/kylin/engine/mr/steps/CubeReducerTest.java
@@ -159,9 +159,11 @@ public class CubeReducerTest extends 
LocalFileMetadataTestCase {
         assertTrue(result.contains(p3));
     }
 
-    private Text newValueText(BufferedMeasureCodec codec, String sum, String 
min, String max, int count, int item_count) {
-        Object[] values = new Object[] { sum == null ? null : new 
BigDecimal(sum), //
-                new BigDecimal(min), new BigDecimal(max), new Long(count), new 
Long(item_count) };
+    private Text newValueText(BufferedMeasureCodec codec, String sum, String 
min, String max, int count,
+            int item_count) {
+        Object[] values = new Object[] { sum == null ? null : new 
BigDecimal(sum),
+                min == null ? null : new BigDecimal(min), max == null ? null : 
new BigDecimal(max), new Long(count),
+                new Long(item_count) };
 
         ByteBuffer buf = codec.encode(values);
 

Reply via email to