This is an automated email from the ASF dual-hosted git repository.
etudenhoefner pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/iceberg.git
The following commit(s) were added to refs/heads/main by this push:
new ee0190d00a Spark 3.4: Fix RewriteDataFiles with partial progress
enabled and max-failed-commits larger than total-file-group (#12701)
ee0190d00a is described below
commit ee0190d00a24dae34d3e9fe4f168ef95bf924bd9
Author: Manu Zhang <[email protected]>
AuthorDate: Wed Apr 2 14:12:21 2025 +0800
Spark 3.4: Fix RewriteDataFiles with partial progress enabled and
max-failed-commits larger than total-file-group (#12701)
---
.../spark/actions/RewriteDataFilesSparkAction.java | 3 ++-
.../spark/actions/TestRewriteDataFilesAction.java | 28 ++++++++++++++++++++++
2 files changed, 30 insertions(+), 1 deletion(-)
diff --git
a/spark/v3.4/spark/src/main/java/org/apache/iceberg/spark/actions/RewriteDataFilesSparkAction.java
b/spark/v3.4/spark/src/main/java/org/apache/iceberg/spark/actions/RewriteDataFilesSparkAction.java
index e04a0c88b4..664612cfa3 100644
---
a/spark/v3.4/spark/src/main/java/org/apache/iceberg/spark/actions/RewriteDataFilesSparkAction.java
+++
b/spark/v3.4/spark/src/main/java/org/apache/iceberg/spark/actions/RewriteDataFilesSparkAction.java
@@ -377,7 +377,8 @@ public class RewriteDataFilesSparkAction
// stop commit service
commitService.close();
- int failedCommits = maxCommits - commitService.succeededCommits();
+ int totalCommits = Math.min(ctx.totalGroupCount(), maxCommits);
+ int failedCommits = totalCommits - commitService.succeededCommits();
if (failedCommits > 0 && failedCommits <= maxFailedCommits) {
LOG.warn(
"{} is true but {} rewrite commits failed. Check the logs to
determine why the individual "
diff --git
a/spark/v3.4/spark/src/test/java/org/apache/iceberg/spark/actions/TestRewriteDataFilesAction.java
b/spark/v3.4/spark/src/test/java/org/apache/iceberg/spark/actions/TestRewriteDataFilesAction.java
index a99f8c234f..dfe8f51728 100644
---
a/spark/v3.4/spark/src/test/java/org/apache/iceberg/spark/actions/TestRewriteDataFilesAction.java
+++
b/spark/v3.4/spark/src/test/java/org/apache/iceberg/spark/actions/TestRewriteDataFilesAction.java
@@ -1320,6 +1320,34 @@ public class TestRewriteDataFilesAction extends TestBase
{
shouldHaveACleanCache(table);
}
+ @TestTemplate
+ public void
testParallelPartialProgressWithMaxFailedCommitsLargerThanTotalFileGroup() {
+ Table table = createTable(20);
+ int fileSize = averageFileSize(table);
+
+ List<Object[]> originalData = currentData();
+
+ RewriteDataFilesSparkAction rewrite =
+ basicRewrite(table)
+ .option(
+ RewriteDataFiles.MAX_FILE_GROUP_SIZE_BYTES,
Integer.toString(fileSize * 2 + 1000))
+ .option(RewriteDataFiles.MAX_CONCURRENT_FILE_GROUP_REWRITES, "3")
+ .option(RewriteDataFiles.PARTIAL_PROGRESS_ENABLED, "true")
+ // Since we can have at most one commit per file group and there
are only 10 file
+ // groups, actual number of commits is 10
+ .option(RewriteDataFiles.PARTIAL_PROGRESS_MAX_COMMITS, "20")
+ .option(RewriteDataFiles.PARTIAL_PROGRESS_MAX_FAILED_COMMITS, "0");
+ rewrite.execute();
+
+ table.refresh();
+
+ List<Object[]> postRewriteData = currentData();
+ assertEquals("We shouldn't have changed the data", originalData,
postRewriteData);
+ shouldHaveSnapshots(table, 11);
+ shouldHaveNoOrphans(table);
+ shouldHaveACleanCache(table);
+ }
+
@TestTemplate
public void testInvalidOptions() {
Table table = createTable(20);