difin commented on code in PR #5389:
URL: https://github.com/apache/hive/pull/5389#discussion_r1709890906
##########
iceberg/iceberg-handler/src/main/java/org/apache/iceberg/mr/hive/HiveIcebergOutputCommitter.java:
##########
@@ -599,44 +598,29 @@ private void commitWrite(Table table, String branchName,
Long snapshotId, long s
* @param table The table we are changing
* @param startTime The start time of the commit - used only for
logging
* @param results The object containing the new files
- * @param rewritePolicy The rewrite policy to use for the insert
overwrite commit
* @param partitionSpecId The table spec_id for partition compaction
operation
* @param partitionPath The path of the compacted partition
*/
- private void commitCompaction(Table table, long startTime, FilesForCommit
results,
- RewritePolicy rewritePolicy, Integer partitionSpecId, String
partitionPath) {
- if (rewritePolicy == RewritePolicy.FULL_TABLE) {
- // Full table compaction
- Transaction transaction = table.newTransaction();
- DeleteFiles delete = transaction.newDelete();
- delete.deleteFromRowFilter(Expressions.alwaysTrue());
- delete.commit();
- ReplacePartitions overwrite = transaction.newReplacePartitions();
- results.dataFiles().forEach(overwrite::addFile);
- overwrite.commit();
- transaction.commitTransaction();
- LOG.debug("Compacted full table with files {}", results);
- } else {
- // Single partition compaction
- List<DataFile> existingDataFiles =
- IcebergTableUtil.getDataFiles(table, partitionSpecId, partitionPath,
- partitionPath == null ?
Predicate.isEqual(partitionSpecId).negate() :
Predicate.isEqual(partitionSpecId));
- List<DeleteFile> existingDeleteFiles =
- IcebergTableUtil.getDeleteFiles(table, partitionSpecId,
partitionPath,
- partitionPath == null ?
Predicate.isEqual(partitionSpecId).negate() :
Predicate.isEqual(partitionSpecId));
-
- RewriteFiles rewriteFiles = table.newRewrite();
- rewriteFiles.validateFromSnapshot(table.currentSnapshot().snapshotId());
-
- existingDataFiles.forEach(rewriteFiles::deleteFile);
- existingDeleteFiles.forEach(rewriteFiles::deleteFile);
- results.dataFiles().forEach(rewriteFiles::addFile);
-
- rewriteFiles.commit();
- LOG.debug("Compacted partition {} with files {}", partitionPath,
results);
- }
- LOG.info("Compaction commit took {} ms for table: {} with {} file(s)",
System.currentTimeMillis() - startTime,
- table, results.dataFiles().size());
+ private void commitCompaction(Table table, long startTime, FilesForCommit
results, Integer partitionSpecId,
+ String partitionPath) {
+ List<DataFile> existingDataFiles =
+ IcebergTableUtil.getDataFiles(table, partitionSpecId, partitionPath,
+ partitionPath == null ?
Predicate.isEqual(partitionSpecId).negate() :
Predicate.isEqual(partitionSpecId));
Review Comment:
Done
##########
iceberg/iceberg-handler/src/main/java/org/apache/iceberg/mr/hive/IcebergTableUtil.java:
##########
@@ -396,45 +396,49 @@ public static PartitionData toPartitionData(StructLike
sourceKey, Types.StructTy
/**
* Returns list of data files filtered by specId and partitionPath as
following:
- * 1. If matchBySpecId is true, then filters files by specId == file's
specId, else by specId != file's specId
- * 2. If partitionPath is not null, then also filters files where
partitionPath == file's partition path
+ * 1. If table is unpartitioned, returns all data files without filtering.
+ * 2. If matchBySpecId is true, then filters files by specId == file's
specId, else by specId != file's specId
+ * 3. If partitionPath is not null, then also filters files where
partitionPath == file's partition path
* @param table the iceberg table
* @param specId partition spec id
* @param partitionPath partition path
* @param matchBySpecId filter that's applied on data files' spec ids
*/
- public static List<DataFile> getDataFiles(Table table, int specId, String
partitionPath,
+ public static List<DataFile> getDataFiles(Table table, Integer specId,
String partitionPath,
Predicate<Object> matchBySpecId) {
CloseableIterable<FileScanTask> fileScanTasks =
table.newScan().useSnapshot(table.currentSnapshot().snapshotId()).ignoreResiduals().planFiles();
CloseableIterable<FileScanTask> filteredFileScanTasks =
CloseableIterable.filter(fileScanTasks, t -> {
DataFile file = t.asFileScanTask().file();
- return matchBySpecId.test(file.specId()) && (partitionPath == null
|| (partitionPath != null &&
-
table.specs().get(specId).partitionToPath(file.partition()).equals(partitionPath)));
+ return !table.spec().isPartitioned() ||
+ (matchBySpecId.test(file.specId()) && (partitionPath == null ||
(partitionPath != null &&
Review Comment:
Done
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]