[ https://issues.apache.org/jira/browse/HIVE-27158?focusedWorklogId=855799&page=com.atlassian.jira.plugin.system.issuetabpanels:worklog-tabpanel#worklog-855799 ]
ASF GitHub Bot logged work on HIVE-27158: ----------------------------------------- Author: ASF GitHub Bot Created on: 10/Apr/23 12:37 Start Date: 10/Apr/23 12:37 Worklog Time Spent: 10m Work Description: ayushtkn commented on code in PR #4131: URL: https://github.com/apache/hive/pull/4131#discussion_r1161623163 ########## common/src/java/org/apache/hadoop/hive/conf/HiveConf.java: ########## @@ -2205,9 +2205,8 @@ public static enum ConfVars { "padding tolerance config (hive.exec.orc.block.padding.tolerance)."), HIVE_ORC_CODEC_POOL("hive.use.orc.codec.pool", false, "Whether to use codec pool in ORC. Disable if there are bugs with codec reuse."), - HIVE_USE_STATS_FROM("hive.use.stats.from","iceberg","Use stats from iceberg table snapshot for query " + - "planning. This has three values metastore, puffin and iceberg"), - + HIVE_ICEBERG_STATS_SOURCE("hive.iceberg.stats.source","iceberg","Use stats from iceberg table snapshot for query " + + "planning. This has three values metastore and iceberg"), Review Comment: > This has three values metastore and iceberg what is the third value,? ########## iceberg/iceberg-handler/src/main/java/org/apache/iceberg/mr/hive/HiveIcebergStorageHandler.java: ########## @@ -361,6 +378,83 @@ private Table getTable(org.apache.hadoop.hive.ql.metadata.Table hmsTable) { return table; } + + @Override + public boolean canSetColStatistics() { + return getStatsSource().equals(ICEBERG); + } + + @Override + public boolean canProvideColStatistics(org.apache.hadoop.hive.ql.metadata.Table hmsTable) { + Table table = Catalogs.loadTable(conf, Utilities.getTableDesc(hmsTable).getProperties()); + if (table.currentSnapshot() != null) { + Path statsPath = getStatsPath(table); + if (getStatsSource().equals(ICEBERG)) { Review Comment: can use ```canSetColStatistics()``` ########## iceberg/iceberg-handler/src/main/java/org/apache/iceberg/mr/hive/HiveIcebergStorageHandler.java: ########## @@ -361,6 +378,83 @@ private Table getTable(org.apache.hadoop.hive.ql.metadata.Table hmsTable) { return table; } + + @Override + public boolean canSetColStatistics() { + return getStatsSource().equals(ICEBERG); + } + + @Override + public boolean canProvideColStatistics(org.apache.hadoop.hive.ql.metadata.Table hmsTable) { + Table table = Catalogs.loadTable(conf, Utilities.getTableDesc(hmsTable).getProperties()); + if (table.currentSnapshot() != null) { + Path statsPath = getStatsPath(table); + if (getStatsSource().equals(ICEBERG)) { + try (FileSystem fs = statsPath.getFileSystem(conf)) { + if (fs.exists(statsPath)) { + return true; + } + } catch (IOException e) { + LOG.warn(e.getMessage()); + } + } + } + return false; + } + + @Override + public List<ColumnStatisticsObj> getColStatistics(org.apache.hadoop.hive.ql.metadata.Table hmsTable) { + Table table = Catalogs.loadTable(conf, Utilities.getTableDesc(hmsTable).getProperties()); + String statsPath = getStatsPath(table).toString(); + LOG.info("Using stats from puffin file at:" + statsPath); Review Comment: Logger format: ``` LOG.info("Using stats from puffin file at: {}", statsPath); ``` ########## iceberg/iceberg-handler/src/main/java/org/apache/iceberg/mr/hive/HiveIcebergStorageHandler.java: ########## @@ -361,6 +378,83 @@ private Table getTable(org.apache.hadoop.hive.ql.metadata.Table hmsTable) { return table; } + + @Override + public boolean canSetColStatistics() { + return getStatsSource().equals(ICEBERG); + } + + @Override + public boolean canProvideColStatistics(org.apache.hadoop.hive.ql.metadata.Table hmsTable) { + Table table = Catalogs.loadTable(conf, Utilities.getTableDesc(hmsTable).getProperties()); + if (table.currentSnapshot() != null) { + Path statsPath = getStatsPath(table); + if (getStatsSource().equals(ICEBERG)) { + try (FileSystem fs = statsPath.getFileSystem(conf)) { + if (fs.exists(statsPath)) { + return true; + } + } catch (IOException e) { + LOG.warn(e.getMessage()); + } + } + } + return false; + } + + @Override + public List<ColumnStatisticsObj> getColStatistics(org.apache.hadoop.hive.ql.metadata.Table hmsTable) { + Table table = Catalogs.loadTable(conf, Utilities.getTableDesc(hmsTable).getProperties()); + String statsPath = getStatsPath(table).toString(); + LOG.info("Using stats from puffin file at:" + statsPath); + try (PuffinReader reader = Puffin.read(table.io().newInputFile(statsPath)).build()) { + List<BlobMetadata> blobMetadata = reader.fileMetadata().blobs(); + Map<BlobMetadata, List<ColumnStatistics>> collect = + Streams.stream(reader.readAll(blobMetadata)).collect(Collectors.toMap(Pair::first, + blobMetadataByteBufferPair -> SerializationUtils.deserialize( + ByteBuffers.toByteArray(blobMetadataByteBufferPair.second())))); + return collect.get(blobMetadata.get(0)).get(0).getStatsObj(); + } catch (IOException e) { + LOG.error(String.valueOf(e)); + } + return null; + } + + + @Override + public boolean setColStatistics(org.apache.hadoop.hive.ql.metadata.Table table, + List<ColumnStatistics> colStats) { + TableDesc tableDesc = Utilities.getTableDesc(table); + Table tbl = Catalogs.loadTable(conf, tableDesc.getProperties()); + String snapshotId = tbl.name() + tbl.currentSnapshot().snapshotId(); + byte[] serializeColStats = SerializationUtils.serialize((Serializable) colStats); + + try (PuffinWriter writer = Puffin.write(tbl.io().newOutputFile(getStatsPath(tbl).toString())) + .createdBy("Hive").build()) { + writer.add( + new Blob( + tbl.name() + "-" + snapshotId, + ImmutableList.of(1), + tbl.currentSnapshot().snapshotId(), + tbl.currentSnapshot().sequenceNumber(), + ByteBuffer.wrap(serializeColStats), + PuffinCompressionCodec.NONE, + ImmutableMap.of())); + writer.finish(); + } catch (IOException e) { + LOG.error(String.valueOf(e)); + } + return false; + } + + private String getStatsSource() { + return HiveConf.getVar(conf, HiveConf.ConfVars.HIVE_ICEBERG_STATS_SOURCE, "metastore").toLowerCase(); + } Review Comment: I don't understand this, why the default is here ``metastore``? when the config has default set as Iceberg. Who is using that default then ``` HIVE_ICEBERG_STATS_SOURCE("hive.iceberg.stats.source","iceberg", ``` ########## iceberg/iceberg-handler/src/main/java/org/apache/iceberg/mr/hive/HiveIcebergStorageHandler.java: ########## @@ -361,6 +378,83 @@ private Table getTable(org.apache.hadoop.hive.ql.metadata.Table hmsTable) { return table; } + + @Override + public boolean canSetColStatistics() { + return getStatsSource().equals(ICEBERG); + } + + @Override + public boolean canProvideColStatistics(org.apache.hadoop.hive.ql.metadata.Table hmsTable) { + Table table = Catalogs.loadTable(conf, Utilities.getTableDesc(hmsTable).getProperties()); + if (table.currentSnapshot() != null) { + Path statsPath = getStatsPath(table); + if (getStatsSource().equals(ICEBERG)) { + try (FileSystem fs = statsPath.getFileSystem(conf)) { + if (fs.exists(statsPath)) { + return true; + } + } catch (IOException e) { + LOG.warn(e.getMessage()); + } + } + } + return false; + } + + @Override + public List<ColumnStatisticsObj> getColStatistics(org.apache.hadoop.hive.ql.metadata.Table hmsTable) { + Table table = Catalogs.loadTable(conf, Utilities.getTableDesc(hmsTable).getProperties()); Review Comment: Change to ``Table table = IcebergTableUtil.getTable(conf, hmsTable.getTTable());`` ########## iceberg/iceberg-handler/src/main/java/org/apache/iceberg/mr/hive/HiveIcebergStorageHandler.java: ########## @@ -361,6 +378,83 @@ private Table getTable(org.apache.hadoop.hive.ql.metadata.Table hmsTable) { return table; } + + @Override + public boolean canSetColStatistics() { + return getStatsSource().equals(ICEBERG); + } + + @Override + public boolean canProvideColStatistics(org.apache.hadoop.hive.ql.metadata.Table hmsTable) { + Table table = Catalogs.loadTable(conf, Utilities.getTableDesc(hmsTable).getProperties()); + if (table.currentSnapshot() != null) { + Path statsPath = getStatsPath(table); + if (getStatsSource().equals(ICEBERG)) { + try (FileSystem fs = statsPath.getFileSystem(conf)) { + if (fs.exists(statsPath)) { + return true; + } + } catch (IOException e) { + LOG.warn(e.getMessage()); Review Comment: Log the trace as well, rather than just the message. Along with the table name and the stats path and the snapshot id ########## iceberg/iceberg-handler/src/main/java/org/apache/iceberg/mr/hive/HiveIcebergStorageHandler.java: ########## @@ -361,6 +378,83 @@ private Table getTable(org.apache.hadoop.hive.ql.metadata.Table hmsTable) { return table; } + + @Override + public boolean canSetColStatistics() { + return getStatsSource().equals(ICEBERG); + } + + @Override + public boolean canProvideColStatistics(org.apache.hadoop.hive.ql.metadata.Table hmsTable) { + Table table = Catalogs.loadTable(conf, Utilities.getTableDesc(hmsTable).getProperties()); + if (table.currentSnapshot() != null) { + Path statsPath = getStatsPath(table); + if (getStatsSource().equals(ICEBERG)) { + try (FileSystem fs = statsPath.getFileSystem(conf)) { + if (fs.exists(statsPath)) { + return true; + } + } catch (IOException e) { + LOG.warn(e.getMessage()); + } + } + } + return false; + } + + @Override + public List<ColumnStatisticsObj> getColStatistics(org.apache.hadoop.hive.ql.metadata.Table hmsTable) { + Table table = Catalogs.loadTable(conf, Utilities.getTableDesc(hmsTable).getProperties()); + String statsPath = getStatsPath(table).toString(); + LOG.info("Using stats from puffin file at:" + statsPath); + try (PuffinReader reader = Puffin.read(table.io().newInputFile(statsPath)).build()) { + List<BlobMetadata> blobMetadata = reader.fileMetadata().blobs(); + Map<BlobMetadata, List<ColumnStatistics>> collect = + Streams.stream(reader.readAll(blobMetadata)).collect(Collectors.toMap(Pair::first, + blobMetadataByteBufferPair -> SerializationUtils.deserialize( + ByteBuffers.toByteArray(blobMetadataByteBufferPair.second())))); + return collect.get(blobMetadata.get(0)).get(0).getStatsObj(); + } catch (IOException e) { + LOG.error(String.valueOf(e)); + } + return null; + } + + + @Override + public boolean setColStatistics(org.apache.hadoop.hive.ql.metadata.Table table, + List<ColumnStatistics> colStats) { + TableDesc tableDesc = Utilities.getTableDesc(table); + Table tbl = Catalogs.loadTable(conf, tableDesc.getProperties()); + String snapshotId = tbl.name() + tbl.currentSnapshot().snapshotId(); Review Comment: can currentSnapshot be ``null``? like empty table and then somebody shoots a CLI command to compute statistics? ########## ql/src/java/org/apache/hadoop/hive/ql/stats/StatsUtils.java: ########## @@ -1069,8 +1069,12 @@ public static List<ColStatistics> getTableColumnStats( } if (fetchColStats && !colStatsToRetrieve.isEmpty()) { try { - List<ColumnStatisticsObj> colStat = Hive.get().getTableColumnStatistics( - dbName, tabName, colStatsToRetrieve, false); + List<ColumnStatisticsObj> colStat; + if (table != null && table.isNonNative() && table.getStorageHandler().canProvideColStatistics(table)) { Review Comment: table can not be `null` at this point ########## iceberg/iceberg-handler/src/main/java/org/apache/iceberg/mr/hive/HiveIcebergStorageHandler.java: ########## @@ -361,6 +378,83 @@ private Table getTable(org.apache.hadoop.hive.ql.metadata.Table hmsTable) { return table; } + + @Override + public boolean canSetColStatistics() { + return getStatsSource().equals(ICEBERG); + } + + @Override + public boolean canProvideColStatistics(org.apache.hadoop.hive.ql.metadata.Table hmsTable) { + Table table = Catalogs.loadTable(conf, Utilities.getTableDesc(hmsTable).getProperties()); Review Comment: Change to `` Table table = IcebergTableUtil.getTable(conf, hmsTable.getTTable());`` ########## iceberg/iceberg-handler/src/main/java/org/apache/iceberg/mr/hive/HiveIcebergStorageHandler.java: ########## @@ -361,6 +378,83 @@ private Table getTable(org.apache.hadoop.hive.ql.metadata.Table hmsTable) { return table; } + + @Override + public boolean canSetColStatistics() { + return getStatsSource().equals(ICEBERG); + } + + @Override + public boolean canProvideColStatistics(org.apache.hadoop.hive.ql.metadata.Table hmsTable) { + Table table = Catalogs.loadTable(conf, Utilities.getTableDesc(hmsTable).getProperties()); + if (table.currentSnapshot() != null) { + Path statsPath = getStatsPath(table); + if (getStatsSource().equals(ICEBERG)) { + try (FileSystem fs = statsPath.getFileSystem(conf)) { + if (fs.exists(statsPath)) { + return true; + } + } catch (IOException e) { + LOG.warn(e.getMessage()); + } + } + } + return false; + } + + @Override + public List<ColumnStatisticsObj> getColStatistics(org.apache.hadoop.hive.ql.metadata.Table hmsTable) { + Table table = Catalogs.loadTable(conf, Utilities.getTableDesc(hmsTable).getProperties()); + String statsPath = getStatsPath(table).toString(); + LOG.info("Using stats from puffin file at:" + statsPath); + try (PuffinReader reader = Puffin.read(table.io().newInputFile(statsPath)).build()) { + List<BlobMetadata> blobMetadata = reader.fileMetadata().blobs(); + Map<BlobMetadata, List<ColumnStatistics>> collect = + Streams.stream(reader.readAll(blobMetadata)).collect(Collectors.toMap(Pair::first, + blobMetadataByteBufferPair -> SerializationUtils.deserialize( + ByteBuffers.toByteArray(blobMetadataByteBufferPair.second())))); + return collect.get(blobMetadata.get(0)).get(0).getStatsObj(); + } catch (IOException e) { + LOG.error(String.valueOf(e)); + } + return null; + } + + + @Override + public boolean setColStatistics(org.apache.hadoop.hive.ql.metadata.Table table, + List<ColumnStatistics> colStats) { + TableDesc tableDesc = Utilities.getTableDesc(table); + Table tbl = Catalogs.loadTable(conf, tableDesc.getProperties()); + String snapshotId = tbl.name() + tbl.currentSnapshot().snapshotId(); + byte[] serializeColStats = SerializationUtils.serialize((Serializable) colStats); + + try (PuffinWriter writer = Puffin.write(tbl.io().newOutputFile(getStatsPath(tbl).toString())) + .createdBy("Hive").build()) { Review Comment: Use constant ``` Constants.HIVE_ENGINE ``` ########## iceberg/iceberg-handler/src/main/java/org/apache/iceberg/mr/hive/HiveIcebergStorageHandler.java: ########## @@ -361,6 +378,83 @@ private Table getTable(org.apache.hadoop.hive.ql.metadata.Table hmsTable) { return table; } + + @Override + public boolean canSetColStatistics() { + return getStatsSource().equals(ICEBERG); + } + + @Override + public boolean canProvideColStatistics(org.apache.hadoop.hive.ql.metadata.Table hmsTable) { + Table table = Catalogs.loadTable(conf, Utilities.getTableDesc(hmsTable).getProperties()); + if (table.currentSnapshot() != null) { + Path statsPath = getStatsPath(table); + if (getStatsSource().equals(ICEBERG)) { + try (FileSystem fs = statsPath.getFileSystem(conf)) { + if (fs.exists(statsPath)) { + return true; + } + } catch (IOException e) { + LOG.warn(e.getMessage()); + } + } + } + return false; + } + + @Override + public List<ColumnStatisticsObj> getColStatistics(org.apache.hadoop.hive.ql.metadata.Table hmsTable) { + Table table = Catalogs.loadTable(conf, Utilities.getTableDesc(hmsTable).getProperties()); + String statsPath = getStatsPath(table).toString(); + LOG.info("Using stats from puffin file at:" + statsPath); + try (PuffinReader reader = Puffin.read(table.io().newInputFile(statsPath)).build()) { + List<BlobMetadata> blobMetadata = reader.fileMetadata().blobs(); + Map<BlobMetadata, List<ColumnStatistics>> collect = + Streams.stream(reader.readAll(blobMetadata)).collect(Collectors.toMap(Pair::first, + blobMetadataByteBufferPair -> SerializationUtils.deserialize( + ByteBuffers.toByteArray(blobMetadataByteBufferPair.second())))); + return collect.get(blobMetadata.get(0)).get(0).getStatsObj(); + } catch (IOException e) { + LOG.error(String.valueOf(e)); + } + return null; + } + + + @Override + public boolean setColStatistics(org.apache.hadoop.hive.ql.metadata.Table table, + List<ColumnStatistics> colStats) { + TableDesc tableDesc = Utilities.getTableDesc(table); + Table tbl = Catalogs.loadTable(conf, tableDesc.getProperties()); Review Comment: Use ``IcebergTableUtil`` to fetch the table, It has cache, fetching and reading the table metadata multiple times have severe performance penalties, cache the table and use it from cache, unless necessary ########## iceberg/iceberg-handler/src/main/java/org/apache/iceberg/mr/hive/HiveIcebergStorageHandler.java: ########## @@ -361,6 +378,83 @@ private Table getTable(org.apache.hadoop.hive.ql.metadata.Table hmsTable) { return table; } + + @Override + public boolean canSetColStatistics() { + return getStatsSource().equals(ICEBERG); + } + + @Override + public boolean canProvideColStatistics(org.apache.hadoop.hive.ql.metadata.Table hmsTable) { + Table table = Catalogs.loadTable(conf, Utilities.getTableDesc(hmsTable).getProperties()); + if (table.currentSnapshot() != null) { + Path statsPath = getStatsPath(table); + if (getStatsSource().equals(ICEBERG)) { + try (FileSystem fs = statsPath.getFileSystem(conf)) { + if (fs.exists(statsPath)) { + return true; + } + } catch (IOException e) { + LOG.warn(e.getMessage()); + } + } + } + return false; + } + + @Override + public List<ColumnStatisticsObj> getColStatistics(org.apache.hadoop.hive.ql.metadata.Table hmsTable) { + Table table = Catalogs.loadTable(conf, Utilities.getTableDesc(hmsTable).getProperties()); + String statsPath = getStatsPath(table).toString(); + LOG.info("Using stats from puffin file at:" + statsPath); + try (PuffinReader reader = Puffin.read(table.io().newInputFile(statsPath)).build()) { + List<BlobMetadata> blobMetadata = reader.fileMetadata().blobs(); + Map<BlobMetadata, List<ColumnStatistics>> collect = + Streams.stream(reader.readAll(blobMetadata)).collect(Collectors.toMap(Pair::first, + blobMetadataByteBufferPair -> SerializationUtils.deserialize( + ByteBuffers.toByteArray(blobMetadataByteBufferPair.second())))); + return collect.get(blobMetadata.get(0)).get(0).getStatsObj(); + } catch (IOException e) { + LOG.error(String.valueOf(e)); + } + return null; Review Comment: Why don't we throw exception here? Issue Time Tracking ------------------- Worklog Id: (was: 855799) Time Spent: 7.5h (was: 7h 20m) > Store hive columns stats in puffin files for iceberg tables > ----------------------------------------------------------- > > Key: HIVE-27158 > URL: https://issues.apache.org/jira/browse/HIVE-27158 > Project: Hive > Issue Type: Improvement > Reporter: Simhadri Govindappa > Assignee: Simhadri Govindappa > Priority: Major > Labels: pull-request-available > Time Spent: 7.5h > Remaining Estimate: 0h > -- This message was sent by Atlassian Jira (v8.20.10#820010)