difin commented on code in PR #5389:
URL: https://github.com/apache/hive/pull/5389#discussion_r1709890906


##########
iceberg/iceberg-handler/src/main/java/org/apache/iceberg/mr/hive/HiveIcebergOutputCommitter.java:
##########
@@ -599,44 +598,29 @@ private void commitWrite(Table table, String branchName, 
Long snapshotId, long s
    * @param table             The table we are changing
    * @param startTime         The start time of the commit - used only for 
logging
    * @param results           The object containing the new files
-   * @param rewritePolicy     The rewrite policy to use for the insert 
overwrite commit
    * @param partitionSpecId   The table spec_id for partition compaction 
operation
    * @param partitionPath     The path of the compacted partition
    */
-  private void commitCompaction(Table table, long startTime, FilesForCommit 
results,
-      RewritePolicy rewritePolicy, Integer partitionSpecId, String 
partitionPath) {
-    if (rewritePolicy == RewritePolicy.FULL_TABLE) {
-      // Full table compaction
-      Transaction transaction = table.newTransaction();
-      DeleteFiles delete = transaction.newDelete();
-      delete.deleteFromRowFilter(Expressions.alwaysTrue());
-      delete.commit();
-      ReplacePartitions overwrite = transaction.newReplacePartitions();
-      results.dataFiles().forEach(overwrite::addFile);
-      overwrite.commit();
-      transaction.commitTransaction();
-      LOG.debug("Compacted full table with files {}", results);
-    } else {
-      // Single partition compaction
-      List<DataFile> existingDataFiles =
-          IcebergTableUtil.getDataFiles(table, partitionSpecId, partitionPath,
-              partitionPath == null ? 
Predicate.isEqual(partitionSpecId).negate() : 
Predicate.isEqual(partitionSpecId));
-      List<DeleteFile> existingDeleteFiles =
-          IcebergTableUtil.getDeleteFiles(table, partitionSpecId, 
partitionPath,
-              partitionPath == null ? 
Predicate.isEqual(partitionSpecId).negate() : 
Predicate.isEqual(partitionSpecId));
-
-      RewriteFiles rewriteFiles = table.newRewrite();
-      rewriteFiles.validateFromSnapshot(table.currentSnapshot().snapshotId());
-
-      existingDataFiles.forEach(rewriteFiles::deleteFile);
-      existingDeleteFiles.forEach(rewriteFiles::deleteFile);
-      results.dataFiles().forEach(rewriteFiles::addFile);
-
-      rewriteFiles.commit();
-      LOG.debug("Compacted partition {} with files {}", partitionPath, 
results);
-    }
-    LOG.info("Compaction commit took {} ms for table: {} with {} file(s)", 
System.currentTimeMillis() - startTime,
-        table, results.dataFiles().size());
+  private void commitCompaction(Table table, long startTime, FilesForCommit 
results, Integer partitionSpecId,
+      String partitionPath) {
+    List<DataFile> existingDataFiles =
+        IcebergTableUtil.getDataFiles(table, partitionSpecId, partitionPath,
+            partitionPath == null ? 
Predicate.isEqual(partitionSpecId).negate() : 
Predicate.isEqual(partitionSpecId));

Review Comment:
   I've moved the predicate definition inside the function as you suggested, 
but had to define it a little bit differently, because it has to be an 
effectively final variable because it is used later on in a lambda function.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to