deniskuzZ commented on code in PR #4431:
URL: https://github.com/apache/hive/pull/4431#discussion_r1263500627
##########
iceberg/iceberg-handler/src/main/java/org/apache/iceberg/mr/hive/HiveIcebergStorageHandler.java:
##########
@@ -536,17 +544,30 @@ private String getStatsSource() {
}
private Path getStatsPath(Table table) {
Review Comment:
getStatsPath -> getColStatsPath
##########
iceberg/iceberg-handler/src/main/java/org/apache/iceberg/mr/hive/HiveIcebergStorageHandler.java:
##########
@@ -452,65 +455,70 @@ public boolean
canSetColStatistics(org.apache.hadoop.hive.ql.metadata.Table hmsT
}
@Override
- public boolean setColStatistics(org.apache.hadoop.hive.ql.metadata.Table
hmsTable,
- List<ColumnStatistics> colStats) {
+ public boolean setColStatistics(org.apache.hadoop.hive.ql.metadata.Table
hmsTable, List<ColumnStatistics> colStats,
+ ColumnStatsDesc columnStatsDesc) {
Table tbl = IcebergTableUtil.getTable(conf, hmsTable.getTTable());
String snapshotId = String.format("%s-STATS-%d", tbl.name(),
tbl.currentSnapshot().snapshotId());
- invalidateStats(getStatsPath(tbl));
- byte[] serializeColStats = SerializationUtils.serialize((Serializable)
colStats);
- try (PuffinWriter writer =
Puffin.write(tbl.io().newOutputFile(getStatsPath(tbl).toString()))
- .createdBy(Constants.HIVE_ENGINE).build()) {
- writer.add(
- new Blob(
- tbl.name() + "-" + snapshotId,
- ImmutableList.of(1),
- tbl.currentSnapshot().snapshotId(),
- tbl.currentSnapshot().sequenceNumber(),
- ByteBuffer.wrap(serializeColStats),
- PuffinCompressionCodec.NONE,
- ImmutableMap.of()));
- writer.finish();
- return true;
- } catch (IOException e) {
- LOG.error(String.valueOf(e));
+ try {
+ boolean rewriteStats = removeStatsIfExists(tbl);
+ if (!rewriteStats) {
+ checkAndMergeStats(colStats.get(0), tbl);
+ }
+ byte[] serializeColStats = SerializationUtils.serialize((Serializable)
colStats);
+ try (PuffinWriter writer =
Puffin.write(tbl.io().newOutputFile(getStatsPath(tbl).toString()))
+ .createdBy(Constants.HIVE_ENGINE).build()) {
+ writer.add(new Blob(tbl.name() + "-" + snapshotId,
ImmutableList.of(1), tbl.currentSnapshot().snapshotId(),
+ tbl.currentSnapshot().sequenceNumber(),
ByteBuffer.wrap(serializeColStats), PuffinCompressionCodec.NONE,
+ ImmutableMap.of()));
+ writer.finish();
+ return true;
+ } catch (IOException e) {
+ LOG.warn("Unable to write stats to puffin file", e.getMessage());
+ return false;
+ }
+ } catch (InvalidObjectException | IOException e) {
+ LOG.warn("Unable to invalidate or merge stats: ", e.getMessage());
return false;
}
}
@Override
public boolean
canProvideColStatistics(org.apache.hadoop.hive.ql.metadata.Table hmsTable) {
Table table = IcebergTableUtil.getTable(conf, hmsTable.getTTable());
- if (canSetColStatistics(hmsTable)) {
- Path statsPath = getStatsPath(table);
- try {
- FileSystem fs = statsPath.getFileSystem(conf);
- if (fs.exists(statsPath)) {
- return true;
- }
- } catch (IOException e) {
- LOG.warn("Exception when trying to find Iceberg column stats for
table:{} , snapshot:{} , " +
- "statsPath: {} , stack trace: {}", table.name(),
table.currentSnapshot(), statsPath, e);
- }
+ return canSetColStatistics(hmsTable) && canProvideColStatistics(table,
table.currentSnapshot().snapshotId());
+ }
+
+ private boolean canProvideColStatistics(Table table, long snapshotId) {
+ Path statsPath = getStatsPath(table, snapshotId);
+ try {
+ FileSystem fs = statsPath.getFileSystem(conf);
+ return fs.exists(statsPath);
+ } catch (IOException e) {
+ LOG.warn("Exception when trying to find Iceberg column stats for
table:{} , snapshot:{} , " +
+ "statsPath: {} , stack trace: {}", table.name(),
table.currentSnapshot(), statsPath, e);
}
return false;
}
@Override
public List<ColumnStatisticsObj>
getColStatistics(org.apache.hadoop.hive.ql.metadata.Table hmsTable) {
Table table = IcebergTableUtil.getTable(conf, hmsTable.getTTable());
- String statsPath = getStatsPath(table).toString();
+ Path statsPath = getStatsPath(table);
LOG.info("Using stats from puffin file at: {}", statsPath);
- try (PuffinReader reader =
Puffin.read(table.io().newInputFile(statsPath)).build()) {
+ return readColStats(table, statsPath).getStatsObj();
+ }
+
+ private ColumnStatistics readColStats(Table table, Path statsPath) {
Review Comment:
readColStats -> readColStatistics
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]