This is an automated email from the ASF dual-hosted git repository.

etudenhoefner pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/iceberg.git


The following commit(s) were added to refs/heads/main by this push:
     new ee0190d00a Spark 3.4: Fix RewriteDataFiles with partial progress 
enabled and max-failed-commits larger than total-file-group (#12701)
ee0190d00a is described below

commit ee0190d00a24dae34d3e9fe4f168ef95bf924bd9
Author: Manu Zhang <[email protected]>
AuthorDate: Wed Apr 2 14:12:21 2025 +0800

    Spark 3.4: Fix RewriteDataFiles with partial progress enabled and 
max-failed-commits larger than total-file-group (#12701)
---
 .../spark/actions/RewriteDataFilesSparkAction.java |  3 ++-
 .../spark/actions/TestRewriteDataFilesAction.java  | 28 ++++++++++++++++++++++
 2 files changed, 30 insertions(+), 1 deletion(-)

diff --git 
a/spark/v3.4/spark/src/main/java/org/apache/iceberg/spark/actions/RewriteDataFilesSparkAction.java
 
b/spark/v3.4/spark/src/main/java/org/apache/iceberg/spark/actions/RewriteDataFilesSparkAction.java
index e04a0c88b4..664612cfa3 100644
--- 
a/spark/v3.4/spark/src/main/java/org/apache/iceberg/spark/actions/RewriteDataFilesSparkAction.java
+++ 
b/spark/v3.4/spark/src/main/java/org/apache/iceberg/spark/actions/RewriteDataFilesSparkAction.java
@@ -377,7 +377,8 @@ public class RewriteDataFilesSparkAction
     // stop commit service
     commitService.close();
 
-    int failedCommits = maxCommits - commitService.succeededCommits();
+    int totalCommits = Math.min(ctx.totalGroupCount(), maxCommits);
+    int failedCommits = totalCommits - commitService.succeededCommits();
     if (failedCommits > 0 && failedCommits <= maxFailedCommits) {
       LOG.warn(
           "{} is true but {} rewrite commits failed. Check the logs to 
determine why the individual "
diff --git 
a/spark/v3.4/spark/src/test/java/org/apache/iceberg/spark/actions/TestRewriteDataFilesAction.java
 
b/spark/v3.4/spark/src/test/java/org/apache/iceberg/spark/actions/TestRewriteDataFilesAction.java
index a99f8c234f..dfe8f51728 100644
--- 
a/spark/v3.4/spark/src/test/java/org/apache/iceberg/spark/actions/TestRewriteDataFilesAction.java
+++ 
b/spark/v3.4/spark/src/test/java/org/apache/iceberg/spark/actions/TestRewriteDataFilesAction.java
@@ -1320,6 +1320,34 @@ public class TestRewriteDataFilesAction extends TestBase 
{
     shouldHaveACleanCache(table);
   }
 
+  @TestTemplate
+  public void 
testParallelPartialProgressWithMaxFailedCommitsLargerThanTotalFileGroup() {
+    Table table = createTable(20);
+    int fileSize = averageFileSize(table);
+
+    List<Object[]> originalData = currentData();
+
+    RewriteDataFilesSparkAction rewrite =
+        basicRewrite(table)
+            .option(
+                RewriteDataFiles.MAX_FILE_GROUP_SIZE_BYTES, 
Integer.toString(fileSize * 2 + 1000))
+            .option(RewriteDataFiles.MAX_CONCURRENT_FILE_GROUP_REWRITES, "3")
+            .option(RewriteDataFiles.PARTIAL_PROGRESS_ENABLED, "true")
+            // Since we can have at most one commit per file group and there 
are only 10 file
+            // groups, actual number of commits is 10
+            .option(RewriteDataFiles.PARTIAL_PROGRESS_MAX_COMMITS, "20")
+            .option(RewriteDataFiles.PARTIAL_PROGRESS_MAX_FAILED_COMMITS, "0");
+    rewrite.execute();
+
+    table.refresh();
+
+    List<Object[]> postRewriteData = currentData();
+    assertEquals("We shouldn't have changed the data", originalData, 
postRewriteData);
+    shouldHaveSnapshots(table, 11);
+    shouldHaveNoOrphans(table);
+    shouldHaveACleanCache(table);
+  }
+
   @TestTemplate
   public void testInvalidOptions() {
     Table table = createTable(20);

Reply via email to