ayushtkn commented on code in PR #4131:
URL: https://github.com/apache/hive/pull/4131#discussion_r1161623163


##########
common/src/java/org/apache/hadoop/hive/conf/HiveConf.java:
##########
@@ -2205,9 +2205,8 @@ public static enum ConfVars {
         "padding tolerance config (hive.exec.orc.block.padding.tolerance)."),
     HIVE_ORC_CODEC_POOL("hive.use.orc.codec.pool", false,
         "Whether to use codec pool in ORC. Disable if there are bugs with 
codec reuse."),
-    HIVE_USE_STATS_FROM("hive.use.stats.from","iceberg","Use stats from 
iceberg table snapshot for query " +
-        "planning. This has three values metastore, puffin and iceberg"),
-
+    HIVE_ICEBERG_STATS_SOURCE("hive.iceberg.stats.source","iceberg","Use stats 
from iceberg table snapshot for query " +
+        "planning. This has three values metastore and iceberg"),

Review Comment:
   > This has three values metastore and iceberg
   
   what is the third value,?



##########
iceberg/iceberg-handler/src/main/java/org/apache/iceberg/mr/hive/HiveIcebergStorageHandler.java:
##########
@@ -361,6 +378,83 @@ private Table 
getTable(org.apache.hadoop.hive.ql.metadata.Table hmsTable) {
     return table;
   }
 
+
+  @Override
+  public boolean canSetColStatistics() {
+    return getStatsSource().equals(ICEBERG);
+  }
+
+  @Override
+  public boolean 
canProvideColStatistics(org.apache.hadoop.hive.ql.metadata.Table hmsTable) {
+    Table table = Catalogs.loadTable(conf, 
Utilities.getTableDesc(hmsTable).getProperties());
+    if (table.currentSnapshot() != null) {
+      Path statsPath = getStatsPath(table);
+      if (getStatsSource().equals(ICEBERG)) {

Review Comment:
   can use ```canSetColStatistics()```



##########
iceberg/iceberg-handler/src/main/java/org/apache/iceberg/mr/hive/HiveIcebergStorageHandler.java:
##########
@@ -361,6 +378,83 @@ private Table 
getTable(org.apache.hadoop.hive.ql.metadata.Table hmsTable) {
     return table;
   }
 
+
+  @Override
+  public boolean canSetColStatistics() {
+    return getStatsSource().equals(ICEBERG);
+  }
+
+  @Override
+  public boolean 
canProvideColStatistics(org.apache.hadoop.hive.ql.metadata.Table hmsTable) {
+    Table table = Catalogs.loadTable(conf, 
Utilities.getTableDesc(hmsTable).getProperties());
+    if (table.currentSnapshot() != null) {
+      Path statsPath = getStatsPath(table);
+      if (getStatsSource().equals(ICEBERG)) {
+        try (FileSystem fs = statsPath.getFileSystem(conf)) {
+          if (fs.exists(statsPath)) {
+            return true;
+          }
+        } catch (IOException e) {
+          LOG.warn(e.getMessage());
+        }
+      }
+    }
+    return false;
+  }
+
+  @Override
+  public List<ColumnStatisticsObj> 
getColStatistics(org.apache.hadoop.hive.ql.metadata.Table hmsTable) {
+    Table table = Catalogs.loadTable(conf, 
Utilities.getTableDesc(hmsTable).getProperties());
+    String statsPath = getStatsPath(table).toString();
+    LOG.info("Using stats from puffin file at:" + statsPath);

Review Comment:
   Logger format: 
   ```
       LOG.info("Using stats from puffin file at: {}", statsPath);
   ```



##########
iceberg/iceberg-handler/src/main/java/org/apache/iceberg/mr/hive/HiveIcebergStorageHandler.java:
##########
@@ -361,6 +378,83 @@ private Table 
getTable(org.apache.hadoop.hive.ql.metadata.Table hmsTable) {
     return table;
   }
 
+
+  @Override
+  public boolean canSetColStatistics() {
+    return getStatsSource().equals(ICEBERG);
+  }
+
+  @Override
+  public boolean 
canProvideColStatistics(org.apache.hadoop.hive.ql.metadata.Table hmsTable) {
+    Table table = Catalogs.loadTable(conf, 
Utilities.getTableDesc(hmsTable).getProperties());
+    if (table.currentSnapshot() != null) {
+      Path statsPath = getStatsPath(table);
+      if (getStatsSource().equals(ICEBERG)) {
+        try (FileSystem fs = statsPath.getFileSystem(conf)) {
+          if (fs.exists(statsPath)) {
+            return true;
+          }
+        } catch (IOException e) {
+          LOG.warn(e.getMessage());
+        }
+      }
+    }
+    return false;
+  }
+
+  @Override
+  public List<ColumnStatisticsObj> 
getColStatistics(org.apache.hadoop.hive.ql.metadata.Table hmsTable) {
+    Table table = Catalogs.loadTable(conf, 
Utilities.getTableDesc(hmsTable).getProperties());
+    String statsPath = getStatsPath(table).toString();
+    LOG.info("Using stats from puffin file at:" + statsPath);
+    try (PuffinReader reader = 
Puffin.read(table.io().newInputFile(statsPath)).build()) {
+      List<BlobMetadata> blobMetadata = reader.fileMetadata().blobs();
+      Map<BlobMetadata, List<ColumnStatistics>> collect =
+          
Streams.stream(reader.readAll(blobMetadata)).collect(Collectors.toMap(Pair::first,
+              blobMetadataByteBufferPair -> SerializationUtils.deserialize(
+                  
ByteBuffers.toByteArray(blobMetadataByteBufferPair.second()))));
+      return collect.get(blobMetadata.get(0)).get(0).getStatsObj();
+    } catch (IOException e) {
+      LOG.error(String.valueOf(e));
+    }
+    return null;
+  }
+
+
+  @Override
+  public boolean setColStatistics(org.apache.hadoop.hive.ql.metadata.Table 
table,
+      List<ColumnStatistics> colStats) {
+    TableDesc tableDesc = Utilities.getTableDesc(table);
+    Table tbl = Catalogs.loadTable(conf, tableDesc.getProperties());
+    String snapshotId = tbl.name() + tbl.currentSnapshot().snapshotId();
+    byte[] serializeColStats = SerializationUtils.serialize((Serializable) 
colStats);
+
+    try (PuffinWriter writer = 
Puffin.write(tbl.io().newOutputFile(getStatsPath(tbl).toString()))
+        .createdBy("Hive").build()) {
+      writer.add(
+          new Blob(
+              tbl.name() + "-" + snapshotId,
+              ImmutableList.of(1),
+              tbl.currentSnapshot().snapshotId(),
+              tbl.currentSnapshot().sequenceNumber(),
+              ByteBuffer.wrap(serializeColStats),
+              PuffinCompressionCodec.NONE,
+              ImmutableMap.of()));
+      writer.finish();
+    } catch (IOException e) {
+      LOG.error(String.valueOf(e));
+    }
+    return false;
+  }
+
+  private String getStatsSource() {
+    return HiveConf.getVar(conf, HiveConf.ConfVars.HIVE_ICEBERG_STATS_SOURCE, 
"metastore").toLowerCase();
+  }

Review Comment:
   I don't understand this, why the default is here ``metastore``? when the 
config has default set as Iceberg. Who is using that default then
   ```
       HIVE_ICEBERG_STATS_SOURCE("hive.iceberg.stats.source","iceberg",
   ```



##########
iceberg/iceberg-handler/src/main/java/org/apache/iceberg/mr/hive/HiveIcebergStorageHandler.java:
##########
@@ -361,6 +378,83 @@ private Table 
getTable(org.apache.hadoop.hive.ql.metadata.Table hmsTable) {
     return table;
   }
 
+
+  @Override
+  public boolean canSetColStatistics() {
+    return getStatsSource().equals(ICEBERG);
+  }
+
+  @Override
+  public boolean 
canProvideColStatistics(org.apache.hadoop.hive.ql.metadata.Table hmsTable) {
+    Table table = Catalogs.loadTable(conf, 
Utilities.getTableDesc(hmsTable).getProperties());
+    if (table.currentSnapshot() != null) {
+      Path statsPath = getStatsPath(table);
+      if (getStatsSource().equals(ICEBERG)) {
+        try (FileSystem fs = statsPath.getFileSystem(conf)) {
+          if (fs.exists(statsPath)) {
+            return true;
+          }
+        } catch (IOException e) {
+          LOG.warn(e.getMessage());
+        }
+      }
+    }
+    return false;
+  }
+
+  @Override
+  public List<ColumnStatisticsObj> 
getColStatistics(org.apache.hadoop.hive.ql.metadata.Table hmsTable) {
+    Table table = Catalogs.loadTable(conf, 
Utilities.getTableDesc(hmsTable).getProperties());

Review Comment:
   Change to     ``Table table = IcebergTableUtil.getTable(conf, 
hmsTable.getTTable());``



##########
iceberg/iceberg-handler/src/main/java/org/apache/iceberg/mr/hive/HiveIcebergStorageHandler.java:
##########
@@ -361,6 +378,83 @@ private Table 
getTable(org.apache.hadoop.hive.ql.metadata.Table hmsTable) {
     return table;
   }
 
+
+  @Override
+  public boolean canSetColStatistics() {
+    return getStatsSource().equals(ICEBERG);
+  }
+
+  @Override
+  public boolean 
canProvideColStatistics(org.apache.hadoop.hive.ql.metadata.Table hmsTable) {
+    Table table = Catalogs.loadTable(conf, 
Utilities.getTableDesc(hmsTable).getProperties());
+    if (table.currentSnapshot() != null) {
+      Path statsPath = getStatsPath(table);
+      if (getStatsSource().equals(ICEBERG)) {
+        try (FileSystem fs = statsPath.getFileSystem(conf)) {
+          if (fs.exists(statsPath)) {
+            return true;
+          }
+        } catch (IOException e) {
+          LOG.warn(e.getMessage());

Review Comment:
   Log the trace as well, rather than just the message. Along with the table 
name and the stats path and the snapshot id



##########
iceberg/iceberg-handler/src/main/java/org/apache/iceberg/mr/hive/HiveIcebergStorageHandler.java:
##########
@@ -361,6 +378,83 @@ private Table 
getTable(org.apache.hadoop.hive.ql.metadata.Table hmsTable) {
     return table;
   }
 
+
+  @Override
+  public boolean canSetColStatistics() {
+    return getStatsSource().equals(ICEBERG);
+  }
+
+  @Override
+  public boolean 
canProvideColStatistics(org.apache.hadoop.hive.ql.metadata.Table hmsTable) {
+    Table table = Catalogs.loadTable(conf, 
Utilities.getTableDesc(hmsTable).getProperties());
+    if (table.currentSnapshot() != null) {
+      Path statsPath = getStatsPath(table);
+      if (getStatsSource().equals(ICEBERG)) {
+        try (FileSystem fs = statsPath.getFileSystem(conf)) {
+          if (fs.exists(statsPath)) {
+            return true;
+          }
+        } catch (IOException e) {
+          LOG.warn(e.getMessage());
+        }
+      }
+    }
+    return false;
+  }
+
+  @Override
+  public List<ColumnStatisticsObj> 
getColStatistics(org.apache.hadoop.hive.ql.metadata.Table hmsTable) {
+    Table table = Catalogs.loadTable(conf, 
Utilities.getTableDesc(hmsTable).getProperties());
+    String statsPath = getStatsPath(table).toString();
+    LOG.info("Using stats from puffin file at:" + statsPath);
+    try (PuffinReader reader = 
Puffin.read(table.io().newInputFile(statsPath)).build()) {
+      List<BlobMetadata> blobMetadata = reader.fileMetadata().blobs();
+      Map<BlobMetadata, List<ColumnStatistics>> collect =
+          
Streams.stream(reader.readAll(blobMetadata)).collect(Collectors.toMap(Pair::first,
+              blobMetadataByteBufferPair -> SerializationUtils.deserialize(
+                  
ByteBuffers.toByteArray(blobMetadataByteBufferPair.second()))));
+      return collect.get(blobMetadata.get(0)).get(0).getStatsObj();
+    } catch (IOException e) {
+      LOG.error(String.valueOf(e));
+    }
+    return null;
+  }
+
+
+  @Override
+  public boolean setColStatistics(org.apache.hadoop.hive.ql.metadata.Table 
table,
+      List<ColumnStatistics> colStats) {
+    TableDesc tableDesc = Utilities.getTableDesc(table);
+    Table tbl = Catalogs.loadTable(conf, tableDesc.getProperties());
+    String snapshotId = tbl.name() + tbl.currentSnapshot().snapshotId();

Review Comment:
   can currentSnapshot be ``null``? like empty table and then somebody shoots a 
CLI command to compute statistics?



##########
ql/src/java/org/apache/hadoop/hive/ql/stats/StatsUtils.java:
##########
@@ -1069,8 +1069,12 @@ public static List<ColStatistics> getTableColumnStats(
     }
     if (fetchColStats && !colStatsToRetrieve.isEmpty()) {
       try {
-        List<ColumnStatisticsObj> colStat = 
Hive.get().getTableColumnStatistics(
-            dbName, tabName, colStatsToRetrieve, false);
+        List<ColumnStatisticsObj> colStat;
+        if (table != null && table.isNonNative() && 
table.getStorageHandler().canProvideColStatistics(table)) {

Review Comment:
   table can not be `null` at this point



##########
iceberg/iceberg-handler/src/main/java/org/apache/iceberg/mr/hive/HiveIcebergStorageHandler.java:
##########
@@ -361,6 +378,83 @@ private Table 
getTable(org.apache.hadoop.hive.ql.metadata.Table hmsTable) {
     return table;
   }
 
+
+  @Override
+  public boolean canSetColStatistics() {
+    return getStatsSource().equals(ICEBERG);
+  }
+
+  @Override
+  public boolean 
canProvideColStatistics(org.apache.hadoop.hive.ql.metadata.Table hmsTable) {
+    Table table = Catalogs.loadTable(conf, 
Utilities.getTableDesc(hmsTable).getProperties());

Review Comment:
   Change to ``    Table table = IcebergTableUtil.getTable(conf, 
hmsTable.getTTable());``



##########
iceberg/iceberg-handler/src/main/java/org/apache/iceberg/mr/hive/HiveIcebergStorageHandler.java:
##########
@@ -361,6 +378,83 @@ private Table 
getTable(org.apache.hadoop.hive.ql.metadata.Table hmsTable) {
     return table;
   }
 
+
+  @Override
+  public boolean canSetColStatistics() {
+    return getStatsSource().equals(ICEBERG);
+  }
+
+  @Override
+  public boolean 
canProvideColStatistics(org.apache.hadoop.hive.ql.metadata.Table hmsTable) {
+    Table table = Catalogs.loadTable(conf, 
Utilities.getTableDesc(hmsTable).getProperties());
+    if (table.currentSnapshot() != null) {
+      Path statsPath = getStatsPath(table);
+      if (getStatsSource().equals(ICEBERG)) {
+        try (FileSystem fs = statsPath.getFileSystem(conf)) {
+          if (fs.exists(statsPath)) {
+            return true;
+          }
+        } catch (IOException e) {
+          LOG.warn(e.getMessage());
+        }
+      }
+    }
+    return false;
+  }
+
+  @Override
+  public List<ColumnStatisticsObj> 
getColStatistics(org.apache.hadoop.hive.ql.metadata.Table hmsTable) {
+    Table table = Catalogs.loadTable(conf, 
Utilities.getTableDesc(hmsTable).getProperties());
+    String statsPath = getStatsPath(table).toString();
+    LOG.info("Using stats from puffin file at:" + statsPath);
+    try (PuffinReader reader = 
Puffin.read(table.io().newInputFile(statsPath)).build()) {
+      List<BlobMetadata> blobMetadata = reader.fileMetadata().blobs();
+      Map<BlobMetadata, List<ColumnStatistics>> collect =
+          
Streams.stream(reader.readAll(blobMetadata)).collect(Collectors.toMap(Pair::first,
+              blobMetadataByteBufferPair -> SerializationUtils.deserialize(
+                  
ByteBuffers.toByteArray(blobMetadataByteBufferPair.second()))));
+      return collect.get(blobMetadata.get(0)).get(0).getStatsObj();
+    } catch (IOException e) {
+      LOG.error(String.valueOf(e));
+    }
+    return null;
+  }
+
+
+  @Override
+  public boolean setColStatistics(org.apache.hadoop.hive.ql.metadata.Table 
table,
+      List<ColumnStatistics> colStats) {
+    TableDesc tableDesc = Utilities.getTableDesc(table);
+    Table tbl = Catalogs.loadTable(conf, tableDesc.getProperties());
+    String snapshotId = tbl.name() + tbl.currentSnapshot().snapshotId();
+    byte[] serializeColStats = SerializationUtils.serialize((Serializable) 
colStats);
+
+    try (PuffinWriter writer = 
Puffin.write(tbl.io().newOutputFile(getStatsPath(tbl).toString()))
+        .createdBy("Hive").build()) {

Review Comment:
   Use constant 
   ```
   Constants.HIVE_ENGINE
   ```



##########
iceberg/iceberg-handler/src/main/java/org/apache/iceberg/mr/hive/HiveIcebergStorageHandler.java:
##########
@@ -361,6 +378,83 @@ private Table 
getTable(org.apache.hadoop.hive.ql.metadata.Table hmsTable) {
     return table;
   }
 
+
+  @Override
+  public boolean canSetColStatistics() {
+    return getStatsSource().equals(ICEBERG);
+  }
+
+  @Override
+  public boolean 
canProvideColStatistics(org.apache.hadoop.hive.ql.metadata.Table hmsTable) {
+    Table table = Catalogs.loadTable(conf, 
Utilities.getTableDesc(hmsTable).getProperties());
+    if (table.currentSnapshot() != null) {
+      Path statsPath = getStatsPath(table);
+      if (getStatsSource().equals(ICEBERG)) {
+        try (FileSystem fs = statsPath.getFileSystem(conf)) {
+          if (fs.exists(statsPath)) {
+            return true;
+          }
+        } catch (IOException e) {
+          LOG.warn(e.getMessage());
+        }
+      }
+    }
+    return false;
+  }
+
+  @Override
+  public List<ColumnStatisticsObj> 
getColStatistics(org.apache.hadoop.hive.ql.metadata.Table hmsTable) {
+    Table table = Catalogs.loadTable(conf, 
Utilities.getTableDesc(hmsTable).getProperties());
+    String statsPath = getStatsPath(table).toString();
+    LOG.info("Using stats from puffin file at:" + statsPath);
+    try (PuffinReader reader = 
Puffin.read(table.io().newInputFile(statsPath)).build()) {
+      List<BlobMetadata> blobMetadata = reader.fileMetadata().blobs();
+      Map<BlobMetadata, List<ColumnStatistics>> collect =
+          
Streams.stream(reader.readAll(blobMetadata)).collect(Collectors.toMap(Pair::first,
+              blobMetadataByteBufferPair -> SerializationUtils.deserialize(
+                  
ByteBuffers.toByteArray(blobMetadataByteBufferPair.second()))));
+      return collect.get(blobMetadata.get(0)).get(0).getStatsObj();
+    } catch (IOException e) {
+      LOG.error(String.valueOf(e));
+    }
+    return null;
+  }
+
+
+  @Override
+  public boolean setColStatistics(org.apache.hadoop.hive.ql.metadata.Table 
table,
+      List<ColumnStatistics> colStats) {
+    TableDesc tableDesc = Utilities.getTableDesc(table);
+    Table tbl = Catalogs.loadTable(conf, tableDesc.getProperties());

Review Comment:
   Use  ``IcebergTableUtil`` to fetch the table, It has cache, fetching and 
reading the table metadata multiple times have severe performance penalties, 
cache the table and use it from cache, unless necessary



##########
iceberg/iceberg-handler/src/main/java/org/apache/iceberg/mr/hive/HiveIcebergStorageHandler.java:
##########
@@ -361,6 +378,83 @@ private Table 
getTable(org.apache.hadoop.hive.ql.metadata.Table hmsTable) {
     return table;
   }
 
+
+  @Override
+  public boolean canSetColStatistics() {
+    return getStatsSource().equals(ICEBERG);
+  }
+
+  @Override
+  public boolean 
canProvideColStatistics(org.apache.hadoop.hive.ql.metadata.Table hmsTable) {
+    Table table = Catalogs.loadTable(conf, 
Utilities.getTableDesc(hmsTable).getProperties());
+    if (table.currentSnapshot() != null) {
+      Path statsPath = getStatsPath(table);
+      if (getStatsSource().equals(ICEBERG)) {
+        try (FileSystem fs = statsPath.getFileSystem(conf)) {
+          if (fs.exists(statsPath)) {
+            return true;
+          }
+        } catch (IOException e) {
+          LOG.warn(e.getMessage());
+        }
+      }
+    }
+    return false;
+  }
+
+  @Override
+  public List<ColumnStatisticsObj> 
getColStatistics(org.apache.hadoop.hive.ql.metadata.Table hmsTable) {
+    Table table = Catalogs.loadTable(conf, 
Utilities.getTableDesc(hmsTable).getProperties());
+    String statsPath = getStatsPath(table).toString();
+    LOG.info("Using stats from puffin file at:" + statsPath);
+    try (PuffinReader reader = 
Puffin.read(table.io().newInputFile(statsPath)).build()) {
+      List<BlobMetadata> blobMetadata = reader.fileMetadata().blobs();
+      Map<BlobMetadata, List<ColumnStatistics>> collect =
+          
Streams.stream(reader.readAll(blobMetadata)).collect(Collectors.toMap(Pair::first,
+              blobMetadataByteBufferPair -> SerializationUtils.deserialize(
+                  
ByteBuffers.toByteArray(blobMetadataByteBufferPair.second()))));
+      return collect.get(blobMetadata.get(0)).get(0).getStatsObj();
+    } catch (IOException e) {
+      LOG.error(String.valueOf(e));
+    }
+    return null;

Review Comment:
   Why don't we throw exception here?



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: gitbox-unsubscr...@hive.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: gitbox-unsubscr...@hive.apache.org
For additional commands, e-mail: gitbox-h...@hive.apache.org

Reply via email to