ayushtkn commented on code in PR #4131:
URL: https://github.com/apache/hive/pull/4131#discussion_r1161623163
##########
common/src/java/org/apache/hadoop/hive/conf/HiveConf.java:
##########
@@ -2205,9 +2205,8 @@ public static enum ConfVars {
"padding tolerance config (hive.exec.orc.block.padding.tolerance)."),
HIVE_ORC_CODEC_POOL("hive.use.orc.codec.pool", false,
"Whether to use codec pool in ORC. Disable if there are bugs with
codec reuse."),
- HIVE_USE_STATS_FROM("hive.use.stats.from","iceberg","Use stats from
iceberg table snapshot for query " +
- "planning. This has three values metastore, puffin and iceberg"),
-
+ HIVE_ICEBERG_STATS_SOURCE("hive.iceberg.stats.source","iceberg","Use stats
from iceberg table snapshot for query " +
+ "planning. This has three values metastore and iceberg"),
Review Comment:
> This has three values metastore and iceberg
what is the third value,?
##########
iceberg/iceberg-handler/src/main/java/org/apache/iceberg/mr/hive/HiveIcebergStorageHandler.java:
##########
@@ -361,6 +378,83 @@ private Table
getTable(org.apache.hadoop.hive.ql.metadata.Table hmsTable) {
return table;
}
+
+ @Override
+ public boolean canSetColStatistics() {
+ return getStatsSource().equals(ICEBERG);
+ }
+
+ @Override
+ public boolean
canProvideColStatistics(org.apache.hadoop.hive.ql.metadata.Table hmsTable) {
+ Table table = Catalogs.loadTable(conf,
Utilities.getTableDesc(hmsTable).getProperties());
+ if (table.currentSnapshot() != null) {
+ Path statsPath = getStatsPath(table);
+ if (getStatsSource().equals(ICEBERG)) {
Review Comment:
can use ```canSetColStatistics()```
##########
iceberg/iceberg-handler/src/main/java/org/apache/iceberg/mr/hive/HiveIcebergStorageHandler.java:
##########
@@ -361,6 +378,83 @@ private Table
getTable(org.apache.hadoop.hive.ql.metadata.Table hmsTable) {
return table;
}
+
+ @Override
+ public boolean canSetColStatistics() {
+ return getStatsSource().equals(ICEBERG);
+ }
+
+ @Override
+ public boolean
canProvideColStatistics(org.apache.hadoop.hive.ql.metadata.Table hmsTable) {
+ Table table = Catalogs.loadTable(conf,
Utilities.getTableDesc(hmsTable).getProperties());
+ if (table.currentSnapshot() != null) {
+ Path statsPath = getStatsPath(table);
+ if (getStatsSource().equals(ICEBERG)) {
+ try (FileSystem fs = statsPath.getFileSystem(conf)) {
+ if (fs.exists(statsPath)) {
+ return true;
+ }
+ } catch (IOException e) {
+ LOG.warn(e.getMessage());
+ }
+ }
+ }
+ return false;
+ }
+
+ @Override
+ public List<ColumnStatisticsObj>
getColStatistics(org.apache.hadoop.hive.ql.metadata.Table hmsTable) {
+ Table table = Catalogs.loadTable(conf,
Utilities.getTableDesc(hmsTable).getProperties());
+ String statsPath = getStatsPath(table).toString();
+ LOG.info("Using stats from puffin file at:" + statsPath);
Review Comment:
Logger format:
```
LOG.info("Using stats from puffin file at: {}", statsPath);
```
##########
iceberg/iceberg-handler/src/main/java/org/apache/iceberg/mr/hive/HiveIcebergStorageHandler.java:
##########
@@ -361,6 +378,83 @@ private Table
getTable(org.apache.hadoop.hive.ql.metadata.Table hmsTable) {
return table;
}
+
+ @Override
+ public boolean canSetColStatistics() {
+ return getStatsSource().equals(ICEBERG);
+ }
+
+ @Override
+ public boolean
canProvideColStatistics(org.apache.hadoop.hive.ql.metadata.Table hmsTable) {
+ Table table = Catalogs.loadTable(conf,
Utilities.getTableDesc(hmsTable).getProperties());
+ if (table.currentSnapshot() != null) {
+ Path statsPath = getStatsPath(table);
+ if (getStatsSource().equals(ICEBERG)) {
+ try (FileSystem fs = statsPath.getFileSystem(conf)) {
+ if (fs.exists(statsPath)) {
+ return true;
+ }
+ } catch (IOException e) {
+ LOG.warn(e.getMessage());
+ }
+ }
+ }
+ return false;
+ }
+
+ @Override
+ public List<ColumnStatisticsObj>
getColStatistics(org.apache.hadoop.hive.ql.metadata.Table hmsTable) {
+ Table table = Catalogs.loadTable(conf,
Utilities.getTableDesc(hmsTable).getProperties());
+ String statsPath = getStatsPath(table).toString();
+ LOG.info("Using stats from puffin file at:" + statsPath);
+ try (PuffinReader reader =
Puffin.read(table.io().newInputFile(statsPath)).build()) {
+ List<BlobMetadata> blobMetadata = reader.fileMetadata().blobs();
+ Map<BlobMetadata, List<ColumnStatistics>> collect =
+
Streams.stream(reader.readAll(blobMetadata)).collect(Collectors.toMap(Pair::first,
+ blobMetadataByteBufferPair -> SerializationUtils.deserialize(
+
ByteBuffers.toByteArray(blobMetadataByteBufferPair.second()))));
+ return collect.get(blobMetadata.get(0)).get(0).getStatsObj();
+ } catch (IOException e) {
+ LOG.error(String.valueOf(e));
+ }
+ return null;
+ }
+
+
+ @Override
+ public boolean setColStatistics(org.apache.hadoop.hive.ql.metadata.Table
table,
+ List<ColumnStatistics> colStats) {
+ TableDesc tableDesc = Utilities.getTableDesc(table);
+ Table tbl = Catalogs.loadTable(conf, tableDesc.getProperties());
+ String snapshotId = tbl.name() + tbl.currentSnapshot().snapshotId();
+ byte[] serializeColStats = SerializationUtils.serialize((Serializable)
colStats);
+
+ try (PuffinWriter writer =
Puffin.write(tbl.io().newOutputFile(getStatsPath(tbl).toString()))
+ .createdBy("Hive").build()) {
+ writer.add(
+ new Blob(
+ tbl.name() + "-" + snapshotId,
+ ImmutableList.of(1),
+ tbl.currentSnapshot().snapshotId(),
+ tbl.currentSnapshot().sequenceNumber(),
+ ByteBuffer.wrap(serializeColStats),
+ PuffinCompressionCodec.NONE,
+ ImmutableMap.of()));
+ writer.finish();
+ } catch (IOException e) {
+ LOG.error(String.valueOf(e));
+ }
+ return false;
+ }
+
+ private String getStatsSource() {
+ return HiveConf.getVar(conf, HiveConf.ConfVars.HIVE_ICEBERG_STATS_SOURCE,
"metastore").toLowerCase();
+ }
Review Comment:
I don't understand this, why the default is here ``metastore``? when the
config has default set as Iceberg. Who is using that default then
```
HIVE_ICEBERG_STATS_SOURCE("hive.iceberg.stats.source","iceberg",
```
##########
iceberg/iceberg-handler/src/main/java/org/apache/iceberg/mr/hive/HiveIcebergStorageHandler.java:
##########
@@ -361,6 +378,83 @@ private Table
getTable(org.apache.hadoop.hive.ql.metadata.Table hmsTable) {
return table;
}
+
+ @Override
+ public boolean canSetColStatistics() {
+ return getStatsSource().equals(ICEBERG);
+ }
+
+ @Override
+ public boolean
canProvideColStatistics(org.apache.hadoop.hive.ql.metadata.Table hmsTable) {
+ Table table = Catalogs.loadTable(conf,
Utilities.getTableDesc(hmsTable).getProperties());
+ if (table.currentSnapshot() != null) {
+ Path statsPath = getStatsPath(table);
+ if (getStatsSource().equals(ICEBERG)) {
+ try (FileSystem fs = statsPath.getFileSystem(conf)) {
+ if (fs.exists(statsPath)) {
+ return true;
+ }
+ } catch (IOException e) {
+ LOG.warn(e.getMessage());
+ }
+ }
+ }
+ return false;
+ }
+
+ @Override
+ public List<ColumnStatisticsObj>
getColStatistics(org.apache.hadoop.hive.ql.metadata.Table hmsTable) {
+ Table table = Catalogs.loadTable(conf,
Utilities.getTableDesc(hmsTable).getProperties());
Review Comment:
Change to ``Table table = IcebergTableUtil.getTable(conf,
hmsTable.getTTable());``
##########
iceberg/iceberg-handler/src/main/java/org/apache/iceberg/mr/hive/HiveIcebergStorageHandler.java:
##########
@@ -361,6 +378,83 @@ private Table
getTable(org.apache.hadoop.hive.ql.metadata.Table hmsTable) {
return table;
}
+
+ @Override
+ public boolean canSetColStatistics() {
+ return getStatsSource().equals(ICEBERG);
+ }
+
+ @Override
+ public boolean
canProvideColStatistics(org.apache.hadoop.hive.ql.metadata.Table hmsTable) {
+ Table table = Catalogs.loadTable(conf,
Utilities.getTableDesc(hmsTable).getProperties());
+ if (table.currentSnapshot() != null) {
+ Path statsPath = getStatsPath(table);
+ if (getStatsSource().equals(ICEBERG)) {
+ try (FileSystem fs = statsPath.getFileSystem(conf)) {
+ if (fs.exists(statsPath)) {
+ return true;
+ }
+ } catch (IOException e) {
+ LOG.warn(e.getMessage());
Review Comment:
Log the trace as well, rather than just the message. Along with the table
name and the stats path and the snapshot id
##########
iceberg/iceberg-handler/src/main/java/org/apache/iceberg/mr/hive/HiveIcebergStorageHandler.java:
##########
@@ -361,6 +378,83 @@ private Table
getTable(org.apache.hadoop.hive.ql.metadata.Table hmsTable) {
return table;
}
+
+ @Override
+ public boolean canSetColStatistics() {
+ return getStatsSource().equals(ICEBERG);
+ }
+
+ @Override
+ public boolean
canProvideColStatistics(org.apache.hadoop.hive.ql.metadata.Table hmsTable) {
+ Table table = Catalogs.loadTable(conf,
Utilities.getTableDesc(hmsTable).getProperties());
+ if (table.currentSnapshot() != null) {
+ Path statsPath = getStatsPath(table);
+ if (getStatsSource().equals(ICEBERG)) {
+ try (FileSystem fs = statsPath.getFileSystem(conf)) {
+ if (fs.exists(statsPath)) {
+ return true;
+ }
+ } catch (IOException e) {
+ LOG.warn(e.getMessage());
+ }
+ }
+ }
+ return false;
+ }
+
+ @Override
+ public List<ColumnStatisticsObj>
getColStatistics(org.apache.hadoop.hive.ql.metadata.Table hmsTable) {
+ Table table = Catalogs.loadTable(conf,
Utilities.getTableDesc(hmsTable).getProperties());
+ String statsPath = getStatsPath(table).toString();
+ LOG.info("Using stats from puffin file at:" + statsPath);
+ try (PuffinReader reader =
Puffin.read(table.io().newInputFile(statsPath)).build()) {
+ List<BlobMetadata> blobMetadata = reader.fileMetadata().blobs();
+ Map<BlobMetadata, List<ColumnStatistics>> collect =
+
Streams.stream(reader.readAll(blobMetadata)).collect(Collectors.toMap(Pair::first,
+ blobMetadataByteBufferPair -> SerializationUtils.deserialize(
+
ByteBuffers.toByteArray(blobMetadataByteBufferPair.second()))));
+ return collect.get(blobMetadata.get(0)).get(0).getStatsObj();
+ } catch (IOException e) {
+ LOG.error(String.valueOf(e));
+ }
+ return null;
+ }
+
+
+ @Override
+ public boolean setColStatistics(org.apache.hadoop.hive.ql.metadata.Table
table,
+ List<ColumnStatistics> colStats) {
+ TableDesc tableDesc = Utilities.getTableDesc(table);
+ Table tbl = Catalogs.loadTable(conf, tableDesc.getProperties());
+ String snapshotId = tbl.name() + tbl.currentSnapshot().snapshotId();
Review Comment:
can currentSnapshot be ``null``? like empty table and then somebody shoots a
CLI command to compute statistics?
##########
ql/src/java/org/apache/hadoop/hive/ql/stats/StatsUtils.java:
##########
@@ -1069,8 +1069,12 @@ public static List<ColStatistics> getTableColumnStats(
}
if (fetchColStats && !colStatsToRetrieve.isEmpty()) {
try {
- List<ColumnStatisticsObj> colStat =
Hive.get().getTableColumnStatistics(
- dbName, tabName, colStatsToRetrieve, false);
+ List<ColumnStatisticsObj> colStat;
+ if (table != null && table.isNonNative() &&
table.getStorageHandler().canProvideColStatistics(table)) {
Review Comment:
table can not be `null` at this point
##########
iceberg/iceberg-handler/src/main/java/org/apache/iceberg/mr/hive/HiveIcebergStorageHandler.java:
##########
@@ -361,6 +378,83 @@ private Table
getTable(org.apache.hadoop.hive.ql.metadata.Table hmsTable) {
return table;
}
+
+ @Override
+ public boolean canSetColStatistics() {
+ return getStatsSource().equals(ICEBERG);
+ }
+
+ @Override
+ public boolean
canProvideColStatistics(org.apache.hadoop.hive.ql.metadata.Table hmsTable) {
+ Table table = Catalogs.loadTable(conf,
Utilities.getTableDesc(hmsTable).getProperties());
Review Comment:
Change to `` Table table = IcebergTableUtil.getTable(conf,
hmsTable.getTTable());``
##########
iceberg/iceberg-handler/src/main/java/org/apache/iceberg/mr/hive/HiveIcebergStorageHandler.java:
##########
@@ -361,6 +378,83 @@ private Table
getTable(org.apache.hadoop.hive.ql.metadata.Table hmsTable) {
return table;
}
+
+ @Override
+ public boolean canSetColStatistics() {
+ return getStatsSource().equals(ICEBERG);
+ }
+
+ @Override
+ public boolean
canProvideColStatistics(org.apache.hadoop.hive.ql.metadata.Table hmsTable) {
+ Table table = Catalogs.loadTable(conf,
Utilities.getTableDesc(hmsTable).getProperties());
+ if (table.currentSnapshot() != null) {
+ Path statsPath = getStatsPath(table);
+ if (getStatsSource().equals(ICEBERG)) {
+ try (FileSystem fs = statsPath.getFileSystem(conf)) {
+ if (fs.exists(statsPath)) {
+ return true;
+ }
+ } catch (IOException e) {
+ LOG.warn(e.getMessage());
+ }
+ }
+ }
+ return false;
+ }
+
+ @Override
+ public List<ColumnStatisticsObj>
getColStatistics(org.apache.hadoop.hive.ql.metadata.Table hmsTable) {
+ Table table = Catalogs.loadTable(conf,
Utilities.getTableDesc(hmsTable).getProperties());
+ String statsPath = getStatsPath(table).toString();
+ LOG.info("Using stats from puffin file at:" + statsPath);
+ try (PuffinReader reader =
Puffin.read(table.io().newInputFile(statsPath)).build()) {
+ List<BlobMetadata> blobMetadata = reader.fileMetadata().blobs();
+ Map<BlobMetadata, List<ColumnStatistics>> collect =
+
Streams.stream(reader.readAll(blobMetadata)).collect(Collectors.toMap(Pair::first,
+ blobMetadataByteBufferPair -> SerializationUtils.deserialize(
+
ByteBuffers.toByteArray(blobMetadataByteBufferPair.second()))));
+ return collect.get(blobMetadata.get(0)).get(0).getStatsObj();
+ } catch (IOException e) {
+ LOG.error(String.valueOf(e));
+ }
+ return null;
+ }
+
+
+ @Override
+ public boolean setColStatistics(org.apache.hadoop.hive.ql.metadata.Table
table,
+ List<ColumnStatistics> colStats) {
+ TableDesc tableDesc = Utilities.getTableDesc(table);
+ Table tbl = Catalogs.loadTable(conf, tableDesc.getProperties());
+ String snapshotId = tbl.name() + tbl.currentSnapshot().snapshotId();
+ byte[] serializeColStats = SerializationUtils.serialize((Serializable)
colStats);
+
+ try (PuffinWriter writer =
Puffin.write(tbl.io().newOutputFile(getStatsPath(tbl).toString()))
+ .createdBy("Hive").build()) {
Review Comment:
Use constant
```
Constants.HIVE_ENGINE
```
##########
iceberg/iceberg-handler/src/main/java/org/apache/iceberg/mr/hive/HiveIcebergStorageHandler.java:
##########
@@ -361,6 +378,83 @@ private Table
getTable(org.apache.hadoop.hive.ql.metadata.Table hmsTable) {
return table;
}
+
+ @Override
+ public boolean canSetColStatistics() {
+ return getStatsSource().equals(ICEBERG);
+ }
+
+ @Override
+ public boolean
canProvideColStatistics(org.apache.hadoop.hive.ql.metadata.Table hmsTable) {
+ Table table = Catalogs.loadTable(conf,
Utilities.getTableDesc(hmsTable).getProperties());
+ if (table.currentSnapshot() != null) {
+ Path statsPath = getStatsPath(table);
+ if (getStatsSource().equals(ICEBERG)) {
+ try (FileSystem fs = statsPath.getFileSystem(conf)) {
+ if (fs.exists(statsPath)) {
+ return true;
+ }
+ } catch (IOException e) {
+ LOG.warn(e.getMessage());
+ }
+ }
+ }
+ return false;
+ }
+
+ @Override
+ public List<ColumnStatisticsObj>
getColStatistics(org.apache.hadoop.hive.ql.metadata.Table hmsTable) {
+ Table table = Catalogs.loadTable(conf,
Utilities.getTableDesc(hmsTable).getProperties());
+ String statsPath = getStatsPath(table).toString();
+ LOG.info("Using stats from puffin file at:" + statsPath);
+ try (PuffinReader reader =
Puffin.read(table.io().newInputFile(statsPath)).build()) {
+ List<BlobMetadata> blobMetadata = reader.fileMetadata().blobs();
+ Map<BlobMetadata, List<ColumnStatistics>> collect =
+
Streams.stream(reader.readAll(blobMetadata)).collect(Collectors.toMap(Pair::first,
+ blobMetadataByteBufferPair -> SerializationUtils.deserialize(
+
ByteBuffers.toByteArray(blobMetadataByteBufferPair.second()))));
+ return collect.get(blobMetadata.get(0)).get(0).getStatsObj();
+ } catch (IOException e) {
+ LOG.error(String.valueOf(e));
+ }
+ return null;
+ }
+
+
+ @Override
+ public boolean setColStatistics(org.apache.hadoop.hive.ql.metadata.Table
table,
+ List<ColumnStatistics> colStats) {
+ TableDesc tableDesc = Utilities.getTableDesc(table);
+ Table tbl = Catalogs.loadTable(conf, tableDesc.getProperties());
Review Comment:
Use ``IcebergTableUtil`` to fetch the table, It has cache, fetching and
reading the table metadata multiple times have severe performance penalties,
cache the table and use it from cache, unless necessary
##########
iceberg/iceberg-handler/src/main/java/org/apache/iceberg/mr/hive/HiveIcebergStorageHandler.java:
##########
@@ -361,6 +378,83 @@ private Table
getTable(org.apache.hadoop.hive.ql.metadata.Table hmsTable) {
return table;
}
+
+ @Override
+ public boolean canSetColStatistics() {
+ return getStatsSource().equals(ICEBERG);
+ }
+
+ @Override
+ public boolean
canProvideColStatistics(org.apache.hadoop.hive.ql.metadata.Table hmsTable) {
+ Table table = Catalogs.loadTable(conf,
Utilities.getTableDesc(hmsTable).getProperties());
+ if (table.currentSnapshot() != null) {
+ Path statsPath = getStatsPath(table);
+ if (getStatsSource().equals(ICEBERG)) {
+ try (FileSystem fs = statsPath.getFileSystem(conf)) {
+ if (fs.exists(statsPath)) {
+ return true;
+ }
+ } catch (IOException e) {
+ LOG.warn(e.getMessage());
+ }
+ }
+ }
+ return false;
+ }
+
+ @Override
+ public List<ColumnStatisticsObj>
getColStatistics(org.apache.hadoop.hive.ql.metadata.Table hmsTable) {
+ Table table = Catalogs.loadTable(conf,
Utilities.getTableDesc(hmsTable).getProperties());
+ String statsPath = getStatsPath(table).toString();
+ LOG.info("Using stats from puffin file at:" + statsPath);
+ try (PuffinReader reader =
Puffin.read(table.io().newInputFile(statsPath)).build()) {
+ List<BlobMetadata> blobMetadata = reader.fileMetadata().blobs();
+ Map<BlobMetadata, List<ColumnStatistics>> collect =
+
Streams.stream(reader.readAll(blobMetadata)).collect(Collectors.toMap(Pair::first,
+ blobMetadataByteBufferPair -> SerializationUtils.deserialize(
+
ByteBuffers.toByteArray(blobMetadataByteBufferPair.second()))));
+ return collect.get(blobMetadata.get(0)).get(0).getStatsObj();
+ } catch (IOException e) {
+ LOG.error(String.valueOf(e));
+ }
+ return null;
Review Comment:
Why don't we throw exception here?
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]