ramitg254 commented on code in PR #6089:
URL: https://github.com/apache/hive/pull/6089#discussion_r2709518413


##########
standalone-metastore/metastore-server/src/main/java/org/apache/hadoop/hive/metastore/MetaStoreDirectSql.java:
##########
@@ -1957,307 +2010,224 @@ private List<ColumnStatisticsObj> 
aggrStatsUseJava(String catName, String dbName
         areAllPartsFound, useDensityFunctionForNDVEstimation, ndvTuner);
   }
 
-  private List<ColumnStatisticsObj> aggrStatsUseDB(String catName, String 
dbName,
-      String tableName, List<String> partNames, List<String> colNames, String 
engine,
-      boolean areAllPartsFound, boolean useDensityFunctionForNDVEstimation, 
double ndvTuner) throws MetaException {
+  private List<ColumnStatisticsObj> aggrStatsUseDB(String catName, String 
dbName, String tableName,
+                                                   List<String> partNames, 
List<String> colNames, String engine,
+                                                   boolean 
useDensityFunctionForNDVEstimation, double ndvTuner)
+          throws MetaException {
     // TODO: all the extrapolation logic should be moved out of this class,
     // only mechanical data retrieval should remain here.
-    String commonPrefix = "select \"COLUMN_NAME\", \"COLUMN_TYPE\", "
-        + "min(\"LONG_LOW_VALUE\"), max(\"LONG_HIGH_VALUE\"), 
min(\"DOUBLE_LOW_VALUE\"), max(\"DOUBLE_HIGH_VALUE\"), "
-        + "min(cast(\"BIG_DECIMAL_LOW_VALUE\" as decimal)), 
max(cast(\"BIG_DECIMAL_HIGH_VALUE\" as decimal)), "
-        + "sum(\"NUM_NULLS\"), max(\"NUM_DISTINCTS\"), "
-        + "max(\"AVG_COL_LEN\"), max(\"MAX_COL_LEN\"), sum(\"NUM_TRUES\"), 
sum(\"NUM_FALSES\"), "
-        // The following data is used to compute a partitioned table's NDV 
based
-        // on partitions' NDV when useDensityFunctionForNDVEstimation = true. 
Global NDVs cannot be
-        // accurately derived from partition NDVs, because the domain of 
column value two partitions
-        // can overlap. If there is no overlap then global NDV is just the sum
-        // of partition NDVs (UpperBound). But if there is some overlay then
-        // global NDV can be anywhere between sum of partition NDVs (no 
overlap)
-        // and same as one of the partition NDV (domain of column value in all 
other
-        // partitions is subset of the domain value in one of the partition)
-        // (LowerBound).But under uniform distribution, we can roughly 
estimate the global
-        // NDV by leveraging the min/max values.
-        // And, we also guarantee that the estimation makes sense by comparing 
it to the
-        // UpperBound (calculated by "sum(\"NUM_DISTINCTS\")")
-        // and LowerBound (calculated by "max(\"NUM_DISTINCTS\")")
-        + "avg((\"LONG_HIGH_VALUE\"-\"LONG_LOW_VALUE\")/cast(\"NUM_DISTINCTS\" 
as decimal)),"
-        + 
"avg((\"DOUBLE_HIGH_VALUE\"-\"DOUBLE_LOW_VALUE\")/\"NUM_DISTINCTS\"),"
-        + "avg((cast(\"BIG_DECIMAL_HIGH_VALUE\" as 
decimal)-cast(\"BIG_DECIMAL_LOW_VALUE\" as decimal))/\"NUM_DISTINCTS\"),"
-        + "sum(\"NUM_DISTINCTS\")" + " from " + PART_COL_STATS + ""
-        + " inner join " + PARTITIONS + " on " + PART_COL_STATS + 
".\"PART_ID\" = " + PARTITIONS + ".\"PART_ID\""
-        + " inner join " + TBLS + " on " + PARTITIONS + ".\"TBL_ID\" = " + 
TBLS + ".\"TBL_ID\""
-        + " inner join " + DBS + " on " + TBLS + ".\"DB_ID\" = " + DBS + 
".\"DB_ID\""
-        + " where " + DBS + ".\"CTLG_NAME\" = ? and " + DBS + ".\"NAME\" = ? 
and " + TBLS + ".\"TBL_NAME\" = ? ";
-    String queryText = null;
-    long start = 0;
-    long end = 0;
+    String queryText = "select \"COLUMN_NAME\", \"COLUMN_TYPE\", "
+            + "min(\"LONG_LOW_VALUE\"), max(\"LONG_HIGH_VALUE\"), 
min(\"DOUBLE_LOW_VALUE\"), max(\"DOUBLE_HIGH_VALUE\"), "
+            + "min(cast(\"BIG_DECIMAL_LOW_VALUE\" as decimal)), 
max(cast(\"BIG_DECIMAL_HIGH_VALUE\" as decimal)), "
+            + "sum(\"NUM_NULLS\"), max(\"NUM_DISTINCTS\"), "
+            + "max(\"AVG_COL_LEN\"), max(\"MAX_COL_LEN\"), sum(\"NUM_TRUES\"), 
sum(\"NUM_FALSES\"), "
+            // The following data is used to compute a partitioned table's NDV 
based
+            // on partitions' NDV when useDensityFunctionForNDVEstimation = 
true. Global NDVs cannot be
+            // accurately derived from partition NDVs, because the domain of 
column value two partitions
+            // can overlap. If there is no overlap then global NDV is just the 
sum
+            // of partition NDVs (UpperBound). But if there is some overlay 
then
+            // global NDV can be anywhere between sum of partition NDVs (no 
overlap)
+            // and same as one of the partition NDV (domain of column value in 
all other
+            // partitions is subset of the domain value in one of the 
partition)
+            // (LowerBound).But under uniform distribution, we can roughly 
estimate the global
+            // NDV by leveraging the min/max values.
+            // And, we also guarantee that the estimation makes sense by 
comparing it to the
+            // UpperBound (calculated by "sum(\"NUM_DISTINCTS\")")
+            // and LowerBound (calculated by "max(\"NUM_DISTINCTS\")")
+            + 
"sum((\"LONG_HIGH_VALUE\"-\"LONG_LOW_VALUE\")/cast(\"NUM_DISTINCTS\" as 
decimal)),"
+            + 
"sum((\"DOUBLE_HIGH_VALUE\"-\"DOUBLE_LOW_VALUE\")/\"NUM_DISTINCTS\"),"
+            + "sum((cast(\"BIG_DECIMAL_HIGH_VALUE\" as 
decimal)-cast(\"BIG_DECIMAL_LOW_VALUE\" as decimal))/\"NUM_DISTINCTS\"),"
+            + "count(1),"
+            + "sum(\"NUM_DISTINCTS\")" + " from " + PART_COL_STATS + ""
+            + " inner join " + PARTITIONS + " on " + PART_COL_STATS + 
".\"PART_ID\" = " + PARTITIONS + ".\"PART_ID\""
+            + " inner join " + TBLS + " on " + PARTITIONS + ".\"TBL_ID\" = " + 
TBLS + ".\"TBL_ID\""
+            + " inner join " + DBS + " on " + TBLS + ".\"DB_ID\" = " + DBS + 
".\"DB_ID\""
+            + " where " + DBS + ".\"CTLG_NAME\" = ? and " + DBS + ".\"NAME\" = 
? and " + TBLS + ".\"TBL_NAME\" = ? "
+            + " and \"COLUMN_NAME\" in (%1$s)" + " and " + PARTITIONS + 
".\"PART_NAME\" in (%2$s)"
+            + " and \"ENGINE\" = ? " + " group by \"COLUMN_NAME\", 
\"COLUMN_TYPE\"";
 
     boolean doTrace = LOG.isDebugEnabled();
-    ForwardQueryResult<?> fqr = null;
-    // Check if the status of all the columns of all the partitions exists
-    // Extrapolation is not needed.
-    if (areAllPartsFound) {
-      queryText = commonPrefix + " and \"COLUMN_NAME\" in (" + 
makeParams(colNames.size()) + ")"
-          + " and " + PARTITIONS + ".\"PART_NAME\" in (" + 
makeParams(partNames.size()) + ")"
-          + " and \"ENGINE\" = ? "
-          + " group by \"COLUMN_NAME\", \"COLUMN_TYPE\"";
-      start = doTrace ? System.nanoTime() : 0;
-      try (QueryWrapper query = new 
QueryWrapper(pm.newQuery("javax.jdo.query.SQL", queryText))) {
-        Object qResult = executeWithArray(query.getInnerQuery(),
-            prepareParams(catName, dbName, tableName, partNames, colNames,
-                engine), queryText);
-        if (qResult == null) {
-          return Collections.emptyList();
-        }
-        end = doTrace ? System.nanoTime() : 0;
-        MetastoreDirectSqlUtils.timingTrace(doTrace, queryText, start, end);
-        List<Object[]> list = MetastoreDirectSqlUtils.ensureList(qResult);
-        List<ColumnStatisticsObj> colStats =
-            new ArrayList<ColumnStatisticsObj>(list.size());
-        for (Object[] row : list) {
-          colStats.add(prepareCSObjWithAdjustedNDV(row, 0,
-              useDensityFunctionForNDVEstimation, ndvTuner));
-          Deadline.checkTimeout();
-        }
-        return colStats;
-      }
-    } else {
-      // Extrapolation is needed for some columns.
-      // In this case, at least a column status for a partition is missing.
-      // We need to extrapolate this partition based on the other partitions
-      List<ColumnStatisticsObj> colStats = new 
ArrayList<ColumnStatisticsObj>(colNames.size());
-      queryText = "select \"COLUMN_NAME\", \"COLUMN_TYPE\", 
count(\"PART_COL_STATS\".\"PART_ID\") "
-          + " from " + PART_COL_STATS
-          + " inner join " + PARTITIONS + " on " + PART_COL_STATS + 
".\"PART_ID\" = " + PARTITIONS + ".\"PART_ID\""
-          + " inner join " + TBLS + " on " + PARTITIONS + ".\"TBL_ID\" = " + 
TBLS + ".\"TBL_ID\""
-          + " inner join " + DBS + " on " + TBLS + ".\"DB_ID\" = " + DBS + 
".\"DB_ID\""
-          + " where " + DBS + ".\"CTLG_NAME\" = ? and " + DBS + ".\"NAME\" = ? 
and " + TBLS + ".\"TBL_NAME\" = ? "
-          + " and " + PART_COL_STATS + ".\"COLUMN_NAME\" in (" + 
makeParams(colNames.size()) + ")"
-          + " and " + PARTITIONS + ".\"PART_NAME\" in (" + 
makeParams(partNames.size()) + ")"
-          + " and " + PART_COL_STATS + ".\"ENGINE\" = ? "
-          + " group by " + PART_COL_STATS + ".\"COLUMN_NAME\", " + 
PART_COL_STATS + ".\"COLUMN_TYPE\"";
-      start = doTrace ? System.nanoTime() : 0;
-      List<String> noExtraColumnNames = new ArrayList<String>();
-      Map<String, String[]> extraColumnNameTypeParts = new HashMap<String, 
String[]>();
-      try(QueryWrapper query = new 
QueryWrapper(pm.newQuery("javax.jdo.query.SQL", queryText))) {
-        Object qResult = executeWithArray(query.getInnerQuery(),
-            prepareParams(catName, dbName, tableName, partNames, colNames,
-                engine), queryText);
-        end = doTrace ? System.nanoTime() : 0;
-        MetastoreDirectSqlUtils.timingTrace(doTrace, queryText, start, end);
-        if (qResult == null) {
-          return Collections.emptyList();
-        }
 
-        List<Object[]> list = MetastoreDirectSqlUtils.ensureList(qResult);
-        for (Object[] row : list) {
-          String colName = (String) row[0];
-          String colType = (String) row[1];
-          // Extrapolation is not needed for this column if
-          // count(\"PARTITION_NAME\")==partNames.size()
-          // Or, extrapolation is not possible for this column if
-          // count(\"PARTITION_NAME\")<2
-          Long count = MetastoreDirectSqlUtils.extractSqlLong(row[2]);
-          if (count == partNames.size() || count < 2) {
-            noExtraColumnNames.add(colName);
-          } else {
-            extraColumnNameTypeParts.put(colName, new String[] {colType, 
String.valueOf(count)});
-          }
-          Deadline.checkTimeout();
-        }
-      }
-      // Extrapolation is not needed for columns noExtraColumnNames
-      List<Object[]> list;
-      if (noExtraColumnNames.size() != 0) {
-        queryText = commonPrefix + " and \"COLUMN_NAME\" in ("
-            + makeParams(noExtraColumnNames.size()) + ")" + " and 
\"PARTITION_NAME\" in ("
-            + makeParams(partNames.size()) + ")"
-            + " and \"ENGINE\" = ? "
-            + " group by \"COLUMN_NAME\", \"COLUMN_TYPE\"";
-        start = doTrace ? System.nanoTime() : 0;
-
-        try (QueryWrapper query = new 
QueryWrapper(pm.newQuery("javax.jdo.query.SQL", queryText))) {
-          Object qResult = executeWithArray(query.getInnerQuery(),
-              prepareParams(catName, dbName, tableName, partNames, 
noExtraColumnNames, engine), queryText);
-          if (qResult == null) {
-            return Collections.emptyList();
-          }
-          list = MetastoreDirectSqlUtils.ensureList(qResult);
-          for (Object[] row : list) {
-            colStats.add(prepareCSObjWithAdjustedNDV(row, 0,
-                useDensityFunctionForNDVEstimation, ndvTuner));
-            Deadline.checkTimeout();
-          }
-          end = doTrace ? System.nanoTime() : 0;
-          MetastoreDirectSqlUtils.timingTrace(doTrace, queryText, start, end);
+    List<ColumnStatisticsObj> colStats = new ArrayList<>(colNames.size());
+    List<Object[]> partialStatsRows = new ArrayList<>(colNames.size());
+    columnWiseStatsMerger(queryText, catName, dbName, tableName,
+            colNames, partNames, colStats, partialStatsRows,
+            engine, useDensityFunctionForNDVEstimation, ndvTuner, doTrace);
+
+    // Extrapolation is needed for partialStatsRows.
+    if (partialStatsRows.size() != 0) {
+      Map<String, Integer> indexMap = new HashMap<String, Integer>();
+      for (int index = 0; index < partNames.size(); index++) {
+        indexMap.put(partNames.get(index), index);
+      }
+
+      for (Object[] row : partialStatsRows) {
+        String colName = row[COLNAME.idx()].toString();
+        String colType = row[COLTYPE.idx()].toString();
+        BigDecimal countVal = new BigDecimal(row[COUNT_ROWS.idx()].toString());
+
+        // use linear extrapolation. more complicated one can be added in the
+        // future.
+        IExtrapolatePartStatus extrapolateMethod = new 
LinearExtrapolatePartStatus();
+        // fill in colstatus
+        Integer[] index;
+        boolean decimal = false;
+        if (colType.toLowerCase().startsWith("decimal")) {
+          index = IExtrapolatePartStatus.indexMaps.get("decimal");
+          decimal = true;
+        } else {
+          index = IExtrapolatePartStatus.indexMaps.get(colType.toLowerCase());
         }
-      }
-      // Extrapolation is needed for extraColumnNames.
-      // give a sequence number for all the partitions
-      if (extraColumnNameTypeParts.size() != 0) {
-        Map<String, Integer> indexMap = new HashMap<String, Integer>();
-        for (int index = 0; index < partNames.size(); index++) {
-          indexMap.put(partNames.get(index), index);
+        // if the colType is not the known type, long, double, etc, then get
+        // all index.
+        if (index == null) {
+          index = IExtrapolatePartStatus.indexMaps.get("default");
         }
-        // get sum for all columns to reduce the number of queries
-        Map<String, Map<Integer, Object>> sumMap = new HashMap<String, 
Map<Integer, Object>>();
-        queryText = "select \"COLUMN_NAME\", sum(\"NUM_NULLS\"), 
sum(\"NUM_TRUES\"), sum(\"NUM_FALSES\"), sum(\"NUM_DISTINCTS\")"
-            + " from " + PART_COL_STATS
-            + " inner join " + PARTITIONS + " on " + PART_COL_STATS + 
".\"PART_ID\" = " + PARTITIONS + ".\"PART_ID\""
-            + " inner join " + TBLS + " on " + PARTITIONS + ".\"TBL_ID\" = " + 
TBLS + ".\"TBL_ID\""
-            + " inner join " + DBS + " on " + TBLS + ".\"DB_ID\" = " + DBS + 
".\"DB_ID\""
-            + " where " + DBS + ".\"CTLG_NAME\" = ? and " + DBS + ".\"NAME\" = 
? and " + TBLS + ".\"TBL_NAME\" = ? "
-            + " and " + PART_COL_STATS + ".\"COLUMN_NAME\" in (" + 
makeParams(extraColumnNameTypeParts.size()) + ")"
-            + " and " + PARTITIONS + ".\"PART_NAME\" in (" + 
makeParams(partNames.size()) + ")"
-            + " and " + PART_COL_STATS + ".\"ENGINE\" = ? "
-            + " group by " + PART_COL_STATS + ".\"COLUMN_NAME\"";
-        start = doTrace ? System.nanoTime() : 0;
-        try (QueryWrapper query = new 
QueryWrapper(pm.newQuery("javax.jdo.query.SQL", queryText))) {
-          List<String> extraColumnNames = new ArrayList<String>();
-          extraColumnNames.addAll(extraColumnNameTypeParts.keySet());
-          Object qResult = executeWithArray(query.getInnerQuery(),
-              prepareParams(catName, dbName, tableName, partNames,
-                  extraColumnNames, engine), queryText);
-          if (qResult == null) {
-            return Collections.emptyList();
-          }
-          list = MetastoreDirectSqlUtils.ensureList(qResult);
-          // see the indexes for colstats in IExtrapolatePartStatus
-          Integer[] sumIndex = new Integer[] {6, 10, 11, 15};
-          for (Object[] row : list) {
-            Map<Integer, Object> indexToObject = new HashMap<Integer, 
Object>();
-            for (int ind = 1; ind < row.length; ind++) {
-              indexToObject.put(sumIndex[ind - 1], row[ind]);
+
+        for (int colStatIndex : index) {
+          String colStatName = 
IExtrapolatePartStatus.colStatNames[colStatIndex];
+          // if the aggregation type is sum, we do a scale-up
+          if (IExtrapolatePartStatus.aggrTypes[colStatIndex] == 
IExtrapolatePartStatus.AggrType.Sum) {
+            // +3 only for the case of SUM_NUM_DISTINCTS which is after count 
rows index
+            int rowIndex = (colStatIndex == 15) ? colStatIndex + 3 : 
colStatIndex + 2;
+            if (row[rowIndex] != null) {
+              Long val = MetastoreDirectSqlUtils.extractSqlLong(row[rowIndex]);
+              row[rowIndex] = val / countVal.longValue() * (partNames.size());
             }
-            // row[0] is the column name
-            sumMap.put((String) row[0], indexToObject);
-            Deadline.checkTimeout();
-          }
-          end = doTrace ? System.nanoTime() : 0;
-          MetastoreDirectSqlUtils.timingTrace(doTrace, queryText, start, end);
-        }
-        for (Map.Entry<String, String[]> entry : 
extraColumnNameTypeParts.entrySet()) {
-          Object[] row = new Object[IExtrapolatePartStatus.colStatNames.length 
+ 2];
-          String colName = entry.getKey();
-          String colType = entry.getValue()[0];
-          Long sumVal = Long.parseLong(entry.getValue()[1]);
-          // fill in colname
-          row[0] = colName;
-          // fill in coltype
-          row[1] = colType;
-          // use linear extrapolation. more complicated one can be added in the
-          // future.
-          IExtrapolatePartStatus extrapolateMethod = new 
LinearExtrapolatePartStatus();
-          // fill in colstatus
-          Integer[] index = null;
-          boolean decimal = false;
-          if (colType.toLowerCase().startsWith("decimal")) {
-            index = IExtrapolatePartStatus.indexMaps.get("decimal");
-            decimal = true;
-          } else {
-            index = 
IExtrapolatePartStatus.indexMaps.get(colType.toLowerCase());
-          }
-          // if the colType is not the known type, long, double, etc, then get
-          // all index.
-          if (index == null) {
-            index = IExtrapolatePartStatus.indexMaps.get("default");
-          }
-          for (int colStatIndex : index) {
-            String colStatName = 
IExtrapolatePartStatus.colStatNames[colStatIndex];
-            // if the aggregation type is sum, we do a scale-up
-            if (IExtrapolatePartStatus.aggrTypes[colStatIndex] == 
IExtrapolatePartStatus.AggrType.Sum) {
-              Object o = sumMap.get(colName).get(colStatIndex);
-              if (o == null) {
-                row[2 + colStatIndex] = null;
-              } else {
-                Long val = MetastoreDirectSqlUtils.extractSqlLong(o);
-                row[2 + colStatIndex] = val / sumVal * (partNames.size());
-              }
-            } else if (IExtrapolatePartStatus.aggrTypes[colStatIndex] == 
IExtrapolatePartStatus.AggrType.Min
-                || IExtrapolatePartStatus.aggrTypes[colStatIndex] == 
IExtrapolatePartStatus.AggrType.Max) {
-              // if the aggregation type is min/max, we extrapolate from the
-              // left/right borders
-              if (!decimal) {
-                queryText = "select \"" + colStatName + "\",\"PART_NAME\" from 
" + PART_COL_STATS
-                    + " inner join " + PARTITIONS + " on " + PART_COL_STATS + 
".\"PART_ID\" = " + PARTITIONS + ".\"PART_ID\""
-                    + " inner join " + TBLS + " on " + PARTITIONS + 
".\"TBL_ID\" = " + TBLS + ".\"TBL_ID\""
-                    + " inner join " + DBS + " on " + TBLS + ".\"DB_ID\" = " + 
DBS + ".\"DB_ID\""
-                    + " where " + DBS + ".\"CTLG_NAME\" = ? and " + DBS + 
".\"NAME\" = ? and " + TBLS + ".\"TBL_NAME\" = ? "
-                    + " and " + PART_COL_STATS + ".\"COLUMN_NAME\" = ? "
-                    + " and " + PARTITIONS + ".\"PART_NAME\" in (" + 
makeParams(partNames.size()) + ")"
-                    + " and " + PART_COL_STATS + ".\"ENGINE\" = ? "
-                    + " order by \"" + colStatName + "\"";
-              } else {
-                queryText = "select \"" + colStatName + "\",\"PART_NAME\" from 
" + PART_COL_STATS
+          } else if (IExtrapolatePartStatus.aggrTypes[colStatIndex] == 
IExtrapolatePartStatus.AggrType.Min ||
+                  IExtrapolatePartStatus.aggrTypes[colStatIndex] == 
IExtrapolatePartStatus.AggrType.Max) {
+            // if the aggregation type is min/max, we extrapolate from the
+            // left/right borders
+            String orderByExpr = decimal ? "cast(\"" + colStatName + "\" as 
decimal)" : "\"" + colStatName + "\"";
+
+            queryText = "select \"" + colStatName + "\",\"PART_NAME\" from " + 
PART_COL_STATS
                     + " inner join " + PARTITIONS + " on " + PART_COL_STATS + 
".\"PART_ID\" = " + PARTITIONS + ".\"PART_ID\""
                     + " inner join " + TBLS + " on " + PARTITIONS + 
".\"TBL_ID\" = " + TBLS + ".\"TBL_ID\""
                     + " inner join " + DBS + " on " + TBLS + ".\"DB_ID\" = " + 
DBS + ".\"DB_ID\""
                     + " where " + DBS + ".\"CTLG_NAME\" = ? and " + DBS + 
".\"NAME\" = ? and " + TBLS + ".\"TBL_NAME\" = ? "
-                    + " and " + PART_COL_STATS + ".\"COLUMN_NAME\" = ? "
-                    + " and " + PARTITIONS + ".\"PART_NAME\" in (" + 
makeParams(partNames.size()) + ")"
+                    + " and " + PART_COL_STATS + ".\"COLUMN_NAME\" in (%1$s)"
+                    + " and " + PARTITIONS + ".\"PART_NAME\" in (%2$s)"
                     + " and " + PART_COL_STATS + ".\"ENGINE\" = ? "
-                    + " order by cast(\"" + colStatName + "\" as decimal)";
-              }
-              start = doTrace ? System.nanoTime() : 0;
-              try (QueryWrapper query = new 
QueryWrapper(pm.newQuery("javax.jdo.query.SQL", queryText))) {
-                Object qResult = executeWithArray(query.getInnerQuery(),
-                    prepareParams(catName, dbName, tableName, partNames, 
Arrays.asList(colName), engine), queryText);
-                if (qResult == null) {
-                  return Collections.emptyList();
-                }
-                fqr = (ForwardQueryResult<?>) qResult;
-                Object[] min = (Object[]) (fqr.get(0));
-                Object[] max = (Object[]) (fqr.get(fqr.size() - 1));
-                end = doTrace ? System.nanoTime() : 0;
-                MetastoreDirectSqlUtils.timingTrace(doTrace, queryText, start, 
end);
-                if (min[0] == null || max[0] == null) {
-                  row[2 + colStatIndex] = null;
-                } else {
-                  row[2 + colStatIndex] = extrapolateMethod
-                      .extrapolate(min, max, colStatIndex, indexMap);
+                    + " order by " + orderByExpr;
+
+            Batchable<String, Object[]> columnWisePartitionBatches =
+                    columnWisePartitionBatcher(queryText, catName, dbName, 
tableName, partNames, engine, doTrace);
+            try {
+              List<Object[]> list = Batchable.runBatched(batchSize, 
Collections.singletonList(colName), columnWisePartitionBatches);
+              Object[] min = list.getFirst();
+              Object[] max = list.getLast();
+              if (batchSize > 0) {
+                for (int i = Math.min(batchSize - 1, list.size() - 1); i < 
list.size(); i += batchSize) {
+                  Object[] posMax = list.get(i);
+                  if (new BigDecimal(max[0].toString()).compareTo(new 
BigDecimal(posMax[0].toString())) < 0) {
+                    max = posMax;
+                  }
+                  int j = i + 1;
+                  if (j < list.size()) {
+                    Object[] posMin = list.get(j);
+                    if (new BigDecimal(min[0].toString()).compareTo(new 
BigDecimal(posMin[0].toString())) > 0) {
+                      min = posMin;
+                    }
+                  }
                 }
               }
-            } else {
-              // if the aggregation type is avg, we use the average on the 
existing ones.
-              queryText = "select "
-                  + 
"avg((\"LONG_HIGH_VALUE\"-\"LONG_LOW_VALUE\")/cast(\"NUM_DISTINCTS\" as 
decimal)),"
-                  + 
"avg((\"DOUBLE_HIGH_VALUE\"-\"DOUBLE_LOW_VALUE\")/\"NUM_DISTINCTS\"),"
-                  + "avg((cast(\"BIG_DECIMAL_HIGH_VALUE\" as 
decimal)-cast(\"BIG_DECIMAL_LOW_VALUE\" as decimal))/\"NUM_DISTINCTS\")"
-                  + " from " + PART_COL_STATS + ""
-                  + " inner join " + PARTITIONS + " on " + PART_COL_STATS + 
".\"PART_ID\" = " + PARTITIONS + ".\"PART_ID\""
-                  + " inner join " + TBLS + " on " + PARTITIONS + ".\"TBL_ID\" 
= " + TBLS + ".\"TBL_ID\""
-                  + " inner join " + DBS + " on " + TBLS + ".\"DB_ID\" = " + 
DBS + ".\"DB_ID\""
-                  + " where " + DBS + ".\"CTLG_NAME\" = ? and " + DBS + 
".\"NAME\" = ? and " + TBLS + ".\"TBL_NAME\" = ? "
-                  + " and " + PART_COL_STATS + ".\"COLUMN_NAME\" = ? "
-                  + " and " + PARTITIONS + ".\"PART_NAME\" in (" + 
makeParams(partNames.size()) + ")"
-                  + " and " + PART_COL_STATS + ".\"ENGINE\" = ? "
-                  + " group by \"COLUMN_NAME\"";
-              start = doTrace ? System.nanoTime() : 0;
-              try(QueryWrapper query = new 
QueryWrapper(pm.newQuery("javax.jdo.query.SQL", queryText))) {
-                Object qResult = executeWithArray(query.getInnerQuery(),
-                    prepareParams(catName, dbName, tableName, partNames, 
Arrays.asList(colName), engine), queryText);
-                if (qResult == null) {
-                  return Collections.emptyList();
-                }
-                fqr = (ForwardQueryResult<?>) qResult;
-                Object[] avg = (Object[]) (fqr.get(0));
-                // colStatIndex=12,13,14 respond to "AVG_LONG", "AVG_DOUBLE",
-                // "AVG_DECIMAL"
-                row[2 + colStatIndex] = avg[colStatIndex - 12];
-                end = doTrace ? System.nanoTime() : 0;
-                MetastoreDirectSqlUtils.timingTrace(doTrace, queryText, start, 
end);
+              if (min[0] == null || max[0] == null) {
+                row[2 + colStatIndex] = null;
+              } else {
+                row[2 + colStatIndex] = extrapolateMethod.extrapolate(min, 
max, colStatIndex, indexMap);
               }
+            } finally {
+              columnWisePartitionBatches.closeAllQueries();
             }
           }
-          colStats.add(prepareCSObjWithAdjustedNDV(row, 0, 
useDensityFunctionForNDVEstimation, ndvTuner));
-          Deadline.checkTimeout();
         }
+        colStats.add(prepareCSObjWithAdjustedNDV
+                (row, useDensityFunctionForNDVEstimation, ndvTuner));
+        Deadline.checkTimeout();
       }
-      return colStats;
     }
+    return colStats;
   }
 
-  private ColumnStatisticsObj prepareCSObj (Object[] row, int i) throws 
MetaException {
+  private void columnWiseStatsMerger(
+          final String queryText, final String catName, final String dbName,
+          final String tableName, final List<String> colNames, final 
List<String> partNames,
+          final List<ColumnStatisticsObj> colStats, final List<Object[]> 
partialStatsRows, final String engine,
+          final boolean useDensityFunctionForNDVEstimation, final double 
ndvTuner,
+          final boolean doTrace
+  ) throws MetaException {
+    Batchable<String, Object[]> columnWisePartitionBatches =
+            columnWisePartitionBatcher(queryText, catName, dbName, tableName, 
partNames, engine, doTrace);
+    try {
+      List<Object[]> unmergedColStatslist = Batchable.runBatched(batchSize, 
colNames, columnWisePartitionBatches);
+      Map<String, Object[]> mergedColStatsMap = new HashMap<>();
+      for (Object[] unmergedRow : unmergedColStatslist) {
+        String colName = (String) unmergedRow[0];
+        Object[] mergedRow = mergedColStatsMap.getOrDefault(colName, new 
Object[21]);
+        mergeBackendDBStats(mergedRow, unmergedRow);
+        mergedColStatsMap.put(colName, mergedRow);
+      }
+
+      for (Map.Entry<String, Object[]> entry : mergedColStatsMap.entrySet()) {
+        BigDecimal partCount = new 
BigDecimal(entry.getValue()[COUNT_ROWS.idx()].toString());
+        if (partCount.equals(new BigDecimal(partNames.size()))) {

Review Comment:
   done, corrected it, somehow got left out while simplification



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to