This is an automated email from the ASF dual-hosted git repository.

zabetak pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/hive.git


The following commit(s) were added to refs/heads/master by this push:
     new b6cbb2e6a2f HIVE-26277: NPEs and rounding issues in 
ColumnStatsAggregator classes (Alessandro Solimando reviewed by Stamatis 
Zampetakis)
b6cbb2e6a2f is described below

commit b6cbb2e6a2f3d3c5de565492c3f658cbf94d96fb
Author: Alessandro Solimando <alessandro.solima...@gmail.com>
AuthorDate: Fri May 13 17:29:30 2022 +0200

    HIVE-26277: NPEs and rounding issues in ColumnStatsAggregator classes 
(Alessandro Solimando reviewed by Stamatis Zampetakis)
    
    1. Add and invoke checkStatisticsList to prevent NPEs in aggregators;
    they all rely on a non-empty list of statistics.
    2. Cast integers to double in divisions to make computations more
    accurate and avoid rounding issues.
    3. Align loggers names to match the class they are in and avoid
    misleading log messages.
    4. Add documentation for ndvtuner based on current understanding of how
    it should work.
    
    Closes #3339
    
    Move (and complete) ndvTuner documentation from tests to production classes
---
 .../aggr/BinaryColumnStatsAggregator.java          |   2 +
 .../aggr/BooleanColumnStatsAggregator.java         |   2 +
 .../columnstats/aggr/ColumnStatsAggregator.java    |  19 ++
 .../aggr/DateColumnStatsAggregator.java            |  14 +-
 .../aggr/DecimalColumnStatsAggregator.java         |   5 +-
 .../aggr/DoubleColumnStatsAggregator.java          |   2 +
 .../aggr/LongColumnStatsAggregator.java            |  10 +-
 .../aggr/StringColumnStatsAggregator.java          |   4 +-
 .../aggr/TimestampColumnStatsAggregator.java       |  14 +-
 .../hadoop/hive/metastore/StatisticsTestUtils.java | 112 +++++++++
 .../metastore/columnstats/ColStatsBuilder.java     | 187 ++++++++++++++
 .../aggr/BinaryColumnStatsAggregatorTest.java      | 101 ++++++++
 .../aggr/BooleanColumnStatsAggregatorTest.java     | 101 ++++++++
 .../aggr/DateColumnStatsAggregatorTest.java        | 270 ++++++++++++++++++++
 .../aggr/DecimalColumnStatsAggregatorTest.java     | 256 +++++++++++++++++++
 .../aggr/DoubleColumnStatsAggregatorTest.java      | 242 ++++++++++++++++++
 .../aggr/LongColumnStatsAggregatorTest.java        | 242 ++++++++++++++++++
 .../aggr/StringColumnStatsAggregatorTest.java      | 188 ++++++++++++++
 .../aggr/TimestampColumnStatsAggregatorTest.java   | 273 +++++++++++++++++++++
 19 files changed, 2028 insertions(+), 16 deletions(-)

diff --git 
a/standalone-metastore/metastore-server/src/main/java/org/apache/hadoop/hive/metastore/columnstats/aggr/BinaryColumnStatsAggregator.java
 
b/standalone-metastore/metastore-server/src/main/java/org/apache/hadoop/hive/metastore/columnstats/aggr/BinaryColumnStatsAggregator.java
index c885cf2d44f..552c91835f7 100644
--- 
a/standalone-metastore/metastore-server/src/main/java/org/apache/hadoop/hive/metastore/columnstats/aggr/BinaryColumnStatsAggregator.java
+++ 
b/standalone-metastore/metastore-server/src/main/java/org/apache/hadoop/hive/metastore/columnstats/aggr/BinaryColumnStatsAggregator.java
@@ -32,6 +32,8 @@ public class BinaryColumnStatsAggregator extends 
ColumnStatsAggregator {
   @Override
   public ColumnStatisticsObj aggregate(List<ColStatsObjWithSourceInfo> 
colStatsWithSourceInfo,
       List<String> partNames, boolean areAllPartsFound) throws MetaException {
+    checkStatisticsList(colStatsWithSourceInfo);
+
     ColumnStatisticsObj statsObj = null;
     String colType = null;
     String colName = null;
diff --git 
a/standalone-metastore/metastore-server/src/main/java/org/apache/hadoop/hive/metastore/columnstats/aggr/BooleanColumnStatsAggregator.java
 
b/standalone-metastore/metastore-server/src/main/java/org/apache/hadoop/hive/metastore/columnstats/aggr/BooleanColumnStatsAggregator.java
index 6fafab53e0f..9babeea8510 100644
--- 
a/standalone-metastore/metastore-server/src/main/java/org/apache/hadoop/hive/metastore/columnstats/aggr/BooleanColumnStatsAggregator.java
+++ 
b/standalone-metastore/metastore-server/src/main/java/org/apache/hadoop/hive/metastore/columnstats/aggr/BooleanColumnStatsAggregator.java
@@ -32,6 +32,8 @@ public class BooleanColumnStatsAggregator extends 
ColumnStatsAggregator {
   @Override
   public ColumnStatisticsObj aggregate(List<ColStatsObjWithSourceInfo> 
colStatsWithSourceInfo,
       List<String> partNames, boolean areAllPartsFound) throws MetaException {
+    checkStatisticsList(colStatsWithSourceInfo);
+
     ColumnStatisticsObj statsObj = null;
     String colType = null;
     String colName = null;
diff --git 
a/standalone-metastore/metastore-server/src/main/java/org/apache/hadoop/hive/metastore/columnstats/aggr/ColumnStatsAggregator.java
 
b/standalone-metastore/metastore-server/src/main/java/org/apache/hadoop/hive/metastore/columnstats/aggr/ColumnStatsAggregator.java
index c4325763beb..144e71c69ec 100644
--- 
a/standalone-metastore/metastore-server/src/main/java/org/apache/hadoop/hive/metastore/columnstats/aggr/ColumnStatsAggregator.java
+++ 
b/standalone-metastore/metastore-server/src/main/java/org/apache/hadoop/hive/metastore/columnstats/aggr/ColumnStatsAggregator.java
@@ -27,9 +27,28 @@ import 
org.apache.hadoop.hive.metastore.utils.MetaStoreServerUtils.ColStatsObjWi
 
 public abstract class ColumnStatsAggregator {
   public boolean useDensityFunctionForNDVEstimation;
+  /**
+   * The tuner controls the derivation of the NDV value when aggregating 
statistics from multiple partitions. It accepts
+   * values in the range [0, 1] pushing the aggregated NDV closer to the 
lower, or upper bound respectively.
+   * <p>
+   * For example, consider the aggregation of three partitions with NDV values 
2, 3, and 4, respectively. The NDV
+   * lower bound is 4 (the highest among individual NDVs), and the upper bound 
is 9 (the sum of individual NDVs). In
+   * this case the aggregated NDV will be in the range [4, 9] touching the 
bounds when the tuner is equal to 0, or 1
+   * respectively.
+   * </p>
+   * <p>
+   * It is optional and concrete implementations can choose to ignore it 
completely.
+   * </p>
+   */
   public double ndvTuner;
 
   public abstract ColumnStatisticsObj aggregate(
       List<ColStatsObjWithSourceInfo> colStatsWithSourceInfo, List<String> 
partNames,
       boolean areAllPartsFound) throws MetaException;
+
+  void checkStatisticsList(List<ColStatsObjWithSourceInfo> 
colStatsWithSourceInfo) {
+    if (colStatsWithSourceInfo.isEmpty()) {
+      throw new IllegalArgumentException("Column statistics list must not be 
empty when aggregating");
+    }
+  }
 }
diff --git 
a/standalone-metastore/metastore-server/src/main/java/org/apache/hadoop/hive/metastore/columnstats/aggr/DateColumnStatsAggregator.java
 
b/standalone-metastore/metastore-server/src/main/java/org/apache/hadoop/hive/metastore/columnstats/aggr/DateColumnStatsAggregator.java
index 281ddaa90f3..a0dcbe9d6a7 100644
--- 
a/standalone-metastore/metastore-server/src/main/java/org/apache/hadoop/hive/metastore/columnstats/aggr/DateColumnStatsAggregator.java
+++ 
b/standalone-metastore/metastore-server/src/main/java/org/apache/hadoop/hive/metastore/columnstats/aggr/DateColumnStatsAggregator.java
@@ -49,6 +49,8 @@ public class DateColumnStatsAggregator extends 
ColumnStatsAggregator implements
   @Override
   public ColumnStatisticsObj aggregate(List<ColStatsObjWithSourceInfo> 
colStatsWithSourceInfo,
             List<String> partNames, boolean areAllPartsFound) throws 
MetaException {
+    checkStatisticsList(colStatsWithSourceInfo);
+
     ColumnStatisticsObj statsObj = null;
     String colType = null;
     String colName = null;
@@ -99,9 +101,10 @@ public class DateColumnStatsAggregator extends 
ColumnStatsAggregator implements
       for (ColStatsObjWithSourceInfo csp : colStatsWithSourceInfo) {
         ColumnStatisticsObj cso = csp.getColStatsObj();
         DateColumnStatsDataInspector newData = dateInspectorFromStats(cso);
+        lowerBound = Math.max(lowerBound, newData.getNumDVs());
         higherBound += newData.getNumDVs();
         if (newData.isSetLowValue() && newData.isSetHighValue()) {
-          densityAvgSum += (diff(newData.getHighValue(), 
newData.getLowValue())) / newData.getNumDVs();
+          densityAvgSum += ((double) diff(newData.getHighValue(), 
newData.getLowValue())) / newData.getNumDVs();
         }
         if (ndvEstimator != null) {
           ndvEstimator.mergeEstimators(newData.getNdvEstimator());
@@ -124,7 +127,8 @@ public class DateColumnStatsAggregator extends 
ColumnStatsAggregator implements
         aggregateData.setNumDVs(ndvEstimator.estimateNumDistinctValues());
       } else {
         long estimation;
-        if (useDensityFunctionForNDVEstimation) {
+        if (useDensityFunctionForNDVEstimation && aggregateData != null
+            && aggregateData.isSetLowValue() && 
aggregateData.isSetHighValue()) {
           // We have estimation, lowerbound and higherbound. We use estimation
           // if it is between lowerbound and higherbound.
           double densityAvg = densityAvgSum / partNames.size();
@@ -161,7 +165,7 @@ public class DateColumnStatsAggregator extends 
ColumnStatsAggregator implements
           String partName = csp.getPartName();
           DateColumnStatsData newData = cso.getStatsData().getDateStats();
           if (useDensityFunctionForNDVEstimation) {
-            densityAvgSum += diff(newData.getHighValue(), 
newData.getLowValue()) / newData.getNumDVs();
+            densityAvgSum += ((double) diff(newData.getHighValue(), 
newData.getLowValue())) / newData.getNumDVs();
           }
           adjustedIndexMap.put(partName, (double) indexMap.get(partName));
           adjustedStatsMap.put(partName, cso.getStatsData());
@@ -190,7 +194,7 @@ public class DateColumnStatsAggregator extends 
ColumnStatsAggregator implements
               csd.setDateStats(aggregateData);
               adjustedStatsMap.put(pseudoPartName.toString(), csd);
               if (useDensityFunctionForNDVEstimation) {
-                densityAvgSum += diff(aggregateData.getHighValue(), 
aggregateData.getLowValue())
+                densityAvgSum += ((double) diff(aggregateData.getHighValue(), 
aggregateData.getLowValue()))
                     / aggregateData.getNumDVs();
               }
               // reset everything
@@ -223,7 +227,7 @@ public class DateColumnStatsAggregator extends 
ColumnStatsAggregator implements
           csd.setDateStats(aggregateData);
           adjustedStatsMap.put(pseudoPartName.toString(), csd);
           if (useDensityFunctionForNDVEstimation) {
-            densityAvgSum += diff(aggregateData.getHighValue(), 
aggregateData.getLowValue())
+            densityAvgSum += ((double) diff(aggregateData.getHighValue(), 
aggregateData.getLowValue()))
                 / aggregateData.getNumDVs();
           }
         }
diff --git 
a/standalone-metastore/metastore-server/src/main/java/org/apache/hadoop/hive/metastore/columnstats/aggr/DecimalColumnStatsAggregator.java
 
b/standalone-metastore/metastore-server/src/main/java/org/apache/hadoop/hive/metastore/columnstats/aggr/DecimalColumnStatsAggregator.java
index 63bc3fdc5ce..3e2093829b7 100644
--- 
a/standalone-metastore/metastore-server/src/main/java/org/apache/hadoop/hive/metastore/columnstats/aggr/DecimalColumnStatsAggregator.java
+++ 
b/standalone-metastore/metastore-server/src/main/java/org/apache/hadoop/hive/metastore/columnstats/aggr/DecimalColumnStatsAggregator.java
@@ -50,6 +50,8 @@ public class DecimalColumnStatsAggregator extends 
ColumnStatsAggregator implemen
   @Override
   public ColumnStatisticsObj aggregate(List<ColStatsObjWithSourceInfo> 
colStatsWithSourceInfo,
       List<String> partNames, boolean areAllPartsFound) throws MetaException {
+    checkStatisticsList(colStatsWithSourceInfo);
+
     ColumnStatisticsObj statsObj = null;
     String colType = null;
     String colName = null;
@@ -128,7 +130,8 @@ public class DecimalColumnStatsAggregator extends 
ColumnStatsAggregator implemen
         aggregateData.setNumDVs(ndvEstimator.estimateNumDistinctValues());
       } else {
         long estimation;
-        if (useDensityFunctionForNDVEstimation) {
+        if (useDensityFunctionForNDVEstimation && aggregateData != null
+            && aggregateData.isSetLowValue() && 
aggregateData.isSetHighValue()) {
           // We have estimation, lowerbound and higherbound. We use estimation
           // if it is between lowerbound and higherbound.
           double densityAvg = densityAvgSum / partNames.size();
diff --git 
a/standalone-metastore/metastore-server/src/main/java/org/apache/hadoop/hive/metastore/columnstats/aggr/DoubleColumnStatsAggregator.java
 
b/standalone-metastore/metastore-server/src/main/java/org/apache/hadoop/hive/metastore/columnstats/aggr/DoubleColumnStatsAggregator.java
index 6d4e6472aa7..2caa2f32a3c 100644
--- 
a/standalone-metastore/metastore-server/src/main/java/org/apache/hadoop/hive/metastore/columnstats/aggr/DoubleColumnStatsAggregator.java
+++ 
b/standalone-metastore/metastore-server/src/main/java/org/apache/hadoop/hive/metastore/columnstats/aggr/DoubleColumnStatsAggregator.java
@@ -48,6 +48,8 @@ public class DoubleColumnStatsAggregator extends 
ColumnStatsAggregator implement
   @Override
   public ColumnStatisticsObj aggregate(List<ColStatsObjWithSourceInfo> 
colStatsWithSourceInfo,
       List<String> partNames, boolean areAllPartsFound) throws MetaException {
+    checkStatisticsList(colStatsWithSourceInfo);
+
     ColumnStatisticsObj statsObj = null;
     String colType = null;
     String colName = null;
diff --git 
a/standalone-metastore/metastore-server/src/main/java/org/apache/hadoop/hive/metastore/columnstats/aggr/LongColumnStatsAggregator.java
 
b/standalone-metastore/metastore-server/src/main/java/org/apache/hadoop/hive/metastore/columnstats/aggr/LongColumnStatsAggregator.java
index ffde02455ae..dd35e0b35c8 100644
--- 
a/standalone-metastore/metastore-server/src/main/java/org/apache/hadoop/hive/metastore/columnstats/aggr/LongColumnStatsAggregator.java
+++ 
b/standalone-metastore/metastore-server/src/main/java/org/apache/hadoop/hive/metastore/columnstats/aggr/LongColumnStatsAggregator.java
@@ -48,6 +48,8 @@ public class LongColumnStatsAggregator extends 
ColumnStatsAggregator implements
   @Override
   public ColumnStatisticsObj aggregate(List<ColStatsObjWithSourceInfo> 
colStatsWithSourceInfo,
       List<String> partNames, boolean areAllPartsFound) throws MetaException {
+    checkStatisticsList(colStatsWithSourceInfo);
+
     ColumnStatisticsObj statsObj = null;
     String colType = null;
     String colName = null;
@@ -100,7 +102,7 @@ public class LongColumnStatsAggregator extends 
ColumnStatsAggregator implements
         LongColumnStatsDataInspector newData = longInspectorFromStats(cso);
         lowerBound = Math.max(lowerBound, newData.getNumDVs());
         higherBound += newData.getNumDVs();
-        densityAvgSum += (newData.getHighValue() - newData.getLowValue()) / 
newData.getNumDVs();
+        densityAvgSum += ((double) (newData.getHighValue() - 
newData.getLowValue())) / newData.getNumDVs();
         if (ndvEstimator != null) {
           ndvEstimator.mergeEstimators(newData.getNdvEstimator());
         }
@@ -159,7 +161,7 @@ public class LongColumnStatsAggregator extends 
ColumnStatsAggregator implements
           String partName = csp.getPartName();
           LongColumnStatsData newData = cso.getStatsData().getLongStats();
           if (useDensityFunctionForNDVEstimation) {
-            densityAvgSum += (newData.getHighValue() - newData.getLowValue()) 
/ newData.getNumDVs();
+            densityAvgSum += ((double) (newData.getHighValue() - 
newData.getLowValue())) / newData.getNumDVs();
           }
           adjustedIndexMap.put(partName, (double) indexMap.get(partName));
           adjustedStatsMap.put(partName, cso.getStatsData());
@@ -188,7 +190,7 @@ public class LongColumnStatsAggregator extends 
ColumnStatsAggregator implements
               csd.setLongStats(aggregateData);
               adjustedStatsMap.put(pseudoPartName.toString(), csd);
               if (useDensityFunctionForNDVEstimation) {
-                densityAvgSum += (aggregateData.getHighValue() - 
aggregateData.getLowValue()) / aggregateData.getNumDVs();
+                densityAvgSum += ((double) (aggregateData.getHighValue() - 
aggregateData.getLowValue())) / aggregateData.getNumDVs();
               }
               // reset everything
               pseudoPartName = new StringBuilder();
@@ -221,7 +223,7 @@ public class LongColumnStatsAggregator extends 
ColumnStatsAggregator implements
           csd.setLongStats(aggregateData);
           adjustedStatsMap.put(pseudoPartName.toString(), csd);
           if (useDensityFunctionForNDVEstimation) {
-            densityAvgSum += (aggregateData.getHighValue() - 
aggregateData.getLowValue()) / aggregateData.getNumDVs();
+            densityAvgSum += ((double) (aggregateData.getHighValue() - 
aggregateData.getLowValue())) / aggregateData.getNumDVs();
           }
         }
       }
diff --git 
a/standalone-metastore/metastore-server/src/main/java/org/apache/hadoop/hive/metastore/columnstats/aggr/StringColumnStatsAggregator.java
 
b/standalone-metastore/metastore-server/src/main/java/org/apache/hadoop/hive/metastore/columnstats/aggr/StringColumnStatsAggregator.java
index 6fb0fb5d8f9..bb38b8cfaa7 100644
--- 
a/standalone-metastore/metastore-server/src/main/java/org/apache/hadoop/hive/metastore/columnstats/aggr/StringColumnStatsAggregator.java
+++ 
b/standalone-metastore/metastore-server/src/main/java/org/apache/hadoop/hive/metastore/columnstats/aggr/StringColumnStatsAggregator.java
@@ -42,11 +42,13 @@ import static 
org.apache.hadoop.hive.metastore.columnstats.ColumnsStatsUtils.str
 public class StringColumnStatsAggregator extends ColumnStatsAggregator 
implements
     IExtrapolatePartStatus {
 
-  private static final Logger LOG = 
LoggerFactory.getLogger(LongColumnStatsAggregator.class);
+  private static final Logger LOG = 
LoggerFactory.getLogger(StringColumnStatsAggregator.class);
 
   @Override
   public ColumnStatisticsObj aggregate(List<ColStatsObjWithSourceInfo> 
colStatsWithSourceInfo,
       List<String> partNames, boolean areAllPartsFound) throws MetaException {
+    checkStatisticsList(colStatsWithSourceInfo);
+
     ColumnStatisticsObj statsObj = null;
     String colType = null;
     String colName = null;
diff --git 
a/standalone-metastore/metastore-server/src/main/java/org/apache/hadoop/hive/metastore/columnstats/aggr/TimestampColumnStatsAggregator.java
 
b/standalone-metastore/metastore-server/src/main/java/org/apache/hadoop/hive/metastore/columnstats/aggr/TimestampColumnStatsAggregator.java
index 8828f89ebfe..95e8db9fdf8 100644
--- 
a/standalone-metastore/metastore-server/src/main/java/org/apache/hadoop/hive/metastore/columnstats/aggr/TimestampColumnStatsAggregator.java
+++ 
b/standalone-metastore/metastore-server/src/main/java/org/apache/hadoop/hive/metastore/columnstats/aggr/TimestampColumnStatsAggregator.java
@@ -49,6 +49,8 @@ public class TimestampColumnStatsAggregator extends 
ColumnStatsAggregator implem
   @Override
   public ColumnStatisticsObj aggregate(List<ColStatsObjWithSourceInfo> 
colStatsWithSourceInfo,
                                        List<String> partNames, boolean 
areAllPartsFound) throws MetaException {
+    checkStatisticsList(colStatsWithSourceInfo);
+
     ColumnStatisticsObj statsObj = null;
     String colType = null;
     String colName = null;
@@ -99,9 +101,10 @@ public class TimestampColumnStatsAggregator extends 
ColumnStatsAggregator implem
       for (ColStatsObjWithSourceInfo csp : colStatsWithSourceInfo) {
         ColumnStatisticsObj cso = csp.getColStatsObj();
         TimestampColumnStatsDataInspector newData = 
timestampInspectorFromStats(cso);
+        lowerBound = Math.max(lowerBound, newData.getNumDVs());
         higherBound += newData.getNumDVs();
         if (newData.isSetLowValue() && newData.isSetHighValue()) {
-          densityAvgSum += (diff(newData.getHighValue(), 
newData.getLowValue())) / newData.getNumDVs();
+          densityAvgSum += ((double) (diff(newData.getHighValue(), 
newData.getLowValue())) / newData.getNumDVs());
         }
         if (ndvEstimator != null) {
           ndvEstimator.mergeEstimators(newData.getNdvEstimator());
@@ -124,7 +127,8 @@ public class TimestampColumnStatsAggregator extends 
ColumnStatsAggregator implem
         aggregateData.setNumDVs(ndvEstimator.estimateNumDistinctValues());
       } else {
         long estimation;
-        if (useDensityFunctionForNDVEstimation) {
+        if (useDensityFunctionForNDVEstimation && aggregateData != null
+            && aggregateData.isSetLowValue() && aggregateData.isSetHighValue() 
) {
           // We have estimation, lowerbound and higherbound. We use estimation
           // if it is between lowerbound and higherbound.
           double densityAvg = densityAvgSum / partNames.size();
@@ -161,7 +165,7 @@ public class TimestampColumnStatsAggregator extends 
ColumnStatsAggregator implem
           String partName = csp.getPartName();
           TimestampColumnStatsData newData = 
cso.getStatsData().getTimestampStats();
           if (useDensityFunctionForNDVEstimation) {
-            densityAvgSum += diff(newData.getHighValue(), 
newData.getLowValue()) / newData.getNumDVs();
+            densityAvgSum += ((double) diff(newData.getHighValue(), 
newData.getLowValue()) / newData.getNumDVs());
           }
           adjustedIndexMap.put(partName, (double) indexMap.get(partName));
           adjustedStatsMap.put(partName, cso.getStatsData());
@@ -190,7 +194,7 @@ public class TimestampColumnStatsAggregator extends 
ColumnStatsAggregator implem
               csd.setTimestampStats(aggregateData);
               adjustedStatsMap.put(pseudoPartName.toString(), csd);
               if (useDensityFunctionForNDVEstimation) {
-                densityAvgSum += diff(aggregateData.getHighValue(), 
aggregateData.getLowValue())
+                densityAvgSum += ((double) diff(aggregateData.getHighValue(), 
aggregateData.getLowValue()))
                     / aggregateData.getNumDVs();
               }
               // reset everything
@@ -223,7 +227,7 @@ public class TimestampColumnStatsAggregator extends 
ColumnStatsAggregator implem
           csd.setTimestampStats(aggregateData);
           adjustedStatsMap.put(pseudoPartName.toString(), csd);
           if (useDensityFunctionForNDVEstimation) {
-            densityAvgSum += diff(aggregateData.getHighValue(), 
aggregateData.getLowValue())
+            densityAvgSum += ((double) diff(aggregateData.getHighValue(), 
aggregateData.getLowValue()))
                 / aggregateData.getNumDVs();
           }
         }
diff --git 
a/standalone-metastore/metastore-server/src/test/java/org/apache/hadoop/hive/metastore/StatisticsTestUtils.java
 
b/standalone-metastore/metastore-server/src/test/java/org/apache/hadoop/hive/metastore/StatisticsTestUtils.java
new file mode 100644
index 00000000000..5520f04a4ff
--- /dev/null
+++ 
b/standalone-metastore/metastore-server/src/test/java/org/apache/hadoop/hive/metastore/StatisticsTestUtils.java
@@ -0,0 +1,112 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.hadoop.hive.metastore;
+
+import org.apache.hadoop.hive.common.ndv.fm.FMSketch;
+import org.apache.hadoop.hive.common.ndv.hll.HyperLogLog;
+import org.apache.hadoop.hive.metastore.api.ColumnStatistics;
+import org.apache.hadoop.hive.metastore.api.ColumnStatisticsData;
+import org.apache.hadoop.hive.metastore.api.ColumnStatisticsDesc;
+import org.apache.hadoop.hive.metastore.api.ColumnStatisticsObj;
+import org.apache.hadoop.hive.metastore.api.FieldSchema;
+import org.apache.hadoop.hive.metastore.api.Table;
+import 
org.apache.hadoop.hive.metastore.utils.MetaStoreServerUtils.ColStatsObjWithSourceInfo;
+
+import java.util.Collections;
+import java.util.List;
+import java.util.stream.Collectors;
+import java.util.stream.IntStream;
+
+import static org.apache.hadoop.hive.metastore.Warehouse.DEFAULT_CATALOG_NAME;
+
+public class StatisticsTestUtils {
+
+  private static final String HIVE_ENGINE = "hive";
+
+  private StatisticsTestUtils() {
+    throw new AssertionError("Suppress default constructor for non 
instantiation");
+  }
+
+  /**
+   * Creates a {@link ColStatsObjWithSourceInfo} object for a given table, 
partition and column information,
+   * using the given statistics data.
+   * @param data the column statistics data
+   * @param tbl the target table for stats
+   * @param column the target column for stats
+   * @param partName the target partition for stats
+   * @return column statistics objects with source info.
+   */
+  public static ColStatsObjWithSourceInfo 
createStatsWithInfo(ColumnStatisticsData data, Table tbl,
+      FieldSchema column, String partName) {
+    ColumnStatisticsObj statObj = new ColumnStatisticsObj(column.getName(), 
column.getType(), data);
+    return new ColStatsObjWithSourceInfo(statObj, tbl.getCatName(), 
tbl.getDbName(), column.getName(), partName);
+  }
+
+  /**
+   * Creates an FM sketch object initialized with the given values.
+   * @param values the values to be added
+   * @return an FM sketch initialized with the given values.
+   */
+  public static FMSketch createFMSketch(long... values) {
+    FMSketch fm = new FMSketch(1);
+    for (long value : values) {
+      fm.addToEstimator(value);
+    }
+    return fm;
+  }
+
+  /**
+   * Creates an FM sketch object initialized with the given values.
+   * @param values the values to be added
+   * @return an FM sketch initialized with the given values.
+   */
+  public static FMSketch createFMSketch(String... values) {
+    FMSketch fm = new FMSketch(1);
+    for (String value : values) {
+      fm.addToEstimator(value);
+    }
+    return fm;
+  }
+
+  /**
+   * Creates an HLL object initialized with the given values.
+   * @param values the values to be added
+   * @return an HLL object initialized with the given values.
+   */
+  public static HyperLogLog createHll(long... values) {
+    HyperLogLog hll = HyperLogLog.builder().build();
+    for (long value : values) {
+      hll.addLong(value);
+    }
+    return hll;
+  }
+
+  /**
+   * Creates an HLL object initialized with the given values.
+   * @param values the values to be added
+   * @return an HLL object initialized with the given values.
+   */
+  public static HyperLogLog createHll(String... values) {
+    HyperLogLog hll = HyperLogLog.builder().build();
+    for (String value : values) {
+      hll.addBytes(value.getBytes());
+    }
+    return hll;
+  }
+}
diff --git 
a/standalone-metastore/metastore-server/src/test/java/org/apache/hadoop/hive/metastore/columnstats/ColStatsBuilder.java
 
b/standalone-metastore/metastore-server/src/test/java/org/apache/hadoop/hive/metastore/columnstats/ColStatsBuilder.java
new file mode 100644
index 00000000000..6683d323ecc
--- /dev/null
+++ 
b/standalone-metastore/metastore-server/src/test/java/org/apache/hadoop/hive/metastore/columnstats/ColStatsBuilder.java
@@ -0,0 +1,187 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to you under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.hive.metastore.columnstats;
+
+import org.apache.hadoop.hive.common.ndv.fm.FMSketch;
+import org.apache.hadoop.hive.common.ndv.hll.HyperLogLog;
+import org.apache.hadoop.hive.metastore.StatisticsTestUtils;
+import org.apache.hadoop.hive.metastore.api.BinaryColumnStatsData;
+import org.apache.hadoop.hive.metastore.api.BooleanColumnStatsData;
+import org.apache.hadoop.hive.metastore.api.ColumnStatisticsData;
+import org.apache.hadoop.hive.metastore.api.Date;
+import org.apache.hadoop.hive.metastore.api.Decimal;
+import org.apache.hadoop.hive.metastore.api.Timestamp;
+import 
org.apache.hadoop.hive.metastore.columnstats.cache.DateColumnStatsDataInspector;
+import 
org.apache.hadoop.hive.metastore.columnstats.cache.DecimalColumnStatsDataInspector;
+import 
org.apache.hadoop.hive.metastore.columnstats.cache.DoubleColumnStatsDataInspector;
+import 
org.apache.hadoop.hive.metastore.columnstats.cache.LongColumnStatsDataInspector;
+import 
org.apache.hadoop.hive.metastore.columnstats.cache.StringColumnStatsDataInspector;
+import 
org.apache.hadoop.hive.metastore.columnstats.cache.TimestampColumnStatsDataInspector;
+
+import java.lang.reflect.InvocationTargetException;
+
+public class ColStatsBuilder<T> {
+
+  private final Class<T> type;
+  private T lowValue;
+  private T highValue;
+  private Double avgColLen;
+  private Long maxColLen;
+  private Long numTrues;
+  private Long numFalses;
+  private Long numNulls;
+  private Long numDVs;
+  private byte[] bitVector;
+
+  public ColStatsBuilder(Class<T> type) {
+    this.type = type;
+  }
+
+  public ColStatsBuilder<T> numNulls(long num) {
+    this.numNulls = num;
+    return this;
+  }
+
+  public ColStatsBuilder<T> numDVs(long num) {
+    this.numDVs = num;
+    return this;
+  }
+
+  public ColStatsBuilder<T> numFalses(long num) {
+    this.numFalses = num;
+    return this;
+  }
+
+  public ColStatsBuilder<T> numTrues(long num) {
+    this.numTrues = num;
+    return this;
+  }
+
+  public ColStatsBuilder<T> avgColLen(double val) {
+    this.avgColLen = val;
+    return this;
+  }
+
+  public ColStatsBuilder<T> maxColLen(long val) {
+    this.maxColLen = val;
+    return this;
+  }
+
+  public ColStatsBuilder<T> low(T val) {
+    this.lowValue = val;
+    return this;
+  }
+
+  public ColStatsBuilder<T> high(T val) {
+    this.highValue = val;
+    return this;
+  }
+
+  public ColStatsBuilder<T> hll(long... values) {
+    HyperLogLog hll = StatisticsTestUtils.createHll(values);
+    this.bitVector = hll.serialize();
+    return this;
+  }
+
+  public ColStatsBuilder<T> hll(String... values) {
+    HyperLogLog hll = StatisticsTestUtils.createHll(values);
+    this.bitVector = hll.serialize();
+    return this;
+  }
+
+  public ColStatsBuilder<T> fmSketch(long... values) {
+    FMSketch fm = StatisticsTestUtils.createFMSketch(values);
+    this.bitVector = fm.serialize();
+    return this;
+  }
+
+  public ColStatsBuilder<T> fmSketch(String... values) {
+    FMSketch fm = StatisticsTestUtils.createFMSketch(values);
+    this.bitVector = fm.serialize();
+    return this;
+  }
+
+  public ColumnStatisticsData build() {
+    ColumnStatisticsData data = new ColumnStatisticsData();
+    if (type == byte[].class) {
+      data.setBinaryStats(newColData(BinaryColumnStatsData.class));
+    } else if (type == Boolean.class) {
+      data.setBooleanStats(newColData(BooleanColumnStatsData.class));
+    } else if (type == Date.class) {
+      data.setDateStats(newColData(DateColumnStatsDataInspector.class));
+    } else if (type == Decimal.class) {
+      data.setDecimalStats(newColData(DecimalColumnStatsDataInspector.class));
+    } else if (type == double.class) {
+      data.setDoubleStats(newColData(DoubleColumnStatsDataInspector.class));
+    } else if (type == long.class) {
+      data.setLongStats(newColData(LongColumnStatsDataInspector.class));
+    } else if (type == String.class) {
+      data.setStringStats(newColData(StringColumnStatsDataInspector.class));
+    } else if (type == Timestamp.class) {
+      
data.setTimestampStats(newColData(TimestampColumnStatsDataInspector.class));
+    } else {
+      throw new IllegalStateException(type.getSimpleName() + " is not 
supported");
+    }
+    return data;
+  }
+
+  private <X> X newColData(Class<X> clazz) {
+    try {
+      X data = clazz.getDeclaredConstructor().newInstance();
+      if (numNulls != null) {
+        clazz.getMethod("setNumNulls", long.class).invoke(data, numNulls);
+      }
+      if (numDVs != null) {
+        clazz.getMethod("setNumDVs", long.class).invoke(data, numDVs);
+      }
+      if (bitVector != null) {
+        clazz.getMethod("setBitVectors", byte[].class).invoke(data, bitVector);
+      }
+      if (avgColLen != null) {
+        clazz.getMethod("setAvgColLen", double.class).invoke(data, avgColLen);
+      }
+      if (maxColLen != null) {
+        clazz.getMethod("setMaxColLen", long.class).invoke(data, maxColLen);
+      }
+      if (numFalses != null) {
+        clazz.getMethod("setNumFalses", long.class).invoke(data, numFalses);
+      }
+      if (numTrues != null) {
+        clazz.getMethod("setNumTrues", long.class).invoke(data, numTrues);
+      }
+
+      if (lowValue != null) {
+        if (type.isPrimitive()) {
+          clazz.getMethod("setLowValue", type).invoke(data, lowValue);
+        } else {
+          clazz.getMethod("setLowValue", type).invoke(data, 
type.cast(lowValue));
+        }
+      }
+      if (highValue != null) {
+        if (type.isPrimitive()) {
+          clazz.getMethod("setHighValue", type).invoke(data, highValue);
+        } else {
+          clazz.getMethod("setHighValue", type).invoke(data, 
type.cast(highValue));
+        }
+      }
+      clazz.getMethod("validate").invoke(data);
+      return data;
+    } catch (NoSuchMethodException | InstantiationException | 
IllegalAccessException | InvocationTargetException e) {
+      throw new RuntimeException("Reflection error", e);
+    }
+  }
+}
diff --git 
a/standalone-metastore/metastore-server/src/test/java/org/apache/hadoop/hive/metastore/columnstats/aggr/BinaryColumnStatsAggregatorTest.java
 
b/standalone-metastore/metastore-server/src/test/java/org/apache/hadoop/hive/metastore/columnstats/aggr/BinaryColumnStatsAggregatorTest.java
new file mode 100644
index 00000000000..cc9d4ca4a87
--- /dev/null
+++ 
b/standalone-metastore/metastore-server/src/test/java/org/apache/hadoop/hive/metastore/columnstats/aggr/BinaryColumnStatsAggregatorTest.java
@@ -0,0 +1,101 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.hadoop.hive.metastore.columnstats.aggr;
+
+import org.apache.hadoop.hive.metastore.TableType;
+import org.apache.hadoop.hive.metastore.annotation.MetastoreUnitTest;
+import org.apache.hadoop.hive.metastore.api.ColumnStatisticsData;
+import org.apache.hadoop.hive.metastore.api.ColumnStatisticsObj;
+import org.apache.hadoop.hive.metastore.api.FieldSchema;
+import org.apache.hadoop.hive.metastore.api.MetaException;
+import org.apache.hadoop.hive.metastore.api.Table;
+import org.apache.hadoop.hive.metastore.columnstats.ColStatsBuilder;
+import 
org.apache.hadoop.hive.metastore.utils.MetaStoreServerUtils.ColStatsObjWithSourceInfo;
+import org.junit.Assert;
+import org.junit.Test;
+import org.junit.experimental.categories.Category;
+
+import java.util.Arrays;
+import java.util.Collections;
+import java.util.List;
+
+import static 
org.apache.hadoop.hive.metastore.StatisticsTestUtils.createStatsWithInfo;
+
+@Category(MetastoreUnitTest.class)
+public class BinaryColumnStatsAggregatorTest {
+
+  private static final Table TABLE = new Table("dummy", "db", "hive", 0, 0,
+      0, null, null, Collections.emptyMap(), null, null,
+      TableType.MANAGED_TABLE.toString());
+  private static final FieldSchema COL = new FieldSchema("col", "binary", "");
+
+  @Test
+  public void testAggregateSingleStat() throws MetaException {
+    List<String> partitions = Collections.singletonList("part1");
+
+    ColumnStatisticsData data1 = new 
ColStatsBuilder<>(byte[].class).numNulls(1).avgColLen(8.5).maxColLen(13).build();
+    List<ColStatsObjWithSourceInfo> statsList =
+        Collections.singletonList(createStatsWithInfo(data1, TABLE, COL, 
partitions.get(0)));
+
+    BinaryColumnStatsAggregator aggregator = new BinaryColumnStatsAggregator();
+    ColumnStatisticsObj computedStatsObj = aggregator.aggregate(statsList, 
partitions, true);
+
+    Assert.assertEquals(data1, computedStatsObj.getStatsData());
+  }
+
+  @Test
+  public void testAggregateMultiStatsWhenAllAvailable() throws MetaException {
+    List<String> partitions = Arrays.asList("part1", "part2", "part3");
+
+    ColumnStatisticsData data1 = new 
ColStatsBuilder<>(byte[].class).numNulls(1).avgColLen(20.0 / 
3).maxColLen(13).build();
+    ColumnStatisticsData data2 = new 
ColStatsBuilder<>(byte[].class).numNulls(2).avgColLen(14).maxColLen(18).build();
+    ColumnStatisticsData data3 = new 
ColStatsBuilder<>(byte[].class).numNulls(3).avgColLen(17.5).maxColLen(18).build();
+
+    List<ColStatsObjWithSourceInfo> statsList = Arrays.asList(
+        createStatsWithInfo(data1, TABLE, COL, partitions.get(0)),
+        createStatsWithInfo(data2, TABLE, COL, partitions.get(1)),
+        createStatsWithInfo(data3, TABLE, COL, partitions.get(2)));
+
+    BinaryColumnStatsAggregator aggregator = new BinaryColumnStatsAggregator();
+    ColumnStatisticsObj computedStatsObj = aggregator.aggregate(statsList, 
partitions, true);
+    ColumnStatisticsData expectedStats = new 
ColStatsBuilder<>(byte[].class).numNulls(6).avgColLen(17.5).maxColLen(18).build();
+
+    Assert.assertEquals(expectedStats, computedStatsObj.getStatsData());
+  }
+
+  @Test
+  public void testAggregateMultiStatsWhenOnlySomeAvailable() throws 
MetaException {
+    List<String> partitions = Arrays.asList("part1", "part2", "part3", 
"part4");
+
+    ColumnStatisticsData data1 = new 
ColStatsBuilder<>(byte[].class).numNulls(1).avgColLen(20.0 / 
3).maxColLen(13).build();
+    ColumnStatisticsData data3 = new 
ColStatsBuilder<>(byte[].class).numNulls(3).avgColLen(17.5).maxColLen(18).build();
+    ColumnStatisticsData data4 = new 
ColStatsBuilder<>(byte[].class).numNulls(2).avgColLen(14).maxColLen(18).build();
+
+    List<ColStatsObjWithSourceInfo> statsList = Arrays.asList(
+        createStatsWithInfo(data1, TABLE, COL, partitions.get(0)),
+        createStatsWithInfo(data3, TABLE, COL, partitions.get(2)),
+        createStatsWithInfo(data4, TABLE, COL, partitions.get(3)));
+
+    BinaryColumnStatsAggregator aggregator = new BinaryColumnStatsAggregator();
+    ColumnStatisticsObj computedStatsObj = aggregator.aggregate(statsList, 
partitions, false);
+    ColumnStatisticsData expectedStats = new 
ColStatsBuilder<>(byte[].class).numNulls(6).avgColLen(17.5).maxColLen(18).build();
+
+    Assert.assertEquals(expectedStats, computedStatsObj.getStatsData());
+  }
+}
diff --git 
a/standalone-metastore/metastore-server/src/test/java/org/apache/hadoop/hive/metastore/columnstats/aggr/BooleanColumnStatsAggregatorTest.java
 
b/standalone-metastore/metastore-server/src/test/java/org/apache/hadoop/hive/metastore/columnstats/aggr/BooleanColumnStatsAggregatorTest.java
new file mode 100644
index 00000000000..1676d1350d5
--- /dev/null
+++ 
b/standalone-metastore/metastore-server/src/test/java/org/apache/hadoop/hive/metastore/columnstats/aggr/BooleanColumnStatsAggregatorTest.java
@@ -0,0 +1,101 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.hadoop.hive.metastore.columnstats.aggr;
+
+import org.apache.hadoop.hive.metastore.TableType;
+import org.apache.hadoop.hive.metastore.annotation.MetastoreUnitTest;
+import org.apache.hadoop.hive.metastore.api.ColumnStatisticsData;
+import org.apache.hadoop.hive.metastore.api.ColumnStatisticsObj;
+import org.apache.hadoop.hive.metastore.api.FieldSchema;
+import org.apache.hadoop.hive.metastore.api.MetaException;
+import org.apache.hadoop.hive.metastore.api.Table;
+import org.apache.hadoop.hive.metastore.columnstats.ColStatsBuilder;
+import 
org.apache.hadoop.hive.metastore.utils.MetaStoreServerUtils.ColStatsObjWithSourceInfo;
+import org.junit.Assert;
+import org.junit.Test;
+import org.junit.experimental.categories.Category;
+
+import java.util.Arrays;
+import java.util.Collections;
+import java.util.List;
+
+import static 
org.apache.hadoop.hive.metastore.StatisticsTestUtils.createStatsWithInfo;
+
+@Category(MetastoreUnitTest.class)
+public class BooleanColumnStatsAggregatorTest {
+
+  private static final Table TABLE = new Table("dummy", "db", "hive", 0, 0,
+      0, null, null, Collections.emptyMap(), null, null,
+      TableType.MANAGED_TABLE.toString());
+  private static final FieldSchema COL = new FieldSchema("col", "boolean", "");
+
+  @Test
+  public void testAggregateSingleStat() throws MetaException {
+    List<String> partitions = Collections.singletonList("part1");
+
+    ColumnStatisticsData data1 = new 
ColStatsBuilder<>(Boolean.class).numNulls(1).numFalses(2).numTrues(13).build();
+    List<ColStatsObjWithSourceInfo> statsList =
+        Collections.singletonList(createStatsWithInfo(data1, TABLE, COL, 
partitions.get(0)));
+
+    BooleanColumnStatsAggregator aggregator = new 
BooleanColumnStatsAggregator();
+    ColumnStatisticsObj computedStatsObj = aggregator.aggregate(statsList, 
partitions, true);
+
+    Assert.assertEquals(data1, computedStatsObj.getStatsData());
+  }
+
+  @Test
+  public void testAggregateMultiStatsWhenAllAvailable() throws MetaException {
+    List<String> partitions = Arrays.asList("part1", "part2", "part3");
+
+    ColumnStatisticsData data1 = new 
ColStatsBuilder<>(Boolean.class).numNulls(1).numFalses(3).numTrues(13).build();
+    ColumnStatisticsData data2 = new 
ColStatsBuilder<>(Boolean.class).numNulls(2).numFalses(6).numTrues(18).build();
+    ColumnStatisticsData data3 = new 
ColStatsBuilder<>(Boolean.class).numNulls(3).numFalses(2).numTrues(18).build();
+
+    List<ColStatsObjWithSourceInfo> statsList = Arrays.asList(
+        createStatsWithInfo(data1, TABLE, COL, partitions.get(0)),
+        createStatsWithInfo(data2, TABLE, COL, partitions.get(1)),
+        createStatsWithInfo(data3, TABLE, COL, partitions.get(2)));
+
+    BooleanColumnStatsAggregator aggregator = new 
BooleanColumnStatsAggregator();
+    ColumnStatisticsObj computedStatsObj = aggregator.aggregate(statsList, 
partitions, true);
+    ColumnStatisticsData expectedStats = new 
ColStatsBuilder<>(Boolean.class).numNulls(6).numFalses(11).numTrues(49).build();
+
+    Assert.assertEquals(expectedStats, computedStatsObj.getStatsData());
+  }
+
+  @Test
+  public void testAggregateMultiStatsWhenOnlySomeAvailable() throws 
MetaException {
+    List<String> partitions = Arrays.asList("part1", "part2", "part3", 
"part4");
+
+    ColumnStatisticsData data1 = new 
ColStatsBuilder<>(Boolean.class).numNulls(1).numFalses(3).numTrues(13).build();
+    ColumnStatisticsData data3 = new 
ColStatsBuilder<>(Boolean.class).numNulls(3).numFalses(2).numTrues(18).build();
+    ColumnStatisticsData data4 = new 
ColStatsBuilder<>(Boolean.class).numNulls(2).numFalses(6).numTrues(18).build();
+
+    List<ColStatsObjWithSourceInfo> statsList = Arrays.asList(
+        createStatsWithInfo(data1, TABLE, COL, partitions.get(0)),
+        createStatsWithInfo(data3, TABLE, COL, partitions.get(2)),
+        createStatsWithInfo(data4, TABLE, COL, partitions.get(3)));
+
+    BooleanColumnStatsAggregator aggregator = new 
BooleanColumnStatsAggregator();
+    ColumnStatisticsObj computedStatsObj = aggregator.aggregate(statsList, 
partitions, false);
+    ColumnStatisticsData expectedStats = new 
ColStatsBuilder<>(Boolean.class).numNulls(6).numFalses(11).numTrues(49).build();
+
+    Assert.assertEquals(expectedStats, computedStatsObj.getStatsData());
+  }
+}
diff --git 
a/standalone-metastore/metastore-server/src/test/java/org/apache/hadoop/hive/metastore/columnstats/aggr/DateColumnStatsAggregatorTest.java
 
b/standalone-metastore/metastore-server/src/test/java/org/apache/hadoop/hive/metastore/columnstats/aggr/DateColumnStatsAggregatorTest.java
new file mode 100644
index 00000000000..07a5d49c179
--- /dev/null
+++ 
b/standalone-metastore/metastore-server/src/test/java/org/apache/hadoop/hive/metastore/columnstats/aggr/DateColumnStatsAggregatorTest.java
@@ -0,0 +1,270 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.hadoop.hive.metastore.columnstats.aggr;
+
+import org.apache.hadoop.hive.metastore.TableType;
+import org.apache.hadoop.hive.metastore.annotation.MetastoreUnitTest;
+import org.apache.hadoop.hive.metastore.api.ColumnStatisticsData;
+import org.apache.hadoop.hive.metastore.api.ColumnStatisticsObj;
+import org.apache.hadoop.hive.metastore.api.Date;
+import org.apache.hadoop.hive.metastore.api.FieldSchema;
+import org.apache.hadoop.hive.metastore.api.MetaException;
+import org.apache.hadoop.hive.metastore.api.Table;
+import org.apache.hadoop.hive.metastore.columnstats.ColStatsBuilder;
+import 
org.apache.hadoop.hive.metastore.utils.MetaStoreServerUtils.ColStatsObjWithSourceInfo;
+import org.junit.Assert;
+import org.junit.Test;
+import org.junit.experimental.categories.Category;
+
+import java.util.Arrays;
+import java.util.Collections;
+import java.util.List;
+
+import static 
org.apache.hadoop.hive.metastore.StatisticsTestUtils.createStatsWithInfo;
+
+@Category(MetastoreUnitTest.class)
+public class DateColumnStatsAggregatorTest {
+
+  private static final Table TABLE = new Table("dummy", "db", "hive", 0, 0,
+      0, null, null, Collections.emptyMap(), null, null,
+      TableType.MANAGED_TABLE.toString());
+  private static final FieldSchema COL = new FieldSchema("col", "date", "");
+
+  private static final Date DATE_1 = new Date(1);
+  private static final Date DATE_2 = new Date(2);
+  private static final Date DATE_3 = new Date(3);
+  private static final Date DATE_4 = new Date(4);
+  private static final Date DATE_5 = new Date(5);
+  private static final Date DATE_6 = new Date(6);
+  private static final Date DATE_7 = new Date(7);
+  private static final Date DATE_8 = new Date(8);
+  private static final Date DATE_9 = new Date(9);
+
+  @Test
+  public void testAggregateSingleStat() throws MetaException {
+    List<String> partitions = Collections.singletonList("part1");
+
+    ColumnStatisticsData data1 = new 
ColStatsBuilder<>(Date.class).numNulls(1).numDVs(2).low(DATE_1).high(DATE_4)
+        .hll(DATE_1.getDaysSinceEpoch(), DATE_4.getDaysSinceEpoch()).build();
+    List<ColStatsObjWithSourceInfo> statsList =
+        Collections.singletonList(createStatsWithInfo(data1, TABLE, COL, 
partitions.get(0)));
+
+    DateColumnStatsAggregator aggregator = new DateColumnStatsAggregator();
+    ColumnStatisticsObj computedStatsObj = aggregator.aggregate(statsList, 
partitions, true);
+
+    Assert.assertEquals(data1, computedStatsObj.getStatsData());
+  }
+
+  @Test
+  public void testAggregateSingleStatWhenNullValues() throws MetaException {
+    List<String> partitions = Collections.singletonList("part1");
+
+    ColumnStatisticsData data1 = new 
ColStatsBuilder<>(Date.class).numNulls(1).numDVs(2).build();
+    List<ColStatsObjWithSourceInfo> statsList =
+        Collections.singletonList(createStatsWithInfo(data1, TABLE, COL, 
partitions.get(0)));
+
+    DateColumnStatsAggregator aggregator = new DateColumnStatsAggregator();
+    ColumnStatisticsObj computedStatsObj = aggregator.aggregate(statsList, 
partitions, true);
+    Assert.assertEquals(data1, computedStatsObj.getStatsData());
+
+    aggregator.useDensityFunctionForNDVEstimation = true;
+    computedStatsObj = aggregator.aggregate(statsList, partitions, true);
+    Assert.assertEquals(data1, computedStatsObj.getStatsData());
+
+    aggregator.useDensityFunctionForNDVEstimation = false;
+    aggregator.ndvTuner = 1;
+    // ndv tuner does not have any effect because min numDVs and max numDVs 
coincide (we have a single stats)
+    computedStatsObj = aggregator.aggregate(statsList, partitions, true);
+    Assert.assertEquals(data1, computedStatsObj.getStatsData());
+  }
+
+  @Test
+  public void testAggregateMultipleStatsWhenSomeNullValues() throws 
MetaException {
+    List<String> partitions = Arrays.asList("part1", "part2");
+
+    long[] values1 = { DATE_1.getDaysSinceEpoch(), DATE_2.getDaysSinceEpoch() 
};
+    ColumnStatisticsData data1 = new 
ColStatsBuilder<>(Date.class).numNulls(1).numDVs(2)
+        .low(DATE_1).high(DATE_2).hll(values1).build();
+    ColumnStatisticsData data2 = new 
ColStatsBuilder<>(Date.class).numNulls(2).numDVs(3).build();
+
+    List<ColStatsObjWithSourceInfo> statsList = Arrays.asList(
+        createStatsWithInfo(data1, TABLE, COL, partitions.get(0)),
+        createStatsWithInfo(data2, TABLE, COL, partitions.get(1)));
+
+    DateColumnStatsAggregator aggregator = new DateColumnStatsAggregator();
+
+    ColumnStatisticsObj computedStatsObj = aggregator.aggregate(statsList, 
partitions, true);
+    ColumnStatisticsData expectedStats = new 
ColStatsBuilder<>(Date.class).numNulls(3).numDVs(3)
+        .low(DATE_1).high(DATE_2).hll(values1).build();
+    Assert.assertEquals(expectedStats, computedStatsObj.getStatsData());
+
+    aggregator.useDensityFunctionForNDVEstimation = true;
+    computedStatsObj = aggregator.aggregate(statsList, partitions, true);
+    expectedStats = new ColStatsBuilder<>(Date.class).numNulls(3).numDVs(4)
+        .low(DATE_1).high(DATE_2).hll(values1).build();
+    Assert.assertEquals(expectedStats, computedStatsObj.getStatsData());
+
+    aggregator.useDensityFunctionForNDVEstimation = false;
+    aggregator.ndvTuner = 1;
+    computedStatsObj = aggregator.aggregate(statsList, partitions, true);
+    expectedStats = new ColStatsBuilder<>(Date.class).numNulls(3).numDVs(5)
+        .low(DATE_1).high(DATE_2).hll(values1).build();
+    Assert.assertEquals(expectedStats, computedStatsObj.getStatsData());
+  }
+
+  @Test
+  public void testAggregateMultiStatsWhenAllAvailable() throws MetaException {
+    List<String> partitions = Arrays.asList("part1", "part2", "part3");
+
+    long[] values1 = { DATE_1.getDaysSinceEpoch(), DATE_2.getDaysSinceEpoch(), 
DATE_3.getDaysSinceEpoch() };
+    ColumnStatisticsData data1 = new 
ColStatsBuilder<>(Date.class).numNulls(1).numDVs(3)
+        .low(DATE_1).high(DATE_3).hll(values1).build();
+
+    long[] values2 = { DATE_3.getDaysSinceEpoch(), DATE_4.getDaysSinceEpoch(), 
DATE_5.getDaysSinceEpoch() };
+    ColumnStatisticsData data2 = new 
ColStatsBuilder<>(Date.class).numNulls(2).numDVs(3)
+        .low(DATE_3).high(DATE_5).hll(values2).build();
+
+    long[] values3 = { DATE_6.getDaysSinceEpoch(), DATE_7.getDaysSinceEpoch() 
};
+    ColumnStatisticsData data3 = new 
ColStatsBuilder<>(Date.class).numNulls(3).numDVs(2)
+        .low(DATE_6).high(DATE_7).hll(values3).build();
+
+    List<ColStatsObjWithSourceInfo> statsList = Arrays.asList(
+        createStatsWithInfo(data1, TABLE, COL, partitions.get(0)),
+        createStatsWithInfo(data2, TABLE, COL, partitions.get(1)),
+        createStatsWithInfo(data3, TABLE, COL, partitions.get(2)));
+
+    DateColumnStatsAggregator aggregator = new DateColumnStatsAggregator();
+    ColumnStatisticsObj computedStatsObj = aggregator.aggregate(statsList, 
partitions, true);
+
+    // the aggregation does not update hll, only numDVs is, it keeps the first 
hll
+    // notice that numDVs is computed by using HLL, it can detect that 
'DATE_3' appears twice
+    ColumnStatisticsData expectedStats = new 
ColStatsBuilder<>(Date.class).numNulls(6).numDVs(7)
+        .low(DATE_1).high(DATE_7).hll(values1).build();
+
+    Assert.assertEquals(expectedStats, computedStatsObj.getStatsData());
+  }
+
+  @Test
+  public void testAggregateMultiStatsWhenUnmergeableBitVectors() throws 
MetaException {
+    List<String> partitions = Arrays.asList("part1", "part2", "part3");
+
+    long[] values1 = { DATE_1.getDaysSinceEpoch(), DATE_2.getDaysSinceEpoch(), 
DATE_3.getDaysSinceEpoch() };
+    ColumnStatisticsData data1 = new 
ColStatsBuilder<>(Date.class).numNulls(1).numDVs(3)
+        .low(DATE_1).high(DATE_3).fmSketch(values1).build();
+    long[] values2 = { DATE_3.getDaysSinceEpoch(), DATE_4.getDaysSinceEpoch(), 
DATE_5.getDaysSinceEpoch() };
+    ColumnStatisticsData data2 = new 
ColStatsBuilder<>(Date.class).numNulls(2).numDVs(3)
+        .low(DATE_3).high(DATE_5).hll(values2).build();
+    long[] values3 = { DATE_1.getDaysSinceEpoch(), DATE_2.getDaysSinceEpoch(), 
DATE_6.getDaysSinceEpoch(),
+        DATE_8.getDaysSinceEpoch() };
+    ColumnStatisticsData data3 = new 
ColStatsBuilder<>(Date.class).numNulls(3).numDVs(4)
+        .low(DATE_1).high(DATE_8).hll(values3).build();
+
+    List<ColStatsObjWithSourceInfo> statsList = Arrays.asList(
+        createStatsWithInfo(data1, TABLE, COL, partitions.get(0)),
+        createStatsWithInfo(data2, TABLE, COL, partitions.get(1)),
+        createStatsWithInfo(data3, TABLE, COL, partitions.get(2)));
+
+    DateColumnStatsAggregator aggregator = new DateColumnStatsAggregator();
+
+    ColumnStatisticsObj computedStatsObj = aggregator.aggregate(statsList, 
partitions, true);
+    // the aggregation does not update the bitvector, only numDVs is, it keeps 
the first bitvector;
+    // numDVs is set to the maximum among all stats when non-mergeable 
bitvectors are detected
+    ColumnStatisticsData expectedStats = new 
ColStatsBuilder<>(Date.class).numNulls(6).numDVs(4)
+        .low(DATE_1).high(DATE_8).fmSketch(values1).build();
+    Assert.assertEquals(expectedStats, computedStatsObj.getStatsData());
+
+    aggregator.useDensityFunctionForNDVEstimation = true;
+    computedStatsObj = aggregator.aggregate(statsList, partitions, true);
+    // the use of the density function leads to a different estimation for 
numNDV
+    expectedStats = new ColStatsBuilder<>(Date.class).numNulls(6).numDVs(6)
+        .low(DATE_1).high(DATE_8).fmSketch(values1).build();
+    Assert.assertEquals(expectedStats, computedStatsObj.getStatsData());
+
+    aggregator.useDensityFunctionForNDVEstimation = false;
+    double[] tunerValues = new double[] { 0, 0.5, 0.75, 1 };
+    long[] expectedNDVs = new long[] { 4, 7, 8, 10 };
+    for (int i = 0; i < tunerValues.length; i++) {
+      aggregator.ndvTuner = tunerValues[i];
+      computedStatsObj = aggregator.aggregate(statsList, partitions, true);
+      expectedStats = new 
ColStatsBuilder<>(Date.class).numNulls(6).numDVs(expectedNDVs[i])
+          .low(DATE_1).high(DATE_8).fmSketch(values1).build();
+      Assert.assertEquals(expectedStats, computedStatsObj.getStatsData());
+    }
+  }
+
+  @Test
+  public void testAggregateMultiStatsWhenOnlySomeAvailable() throws 
MetaException {
+    List<String> partitions = Arrays.asList("part1", "part2", "part3", 
"part4");
+
+    long[] values1 = { DATE_1.getDaysSinceEpoch(), DATE_2.getDaysSinceEpoch(), 
DATE_3.getDaysSinceEpoch() };
+    ColumnStatisticsData data1 = new 
ColStatsBuilder<>(Date.class).numNulls(1).numDVs(3)
+        .low(DATE_1).high(DATE_3).hll(values1).build();
+
+    ColumnStatisticsData data3 = new 
ColStatsBuilder<>(Date.class).numNulls(3).numDVs(1).low(DATE_7).high(DATE_7)
+        .hll(DATE_7.getDaysSinceEpoch()).build();
+
+    long[] values4 = { DATE_3.getDaysSinceEpoch(), DATE_4.getDaysSinceEpoch(), 
DATE_5.getDaysSinceEpoch() };
+    ColumnStatisticsData data4 = new 
ColStatsBuilder<>(Date.class).numNulls(2).numDVs(3)
+        .low(DATE_3).high(DATE_5).hll(values4).build();
+
+    List<ColStatsObjWithSourceInfo> statsList = Arrays.asList(
+        createStatsWithInfo(data1, TABLE, COL, partitions.get(0)),
+        createStatsWithInfo(data3, TABLE, COL, partitions.get(2)),
+        createStatsWithInfo(data4, TABLE, COL, partitions.get(3)));
+
+    DateColumnStatsAggregator aggregator = new DateColumnStatsAggregator();
+    ColumnStatisticsObj computedStatsObj = aggregator.aggregate(statsList, 
partitions, false);
+
+    // hll in case of missing stats is left as null, only numDVs is updated
+    ColumnStatisticsData expectedStats = new 
ColStatsBuilder<>(Date.class).numNulls(8).numDVs(4)
+        .low(DATE_1).high(DATE_9).build();
+    Assert.assertEquals(expectedStats, computedStatsObj.getStatsData());
+  }
+
+  @Test
+  public void 
testAggregateMultiStatsOnlySomeAvailableButUnmergeableBitVector() throws 
MetaException {
+    List<String> partitions = Arrays.asList("part1", "part2", "part3");
+
+    long[] values1 = { DATE_1.getDaysSinceEpoch(), DATE_2.getDaysSinceEpoch(), 
DATE_6.getDaysSinceEpoch() };
+    ColumnStatisticsData data1 = new 
ColStatsBuilder<>(Date.class).numNulls(1).numDVs(3)
+        .low(DATE_1).high(DATE_6).fmSketch(values1).build();
+
+    ColumnStatisticsData data3 = new 
ColStatsBuilder<>(Date.class).numNulls(3).numDVs(1)
+        .low(DATE_7).high(DATE_7).hll(DATE_7.getDaysSinceEpoch()).build();
+
+    List<ColStatsObjWithSourceInfo> statsList = Arrays.asList(
+        createStatsWithInfo(data1, TABLE, COL, partitions.get(0)),
+        createStatsWithInfo(data3, TABLE, COL, partitions.get(2)));
+
+    DateColumnStatsAggregator aggregator = new DateColumnStatsAggregator();
+
+    ColumnStatisticsObj computedStatsObj = aggregator.aggregate(statsList, 
partitions, false);
+    // hll in case of missing stats is left as null, only numDVs is updated
+    ColumnStatisticsData expectedStats = new 
ColStatsBuilder<>(Date.class).numNulls(6).numDVs(3)
+        .low(DATE_1).high(DATE_7).build();
+    Assert.assertEquals(expectedStats, computedStatsObj.getStatsData());
+
+    aggregator.useDensityFunctionForNDVEstimation = true;
+    computedStatsObj = aggregator.aggregate(statsList, partitions, false);
+    // the use of the density function leads to a different estimation for 
numNDV
+    expectedStats = new ColStatsBuilder<>(Date.class).numNulls(6).numDVs(4)
+        .low(DATE_1).high(DATE_7).build();
+    Assert.assertEquals(expectedStats, computedStatsObj.getStatsData());
+  }
+}
diff --git 
a/standalone-metastore/metastore-server/src/test/java/org/apache/hadoop/hive/metastore/columnstats/aggr/DecimalColumnStatsAggregatorTest.java
 
b/standalone-metastore/metastore-server/src/test/java/org/apache/hadoop/hive/metastore/columnstats/aggr/DecimalColumnStatsAggregatorTest.java
new file mode 100644
index 00000000000..a3a2730be9e
--- /dev/null
+++ 
b/standalone-metastore/metastore-server/src/test/java/org/apache/hadoop/hive/metastore/columnstats/aggr/DecimalColumnStatsAggregatorTest.java
@@ -0,0 +1,256 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.hadoop.hive.metastore.columnstats.aggr;
+
+import org.apache.hadoop.hive.metastore.TableType;
+import org.apache.hadoop.hive.metastore.annotation.MetastoreUnitTest;
+import org.apache.hadoop.hive.metastore.api.ColumnStatisticsData;
+import org.apache.hadoop.hive.metastore.api.ColumnStatisticsObj;
+import org.apache.hadoop.hive.metastore.api.Decimal;
+import org.apache.hadoop.hive.metastore.api.FieldSchema;
+import org.apache.hadoop.hive.metastore.api.MetaException;
+import org.apache.hadoop.hive.metastore.api.Table;
+import org.apache.hadoop.hive.metastore.api.utils.DecimalUtils;
+import org.apache.hadoop.hive.metastore.columnstats.ColStatsBuilder;
+import 
org.apache.hadoop.hive.metastore.utils.MetaStoreServerUtils.ColStatsObjWithSourceInfo;
+import org.junit.Assert;
+import org.junit.Test;
+import org.junit.experimental.categories.Category;
+
+import java.util.Arrays;
+import java.util.Collections;
+import java.util.List;
+
+import static 
org.apache.hadoop.hive.metastore.StatisticsTestUtils.createStatsWithInfo;
+
+@Category(MetastoreUnitTest.class)
+public class DecimalColumnStatsAggregatorTest {
+
+  private static final Table TABLE = new Table("dummy", "db", "hive", 0, 0,
+      0, null, null, Collections.emptyMap(), null, null,
+      TableType.MANAGED_TABLE.toString());
+  private static final FieldSchema COL = new FieldSchema("col", "decimal", "");
+
+  private static final Decimal ONE = DecimalUtils.createThriftDecimal("1.0");
+  private static final Decimal TWO = DecimalUtils.createThriftDecimal("2.0");
+  private static final Decimal THREE = DecimalUtils.createThriftDecimal("3.0");
+  private static final Decimal FOUR = DecimalUtils.createThriftDecimal("4.0");
+  private static final Decimal FIVE = DecimalUtils.createThriftDecimal("5.0");
+  private static final Decimal SIX = DecimalUtils.createThriftDecimal("6.0");
+  private static final Decimal SEVEN = DecimalUtils.createThriftDecimal("7.0");
+  private static final Decimal EIGHT = DecimalUtils.createThriftDecimal("8.0");
+
+  @Test
+  public void testAggregateSingleStat() throws MetaException {
+    List<String> partitions = Collections.singletonList("part1");
+
+    ColumnStatisticsData data1 = new 
ColStatsBuilder<>(Decimal.class).numNulls(1).numDVs(2)
+        .low(ONE).high(FOUR).hll(1, 4).build();
+    List<ColStatsObjWithSourceInfo> statsList =
+        Collections.singletonList(createStatsWithInfo(data1, TABLE, COL, 
partitions.get(0)));
+
+    DecimalColumnStatsAggregator aggregator = new 
DecimalColumnStatsAggregator();
+    ColumnStatisticsObj computedStatsObj = aggregator.aggregate(statsList, 
partitions, true);
+
+    Assert.assertEquals(data1, computedStatsObj.getStatsData());
+  }
+
+  @Test
+  public void testAggregateSingleStatWhenNullValues() throws MetaException {
+    List<String> partitions = Collections.singletonList("part1");
+
+    ColumnStatisticsData data1 = new 
ColStatsBuilder<>(Decimal.class).numNulls(1).numDVs(2).build();
+    List<ColStatsObjWithSourceInfo> statsList =
+        Collections.singletonList(createStatsWithInfo(data1, TABLE, COL, 
partitions.get(0)));
+
+    DecimalColumnStatsAggregator aggregator = new 
DecimalColumnStatsAggregator();
+
+    ColumnStatisticsObj computedStatsObj = aggregator.aggregate(statsList, 
partitions, true);
+    Assert.assertEquals(data1, computedStatsObj.getStatsData());
+
+    aggregator.useDensityFunctionForNDVEstimation = true;
+    computedStatsObj = aggregator.aggregate(statsList, partitions, true);
+    Assert.assertEquals(data1, computedStatsObj.getStatsData());
+
+    aggregator.useDensityFunctionForNDVEstimation = false;
+    aggregator.ndvTuner = 1;
+    // ndv tuner does not have any effect because min numDVs and max numDVs 
coincide (we have a single stats)
+    computedStatsObj = aggregator.aggregate(statsList, partitions, true);
+    Assert.assertEquals(data1, computedStatsObj.getStatsData());
+  }
+
+  @Test
+  public void testAggregateMultipleStatsWhenSomeNullValues() throws 
MetaException {
+    List<String> partitions = Arrays.asList("part1", "part2");
+
+    ColumnStatisticsData data1 = new 
ColStatsBuilder<>(Decimal.class).numNulls(1).numDVs(2)
+        .low(ONE).high(TWO).hll(1, 2).build();
+    ColumnStatisticsData data2 = new 
ColStatsBuilder<>(Decimal.class).numNulls(2).numDVs(3).build();
+
+    List<ColStatsObjWithSourceInfo> statsList = Arrays.asList(
+        createStatsWithInfo(data1, TABLE, COL, partitions.get(0)),
+        createStatsWithInfo(data2, TABLE, COL, partitions.get(1)));
+
+    DecimalColumnStatsAggregator aggregator = new 
DecimalColumnStatsAggregator();
+
+    ColumnStatisticsObj computedStatsObj = aggregator.aggregate(statsList, 
partitions, true);
+    ColumnStatisticsData expectedStats = new 
ColStatsBuilder<>(Decimal.class).numNulls(3).numDVs(3)
+        .low(ONE).high(TWO).hll(1, 2).build();
+    Assert.assertEquals(expectedStats, computedStatsObj.getStatsData());
+
+    aggregator.useDensityFunctionForNDVEstimation = true;
+    computedStatsObj = aggregator.aggregate(statsList, partitions, true);
+    expectedStats = new ColStatsBuilder<>(Decimal.class).numNulls(3).numDVs(4)
+        .low(ONE).high(TWO).hll(1, 2).build();
+    Assert.assertEquals(expectedStats, computedStatsObj.getStatsData());
+
+    aggregator.useDensityFunctionForNDVEstimation = false;
+    aggregator.ndvTuner = 1;
+    computedStatsObj = aggregator.aggregate(statsList, partitions, true);
+    expectedStats = new ColStatsBuilder<>(Decimal.class).numNulls(3).numDVs(5)
+        .low(ONE).high(TWO).hll(1, 2).build();
+    Assert.assertEquals(expectedStats, computedStatsObj.getStatsData());
+  }
+
+  @Test
+  public void testAggregateMultiStatsWhenAllAvailable() throws MetaException {
+    List<String> partitions = Arrays.asList("part1", "part2", "part3");
+
+    ColumnStatisticsData data1 = new 
ColStatsBuilder<>(Decimal.class).numNulls(1).numDVs(3)
+        .low(ONE).high(THREE).hll(1, 2, 3).build();
+    ColumnStatisticsData data2 = new 
ColStatsBuilder<>(Decimal.class).numNulls(2).numDVs(3)
+        .low(THREE).high(FIVE).hll(3, 4, 5).build();
+    ColumnStatisticsData data3 = new 
ColStatsBuilder<>(Decimal.class).numNulls(3).numDVs(2)
+        .low(SIX).high(SEVEN).hll(6, 7).build();
+
+    List<ColStatsObjWithSourceInfo> statsList = Arrays.asList(
+        createStatsWithInfo(data1, TABLE, COL, partitions.get(0)),
+        createStatsWithInfo(data2, TABLE, COL, partitions.get(1)),
+        createStatsWithInfo(data3, TABLE, COL, partitions.get(2)));
+
+    DecimalColumnStatsAggregator aggregator = new 
DecimalColumnStatsAggregator();
+    ColumnStatisticsObj computedStatsObj = aggregator.aggregate(statsList, 
partitions, true);
+
+    // the aggregation does not update hll, only numDVs is, it keeps the first 
hll
+    // notice that numDVs is computed by using HLL, it can detect that '3' 
appears twice
+    ColumnStatisticsData expectedStats = new 
ColStatsBuilder<>(Decimal.class).numNulls(6).numDVs(7)
+        .low(ONE).high(SEVEN).hll(1, 2, 3).build();
+
+    Assert.assertEquals(expectedStats, computedStatsObj.getStatsData());
+  }
+
+  @Test
+  public void testAggregateMultiStatsWhenUnmergeableBitVectors() throws 
MetaException {
+    List<String> partitions = Arrays.asList("part1", "part2", "part3");
+
+    ColumnStatisticsData data1 = new 
ColStatsBuilder<>(Decimal.class).numNulls(1).numDVs(3)
+        .low(ONE).high(THREE).fmSketch(1, 2, 3).build();
+    ColumnStatisticsData data2 = new 
ColStatsBuilder<>(Decimal.class).numNulls(2).numDVs(3)
+        .low(THREE).high(FIVE).hll(3, 4, 5).build();
+    ColumnStatisticsData data3 = new 
ColStatsBuilder<>(Decimal.class).numNulls(3).numDVs(4)
+        .low(ONE).high(EIGHT).hll(1, 2, 6, 8).build();
+
+    List<ColStatsObjWithSourceInfo> statsList = Arrays.asList(
+        createStatsWithInfo(data1, TABLE, COL, partitions.get(0)),
+        createStatsWithInfo(data2, TABLE, COL, partitions.get(1)),
+        createStatsWithInfo(data3, TABLE, COL, partitions.get(2)));
+
+    DecimalColumnStatsAggregator aggregator = new 
DecimalColumnStatsAggregator();
+
+    ColumnStatisticsObj computedStatsObj = aggregator.aggregate(statsList, 
partitions, true);
+    // the aggregation does not update the bitvector, only numDVs is, it keeps 
the first bitvector;
+    // numDVs is set to the maximum among all stats when non-mergeable 
bitvectors are detected
+    ColumnStatisticsData expectedStats = new 
ColStatsBuilder<>(Decimal.class).numNulls(6).numDVs(4)
+        .low(ONE).high(EIGHT).fmSketch(1, 2, 3).build();
+    Assert.assertEquals(expectedStats, computedStatsObj.getStatsData());
+
+    aggregator.useDensityFunctionForNDVEstimation = true;
+    computedStatsObj = aggregator.aggregate(statsList, partitions, true);
+    // the use of the density function leads to a different estimation for 
numNDV
+    expectedStats = new ColStatsBuilder<>(Decimal.class).numNulls(6).numDVs(6)
+        .low(ONE).high(EIGHT).fmSketch(1, 2, 3).build();
+    Assert.assertEquals(expectedStats, computedStatsObj.getStatsData());
+
+    aggregator.useDensityFunctionForNDVEstimation = false;
+    double[] tunerValues = new double[] { 0, 0.5, 0.75, 1 };
+    long[] expectedDVs = new long[] { 4, 7, 8, 10 };
+    for (int i = 0; i < tunerValues.length; i++) {
+      aggregator.ndvTuner = tunerValues[i];
+      computedStatsObj = aggregator.aggregate(statsList, partitions, true);
+      expectedStats = new 
ColStatsBuilder<>(Decimal.class).numNulls(6).numDVs(expectedDVs[i])
+          .low(ONE).high(EIGHT).fmSketch(1, 2, 3).build();
+      Assert.assertEquals(expectedStats, computedStatsObj.getStatsData());
+    }
+  }
+
+  @Test
+  public void testAggregateMultiStatsWhenOnlySomeAvailable() throws 
MetaException {
+    List<String> partitions = Arrays.asList("part1", "part2", "part3", 
"part4");
+
+    ColumnStatisticsData data1 = new 
ColStatsBuilder<>(Decimal.class).numNulls(1).numDVs(3)
+        .low(ONE).high(THREE).hll(1, 2, 3).build();
+    ColumnStatisticsData data3 = new 
ColStatsBuilder<>(Decimal.class).numNulls(3).numDVs(1)
+        .low(SEVEN).high(SEVEN).hll(7).build();
+    ColumnStatisticsData data4 = new 
ColStatsBuilder<>(Decimal.class).numNulls(2).numDVs(3)
+        .low(THREE).high(FIVE).hll(3, 4, 5).build();
+
+    List<ColStatsObjWithSourceInfo> statsList = Arrays.asList(
+        createStatsWithInfo(data1, TABLE, COL, partitions.get(0)),
+        createStatsWithInfo(data3, TABLE, COL, partitions.get(2)),
+        createStatsWithInfo(data4, TABLE, COL, partitions.get(3)));
+
+    DecimalColumnStatsAggregator aggregator = new 
DecimalColumnStatsAggregator();
+    ColumnStatisticsObj computedStatsObj = aggregator.aggregate(statsList, 
partitions, false);
+
+    // hll in case of missing stats is left as null, only numDVs is updated
+    ColumnStatisticsData expectedStats = new 
ColStatsBuilder<>(Decimal.class).numNulls(8).numDVs(4)
+        .low(ONE).high(DecimalUtils.createThriftDecimal("9.4")).build();
+
+    Assert.assertEquals(expectedStats, computedStatsObj.getStatsData());
+  }
+
+  @Test
+  public void 
testAggregateMultiStatsOnlySomeAvailableButUnmergeableBitVector() throws 
MetaException {
+    List<String> partitions = Arrays.asList("part1", "part2", "part3");
+
+    ColumnStatisticsData data1 = new 
ColStatsBuilder<>(Decimal.class).numNulls(1).numDVs(3)
+        .low(ONE).high(SIX).fmSketch(1, 2, 6).build();
+    ColumnStatisticsData data3 = new 
ColStatsBuilder<>(Decimal.class).numNulls(3).numDVs(1)
+        .low(SEVEN).high(SEVEN).hll(7).build();
+
+    List<ColStatsObjWithSourceInfo> statsList = Arrays.asList(
+        createStatsWithInfo(data1, TABLE, COL, partitions.get(0)),
+        createStatsWithInfo(data3, TABLE, COL, partitions.get(2)));
+
+    DecimalColumnStatsAggregator aggregator = new 
DecimalColumnStatsAggregator();
+
+    ColumnStatisticsObj computedStatsObj = aggregator.aggregate(statsList, 
partitions, false);
+    // hll in case of missing stats is left as null, only numDVs is updated
+    ColumnStatisticsData expectedStats = new 
ColStatsBuilder<>(Decimal.class).numNulls(6).numDVs(3)
+        .low(ONE).high(DecimalUtils.createThriftDecimal("7.5")).build();
+    Assert.assertEquals(expectedStats, computedStatsObj.getStatsData());
+
+    aggregator.useDensityFunctionForNDVEstimation = true;
+    computedStatsObj = aggregator.aggregate(statsList, partitions, false);
+    // the use of the density function leads to a different estimation for 
numNDV
+    expectedStats = new ColStatsBuilder<>(Decimal.class).numNulls(6).numDVs(4)
+        .low(ONE).high(DecimalUtils.createThriftDecimal("7.5")).build();
+    Assert.assertEquals(expectedStats, computedStatsObj.getStatsData());
+  }
+}
diff --git 
a/standalone-metastore/metastore-server/src/test/java/org/apache/hadoop/hive/metastore/columnstats/aggr/DoubleColumnStatsAggregatorTest.java
 
b/standalone-metastore/metastore-server/src/test/java/org/apache/hadoop/hive/metastore/columnstats/aggr/DoubleColumnStatsAggregatorTest.java
new file mode 100644
index 00000000000..d38d5324e00
--- /dev/null
+++ 
b/standalone-metastore/metastore-server/src/test/java/org/apache/hadoop/hive/metastore/columnstats/aggr/DoubleColumnStatsAggregatorTest.java
@@ -0,0 +1,242 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.hadoop.hive.metastore.columnstats.aggr;
+
+import org.apache.hadoop.hive.metastore.TableType;
+import org.apache.hadoop.hive.metastore.annotation.MetastoreUnitTest;
+import org.apache.hadoop.hive.metastore.api.ColumnStatisticsData;
+import org.apache.hadoop.hive.metastore.api.ColumnStatisticsObj;
+import org.apache.hadoop.hive.metastore.api.FieldSchema;
+import org.apache.hadoop.hive.metastore.api.MetaException;
+import org.apache.hadoop.hive.metastore.api.Table;
+import org.apache.hadoop.hive.metastore.columnstats.ColStatsBuilder;
+import 
org.apache.hadoop.hive.metastore.utils.MetaStoreServerUtils.ColStatsObjWithSourceInfo;
+import org.junit.Assert;
+import org.junit.Test;
+import org.junit.experimental.categories.Category;
+
+import java.util.Arrays;
+import java.util.Collections;
+import java.util.List;
+
+import static 
org.apache.hadoop.hive.metastore.StatisticsTestUtils.createStatsWithInfo;
+
+@Category(MetastoreUnitTest.class)
+public class DoubleColumnStatsAggregatorTest {
+
+  private static final Table TABLE = new Table("dummy", "db", "hive", 0, 0,
+      0, null, null, Collections.emptyMap(), null, null,
+      TableType.MANAGED_TABLE.toString());
+  private static final FieldSchema COL = new FieldSchema("col", "double", "");
+
+  @Test
+  public void testAggregateSingleStat() throws MetaException {
+    List<String> partitions = Collections.singletonList("part1");
+
+    ColumnStatisticsData data1 = new 
ColStatsBuilder<>(double.class).numNulls(1).numDVs(2)
+        .low(1d).high(4d).hll(1, 4).build();
+    List<ColStatsObjWithSourceInfo> statsList =
+        Collections.singletonList(createStatsWithInfo(data1, TABLE, COL, 
partitions.get(0)));
+
+    DoubleColumnStatsAggregator aggregator = new DoubleColumnStatsAggregator();
+    ColumnStatisticsObj computedStatsObj = aggregator.aggregate(statsList, 
partitions, true);
+
+    Assert.assertEquals(data1, computedStatsObj.getStatsData());
+  }
+
+  @Test
+  public void testAggregateSingleStatWhenNullValues() throws MetaException {
+    List<String> partitions = Collections.singletonList("part1");
+    ColumnStatisticsData data1 = new 
ColStatsBuilder<>(double.class).numNulls(1).numDVs(2).build();
+    List<ColStatsObjWithSourceInfo> statsList =
+        Collections.singletonList(createStatsWithInfo(data1, TABLE, COL, 
partitions.get(0)));
+
+    DoubleColumnStatsAggregator aggregator = new DoubleColumnStatsAggregator();
+
+    ColumnStatisticsObj computedStatsObj = aggregator.aggregate(statsList, 
partitions, true);
+    Assert.assertEquals(data1, computedStatsObj.getStatsData());
+
+    aggregator.useDensityFunctionForNDVEstimation = true;
+    computedStatsObj = aggregator.aggregate(statsList, partitions, true);
+    Assert.assertEquals(data1, computedStatsObj.getStatsData());
+
+    aggregator.useDensityFunctionForNDVEstimation = false;
+    aggregator.ndvTuner = 1;
+    // ndv tuner does not have any effect because min numDVs and max numDVs 
coincide (we have a single stats)
+    computedStatsObj = aggregator.aggregate(statsList, partitions, true);
+    Assert.assertEquals(data1, computedStatsObj.getStatsData());
+  }
+
+  @Test
+  public void testAggregateMultipleStatsWhenSomeNullValues() throws 
MetaException {
+    List<String> partitions = Arrays.asList("part1", "part2");
+
+    ColumnStatisticsData data1 = new 
ColStatsBuilder<>(double.class).numNulls(1).numDVs(2)
+        .low(1d).high(2d).hll(1, 2).build();
+    ColumnStatisticsData data2 = new 
ColStatsBuilder<>(double.class).numNulls(2).numDVs(3).build();
+
+    List<ColStatsObjWithSourceInfo> statsList = Arrays.asList(
+        createStatsWithInfo(data1, TABLE, COL, partitions.get(0)),
+        createStatsWithInfo(data2, TABLE, COL, partitions.get(1)));
+
+    DoubleColumnStatsAggregator aggregator = new DoubleColumnStatsAggregator();
+
+    ColumnStatisticsObj computedStatsObj = aggregator.aggregate(statsList, 
partitions, true);
+    ColumnStatisticsData expectedStats = new 
ColStatsBuilder<>(double.class).numNulls(3).numDVs(3)
+        .low(1d).high(2d).hll(1, 2).build();
+    Assert.assertEquals(expectedStats, computedStatsObj.getStatsData());
+
+    aggregator.useDensityFunctionForNDVEstimation = true;
+    computedStatsObj = aggregator.aggregate(statsList, partitions, true);
+    expectedStats = new ColStatsBuilder<>(double.class).numNulls(3).numDVs(4)
+        .low(1d).high(2d).hll(1, 2).build();
+    Assert.assertEquals(expectedStats, computedStatsObj.getStatsData());
+
+    aggregator.useDensityFunctionForNDVEstimation = false;
+    aggregator.ndvTuner = 1;
+    computedStatsObj = aggregator.aggregate(statsList, partitions, true);
+    expectedStats = new ColStatsBuilder<>(double.class).numNulls(3).numDVs(5)
+        .low(1d).high(2d).hll(1, 2).build();
+    Assert.assertEquals(expectedStats, computedStatsObj.getStatsData());
+  }
+
+  @Test
+  public void testAggregateMultiStatsWhenAllAvailable() throws MetaException {
+    List<String> partitions = Arrays.asList("part1", "part2", "part3");
+
+    ColumnStatisticsData data1 = new 
ColStatsBuilder<>(double.class).numNulls(1).numDVs(3)
+        .low(1d).high(3d).hll(1, 2, 3).build();
+    ColumnStatisticsData data2 = new 
ColStatsBuilder<>(double.class).numNulls(2).numDVs(3)
+        .low(3d).high(5d).hll(3, 4, 5).build();
+    ColumnStatisticsData data3 = new 
ColStatsBuilder<>(double.class).numNulls(3).numDVs(2)
+        .low(6d).high(7d).hll(6, 7).build();
+
+    List<ColStatsObjWithSourceInfo> statsList = Arrays.asList(
+        createStatsWithInfo(data1, TABLE, COL, partitions.get(0)),
+        createStatsWithInfo(data2, TABLE, COL, partitions.get(1)),
+        createStatsWithInfo(data3, TABLE, COL, partitions.get(2)));
+
+    DoubleColumnStatsAggregator aggregator = new DoubleColumnStatsAggregator();
+    ColumnStatisticsObj computedStatsObj = aggregator.aggregate(statsList, 
partitions, true);
+
+    // the aggregation does not update hll, only numDVs is, it keeps the first 
hll
+    // notice that numDVs is computed by using HLL, it can detect that '3' 
appears twice
+    ColumnStatisticsData expectedStats = new 
ColStatsBuilder<>(double.class).numNulls(6).numDVs(7)
+        .low(1d).high(7d).hll(1, 2, 3).build();
+    Assert.assertEquals(expectedStats, computedStatsObj.getStatsData());
+  }
+
+  @Test
+  public void testAggregateMultiStatsWhenUnmergeableBitVectors() throws 
MetaException {
+    List<String> partitions = Arrays.asList("part1", "part2", "part3");
+
+    ColumnStatisticsData data1 = new 
ColStatsBuilder<>(double.class).numNulls(1).numDVs(3)
+        .low(1d).high(3d).fmSketch(1, 2, 3).build();
+    ColumnStatisticsData data2 = new 
ColStatsBuilder<>(double.class).numNulls(2).numDVs(3)
+        .low(3d).high(5d).hll(3, 4, 5).build();
+    ColumnStatisticsData data3 = new 
ColStatsBuilder<>(double.class).numNulls(3).numDVs(4)
+        .low(1d).high(8d).hll(1, 2, 6, 8).build();
+
+    List<ColStatsObjWithSourceInfo> statsList = Arrays.asList(
+        createStatsWithInfo(data1, TABLE, COL, partitions.get(0)),
+        createStatsWithInfo(data2, TABLE, COL, partitions.get(1)),
+        createStatsWithInfo(data3, TABLE, COL, partitions.get(2)));
+
+    DoubleColumnStatsAggregator aggregator = new DoubleColumnStatsAggregator();
+
+    ColumnStatisticsObj computedStatsObj = aggregator.aggregate(statsList, 
partitions, true);
+    // the aggregation does not update the bitvector, only numDVs is, it keeps 
the first bitvector;
+    // numDVs is set to the maximum among all stats when non-mergeable 
bitvectors are detected
+    ColumnStatisticsData expectedStats = new 
ColStatsBuilder<>(double.class).numNulls(6).numDVs(4)
+        .low(1d).high(8d).fmSketch(1, 2, 3).build();
+    Assert.assertEquals(expectedStats, computedStatsObj.getStatsData());
+
+    aggregator.useDensityFunctionForNDVEstimation = true;
+    computedStatsObj = aggregator.aggregate(statsList, partitions, true);
+    // the use of the density function leads to a different estimation for 
numNDV
+    expectedStats = new ColStatsBuilder<>(double.class).numNulls(6).numDVs(6)
+        .low(1d).high(8d).fmSketch(1, 2, 3).build();
+    Assert.assertEquals(expectedStats, computedStatsObj.getStatsData());
+
+    aggregator.useDensityFunctionForNDVEstimation = false;
+    double[] tunerValues = new double[] { 0, 0.5, 0.75, 1 };
+    long[] expectedDVs = new long[] { 4, 7, 8, 10 };
+    for (int i = 0; i < tunerValues.length; i++) {
+      aggregator.ndvTuner = tunerValues[i];
+      computedStatsObj = aggregator.aggregate(statsList, partitions, true);
+      expectedStats = new 
ColStatsBuilder<>(double.class).numNulls(6).numDVs(expectedDVs[i])
+          .low(1d).high(8d).fmSketch(1, 2, 3).build();
+      Assert.assertEquals(expectedStats, computedStatsObj.getStatsData());  
+    }
+  }
+
+  @Test
+  public void testAggregateMultiStatsWhenOnlySomeAvailable() throws 
MetaException {
+    List<String> partitions = Arrays.asList("part1", "part2", "part3", 
"part4");
+
+    ColumnStatisticsData data1 = new 
ColStatsBuilder<>(double.class).numNulls(1).numDVs(3)
+        .low(1d).high(3d).hll(1, 2, 3).build();
+    ColumnStatisticsData data3 = new 
ColStatsBuilder<>(double.class).numNulls(3).numDVs(1)
+        .low(7d).high(7d).hll(7).build();
+    ColumnStatisticsData data4 = new 
ColStatsBuilder<>(double.class).numNulls(2).numDVs(3)
+        .low(3d).high(5d).hll(3, 4, 5).build();
+
+    List<ColStatsObjWithSourceInfo> statsList = Arrays.asList(
+        createStatsWithInfo(data1, TABLE, COL, partitions.get(0)),
+        createStatsWithInfo(data3, TABLE, COL, partitions.get(2)),
+        createStatsWithInfo(data4, TABLE, COL, partitions.get(3)));
+
+    DoubleColumnStatsAggregator aggregator = new DoubleColumnStatsAggregator();
+    ColumnStatisticsObj computedStatsObj = aggregator.aggregate(statsList, 
partitions, false);
+
+    // hll in case of missing stats is left as null, only numDVs is updated
+    ColumnStatisticsData expectedStats = new 
ColStatsBuilder<>(double.class).numNulls(8).numDVs(4)
+        .low(1d).high(9.4).build();
+    Assert.assertEquals(expectedStats, computedStatsObj.getStatsData());
+  }
+
+  @Test
+  public void 
testAggregateMultiStatsOnlySomeAvailableButUnmergeableBitVector() throws 
MetaException {
+    List<String> partitions = Arrays.asList("part1", "part2", "part3");
+
+    ColumnStatisticsData data1 = new 
ColStatsBuilder<>(double.class).numNulls(1).numDVs(3)
+        .low(1d).high(6d).fmSketch(1, 2, 6).build();
+    ColumnStatisticsData data3 = new 
ColStatsBuilder<>(double.class).numNulls(3).numDVs(1)
+        .low(7d).high(7d).hll(7).build();
+
+    List<ColStatsObjWithSourceInfo> statsList = Arrays.asList(
+        createStatsWithInfo(data1, TABLE, COL, partitions.get(0)),
+        createStatsWithInfo(data3, TABLE, COL, partitions.get(2)));
+
+    DoubleColumnStatsAggregator aggregator = new DoubleColumnStatsAggregator();
+
+    ColumnStatisticsObj computedStatsObj = aggregator.aggregate(statsList, 
partitions, false);
+    // hll in case of missing stats is left as null, only numDVs is updated
+    ColumnStatisticsData expectedStats = new 
ColStatsBuilder<>(double.class).numNulls(6).numDVs(3)
+        .low(1d).high(7.5).build();
+    Assert.assertEquals(expectedStats, computedStatsObj.getStatsData());
+
+    aggregator.useDensityFunctionForNDVEstimation = true;
+    computedStatsObj = aggregator.aggregate(statsList, partitions, false);
+    // the use of the density function leads to a different estimation for 
numNDV
+    expectedStats = new ColStatsBuilder<>(double.class).numNulls(6).numDVs(4)
+        .low(1d).high(7.5).build();
+    Assert.assertEquals(expectedStats, computedStatsObj.getStatsData());
+  }
+}
diff --git 
a/standalone-metastore/metastore-server/src/test/java/org/apache/hadoop/hive/metastore/columnstats/aggr/LongColumnStatsAggregatorTest.java
 
b/standalone-metastore/metastore-server/src/test/java/org/apache/hadoop/hive/metastore/columnstats/aggr/LongColumnStatsAggregatorTest.java
new file mode 100644
index 00000000000..126c9868bc6
--- /dev/null
+++ 
b/standalone-metastore/metastore-server/src/test/java/org/apache/hadoop/hive/metastore/columnstats/aggr/LongColumnStatsAggregatorTest.java
@@ -0,0 +1,242 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.hadoop.hive.metastore.columnstats.aggr;
+
+import org.apache.hadoop.hive.metastore.TableType;
+import org.apache.hadoop.hive.metastore.annotation.MetastoreUnitTest;
+import org.apache.hadoop.hive.metastore.api.ColumnStatisticsData;
+import org.apache.hadoop.hive.metastore.api.ColumnStatisticsObj;
+import org.apache.hadoop.hive.metastore.api.FieldSchema;
+import org.apache.hadoop.hive.metastore.api.MetaException;
+import org.apache.hadoop.hive.metastore.api.Table;
+import org.apache.hadoop.hive.metastore.columnstats.ColStatsBuilder;
+import 
org.apache.hadoop.hive.metastore.utils.MetaStoreServerUtils.ColStatsObjWithSourceInfo;
+import org.junit.Assert;
+import org.junit.Test;
+import org.junit.experimental.categories.Category;
+
+import java.util.Arrays;
+import java.util.Collections;
+import java.util.List;
+
+import static 
org.apache.hadoop.hive.metastore.StatisticsTestUtils.createStatsWithInfo;
+
+@Category(MetastoreUnitTest.class)
+public class LongColumnStatsAggregatorTest {
+
+  private static final Table TABLE = new Table("dummy", "db", "hive", 0, 0,
+      0, null, null, Collections.emptyMap(), null, null,
+      TableType.MANAGED_TABLE.toString());
+  private static final FieldSchema COL = new FieldSchema("col", "int", "");
+
+  @Test
+  public void testAggregateSingleStat() throws MetaException {
+    List<String> partitions = Collections.singletonList("part1");
+
+    ColumnStatisticsData data1 = new 
ColStatsBuilder<>(long.class).numNulls(1).numDVs(2)
+        .low(1L).high(4L).hll(1, 4).build();
+    List<ColStatsObjWithSourceInfo> statsList =
+        Collections.singletonList(createStatsWithInfo(data1, TABLE, COL, 
partitions.get(0)));
+
+    LongColumnStatsAggregator aggregator = new LongColumnStatsAggregator();
+    ColumnStatisticsObj computedStatsObj = aggregator.aggregate(statsList, 
partitions, true);
+
+    Assert.assertEquals(data1, computedStatsObj.getStatsData());
+  }
+
+  @Test
+  public void testAggregateSingleStatWhenNullValues() throws MetaException {
+    List<String> partitions = Collections.singletonList("part1");
+
+    ColumnStatisticsData data1 = new 
ColStatsBuilder<>(long.class).numNulls(1).numDVs(2).build();
+    List<ColStatsObjWithSourceInfo> statsList =
+        Collections.singletonList(createStatsWithInfo(data1, TABLE, COL, 
partitions.get(0)));
+
+    LongColumnStatsAggregator aggregator = new LongColumnStatsAggregator();
+
+    ColumnStatisticsObj computedStatsObj = aggregator.aggregate(statsList, 
partitions, true);
+    Assert.assertEquals(data1, computedStatsObj.getStatsData());
+
+    aggregator.useDensityFunctionForNDVEstimation = true;
+    computedStatsObj = aggregator.aggregate(statsList, partitions, true);
+    Assert.assertEquals(data1, computedStatsObj.getStatsData());
+
+    aggregator.useDensityFunctionForNDVEstimation = false;
+    aggregator.ndvTuner = 1;
+    // ndv tuner does not have any effect because min numDVs and max numDVs 
coincide (we have a single stats)
+    computedStatsObj = aggregator.aggregate(statsList, partitions, true);
+    Assert.assertEquals(data1, computedStatsObj.getStatsData());
+  }
+
+  @Test
+  public void testAggregateMultipleStatsWhenSomeNullValues() throws 
MetaException {
+    List<String> partitions = Arrays.asList("part1", "part2");
+
+    ColumnStatisticsData data1 = new 
ColStatsBuilder<>(long.class).numNulls(1).numDVs(2)
+        .low(1L).high(2L).hll(1, 2).build();
+    ColumnStatisticsData data2 = new 
ColStatsBuilder<>(long.class).numNulls(2).numDVs(3).build();
+
+    List<ColStatsObjWithSourceInfo> statsList = Arrays.asList(
+        createStatsWithInfo(data1, TABLE, COL, partitions.get(0)),
+        createStatsWithInfo(data2, TABLE, COL, partitions.get(1)));
+
+    LongColumnStatsAggregator aggregator = new LongColumnStatsAggregator();
+
+    ColumnStatisticsObj computedStatsObj = aggregator.aggregate(statsList, 
partitions, true);
+    ColumnStatisticsData expectedStats = new 
ColStatsBuilder<>(long.class).numNulls(3).numDVs(3)
+        .low(1L).high(2L).hll(1, 2).build();
+    Assert.assertEquals(expectedStats, computedStatsObj.getStatsData());
+
+    aggregator.useDensityFunctionForNDVEstimation = true;
+    computedStatsObj = aggregator.aggregate(statsList, partitions, true);
+    expectedStats = new ColStatsBuilder<>(long.class).numNulls(3).numDVs(4)
+        .low(1L).high(2L).hll(1, 2).build();
+    Assert.assertEquals(expectedStats, computedStatsObj.getStatsData());
+
+    aggregator.useDensityFunctionForNDVEstimation = false;
+    aggregator.ndvTuner = 1;
+    computedStatsObj = aggregator.aggregate(statsList, partitions, true);
+    expectedStats = new ColStatsBuilder<>(long.class).numNulls(3).numDVs(5)
+        .low(1L).high(2L).hll(1, 2).build();
+    Assert.assertEquals(expectedStats, computedStatsObj.getStatsData());
+  }
+
+  @Test
+  public void testAggregateMultiStatsWhenAllAvailable() throws MetaException {
+    List<String> partitions = Arrays.asList("part1", "part2", "part3");
+
+    ColumnStatisticsData data1 = new 
ColStatsBuilder<>(long.class).numNulls(1).numDVs(3)
+        .low(1L).high(3L).hll(1, 2, 3).build();
+    ColumnStatisticsData data2 = new 
ColStatsBuilder<>(long.class).numNulls(2).numDVs(3)
+        .low(3L).high(5L).hll(3, 4, 5).build();
+    ColumnStatisticsData data3 = new 
ColStatsBuilder<>(long.class).numNulls(3).numDVs(2)
+        .low(6L).high(7L).hll(6, 7).build();
+
+    List<ColStatsObjWithSourceInfo> statsList = Arrays.asList(
+        createStatsWithInfo(data1, TABLE, COL, partitions.get(0)),
+        createStatsWithInfo(data2, TABLE, COL, partitions.get(1)),
+        createStatsWithInfo(data3, TABLE, COL, partitions.get(2)));
+
+    LongColumnStatsAggregator aggregator = new LongColumnStatsAggregator();
+    ColumnStatisticsObj computedStatsObj = aggregator.aggregate(statsList, 
partitions, true);
+
+    // the aggregation does not update hll, only numDVs is, it keeps the first 
hll
+    // notice that numDVs is computed by using HLL, it can detect that '3' 
appears twice
+    ColumnStatisticsData expectedStats = new 
ColStatsBuilder<>(long.class).numNulls(6).numDVs(7)
+        .low(1L).high(7L).hll(1, 2, 3).build();
+    Assert.assertEquals(expectedStats, computedStatsObj.getStatsData());
+  }
+
+  @Test
+  public void testAggregateMultiStatsWhenUnmergeableBitVectors() throws 
MetaException {
+    List<String> partitions = Arrays.asList("part1", "part2", "part3");
+
+    ColumnStatisticsData data1 = new 
ColStatsBuilder<>(long.class).numNulls(1).numDVs(3)
+        .low(1L).high(3L).fmSketch(1, 2, 3).build();
+    ColumnStatisticsData data2 = new 
ColStatsBuilder<>(long.class).numNulls(2).numDVs(3)
+        .low(3L).high(5L).hll(3, 4, 5).build();
+    ColumnStatisticsData data3 = new 
ColStatsBuilder<>(long.class).numNulls(3).numDVs(4)
+        .low(1L).high(8L).hll(1, 2, 6, 8).build();
+
+    List<ColStatsObjWithSourceInfo> statsList = Arrays.asList(
+        createStatsWithInfo(data1, TABLE, COL, partitions.get(0)),
+        createStatsWithInfo(data2, TABLE, COL, partitions.get(1)),
+        createStatsWithInfo(data3, TABLE, COL, partitions.get(2)));
+
+    LongColumnStatsAggregator aggregator = new LongColumnStatsAggregator();
+    ColumnStatisticsObj computedStatsObj = aggregator.aggregate(statsList, 
partitions, true);
+    // the aggregation does not update the bitvector, only numDVs is, it keeps 
the first bitvector;
+    // numDVs is set to the maximum among all stats when non-mergeable 
bitvectors are detected
+    ColumnStatisticsData expectedStats = new 
ColStatsBuilder<>(long.class).numNulls(6).numDVs(4)
+        .low(1L).high(8L).fmSketch(1, 2, 3).build();
+    Assert.assertEquals(expectedStats, computedStatsObj.getStatsData());
+
+    aggregator.useDensityFunctionForNDVEstimation = true;
+    computedStatsObj = aggregator.aggregate(statsList, partitions, true);
+    // the use of the density function leads to a different estimation for 
numNDV
+    expectedStats = new ColStatsBuilder<>(long.class).numNulls(6).numDVs(6)
+        .low(1L).high(8L).fmSketch(1, 2, 3).build();
+    Assert.assertEquals(expectedStats, computedStatsObj.getStatsData());
+
+    aggregator.useDensityFunctionForNDVEstimation = false;
+    double[] tunerValues = new double[] { 0, 0.5, 0.75, 1 };
+    long[] expectedDVs = new long[] { 4, 7, 8, 10 };
+    for (int i = 0; i < tunerValues.length; i++) {
+      aggregator.ndvTuner = tunerValues[i];
+      computedStatsObj = aggregator.aggregate(statsList, partitions, true);
+      expectedStats = new 
ColStatsBuilder<>(long.class).numNulls(6).numDVs(expectedDVs[i])
+          .low(1L).high(8L).fmSketch(1, 2, 3).build();
+      Assert.assertEquals(expectedStats, computedStatsObj.getStatsData());
+    }
+  }
+
+  @Test
+  public void testAggregateMultiStatsWhenOnlySomeAvailable() throws 
MetaException {
+    List<String> partitions = Arrays.asList("part1", "part2", "part3", 
"part4");
+
+    ColumnStatisticsData data1 = new 
ColStatsBuilder<>(long.class).numNulls(1).numDVs(3)
+        .low(1L).high(3L).hll(1, 2, 3).build();
+    ColumnStatisticsData data3 = new 
ColStatsBuilder<>(long.class).numNulls(3).numDVs(1)
+        .low(7L).high(7L).hll(7).build();
+    ColumnStatisticsData data4 = new 
ColStatsBuilder<>(long.class).numNulls(2).numDVs(3)
+        .low(3L).high(5L).hll(3, 4, 5).build();
+
+    List<ColStatsObjWithSourceInfo> statsList = Arrays.asList(
+        createStatsWithInfo(data1, TABLE, COL, partitions.get(0)),
+        createStatsWithInfo(data3, TABLE, COL, partitions.get(2)),
+        createStatsWithInfo(data4, TABLE, COL, partitions.get(3)));
+
+    LongColumnStatsAggregator aggregator = new LongColumnStatsAggregator();
+    ColumnStatisticsObj computedStatsObj = aggregator.aggregate(statsList, 
partitions, false);
+
+    // hll in case of missing stats is left as null, only numDVs is updated
+    ColumnStatisticsData expectedStats = new 
ColStatsBuilder<>(long.class).numNulls(8).numDVs(4)
+        .low(1L).high(9L).build();
+    Assert.assertEquals(expectedStats, computedStatsObj.getStatsData());
+  }
+
+  @Test
+  public void 
testAggregateMultiStatsOnlySomeAvailableButUnmergeableBitVector() throws 
MetaException {
+    List<String> partitions = Arrays.asList("part1", "part2", "part3");
+
+    ColumnStatisticsData data1 = new 
ColStatsBuilder<>(long.class).numNulls(1).numDVs(3)
+        .low(1L).high(6L).fmSketch(1, 2, 6).build();
+    ColumnStatisticsData data3 = new 
ColStatsBuilder<>(long.class).numNulls(3).numDVs(1)
+        .low(7L).high(7L).hll(7).build();
+
+    List<ColStatsObjWithSourceInfo> statsList = Arrays.asList(
+        createStatsWithInfo(data1, TABLE, COL, partitions.get(0)),
+        createStatsWithInfo(data3, TABLE, COL, partitions.get(2)));
+
+    LongColumnStatsAggregator aggregator = new LongColumnStatsAggregator();
+
+    ColumnStatisticsObj computedStatsObj = aggregator.aggregate(statsList, 
partitions, false);
+    // hll in case of missing stats is left as null, only numDVs is updated
+    ColumnStatisticsData expectedStats = new 
ColStatsBuilder<>(long.class).numNulls(6).numDVs(3)
+        .low(1L).high(7L).build();
+    Assert.assertEquals(expectedStats, computedStatsObj.getStatsData());
+
+    aggregator.useDensityFunctionForNDVEstimation = true;
+    computedStatsObj = aggregator.aggregate(statsList, partitions, false);
+    // the use of the density function leads to a different estimation for 
numNDV
+    expectedStats = new ColStatsBuilder<>(long.class).numNulls(6).numDVs(4)
+        .low(1L).high(7L).build();
+    Assert.assertEquals(expectedStats, computedStatsObj.getStatsData());
+  }
+}
diff --git 
a/standalone-metastore/metastore-server/src/test/java/org/apache/hadoop/hive/metastore/columnstats/aggr/StringColumnStatsAggregatorTest.java
 
b/standalone-metastore/metastore-server/src/test/java/org/apache/hadoop/hive/metastore/columnstats/aggr/StringColumnStatsAggregatorTest.java
new file mode 100644
index 00000000000..b27092090a9
--- /dev/null
+++ 
b/standalone-metastore/metastore-server/src/test/java/org/apache/hadoop/hive/metastore/columnstats/aggr/StringColumnStatsAggregatorTest.java
@@ -0,0 +1,188 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.hadoop.hive.metastore.columnstats.aggr;
+
+import org.apache.hadoop.hive.metastore.TableType;
+import org.apache.hadoop.hive.metastore.annotation.MetastoreUnitTest;
+import org.apache.hadoop.hive.metastore.api.ColumnStatisticsData;
+import org.apache.hadoop.hive.metastore.api.ColumnStatisticsObj;
+import org.apache.hadoop.hive.metastore.api.FieldSchema;
+import org.apache.hadoop.hive.metastore.api.MetaException;
+import org.apache.hadoop.hive.metastore.api.Table;
+import org.apache.hadoop.hive.metastore.columnstats.ColStatsBuilder;
+import 
org.apache.hadoop.hive.metastore.utils.MetaStoreServerUtils.ColStatsObjWithSourceInfo;
+import org.junit.Assert;
+import org.junit.Test;
+import org.junit.experimental.categories.Category;
+
+import java.util.Arrays;
+import java.util.Collections;
+import java.util.List;
+
+import static 
org.apache.hadoop.hive.metastore.StatisticsTestUtils.createStatsWithInfo;
+
+@Category(MetastoreUnitTest.class)
+public class StringColumnStatsAggregatorTest {
+
+  private static final Table TABLE = new Table("dummy", "db", "hive", 0, 0,
+      0, null, null, Collections.emptyMap(), null, null,
+      TableType.MANAGED_TABLE.toString());
+  private static final FieldSchema COL = new FieldSchema("col", "string", "");
+
+  private static final String S_1 = "test";
+  private static final String S_2 = "try";
+  private static final String S_3 = "longer string";
+  private static final String S_4 = "even longer string";
+  private static final String S_5 = "some string";
+  private static final String S_6 = "some other string";
+  private static final String S_7 = "yet another string";
+
+  @Test
+  public void testAggregateSingleStat() throws MetaException {
+    List<String> partitions = Collections.singletonList("part1");
+
+    ColumnStatisticsData data1 = new 
ColStatsBuilder<>(String.class).numNulls(1).numDVs(2).avgColLen(8.5).maxColLen(13)
+        .hll(S_1, S_3).build();
+    List<ColStatsObjWithSourceInfo> statsList =
+        Collections.singletonList(createStatsWithInfo(data1, TABLE, COL, 
partitions.get(0)));
+
+    StringColumnStatsAggregator aggregator = new StringColumnStatsAggregator();
+    ColumnStatisticsObj computedStatsObj = aggregator.aggregate(statsList, 
partitions, true);
+
+    Assert.assertEquals(data1, computedStatsObj.getStatsData());
+  }
+
+  @Test
+  public void testAggregateMultiStatsWhenAllAvailable() throws MetaException {
+    List<String> partitions = Arrays.asList("part1", "part2", "part3");
+
+    ColumnStatisticsData data1 = new 
ColStatsBuilder<>(String.class).numNulls(1).numDVs(3).avgColLen(20.0 / 
3).maxColLen(13)
+        .hll(S_1, S_2, S_3).build();
+    ColumnStatisticsData data2 = new 
ColStatsBuilder<>(String.class).numNulls(2).numDVs(3).avgColLen(14).maxColLen(18)
+        .hll(S_3, S_4, S_5).build();
+    ColumnStatisticsData data3 = new 
ColStatsBuilder<>(String.class).numNulls(3).numDVs(2).avgColLen(17.5).maxColLen(18)
+        .hll(S_6, S_7).build();
+
+    List<ColStatsObjWithSourceInfo> statsList = Arrays.asList(
+        createStatsWithInfo(data1, TABLE, COL, partitions.get(0)),
+        createStatsWithInfo(data2, TABLE, COL, partitions.get(1)),
+        createStatsWithInfo(data3, TABLE, COL, partitions.get(2)));
+
+    StringColumnStatsAggregator aggregator = new StringColumnStatsAggregator();
+    ColumnStatisticsObj computedStatsObj = aggregator.aggregate(statsList, 
partitions, true);
+
+    // the aggregation does not update hll, only numNDVs is, it keeps the 
first hll
+    ColumnStatisticsData expectedStats = new 
ColStatsBuilder<>(String.class).numNulls(6).numDVs(7).avgColLen(17.5).maxColLen(18)
+        .hll(S_1, S_2, S_3).build();
+
+    Assert.assertEquals(expectedStats, computedStatsObj.getStatsData());
+  }
+
+  @Test
+  public void testAggregateMultiStatsWhenUnmergeableBitVectors() throws 
MetaException {
+    List<String> partitions = Arrays.asList("part1", "part2", "part3");
+
+    ColumnStatisticsData data1 = new 
ColStatsBuilder<>(String.class).numNulls(1).numDVs(3).avgColLen(20.0 / 
3).maxColLen(13)
+        .fmSketch(S_1, S_2, S_3).build();
+    ColumnStatisticsData data2 = new 
ColStatsBuilder<>(String.class).numNulls(2).numDVs(3).avgColLen(14).maxColLen(18)
+        .hll(S_3, S_4, S_5).build();
+    ColumnStatisticsData data3 = new 
ColStatsBuilder<>(String.class).numNulls(3).numDVs(2).avgColLen(17.5).maxColLen(18)
+        .hll(S_6, S_7).build();
+
+    List<ColStatsObjWithSourceInfo> statsList = Arrays.asList(
+        createStatsWithInfo(data1, TABLE, COL, partitions.get(0)),
+        createStatsWithInfo(data2, TABLE, COL, partitions.get(1)),
+        createStatsWithInfo(data3, TABLE, COL, partitions.get(2)));
+
+    StringColumnStatsAggregator aggregator = new StringColumnStatsAggregator();
+
+    ColumnStatisticsObj computedStatsObj = aggregator.aggregate(statsList, 
partitions, true);
+    // the aggregation does not update the bitvector, only numDVs is, it keeps 
the first bitvector;
+    // numDVs is set to the maximum among all stats when non-mergeable 
bitvectors are detected
+    ColumnStatisticsData expectedStats = new 
ColStatsBuilder<>(String.class).numNulls(6).numDVs(3).avgColLen(17.5).maxColLen(18)
+        .fmSketch(S_1, S_2, S_3).build();
+    Assert.assertEquals(expectedStats, computedStatsObj.getStatsData());
+
+    // both useDensityFunctionForNDVEstimation and ndvTuner are ignored by 
StringColumnStatsAggregator
+    aggregator.useDensityFunctionForNDVEstimation = true;
+    computedStatsObj = aggregator.aggregate(statsList, partitions, true);
+    Assert.assertEquals(expectedStats, computedStatsObj.getStatsData());
+
+    aggregator.useDensityFunctionForNDVEstimation = false;
+    double[] tunerValues = new double[] { 0, 0.5, 0.75, 1 };
+    for (int i = 0; i < tunerValues.length; i++) {
+      aggregator.ndvTuner = tunerValues[i];
+      computedStatsObj = aggregator.aggregate(statsList, partitions, true);
+      Assert.assertEquals(expectedStats, computedStatsObj.getStatsData());
+    }
+  }
+
+  @Test
+  public void testAggregateMultiStatsWhenOnlySomeAvailable() throws 
MetaException {
+    List<String> partitions = Arrays.asList("part1", "part2", "part3", 
"part4");
+
+    ColumnStatisticsData data1 = new 
ColStatsBuilder<>(String.class).numNulls(1).numDVs(3).avgColLen(20.0 / 
3).maxColLen(13)
+        .hll(S_1, S_2, S_3).build();
+    ColumnStatisticsData data3 = new 
ColStatsBuilder<>(String.class).numNulls(3).numDVs(2).avgColLen(17.5).maxColLen(18)
+        .hll(S_6, S_7).build();
+    ColumnStatisticsData data4 = new 
ColStatsBuilder<>(String.class).numNulls(2).numDVs(3).avgColLen(14).maxColLen(18)
+        .hll(S_3, S_4, S_5).build();
+
+    List<ColStatsObjWithSourceInfo> statsList = Arrays.asList(
+        createStatsWithInfo(data1, TABLE, COL, partitions.get(0)),
+        createStatsWithInfo(data3, TABLE, COL, partitions.get(2)),
+        createStatsWithInfo(data4, TABLE, COL, partitions.get(3)));
+
+    StringColumnStatsAggregator aggregator = new StringColumnStatsAggregator();
+    ColumnStatisticsObj computedStatsObj = aggregator.aggregate(statsList, 
partitions, false);
+
+    // hll in case of missing stats is left as null, only numDVs is updated
+    ColumnStatisticsData expectedStats = new 
ColStatsBuilder<>(String.class).numNulls(8).numDVs(6)
+        .avgColLen(24).maxColLen(24).build();
+
+    Assert.assertEquals(expectedStats, computedStatsObj.getStatsData());
+  }
+
+  @Test
+  public void 
testAggregateMultiStatsOnlySomeAvailableButUnmergeableBitVector() throws 
MetaException {
+    List<String> partitions = Arrays.asList("part1", "part2", "part3");
+
+    ColumnStatisticsData data1 = new 
ColStatsBuilder<>(String.class).numNulls(1).numDVs(3).avgColLen(20.0 / 
3).maxColLen(13)
+        .fmSketch(S_1, S_2, S_3).build();
+    ColumnStatisticsData data3 = new 
ColStatsBuilder<>(String.class).numNulls(3).numDVs(2).avgColLen(17.5).maxColLen(18)
+        .hll(S_6, S_7).build();
+
+    List<ColStatsObjWithSourceInfo> statsList = Arrays.asList(
+        createStatsWithInfo(data1, TABLE, COL, partitions.get(0)),
+        createStatsWithInfo(data3, TABLE, COL, partitions.get(2)));
+
+    StringColumnStatsAggregator aggregator = new StringColumnStatsAggregator();
+
+    ColumnStatisticsObj computedStatsObj = aggregator.aggregate(statsList, 
partitions, false);
+    // hll in case of missing stats is left as null, only numDVs is updated
+    ColumnStatisticsData expectedStats = new 
ColStatsBuilder<>(String.class).numNulls(6).numDVs(3)
+        .avgColLen(22.916666666666668).maxColLen(22).build();
+    Assert.assertEquals(expectedStats, computedStatsObj.getStatsData());
+
+    // both useDensityFunctionForNDVEstimation and ndvTuner are ignored by 
StringColumnStatsAggregator
+    aggregator.useDensityFunctionForNDVEstimation = true;
+    computedStatsObj = aggregator.aggregate(statsList, partitions, false);
+    Assert.assertEquals(expectedStats, computedStatsObj.getStatsData());
+  }
+}
diff --git 
a/standalone-metastore/metastore-server/src/test/java/org/apache/hadoop/hive/metastore/columnstats/aggr/TimestampColumnStatsAggregatorTest.java
 
b/standalone-metastore/metastore-server/src/test/java/org/apache/hadoop/hive/metastore/columnstats/aggr/TimestampColumnStatsAggregatorTest.java
new file mode 100644
index 00000000000..e6217eb118b
--- /dev/null
+++ 
b/standalone-metastore/metastore-server/src/test/java/org/apache/hadoop/hive/metastore/columnstats/aggr/TimestampColumnStatsAggregatorTest.java
@@ -0,0 +1,273 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.hadoop.hive.metastore.columnstats.aggr;
+
+import org.apache.hadoop.hive.metastore.TableType;
+import org.apache.hadoop.hive.metastore.annotation.MetastoreUnitTest;
+import org.apache.hadoop.hive.metastore.api.ColumnStatisticsData;
+import org.apache.hadoop.hive.metastore.api.ColumnStatisticsObj;
+import org.apache.hadoop.hive.metastore.api.FieldSchema;
+import org.apache.hadoop.hive.metastore.api.MetaException;
+import org.apache.hadoop.hive.metastore.api.Table;
+import org.apache.hadoop.hive.metastore.api.Timestamp;
+import org.apache.hadoop.hive.metastore.columnstats.ColStatsBuilder;
+import 
org.apache.hadoop.hive.metastore.utils.MetaStoreServerUtils.ColStatsObjWithSourceInfo;
+import org.junit.Assert;
+import org.junit.Test;
+import org.junit.experimental.categories.Category;
+
+import java.util.Arrays;
+import java.util.Collections;
+import java.util.List;
+
+import static 
org.apache.hadoop.hive.metastore.StatisticsTestUtils.createStatsWithInfo;
+
+@Category(MetastoreUnitTest.class)
+public class TimestampColumnStatsAggregatorTest {
+
+  private static final Table TABLE = new Table("dummy", "db", "hive", 0, 0,
+      0, null, null, Collections.emptyMap(), null, null,
+      TableType.MANAGED_TABLE.toString());
+  private static final FieldSchema COL = new FieldSchema("col", "timestamp", 
"");
+
+  private static final Timestamp TS_1 = new Timestamp(1);
+  private static final Timestamp TS_2 = new Timestamp(2);
+  private static final Timestamp TS_3 = new Timestamp(3);
+  private static final Timestamp TS_4 = new Timestamp(4);
+  private static final Timestamp TS_5 = new Timestamp(5);
+  private static final Timestamp TS_6 = new Timestamp(6);
+  private static final Timestamp TS_7 = new Timestamp(7);
+  private static final Timestamp TS_8 = new Timestamp(8);
+  private static final Timestamp TS_9 = new Timestamp(9);
+
+  @Test
+  public void testAggregateSingleStat() throws MetaException {
+    List<String> partitions = Collections.singletonList("part1");
+
+    ColumnStatisticsData data1 = new 
ColStatsBuilder<>(Timestamp.class).numNulls(1).numDVs(2).low(TS_1)
+        .high(TS_3).hll(TS_1.getSecondsSinceEpoch(), 
TS_3.getSecondsSinceEpoch()).build();
+    List<ColStatsObjWithSourceInfo> statsList =
+        Collections.singletonList(createStatsWithInfo(data1, TABLE, COL, 
partitions.get(0)));
+
+    TimestampColumnStatsAggregator aggregator = new 
TimestampColumnStatsAggregator();
+    ColumnStatisticsObj computedStatsObj = aggregator.aggregate(statsList, 
partitions, true);
+
+    Assert.assertEquals(data1, computedStatsObj.getStatsData());
+  }
+
+  @Test
+  public void testAggregateSingleStatWhenNullValues() throws MetaException {
+    List<String> partitions = Collections.singletonList("part1");
+
+    ColumnStatisticsData data1 = new 
ColStatsBuilder<>(Timestamp.class).numNulls(1).numDVs(2).build();
+    List<ColStatsObjWithSourceInfo> statsList =
+        Collections.singletonList(createStatsWithInfo(data1, TABLE, COL, 
partitions.get(0)));
+
+    TimestampColumnStatsAggregator aggregator = new 
TimestampColumnStatsAggregator();
+
+    ColumnStatisticsObj computedStatsObj = aggregator.aggregate(statsList, 
partitions, true);
+    Assert.assertEquals(data1, computedStatsObj.getStatsData());
+
+    aggregator.useDensityFunctionForNDVEstimation = true;
+    computedStatsObj = aggregator.aggregate(statsList, partitions, true);
+    Assert.assertEquals(data1, computedStatsObj.getStatsData());
+
+    aggregator.useDensityFunctionForNDVEstimation = false;
+    aggregator.ndvTuner = 1;
+    // ndv tuner does not have any effect because min numDVs and max numDVs 
coincide (we have a single stats)
+    computedStatsObj = aggregator.aggregate(statsList, partitions, true);
+    Assert.assertEquals(data1, computedStatsObj.getStatsData());
+  }
+
+  @Test
+  public void testAggregateMultipleStatsWhenSomeNullValues() throws 
MetaException {
+    List<String> partitions = Arrays.asList("part1", "part2");
+
+    long[] values1 = { TS_1.getSecondsSinceEpoch(), 
TS_2.getSecondsSinceEpoch() };
+    ColumnStatisticsData data1 = new 
ColStatsBuilder<>(Timestamp.class).numNulls(1).numDVs(2)
+        .low(TS_1).high(TS_2).hll(values1).build();
+    ColumnStatisticsData data2 = new 
ColStatsBuilder<>(Timestamp.class).numNulls(2).numDVs(3).build();
+
+    List<ColStatsObjWithSourceInfo> statsList = Arrays.asList(
+        createStatsWithInfo(data1, TABLE, COL, partitions.get(0)),
+        createStatsWithInfo(data2, TABLE, COL, partitions.get(1)));
+
+    TimestampColumnStatsAggregator aggregator = new 
TimestampColumnStatsAggregator();
+
+    ColumnStatisticsObj computedStatsObj = aggregator.aggregate(statsList, 
partitions, true);
+    ColumnStatisticsData expectedStats = new 
ColStatsBuilder<>(Timestamp.class).numNulls(3).numDVs(3)
+        .low(TS_1).high(TS_2).hll(values1).build();
+    Assert.assertEquals(expectedStats, computedStatsObj.getStatsData());
+
+    aggregator.useDensityFunctionForNDVEstimation = true;
+    computedStatsObj = aggregator.aggregate(statsList, partitions, true);
+    expectedStats = new 
ColStatsBuilder<>(Timestamp.class).numNulls(3).numDVs(4)
+        .low(TS_1).high(TS_2).hll(values1).build();
+    Assert.assertEquals(expectedStats, computedStatsObj.getStatsData());
+
+    aggregator.useDensityFunctionForNDVEstimation = false;
+    aggregator.ndvTuner = 1;
+    computedStatsObj = aggregator.aggregate(statsList, partitions, true);
+    expectedStats = new 
ColStatsBuilder<>(Timestamp.class).numNulls(3).numDVs(5)
+        .low(TS_1).high(TS_2).hll(values1).build();
+    Assert.assertEquals(expectedStats, computedStatsObj.getStatsData());
+  }
+
+  @Test
+  public void testAggregateMultiStatsWhenAllAvailable() throws MetaException {
+    List<String> partitions = Arrays.asList("part1", "part2", "part3");
+
+    long[] values1 = { TS_1.getSecondsSinceEpoch(), 
TS_2.getSecondsSinceEpoch(), TS_3.getSecondsSinceEpoch() };
+    ColumnStatisticsData data1 = new 
ColStatsBuilder<>(Timestamp.class).numNulls(1).numDVs(2)
+        .low(TS_1).high(TS_3).hll(values1).build();
+
+    long[] values2 = { TS_3.getSecondsSinceEpoch(), 
TS_4.getSecondsSinceEpoch(), TS_5.getSecondsSinceEpoch() };
+    ColumnStatisticsData data2 = new 
ColStatsBuilder<>(Timestamp.class).numNulls(2).numDVs(3)
+        .low(TS_3).high(TS_5).hll(values2).build();
+
+    long[] values3 = { TS_6.getSecondsSinceEpoch(), 
TS_7.getSecondsSinceEpoch() };
+    ColumnStatisticsData data3 = new 
ColStatsBuilder<>(Timestamp.class).numNulls(3).numDVs(2)
+        .low(TS_6).high(TS_7).hll(values3).build();
+
+    List<ColStatsObjWithSourceInfo> statsList = Arrays.asList(
+        createStatsWithInfo(data1, TABLE, COL, partitions.get(0)),
+        createStatsWithInfo(data2, TABLE, COL, partitions.get(1)),
+        createStatsWithInfo(data3, TABLE, COL, partitions.get(2)));
+
+    TimestampColumnStatsAggregator aggregator = new 
TimestampColumnStatsAggregator();
+    ColumnStatisticsObj computedStatsObj = aggregator.aggregate(statsList, 
partitions, true);
+
+    // the aggregation does not update hll, only numDVs is, it keeps the first 
hll
+    // notice that numDVs is computed by using HLL, it can detect that 'TS_3' 
appears twice
+    ColumnStatisticsData expectedStats = new 
ColStatsBuilder<>(Timestamp.class).numNulls(6).numDVs(7)
+        .low(TS_1).high(TS_7).hll(values1).build();
+
+    Assert.assertEquals(expectedStats, computedStatsObj.getStatsData());
+  }
+
+  @Test
+  public void testAggregateMultiStatsWhenUnmergeableBitVectors() throws 
MetaException {
+    List<String> partitions = Arrays.asList("part1", "part2", "part3");
+
+    long[] values1 = { TS_1.getSecondsSinceEpoch(), 
TS_2.getSecondsSinceEpoch(), TS_3.getSecondsSinceEpoch() };
+    ColumnStatisticsData data1 = new 
ColStatsBuilder<>(Timestamp.class).numNulls(1).numDVs(3)
+        .low(TS_1).high(TS_3).fmSketch(values1).build();
+
+    long[] values2 = { TS_3.getSecondsSinceEpoch(), 
TS_4.getSecondsSinceEpoch(), TS_5.getSecondsSinceEpoch() };
+    ColumnStatisticsData data2 = new 
ColStatsBuilder<>(Timestamp.class).numNulls(2).numDVs(3).low(TS_3).high(TS_5)
+        .hll(values2).build();
+
+    long[] values3 = { TS_1.getSecondsSinceEpoch(), 
TS_2.getSecondsSinceEpoch(), TS_6.getSecondsSinceEpoch(),
+        TS_8.getSecondsSinceEpoch() };
+    ColumnStatisticsData data3 = new 
ColStatsBuilder<>(Timestamp.class).numNulls(3).numDVs(4).low(TS_1)
+        .high(TS_8).hll(values3).build();
+
+    List<ColStatsObjWithSourceInfo> statsList = Arrays.asList(
+        createStatsWithInfo(data1, TABLE, COL, partitions.get(0)),
+        createStatsWithInfo(data2, TABLE, COL, partitions.get(1)),
+        createStatsWithInfo(data3, TABLE, COL, partitions.get(2)));
+
+    TimestampColumnStatsAggregator aggregator = new 
TimestampColumnStatsAggregator();
+
+    ColumnStatisticsObj computedStatsObj = aggregator.aggregate(statsList, 
partitions, true);
+    // the aggregation does not update the bitvector, only numDVs is, it keeps 
the first bitvector;
+    // numDVs is set to the maximum among all stats when non-mergeable 
bitvectors are detected
+    ColumnStatisticsData expectedStats = new 
ColStatsBuilder<>(Timestamp.class).numNulls(6).numDVs(4).low(TS_1)
+        .high(TS_8).fmSketch(values1).build();
+    Assert.assertEquals(expectedStats, computedStatsObj.getStatsData());
+
+    aggregator.useDensityFunctionForNDVEstimation = true;
+    computedStatsObj = aggregator.aggregate(statsList, partitions, true);
+    // the use of the density function leads to a different estimation for 
numNDV
+    expectedStats = new 
ColStatsBuilder<>(Timestamp.class).numNulls(6).numDVs(6).low(TS_1).high(TS_8)
+        .fmSketch(values1).build();
+    Assert.assertEquals(expectedStats, computedStatsObj.getStatsData());
+
+    aggregator.useDensityFunctionForNDVEstimation = false;
+    double[] tunerValues = new double[] { 0, 0.5, 0.75, 1 };
+    long[] expectedDVs = new long[] { 4, 7, 8, 10 };
+    for (int i = 0; i < tunerValues.length; i++) {
+      aggregator.ndvTuner = tunerValues[i];
+      computedStatsObj = aggregator.aggregate(statsList, partitions, true);
+      expectedStats = new 
ColStatsBuilder<>(Timestamp.class).numNulls(6).numDVs(expectedDVs[i])
+          .low(TS_1).high(TS_8).fmSketch(values1).build();
+      Assert.assertEquals(expectedStats, computedStatsObj.getStatsData());
+    }
+  }
+
+  @Test
+  public void testAggregateMultiStatsWhenOnlySomeAvailable() throws 
MetaException {
+    List<String> partitions = Arrays.asList("part1", "part2", "part3", 
"part4");
+
+    long[] values1 = { TS_1.getSecondsSinceEpoch(), 
TS_2.getSecondsSinceEpoch(), TS_3.getSecondsSinceEpoch() };
+    ColumnStatisticsData data1 = new 
ColStatsBuilder<>(Timestamp.class).numNulls(1).numDVs(3)
+        .low(TS_1).high(TS_3).hll(values1).build();
+
+    ColumnStatisticsData data3 = new 
ColStatsBuilder<>(Timestamp.class).numNulls(3).numDVs(1)
+        .low(TS_7).high(TS_7).hll(TS_7.getSecondsSinceEpoch()).build();
+
+    long[] values4 = { TS_3.getSecondsSinceEpoch(), 
TS_4.getSecondsSinceEpoch(), TS_5.getSecondsSinceEpoch() };
+    ColumnStatisticsData data4 = new 
ColStatsBuilder<>(Timestamp.class).numNulls(2).numDVs(3).low(TS_3).high(TS_5)
+        .hll(values4).build();
+
+    List<ColStatsObjWithSourceInfo> statsList = Arrays.asList(
+        createStatsWithInfo(data1, TABLE, COL, partitions.get(0)),
+        createStatsWithInfo(data3, TABLE, COL, partitions.get(2)),
+        createStatsWithInfo(data4, TABLE, COL, partitions.get(3)));
+
+    TimestampColumnStatsAggregator aggregator = new 
TimestampColumnStatsAggregator();
+    ColumnStatisticsObj computedStatsObj = aggregator.aggregate(statsList, 
partitions, false);
+
+    // hll in case of missing stats is left as null, only numDVs is updated
+    ColumnStatisticsData expectedStats = new 
ColStatsBuilder<>(Timestamp.class).numNulls(8).numDVs(4).low(TS_1)
+        .high(TS_9).build();
+    Assert.assertEquals(expectedStats, computedStatsObj.getStatsData());
+  }
+
+  @Test
+  public void 
testAggregateMultiStatsOnlySomeAvailableButUnmergeableBitVector() throws 
MetaException {
+    List<String> partitions = Arrays.asList("part1", "part2", "part3");
+
+    long[] values1 = { TS_1.getSecondsSinceEpoch(), 
TS_2.getSecondsSinceEpoch(), TS_6.getSecondsSinceEpoch() };
+    ColumnStatisticsData data1 = new 
ColStatsBuilder<>(Timestamp.class).numNulls(1).numDVs(3)
+        .low(TS_1).high(TS_6).hll(values1).build();
+
+    ColumnStatisticsData data3 = new 
ColStatsBuilder<>(Timestamp.class).numNulls(3).numDVs(1)
+        .low(TS_7).high(TS_7).hll(TS_7.getSecondsSinceEpoch()).build();
+
+    List<ColStatsObjWithSourceInfo> statsList = Arrays.asList(
+        createStatsWithInfo(data1, TABLE, COL, partitions.get(0)),
+        createStatsWithInfo(data3, TABLE, COL, partitions.get(2)));
+
+    TimestampColumnStatsAggregator aggregator = new 
TimestampColumnStatsAggregator();
+
+    ColumnStatisticsObj computedStatsObj = aggregator.aggregate(statsList, 
partitions, false);
+    // hll in case of missing stats is left as null, only numDVs is updated
+    ColumnStatisticsData expectedStats = new 
ColStatsBuilder<>(Timestamp.class).numNulls(6).numDVs(3).low(TS_1)
+        .high(TS_7).build();
+    Assert.assertEquals(expectedStats, computedStatsObj.getStatsData());
+
+    aggregator.useDensityFunctionForNDVEstimation = true;
+    computedStatsObj = aggregator.aggregate(statsList, partitions, true);
+    // the use of the density function leads to a different estimation for 
numNDV
+    expectedStats = new 
ColStatsBuilder<>(Timestamp.class).numNulls(6).numDVs(4).low(TS_1)
+        .high(TS_7).build();
+    Assert.assertEquals(expectedStats, computedStatsObj.getStatsData());
+  }
+}

Reply via email to