kasakrisz commented on code in PR #6089:
URL: https://github.com/apache/hive/pull/6089#discussion_r2588754346


##########
itests/qtest/src/test/java/org/apache/hadoop/hive/cli/TestTezBatchedStatsCliDriver.java:
##########
@@ -0,0 +1,64 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.hive.cli;
+
+import org.apache.hadoop.hive.cli.control.CliAdapter;
+import org.apache.hadoop.hive.cli.control.CliConfigs;
+import org.junit.ClassRule;
+import org.junit.Rule;
+import org.junit.Test;
+import org.junit.rules.TestRule;
+import org.junit.runner.RunWith;
+import org.junit.runners.Parameterized;
+import org.junit.runners.Parameterized.Parameters;
+
+import java.io.File;
+import java.util.List;
+
+@RunWith(Parameterized.class)
+public class TestTezBatchedStatsCliDriver {

Review Comment:
   IMHO there is no need to add a new driver for testing the 
`ObjectStore.get_aggr_stats_for` method. Doing so would imply that we need new 
drivers to cover all combinations of `hive.stats.fetch.bitvector`, 
`hive.stats.fetch.kll`, `hive.metastore.direct.sql.batch.size`, etc. The number 
of drivers would explode if we followed this pattern.
   
   
   Please take a look at an existing test case. It is not a q-test–based one, 
but a good integration test that focuses only on `ObjectStore`.
   
https://github.com/apache/hive/blob/ca105f8124072d19d88a83b2ced613d326c9a26b/standalone-metastore/metastore-server/src/test/java/org/apache/hadoop/hive/metastore/TestObjectStore.java#L1025-L1047
   (The test method name `testAggrStatsUseDB` is misleading, though, because 
the test actually calls the `ObjectStore.get_aggr_stats_for` method, which 
eventually calls `MetaStoreDirectSql.testAggrStatsUseDB`.)
   
   Configurations can be easily changed in these kinds of tests via the 
MetastoreConf object. We also have better ways to write assertions. These tests 
run faster than q-tests.
   
   I temporarily added:
   ```
   MetastoreConf.setLongVar(conf2, ConfVars.DIRECT_SQL_PARTITION_BATCH_SIZE, 2);
   ```
   and I was able to reproduce the bug described in 
[HIVE-29203](https://issues.apache.org/jira/browse/HIVE-29203). Both with 
aggregating in the backend db (`aggrStatsUseDB`) and HMS side 
(`aggrStatsUseJava`).
   



##########
standalone-metastore/metastore-server/src/main/java/org/apache/hadoop/hive/metastore/MetaStoreDirectSql.java:
##########
@@ -2174,86 +2270,132 @@ private List<ColumnStatisticsObj> 
aggrStatsUseDB(String catName, String dbName,
                 || IExtrapolatePartStatus.aggrTypes[colStatIndex] == 
IExtrapolatePartStatus.AggrType.Max) {
               // if the aggregation type is min/max, we extrapolate from the
               // left/right borders
-              if (!decimal) {
-                queryText = "select \"" + colStatName + "\",\"PART_NAME\" from 
" + PART_COL_STATS
-                    + " inner join " + PARTITIONS + " on " + PART_COL_STATS + 
".\"PART_ID\" = " + PARTITIONS + ".\"PART_ID\""
-                    + " inner join " + TBLS + " on " + PARTITIONS + 
".\"TBL_ID\" = " + TBLS + ".\"TBL_ID\""
-                    + " inner join " + DBS + " on " + TBLS + ".\"DB_ID\" = " + 
DBS + ".\"DB_ID\""
-                    + " where " + DBS + ".\"CTLG_NAME\" = ? and " + DBS + 
".\"NAME\" = ? and " + TBLS + ".\"TBL_NAME\" = ? "
-                    + " and " + PART_COL_STATS + ".\"COLUMN_NAME\" = ? "
-                    + " and " + PARTITIONS + ".\"PART_NAME\" in (" + 
makeParams(partNames.size()) + ")"
-                    + " and " + PART_COL_STATS + ".\"ENGINE\" = ? "
-                    + " order by \"" + colStatName + "\"";
-              } else {
-                queryText = "select \"" + colStatName + "\",\"PART_NAME\" from 
" + PART_COL_STATS
-                    + " inner join " + PARTITIONS + " on " + PART_COL_STATS + 
".\"PART_ID\" = " + PARTITIONS + ".\"PART_ID\""
-                    + " inner join " + TBLS + " on " + PARTITIONS + 
".\"TBL_ID\" = " + TBLS + ".\"TBL_ID\""
-                    + " inner join " + DBS + " on " + TBLS + ".\"DB_ID\" = " + 
DBS + ".\"DB_ID\""
-                    + " where " + DBS + ".\"CTLG_NAME\" = ? and " + DBS + 
".\"NAME\" = ? and " + TBLS + ".\"TBL_NAME\" = ? "
-                    + " and " + PART_COL_STATS + ".\"COLUMN_NAME\" = ? "
-                    + " and " + PARTITIONS + ".\"PART_NAME\" in (" + 
makeParams(partNames.size()) + ")"
-                    + " and " + PART_COL_STATS + ".\"ENGINE\" = ? "
-                    + " order by cast(\"" + colStatName + "\" as decimal)";
-              }
-              start = doTrace ? System.nanoTime() : 0;
-              try (QueryWrapper query = new 
QueryWrapper(pm.newQuery("javax.jdo.query.SQL", queryText))) {
-                Object qResult = executeWithArray(query.getInnerQuery(),
-                    prepareParams(catName, dbName, tableName, partNames, 
Arrays.asList(colName), engine), queryText);
-                if (qResult == null) {
-                  return Collections.emptyList();
-                }
-                fqr = (ForwardQueryResult<?>) qResult;
-                Object[] min = (Object[]) (fqr.get(0));
-                Object[] max = (Object[]) (fqr.get(fqr.size() - 1));
-                end = doTrace ? System.nanoTime() : 0;
-                MetastoreDirectSqlUtils.timingTrace(doTrace, queryText, start, 
end);
-                if (min[0] == null || max[0] == null) {
-                  row[2 + colStatIndex] = null;
-                } else {
-                  row[2 + colStatIndex] = extrapolateMethod
-                      .extrapolate(min, max, colStatIndex, indexMap);
-                }
-              }
-            } else {
-              // if the aggregation type is avg, we use the average on the 
existing ones.
-              queryText = "select "
-                  + 
"avg((\"LONG_HIGH_VALUE\"-\"LONG_LOW_VALUE\")/cast(\"NUM_DISTINCTS\" as 
decimal)),"
-                  + 
"avg((\"DOUBLE_HIGH_VALUE\"-\"DOUBLE_LOW_VALUE\")/\"NUM_DISTINCTS\"),"
-                  + "avg((cast(\"BIG_DECIMAL_HIGH_VALUE\" as 
decimal)-cast(\"BIG_DECIMAL_LOW_VALUE\" as decimal))/\"NUM_DISTINCTS\")"
-                  + " from " + PART_COL_STATS + ""
+              String orderByExpr = decimal ? "cast(\"" + colStatName + "\" as 
decimal)" : "\"" + colStatName + "\"";
+
+              queryText = "select \"" + colStatName + "\",\"PART_NAME\" from " 
+ PART_COL_STATS
                   + " inner join " + PARTITIONS + " on " + PART_COL_STATS + 
".\"PART_ID\" = " + PARTITIONS + ".\"PART_ID\""
                   + " inner join " + TBLS + " on " + PARTITIONS + ".\"TBL_ID\" 
= " + TBLS + ".\"TBL_ID\""
                   + " inner join " + DBS + " on " + TBLS + ".\"DB_ID\" = " + 
DBS + ".\"DB_ID\""
                   + " where " + DBS + ".\"CTLG_NAME\" = ? and " + DBS + 
".\"NAME\" = ? and " + TBLS + ".\"TBL_NAME\" = ? "
-                  + " and " + PART_COL_STATS + ".\"COLUMN_NAME\" = ? "
-                  + " and " + PARTITIONS + ".\"PART_NAME\" in (" + 
makeParams(partNames.size()) + ")"
+                  + " and " + PART_COL_STATS + ".\"COLUMN_NAME\" in (%1$s)"
+                  + " and " + PARTITIONS + ".\"PART_NAME\" in (%2$s)"
                   + " and " + PART_COL_STATS + ".\"ENGINE\" = ? "
-                  + " group by \"COLUMN_NAME\"";
-              start = doTrace ? System.nanoTime() : 0;
-              try(QueryWrapper query = new 
QueryWrapper(pm.newQuery("javax.jdo.query.SQL", queryText))) {
-                Object qResult = executeWithArray(query.getInnerQuery(),
-                    prepareParams(catName, dbName, tableName, partNames, 
Arrays.asList(colName), engine), queryText);
-                if (qResult == null) {
-                  return Collections.emptyList();
+                  + " order by " + orderByExpr;
+
+              columnWisePartitionBatches =
+                      columnWisePartitionBatcher(queryText, catName, dbName, 
tableName, partNames, engine, doTrace);
+              try {
+                list = Batchable.runBatched(batchSize, Arrays.asList(colName), 
columnWisePartitionBatches);
+                Object[] min = list.getFirst();
+                Object[] max = list.getLast();
+                for (int i = Math.min(batchSize - 1, list.size() - 1); i < 
list.size(); i += batchSize) {
+                  Object[] posMax = list.get(i);
+                  if (new BigDecimal(max[0].toString()).compareTo(new 
BigDecimal(posMax[0].toString())) < 0) {
+                    max = posMax;
+                  }
+                  int j = i + 1;
+                  if (j < list.size()) {
+                    Object[] posMin = list.get(j);
+                    if (new BigDecimal(min[0].toString()).compareTo(new 
BigDecimal(posMin[0].toString())) > 0) {
+                      min = posMin;
+                    }
+                  }
+                }
+                if (min[0] == null || max[0] == null) {
+                  row[2 + colStatIndex] = null;
+                } else {
+                  row[2 + colStatIndex] = extrapolateMethod.extrapolate(min, 
max, colStatIndex, indexMap);
                 }
-                fqr = (ForwardQueryResult<?>) qResult;
-                Object[] avg = (Object[]) (fqr.get(0));
-                // colStatIndex=12,13,14 respond to "AVG_LONG", "AVG_DOUBLE",
-                // "AVG_DECIMAL"
-                row[2 + colStatIndex] = avg[colStatIndex - 12];
-                end = doTrace ? System.nanoTime() : 0;
-                MetastoreDirectSqlUtils.timingTrace(doTrace, queryText, start, 
end);
+              } finally {
+                columnWisePartitionBatches.closeAllQueries();
               }
+            } else {
+              // colStatIndex=12,13,14 respond to "AVG_LONG", "AVG_DOUBLE",
+              // "AVG_DECIMAL"
+              row[2 + colStatIndex] = avg[colStatIndex - 12];
             }
           }
-          colStats.add(prepareCSObjWithAdjustedNDV(row, 0, 
useDensityFunctionForNDVEstimation, ndvTuner));
+          colStats.add(columnStatisticsObjWithAdjustedNDV
+                  (Collections.singletonList(row), 
useDensityFunctionForNDVEstimation, ndvTuner));
           Deadline.checkTimeout();
         }
       }
       return colStats;
     }
   }
 
+  private ColumnStatisticsObj columnStatisticsObjWithAdjustedNDV(
+          List<Object[]> columnBatchesOutput,
+          boolean useDensityFunctionForNDVEstimation, double ndvTuner)
+          throws MetaException {
+    if (columnBatchesOutput.isEmpty()) {
+      return null;
+    }
+    ColumnStatisticsData data = new ColumnStatisticsData();
+    Object[] row = columnBatchesOutput.getFirst();
+    String colName = (String) row[COLNAME];
+    String colType = (String) row[COLTYPE];
+    ColumnStatisticsObj cso = new ColumnStatisticsObj(colName, colType, data);
+    Object llow = row[LLOW];
+    Object lhigh = row[LHIGH];
+    Object dlow = row[DLOW];
+    Object dhigh = row[DHIGH];
+    Object declow = row[DECLOW];
+    Object dechigh = row[DECHIGH];
+    Object nulls = row[NULLS];
+    Object dist = row[DIST];
+    Object avglen = row[AVGLEN];
+    Object maxlen = row[MAXLEN];
+    Object trues = row[TRUES];
+    Object falses = row[FALSES];
+    Object avgLong = row[AVGLONG];
+    Object avgDouble = row[AVGDOUBLE];
+    Object avgDecimal = row[AVGDECIMAL];
+    Object sumDist = row[SUMDIST];
+    if (row.length == 18) {
+      StatObjectConverter.fillColumnStatisticsData(cso.getColType(), data, 
llow, lhigh, dlow, dhigh, declow, dechigh,
+              nulls, dist, avglen, maxlen, trues, falses, avgLong, avgDouble, 
avgDecimal, sumDist,
+              useDensityFunctionForNDVEstimation, ndvTuner);
+      return cso;
+    }

Review Comment:
   Why is this check needed? The schema of the query loading the stats is fix. 
The column list is defined in the query text.
   Also when somehow the number of columns is 18 the aggregation is skipped and 
the `ColumnStatisticsData` objects is built by only the first row.



##########
standalone-metastore/metastore-server/src/main/java/org/apache/hadoop/hive/metastore/MetaStoreDirectSql.java:
##########
@@ -2098,29 +2121,39 @@ private List<ColumnStatisticsObj> aggrStatsUseDB(String 
catName, String dbName,
         }
         // get sum for all columns to reduce the number of queries
         Map<String, Map<Integer, Object>> sumMap = new HashMap<String, 
Map<Integer, Object>>();
-        queryText = "select \"COLUMN_NAME\", sum(\"NUM_NULLS\"), 
sum(\"NUM_TRUES\"), sum(\"NUM_FALSES\"), sum(\"NUM_DISTINCTS\")"
-            + " from " + PART_COL_STATS
-            + " inner join " + PARTITIONS + " on " + PART_COL_STATS + 
".\"PART_ID\" = " + PARTITIONS + ".\"PART_ID\""
-            + " inner join " + TBLS + " on " + PARTITIONS + ".\"TBL_ID\" = " + 
TBLS + ".\"TBL_ID\""
-            + " inner join " + DBS + " on " + TBLS + ".\"DB_ID\" = " + DBS + 
".\"DB_ID\""
-            + " where " + DBS + ".\"CTLG_NAME\" = ? and " + DBS + ".\"NAME\" = 
? and " + TBLS + ".\"TBL_NAME\" = ? "
-            + " and " + PART_COL_STATS + ".\"COLUMN_NAME\" in (" + 
makeParams(extraColumnNameTypeParts.size()) + ")"
-            + " and " + PARTITIONS + ".\"PART_NAME\" in (" + 
makeParams(partNames.size()) + ")"
-            + " and " + PART_COL_STATS + ".\"ENGINE\" = ? "
-            + " group by " + PART_COL_STATS + ".\"COLUMN_NAME\"";
-        start = doTrace ? System.nanoTime() : 0;
-        try (QueryWrapper query = new 
QueryWrapper(pm.newQuery("javax.jdo.query.SQL", queryText))) {
+        queryText =
+            "select \"COLUMN_NAME\", sum(\"NUM_NULLS\"), sum(\"NUM_TRUES\"), 
sum(\"NUM_FALSES\"), sum(\"NUM_DISTINCTS\")"
+                + " from " + PART_COL_STATS + " inner join " + PARTITIONS + " 
on " + PART_COL_STATS + ".\"PART_ID\" = "
+                + PARTITIONS + ".\"PART_ID\"" + " inner join " + TBLS + " on " 
+ PARTITIONS + ".\"TBL_ID\" = " + TBLS
+                + ".\"TBL_ID\"" + " inner join " + DBS + " on " + TBLS + 
".\"DB_ID\" = " + DBS + ".\"DB_ID\""
+                + " where " + DBS + ".\"CTLG_NAME\" = ? and " + DBS + 
".\"NAME\" = ? and " + TBLS + ".\"TBL_NAME\" = ? "
+                + " and " + PART_COL_STATS + ".\"COLUMN_NAME\" in (" + 
makeParams(extraColumnNameTypeParts.size()) + ")"
+                + " and " + PARTITIONS + ".\"PART_NAME\" in (" + 
makeParams(partNames.size()) + ")" + " and "
+                + PART_COL_STATS + ".\"ENGINE\" = ? " + " group by " + 
PART_COL_STATS + ".\"COLUMN_NAME\"";
+
+        b = jobsBatching(queryText, catName, dbName, tableName, partNames, 
engine, doTrace);
+        try {
           List<String> extraColumnNames = new ArrayList<String>();
           extraColumnNames.addAll(extraColumnNameTypeParts.keySet());
-          Object qResult = executeWithArray(query.getInnerQuery(),
-              prepareParams(catName, dbName, tableName, partNames,
-                  extraColumnNames, engine), queryText);
-          if (qResult == null) {
-            return Collections.emptyList();
+          List<Object[]> unmergedList = Batchable.runBatched(batchSize, 
extraColumnNames, b);
+          Map<String, List<Object[]>> colSubList = 
columnWiseSubList(unmergedList);
+          List<Object[]> mergedList = new ArrayList<>();
+          for (Map.Entry<String, List<Object[]>> entry : 
colSubList.entrySet()) {

Review Comment:
   It seem the data is merged a few lines later:
   ```
   for (Object[] row : subList) {
                 mergedRow[1] = MetastoreDirectSqlUtils.sum(mergedRow[1], 
row[1]);
                 mergedRow[2] = MetastoreDirectSqlUtils.sum(mergedRow[2], 
row[2]);
                 mergedRow[3] = MetastoreDirectSqlUtils.sum(mergedRow[3], 
row[3]);
                 mergedRow[4] = MetastoreDirectSqlUtils.sum(mergedRow[4], 
row[4]);
               }
   ```
   
   Without transforming the `unmergedList` to `colSubList` the merge could be 
performed directly on the elements of  `unmergedList`: similar to above logic 
but instead of storing the partial results of the aggregation in one 
`mergeRow`, a map of `mergeRows` are need. One per column name.
   



##########
standalone-metastore/metastore-server/src/main/java/org/apache/hadoop/hive/metastore/MetaStoreDirectSql.java:
##########
@@ -1977,116 +2020,96 @@ private List<ColumnStatisticsObj> 
aggrStatsUseDB(String catName, String dbName,
         // And, we also guarantee that the estimation makes sense by comparing 
it to the
         // UpperBound (calculated by "sum(\"NUM_DISTINCTS\")")
         // and LowerBound (calculated by "max(\"NUM_DISTINCTS\")")
-        + "avg((\"LONG_HIGH_VALUE\"-\"LONG_LOW_VALUE\")/cast(\"NUM_DISTINCTS\" 
as decimal)),"
-        + 
"avg((\"DOUBLE_HIGH_VALUE\"-\"DOUBLE_LOW_VALUE\")/\"NUM_DISTINCTS\"),"
-        + "avg((cast(\"BIG_DECIMAL_HIGH_VALUE\" as 
decimal)-cast(\"BIG_DECIMAL_LOW_VALUE\" as decimal))/\"NUM_DISTINCTS\"),"
-        + "sum(\"NUM_DISTINCTS\")" + " from " + PART_COL_STATS + ""
-        + " inner join " + PARTITIONS + " on " + PART_COL_STATS + 
".\"PART_ID\" = " + PARTITIONS + ".\"PART_ID\""
-        + " inner join " + TBLS + " on " + PARTITIONS + ".\"TBL_ID\" = " + 
TBLS + ".\"TBL_ID\""
-        + " inner join " + DBS + " on " + TBLS + ".\"DB_ID\" = " + DBS + 
".\"DB_ID\""
-        + " where " + DBS + ".\"CTLG_NAME\" = ? and " + DBS + ".\"NAME\" = ? 
and " + TBLS + ".\"TBL_NAME\" = ? ";
+        + "sum((\"LONG_HIGH_VALUE\"-\"LONG_LOW_VALUE\")/cast(\"NUM_DISTINCTS\" 
as decimal)),"
+        + 
"count((\"LONG_HIGH_VALUE\"-\"LONG_LOW_VALUE\")/cast(\"NUM_DISTINCTS\" as 
decimal)),"
+        + 
"sum((\"DOUBLE_HIGH_VALUE\"-\"DOUBLE_LOW_VALUE\")/\"NUM_DISTINCTS\"),"
+        + 
"count((\"DOUBLE_HIGH_VALUE\"-\"DOUBLE_LOW_VALUE\")/\"NUM_DISTINCTS\"),"
+        + "sum((cast(\"BIG_DECIMAL_HIGH_VALUE\" as 
decimal)-cast(\"BIG_DECIMAL_LOW_VALUE\" as decimal))/\"NUM_DISTINCTS\"),"
+        + "count((cast(\"BIG_DECIMAL_HIGH_VALUE\" as 
decimal)-cast(\"BIG_DECIMAL_LOW_VALUE\" as decimal))/\"NUM_DISTINCTS\"),"
+        + "sum(\"NUM_DISTINCTS\")" + " from " + PART_COL_STATS + "" + " inner 
join " + PARTITIONS + " on "
+        + PART_COL_STATS + ".\"PART_ID\" = " + PARTITIONS + ".\"PART_ID\"" + " 
inner join " + TBLS + " on " + PARTITIONS
+        + ".\"TBL_ID\" = " + TBLS + ".\"TBL_ID\"" + " inner join " + DBS + " 
on " + TBLS + ".\"DB_ID\" = " + DBS
+        + ".\"DB_ID\"" + " where " + DBS + ".\"CTLG_NAME\" = ? and " + DBS + 
".\"NAME\" = ? and " + TBLS
+        + ".\"TBL_NAME\" = ? ";
     String queryText = null;
-    long start = 0;
-    long end = 0;
 
     boolean doTrace = LOG.isDebugEnabled();
     ForwardQueryResult<?> fqr = null;
     // Check if the status of all the columns of all the partitions exists
     // Extrapolation is not needed.
     if (areAllPartsFound) {
-      queryText = commonPrefix + " and \"COLUMN_NAME\" in (" + 
makeParams(colNames.size()) + ")"
-          + " and " + PARTITIONS + ".\"PART_NAME\" in (" + 
makeParams(partNames.size()) + ")"
-          + " and \"ENGINE\" = ? "
-          + " group by \"COLUMN_NAME\", \"COLUMN_TYPE\"";
-      start = doTrace ? System.nanoTime() : 0;
-      try (QueryWrapper query = new 
QueryWrapper(pm.newQuery("javax.jdo.query.SQL", queryText))) {
-        Object qResult = executeWithArray(query.getInnerQuery(),
-            prepareParams(catName, dbName, tableName, partNames, colNames,
-                engine), queryText);
-        if (qResult == null) {
-          return Collections.emptyList();
-        }
-        end = doTrace ? System.nanoTime() : 0;
-        MetastoreDirectSqlUtils.timingTrace(doTrace, queryText, start, end);
-        List<Object[]> list = MetastoreDirectSqlUtils.ensureList(qResult);
-        List<ColumnStatisticsObj> colStats =
-            new ArrayList<ColumnStatisticsObj>(list.size());
-        for (Object[] row : list) {
-          colStats.add(prepareCSObjWithAdjustedNDV(row, 0,
-              useDensityFunctionForNDVEstimation, ndvTuner));
+      queryText = commonPrefix + " and \"COLUMN_NAME\" in (%1$s)" + " and " + 
PARTITIONS + ".\"PART_NAME\" in (%2$s)"
+          + " and \"ENGINE\" = ? " + " group by \"COLUMN_NAME\", 
\"COLUMN_TYPE\"";
+      Batchable<String, Object[]> b = jobsBatching(queryText, catName, dbName, 
tableName, partNames, engine, doTrace);
+      List<ColumnStatisticsObj> colStats = new ArrayList<>(colNames.size());
+      try {
+        List<Object[]> list = Batchable.runBatched(batchSize, colNames, b);
+        Map<String, List<Object[]>> colSubList = columnWiseSubList(list);
+        for (Map.Entry<String, List<Object[]>> entry : colSubList.entrySet()) {

Review Comment:
   In both cases, whether `areAllPartsFound` is true or false, the same code is 
executed.
   ```
             list = Batchable.runBatched(batchSize, noExtraColumnNames, 
columnWisePartitionBatches);
             Map<String, List<Object[]>> colSubList = columnWiseSubList(list);
             for (Map.Entry<String, List<Object[]>> entry : 
colSubList.entrySet()) {
               colStats.add(
                       columnStatisticsObjWithAdjustedNDV(entry.getValue(), 
useDensityFunctionForNDVEstimation, ndvTuner));
               Deadline.checkTimeout();
             }
   
   ```
   This can be extracted to a method. 
   
   Maybe I'm missing something, but I don't see why the records can't be 
aggregated while iterating through them.
   The map value could hold the partially aggregated data instead of an 
`ArrayList`, and the method `columnStatisticsObjWithAdjustedNDV` could merge 
the partially aggregated data with each record.



##########
standalone-metastore/metastore-server/src/main/java/org/apache/hadoop/hive/metastore/MetaStoreDirectSql.java:
##########
@@ -1954,9 +2007,20 @@ private List<ColumnStatisticsObj> 
aggrStatsUseJava(String catName, String dbName
         areAllPartsFound, useDensityFunctionForNDVEstimation, ndvTuner);
   }
 
-  private List<ColumnStatisticsObj> aggrStatsUseDB(String catName, String 
dbName,
-      String tableName, List<String> partNames, List<String> colNames, String 
engine,
-      boolean areAllPartsFound, boolean useDensityFunctionForNDVEstimation, 
double ndvTuner) throws MetaException {
+  private Map<String, List<Object[]>> columnWiseSubList(List<Object[]> list) {
+    Map<String, List<Object[]>> colSubList = new HashMap<>();
+    for (Object[] row : list) {
+      String colName = (String) row[0];
+      colSubList.putIfAbsent(colName, new ArrayList<>());

Review Comment:
   Instead of adding the elements to an `ArrayList` could you please aggregate 
the elements and store the aggregated data in the map.
   



##########
standalone-metastore/metastore-server/src/main/java/org/apache/hadoop/hive/metastore/MetaStoreDirectSql.java:
##########
@@ -2174,86 +2270,132 @@ private List<ColumnStatisticsObj> 
aggrStatsUseDB(String catName, String dbName,
                 || IExtrapolatePartStatus.aggrTypes[colStatIndex] == 
IExtrapolatePartStatus.AggrType.Max) {
               // if the aggregation type is min/max, we extrapolate from the
               // left/right borders
-              if (!decimal) {
-                queryText = "select \"" + colStatName + "\",\"PART_NAME\" from 
" + PART_COL_STATS
-                    + " inner join " + PARTITIONS + " on " + PART_COL_STATS + 
".\"PART_ID\" = " + PARTITIONS + ".\"PART_ID\""
-                    + " inner join " + TBLS + " on " + PARTITIONS + 
".\"TBL_ID\" = " + TBLS + ".\"TBL_ID\""
-                    + " inner join " + DBS + " on " + TBLS + ".\"DB_ID\" = " + 
DBS + ".\"DB_ID\""
-                    + " where " + DBS + ".\"CTLG_NAME\" = ? and " + DBS + 
".\"NAME\" = ? and " + TBLS + ".\"TBL_NAME\" = ? "
-                    + " and " + PART_COL_STATS + ".\"COLUMN_NAME\" = ? "
-                    + " and " + PARTITIONS + ".\"PART_NAME\" in (" + 
makeParams(partNames.size()) + ")"
-                    + " and " + PART_COL_STATS + ".\"ENGINE\" = ? "
-                    + " order by \"" + colStatName + "\"";
-              } else {
-                queryText = "select \"" + colStatName + "\",\"PART_NAME\" from 
" + PART_COL_STATS
-                    + " inner join " + PARTITIONS + " on " + PART_COL_STATS + 
".\"PART_ID\" = " + PARTITIONS + ".\"PART_ID\""
-                    + " inner join " + TBLS + " on " + PARTITIONS + 
".\"TBL_ID\" = " + TBLS + ".\"TBL_ID\""
-                    + " inner join " + DBS + " on " + TBLS + ".\"DB_ID\" = " + 
DBS + ".\"DB_ID\""
-                    + " where " + DBS + ".\"CTLG_NAME\" = ? and " + DBS + 
".\"NAME\" = ? and " + TBLS + ".\"TBL_NAME\" = ? "
-                    + " and " + PART_COL_STATS + ".\"COLUMN_NAME\" = ? "
-                    + " and " + PARTITIONS + ".\"PART_NAME\" in (" + 
makeParams(partNames.size()) + ")"
-                    + " and " + PART_COL_STATS + ".\"ENGINE\" = ? "
-                    + " order by cast(\"" + colStatName + "\" as decimal)";
-              }
-              start = doTrace ? System.nanoTime() : 0;
-              try (QueryWrapper query = new 
QueryWrapper(pm.newQuery("javax.jdo.query.SQL", queryText))) {
-                Object qResult = executeWithArray(query.getInnerQuery(),
-                    prepareParams(catName, dbName, tableName, partNames, 
Arrays.asList(colName), engine), queryText);
-                if (qResult == null) {
-                  return Collections.emptyList();
-                }
-                fqr = (ForwardQueryResult<?>) qResult;
-                Object[] min = (Object[]) (fqr.get(0));
-                Object[] max = (Object[]) (fqr.get(fqr.size() - 1));
-                end = doTrace ? System.nanoTime() : 0;
-                MetastoreDirectSqlUtils.timingTrace(doTrace, queryText, start, 
end);
-                if (min[0] == null || max[0] == null) {
-                  row[2 + colStatIndex] = null;
-                } else {
-                  row[2 + colStatIndex] = extrapolateMethod
-                      .extrapolate(min, max, colStatIndex, indexMap);
-                }
-              }
-            } else {
-              // if the aggregation type is avg, we use the average on the 
existing ones.
-              queryText = "select "
-                  + 
"avg((\"LONG_HIGH_VALUE\"-\"LONG_LOW_VALUE\")/cast(\"NUM_DISTINCTS\" as 
decimal)),"
-                  + 
"avg((\"DOUBLE_HIGH_VALUE\"-\"DOUBLE_LOW_VALUE\")/\"NUM_DISTINCTS\"),"
-                  + "avg((cast(\"BIG_DECIMAL_HIGH_VALUE\" as 
decimal)-cast(\"BIG_DECIMAL_LOW_VALUE\" as decimal))/\"NUM_DISTINCTS\")"
-                  + " from " + PART_COL_STATS + ""
+              String orderByExpr = decimal ? "cast(\"" + colStatName + "\" as 
decimal)" : "\"" + colStatName + "\"";
+
+              queryText = "select \"" + colStatName + "\",\"PART_NAME\" from " 
+ PART_COL_STATS
                   + " inner join " + PARTITIONS + " on " + PART_COL_STATS + 
".\"PART_ID\" = " + PARTITIONS + ".\"PART_ID\""
                   + " inner join " + TBLS + " on " + PARTITIONS + ".\"TBL_ID\" 
= " + TBLS + ".\"TBL_ID\""
                   + " inner join " + DBS + " on " + TBLS + ".\"DB_ID\" = " + 
DBS + ".\"DB_ID\""
                   + " where " + DBS + ".\"CTLG_NAME\" = ? and " + DBS + 
".\"NAME\" = ? and " + TBLS + ".\"TBL_NAME\" = ? "
-                  + " and " + PART_COL_STATS + ".\"COLUMN_NAME\" = ? "
-                  + " and " + PARTITIONS + ".\"PART_NAME\" in (" + 
makeParams(partNames.size()) + ")"
+                  + " and " + PART_COL_STATS + ".\"COLUMN_NAME\" in (%1$s)"
+                  + " and " + PARTITIONS + ".\"PART_NAME\" in (%2$s)"
                   + " and " + PART_COL_STATS + ".\"ENGINE\" = ? "
-                  + " group by \"COLUMN_NAME\"";
-              start = doTrace ? System.nanoTime() : 0;
-              try(QueryWrapper query = new 
QueryWrapper(pm.newQuery("javax.jdo.query.SQL", queryText))) {
-                Object qResult = executeWithArray(query.getInnerQuery(),
-                    prepareParams(catName, dbName, tableName, partNames, 
Arrays.asList(colName), engine), queryText);
-                if (qResult == null) {
-                  return Collections.emptyList();
+                  + " order by " + orderByExpr;
+
+              columnWisePartitionBatches =
+                      columnWisePartitionBatcher(queryText, catName, dbName, 
tableName, partNames, engine, doTrace);
+              try {
+                list = Batchable.runBatched(batchSize, Arrays.asList(colName), 
columnWisePartitionBatches);
+                Object[] min = list.getFirst();
+                Object[] max = list.getLast();
+                for (int i = Math.min(batchSize - 1, list.size() - 1); i < 
list.size(); i += batchSize) {
+                  Object[] posMax = list.get(i);
+                  if (new BigDecimal(max[0].toString()).compareTo(new 
BigDecimal(posMax[0].toString())) < 0) {
+                    max = posMax;
+                  }
+                  int j = i + 1;
+                  if (j < list.size()) {
+                    Object[] posMin = list.get(j);
+                    if (new BigDecimal(min[0].toString()).compareTo(new 
BigDecimal(posMin[0].toString())) > 0) {
+                      min = posMin;
+                    }
+                  }
+                }
+                if (min[0] == null || max[0] == null) {
+                  row[2 + colStatIndex] = null;
+                } else {
+                  row[2 + colStatIndex] = extrapolateMethod.extrapolate(min, 
max, colStatIndex, indexMap);
                 }
-                fqr = (ForwardQueryResult<?>) qResult;
-                Object[] avg = (Object[]) (fqr.get(0));
-                // colStatIndex=12,13,14 respond to "AVG_LONG", "AVG_DOUBLE",
-                // "AVG_DECIMAL"
-                row[2 + colStatIndex] = avg[colStatIndex - 12];
-                end = doTrace ? System.nanoTime() : 0;
-                MetastoreDirectSqlUtils.timingTrace(doTrace, queryText, start, 
end);
+              } finally {
+                columnWisePartitionBatches.closeAllQueries();
               }
+            } else {
+              // colStatIndex=12,13,14 respond to "AVG_LONG", "AVG_DOUBLE",
+              // "AVG_DECIMAL"
+              row[2 + colStatIndex] = avg[colStatIndex - 12];
             }
           }
-          colStats.add(prepareCSObjWithAdjustedNDV(row, 0, 
useDensityFunctionForNDVEstimation, ndvTuner));
+          colStats.add(columnStatisticsObjWithAdjustedNDV
+                  (Collections.singletonList(row), 
useDensityFunctionForNDVEstimation, ndvTuner));
           Deadline.checkTimeout();
         }
       }
       return colStats;
     }
   }
 
+  private ColumnStatisticsObj columnStatisticsObjWithAdjustedNDV(
+          List<Object[]> columnBatchesOutput,
+          boolean useDensityFunctionForNDVEstimation, double ndvTuner)
+          throws MetaException {
+    if (columnBatchesOutput.isEmpty()) {
+      return null;
+    }
+    ColumnStatisticsData data = new ColumnStatisticsData();
+    Object[] row = columnBatchesOutput.getFirst();
+    String colName = (String) row[COLNAME];
+    String colType = (String) row[COLTYPE];
+    ColumnStatisticsObj cso = new ColumnStatisticsObj(colName, colType, data);
+    Object llow = row[LLOW];
+    Object lhigh = row[LHIGH];
+    Object dlow = row[DLOW];
+    Object dhigh = row[DHIGH];
+    Object declow = row[DECLOW];
+    Object dechigh = row[DECHIGH];
+    Object nulls = row[NULLS];
+    Object dist = row[DIST];
+    Object avglen = row[AVGLEN];
+    Object maxlen = row[MAXLEN];
+    Object trues = row[TRUES];
+    Object falses = row[FALSES];
+    Object avgLong = row[AVGLONG];
+    Object avgDouble = row[AVGDOUBLE];
+    Object avgDecimal = row[AVGDECIMAL];
+    Object sumDist = row[SUMDIST];
+    if (row.length == 18) {
+      StatObjectConverter.fillColumnStatisticsData(cso.getColType(), data, 
llow, lhigh, dlow, dhigh, declow, dechigh,
+              nulls, dist, avglen, maxlen, trues, falses, avgLong, avgDouble, 
avgDecimal, sumDist,
+              useDensityFunctionForNDVEstimation, ndvTuner);
+      return cso;
+    }
+    Object sumLong = row[AVGLONG];
+    Object countLong = row[AVGLONG + 1];
+    Object sumDouble = row[AVGDOUBLE + 1];
+    Object countDouble = row[AVGDOUBLE + 2];
+    Object sumDecimal = row[AVGDECIMAL + 2];
+    Object countDecimal = row[AVGDECIMAL + 3];

Review Comment:
   Why are the constants used for indexing called `AVG...`? Aren't these `sums` 
and `counts`?



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to