deniskuzZ commented on code in PR #5248: URL: https://github.com/apache/hive/pull/5248#discussion_r1647522330
########## iceberg/iceberg-handler/src/main/java/org/apache/iceberg/mr/hive/HiveIcebergOutputCommitter.java: ########## @@ -575,33 +592,71 @@ private void commitWrite(Table table, String branchName, Long snapshotId, long s } /** - * Creates and commits an Iceberg insert overwrite change with the provided data files. - * For unpartitioned tables the table content is replaced with the new data files. If not data files are provided - * then the unpartitioned table is truncated. - * For partitioned tables the relevant partitions are replaced with the new data files. If no data files are provided - * then the unpartitioned table remains unchanged. - * @param table The table we are changing - * @param startTime The start time of the commit - used only for logging - * @param results The object containing the new files - * @param rewritePolicy The rewrite policy to use for the insert overwrite commit + * Creates and commits an Iceberg compaction change with the provided data files. + * Either full table or a selected partition contents is replaced with compacted files. + * + * @param table The table we are changing + * @param startTime The start time of the commit - used only for logging + * @param results The object containing the new files + * @param rewritePolicy The rewrite policy to use for the insert overwrite commit + * @param partitionSpecId The table spec_id for partition compaction operation + * @param partitionPath The path of the compacted partition */ - private void commitOverwrite(Table table, String branchName, long startTime, FilesForCommit results, - RewritePolicy rewritePolicy) { + private void commitCompaction(Table table, long startTime, FilesForCommit results, + RewritePolicy rewritePolicy, Integer partitionSpecId, String partitionPath) { Preconditions.checkArgument(results.deleteFiles().isEmpty(), "Can not handle deletes with overwrite"); Review Comment: do we need this check for compaction? ########## iceberg/iceberg-handler/src/main/java/org/apache/iceberg/mr/hive/HiveIcebergOutputCommitter.java: ########## @@ -575,33 +592,71 @@ private void commitWrite(Table table, String branchName, Long snapshotId, long s } /** - * Creates and commits an Iceberg insert overwrite change with the provided data files. - * For unpartitioned tables the table content is replaced with the new data files. If not data files are provided - * then the unpartitioned table is truncated. - * For partitioned tables the relevant partitions are replaced with the new data files. If no data files are provided - * then the unpartitioned table remains unchanged. - * @param table The table we are changing - * @param startTime The start time of the commit - used only for logging - * @param results The object containing the new files - * @param rewritePolicy The rewrite policy to use for the insert overwrite commit + * Creates and commits an Iceberg compaction change with the provided data files. + * Either full table or a selected partition contents is replaced with compacted files. + * + * @param table The table we are changing + * @param startTime The start time of the commit - used only for logging + * @param results The object containing the new files + * @param rewritePolicy The rewrite policy to use for the insert overwrite commit + * @param partitionSpecId The table spec_id for partition compaction operation + * @param partitionPath The path of the compacted partition */ - private void commitOverwrite(Table table, String branchName, long startTime, FilesForCommit results, - RewritePolicy rewritePolicy) { + private void commitCompaction(Table table, long startTime, FilesForCommit results, + RewritePolicy rewritePolicy, Integer partitionSpecId, String partitionPath) { Preconditions.checkArgument(results.deleteFiles().isEmpty(), "Can not handle deletes with overwrite"); Review Comment: do we need this check for compaction? you don't even use them ########## iceberg/iceberg-handler/src/main/java/org/apache/iceberg/mr/hive/HiveIcebergOutputCommitter.java: ########## @@ -575,33 +592,71 @@ private void commitWrite(Table table, String branchName, Long snapshotId, long s } /** - * Creates and commits an Iceberg insert overwrite change with the provided data files. - * For unpartitioned tables the table content is replaced with the new data files. If not data files are provided - * then the unpartitioned table is truncated. - * For partitioned tables the relevant partitions are replaced with the new data files. If no data files are provided - * then the unpartitioned table remains unchanged. - * @param table The table we are changing - * @param startTime The start time of the commit - used only for logging - * @param results The object containing the new files - * @param rewritePolicy The rewrite policy to use for the insert overwrite commit + * Creates and commits an Iceberg compaction change with the provided data files. + * Either full table or a selected partition contents is replaced with compacted files. + * + * @param table The table we are changing + * @param startTime The start time of the commit - used only for logging + * @param results The object containing the new files + * @param rewritePolicy The rewrite policy to use for the insert overwrite commit + * @param partitionSpecId The table spec_id for partition compaction operation + * @param partitionPath The path of the compacted partition */ - private void commitOverwrite(Table table, String branchName, long startTime, FilesForCommit results, - RewritePolicy rewritePolicy) { + private void commitCompaction(Table table, long startTime, FilesForCommit results, + RewritePolicy rewritePolicy, Integer partitionSpecId, String partitionPath) { Preconditions.checkArgument(results.deleteFiles().isEmpty(), "Can not handle deletes with overwrite"); Review Comment: do we need this check for compaction? you don't even use `deleteFiles` here ########## iceberg/iceberg-handler/src/main/java/org/apache/iceberg/mr/hive/HiveIcebergOutputCommitter.java: ########## @@ -575,33 +592,71 @@ private void commitWrite(Table table, String branchName, Long snapshotId, long s } /** - * Creates and commits an Iceberg insert overwrite change with the provided data files. - * For unpartitioned tables the table content is replaced with the new data files. If not data files are provided - * then the unpartitioned table is truncated. - * For partitioned tables the relevant partitions are replaced with the new data files. If no data files are provided - * then the unpartitioned table remains unchanged. - * @param table The table we are changing - * @param startTime The start time of the commit - used only for logging - * @param results The object containing the new files - * @param rewritePolicy The rewrite policy to use for the insert overwrite commit + * Creates and commits an Iceberg compaction change with the provided data files. + * Either full table or a selected partition contents is replaced with compacted files. + * + * @param table The table we are changing + * @param startTime The start time of the commit - used only for logging + * @param results The object containing the new files + * @param rewritePolicy The rewrite policy to use for the insert overwrite commit + * @param partitionSpecId The table spec_id for partition compaction operation + * @param partitionPath The path of the compacted partition */ - private void commitOverwrite(Table table, String branchName, long startTime, FilesForCommit results, - RewritePolicy rewritePolicy) { + private void commitCompaction(Table table, long startTime, FilesForCommit results, + RewritePolicy rewritePolicy, Integer partitionSpecId, String partitionPath) { Preconditions.checkArgument(results.deleteFiles().isEmpty(), "Can not handle deletes with overwrite"); if (!results.dataFiles().isEmpty()) { - Transaction transaction = table.newTransaction(); if (rewritePolicy == RewritePolicy.ALL_PARTITIONS) { Review Comment: could we please add comments that it's a full table compaction ########## iceberg/iceberg-handler/src/main/java/org/apache/iceberg/mr/hive/HiveIcebergOutputCommitter.java: ########## @@ -575,33 +592,71 @@ private void commitWrite(Table table, String branchName, Long snapshotId, long s } /** - * Creates and commits an Iceberg insert overwrite change with the provided data files. - * For unpartitioned tables the table content is replaced with the new data files. If not data files are provided - * then the unpartitioned table is truncated. - * For partitioned tables the relevant partitions are replaced with the new data files. If no data files are provided - * then the unpartitioned table remains unchanged. - * @param table The table we are changing - * @param startTime The start time of the commit - used only for logging - * @param results The object containing the new files - * @param rewritePolicy The rewrite policy to use for the insert overwrite commit + * Creates and commits an Iceberg compaction change with the provided data files. + * Either full table or a selected partition contents is replaced with compacted files. + * + * @param table The table we are changing + * @param startTime The start time of the commit - used only for logging + * @param results The object containing the new files + * @param rewritePolicy The rewrite policy to use for the insert overwrite commit + * @param partitionSpecId The table spec_id for partition compaction operation + * @param partitionPath The path of the compacted partition */ - private void commitOverwrite(Table table, String branchName, long startTime, FilesForCommit results, - RewritePolicy rewritePolicy) { + private void commitCompaction(Table table, long startTime, FilesForCommit results, + RewritePolicy rewritePolicy, Integer partitionSpecId, String partitionPath) { Preconditions.checkArgument(results.deleteFiles().isEmpty(), "Can not handle deletes with overwrite"); if (!results.dataFiles().isEmpty()) { Review Comment: no need for nested if, return right away if `results.dataFiles().isEmpty()` ########## iceberg/iceberg-handler/src/main/java/org/apache/iceberg/mr/hive/HiveIcebergOutputCommitter.java: ########## @@ -575,33 +592,71 @@ private void commitWrite(Table table, String branchName, Long snapshotId, long s } /** - * Creates and commits an Iceberg insert overwrite change with the provided data files. - * For unpartitioned tables the table content is replaced with the new data files. If not data files are provided - * then the unpartitioned table is truncated. - * For partitioned tables the relevant partitions are replaced with the new data files. If no data files are provided - * then the unpartitioned table remains unchanged. - * @param table The table we are changing - * @param startTime The start time of the commit - used only for logging - * @param results The object containing the new files - * @param rewritePolicy The rewrite policy to use for the insert overwrite commit + * Creates and commits an Iceberg compaction change with the provided data files. + * Either full table or a selected partition contents is replaced with compacted files. + * + * @param table The table we are changing + * @param startTime The start time of the commit - used only for logging + * @param results The object containing the new files + * @param rewritePolicy The rewrite policy to use for the insert overwrite commit + * @param partitionSpecId The table spec_id for partition compaction operation + * @param partitionPath The path of the compacted partition */ - private void commitOverwrite(Table table, String branchName, long startTime, FilesForCommit results, - RewritePolicy rewritePolicy) { + private void commitCompaction(Table table, long startTime, FilesForCommit results, + RewritePolicy rewritePolicy, Integer partitionSpecId, String partitionPath) { Preconditions.checkArgument(results.deleteFiles().isEmpty(), "Can not handle deletes with overwrite"); if (!results.dataFiles().isEmpty()) { - Transaction transaction = table.newTransaction(); if (rewritePolicy == RewritePolicy.ALL_PARTITIONS) { + Transaction transaction = table.newTransaction(); DeleteFiles delete = transaction.newDelete(); delete.deleteFromRowFilter(Expressions.alwaysTrue()); delete.commit(); + ReplacePartitions overwrite = transaction.newReplacePartitions(); Review Comment: In the 2nd PR we'll replace this logic with a single snapshot rewrite, correct? ########## iceberg/iceberg-handler/src/main/java/org/apache/iceberg/mr/hive/HiveIcebergStorageHandler.java: ########## @@ -382,17 +381,20 @@ public DecomposedPredicate decomposePredicate(JobConf jobConf, Deserializer dese List<ExprNodeDesc> subExprNodes = pushedPredicate.getChildren(); if (subExprNodes.removeIf(nodeDesc -> nodeDesc.getCols() != null && - nodeDesc.getCols().contains(VirtualColumn.FILE_PATH.getName()))) { + (nodeDesc.getCols().contains(VirtualColumn.FILE_PATH.getName()) || + nodeDesc.getCols().contains(VirtualColumn.PARTITION_SPEC_ID.getName())))) { if (subExprNodes.size() == 1) { pushedPredicate = subExprNodes.get(0); } else if (subExprNodes.isEmpty()) { pushedPredicate = null; } } - predicate.pushedPredicate = (ExprNodeGenericFuncDesc) pushedPredicate; - Expression filterExpr = HiveIcebergInputFormat.getFilterExpr(conf, predicate.pushedPredicate); - if (filterExpr != null) { - SessionStateUtil.addResource(conf, InputFormatConfig.QUERY_FILTERS, filterExpr); + if (pushedPredicate instanceof ExprNodeGenericFuncDesc) { Review Comment: why do we need `instanceof` check here? -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: gitbox-unsubscr...@hive.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org --------------------------------------------------------------------- To unsubscribe, e-mail: gitbox-unsubscr...@hive.apache.org For additional commands, e-mail: gitbox-h...@hive.apache.org