difin commented on code in PR #5248:
URL: https://github.com/apache/hive/pull/5248#discussion_r1622939200
##########
iceberg/iceberg-handler/src/main/java/org/apache/iceberg/mr/hive/HiveIcebergOutputCommitter.java:
##########
@@ -558,33 +577,71 @@ private void commitWrite(Table table, String branchName,
Long snapshotId, long s
}
/**
- * Creates and commits an Iceberg insert overwrite change with the provided
data files.
- * For unpartitioned tables the table content is replaced with the new data
files. If not data files are provided
- * then the unpartitioned table is truncated.
- * For partitioned tables the relevant partitions are replaced with the new
data files. If no data files are provided
- * then the unpartitioned table remains unchanged.
- * @param table The table we are changing
- * @param startTime The start time of the commit - used only for logging
- * @param results The object containing the new files
- * @param rewritePolicy The rewrite policy to use for the insert overwrite
commit
+ * Creates and commits an Iceberg compaction change with the provided data
files.
+ * Either full table or a selected partition contents is replaced with
compacted files.
+ *
+ * @param table The table we are changing
+ * @param startTime The start time of the commit - used only
for logging
+ * @param results The object containing the new files
+ * @param rewritePolicy The rewrite policy to use for the insert
overwrite commit
+ * @param compactionPartSpecId The table spec_id for partition
compaction operation
+ * @param compactionPartitionPath The path of the compacted partition
*/
- private void commitOverwrite(Table table, String branchName, long startTime,
FilesForCommit results,
- RewritePolicy rewritePolicy) {
+ private void commitCompaction(Table table, long startTime, FilesForCommit
results,
+ RewritePolicy rewritePolicy, Integer compactionPartSpecId, String
compactionPartitionPath) {
Preconditions.checkArgument(results.deleteFiles().isEmpty(), "Can not
handle deletes with overwrite");
if (!results.dataFiles().isEmpty()) {
- Transaction transaction = table.newTransaction();
if (rewritePolicy == RewritePolicy.ALL_PARTITIONS) {
+ Transaction transaction = table.newTransaction();
DeleteFiles delete = transaction.newDelete();
delete.deleteFromRowFilter(Expressions.alwaysTrue());
delete.commit();
+ ReplacePartitions overwrite = transaction.newReplacePartitions();
+ results.dataFiles().forEach(overwrite::addFile);
+ overwrite.commit();
+ transaction.commitTransaction();
+ } else {
+ Pair<List<DataFile>, List<DeleteFile>> existingFiles =
IcebergTableUtil.getDataAndDeleteFiles(table,
+ compactionPartSpecId, compactionPartitionPath);
+
+ RewriteFiles rewriteFiles = table.newRewrite();
+
rewriteFiles.validateFromSnapshot(table.currentSnapshot().snapshotId());
+
+ existingFiles.first().forEach(rewriteFiles::deleteFile);
+ existingFiles.second().forEach(rewriteFiles::deleteFile);
+ results.dataFiles().forEach(rewriteFiles::addFile);
+
+ rewriteFiles.commit();
}
- ReplacePartitions overwrite = transaction.newReplacePartitions();
+ LOG.info("Compaction commit took {} ms for table: {} with {} file(s)",
System.currentTimeMillis() - startTime,
+ table, results.dataFiles().size());
+ } else {
+ LOG.info("Empty compaction commit, took {} ms for table: {}",
System.currentTimeMillis() - startTime, table);
+ }
+
+ LOG.debug("Compacted partitions with files {}", results);
+ }
+
+ /**
+ * Creates and commits an Iceberg insert overwrite change with the provided
data files.
+ * For unpartitioned tables the table content is replaced with the new data
files. If not data files are provided
+ * then the unpartitioned table is truncated.
+ * For partitioned tables the relevant partitions are replaced with the new
data files. If no data files are provided
+ * then the unpartitioned table remains unchanged.
+ *
+ * @param table The table we are changing
+ * @param startTime The start time of the commit - used only
for logging
+ * @param results The object containing the new files
+ */
+ private void commitOverwrite(Table table, String branchName, long startTime,
FilesForCommit results) {
+ Preconditions.checkArgument(results.deleteFiles().isEmpty(), "Can not
handle deletes with overwrite");
Review Comment:
I am not sure and it wasn't added as part of compaction, it was there before.
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]