This is an automated email from the ASF dual-hosted git repository.

lzljs3620320 pushed a commit to branch release-1.1
in repository https://gitbox.apache.org/repos/asf/paimon.git


The following commit(s) were added to refs/heads/release-1.1 by this push:
     new db2ccb98ed [core] Reduce the file size per append table compaction 
task (#5493)
db2ccb98ed is described below

commit db2ccb98ed41002a1b3b85b5f47f5bbd01ce7a02
Author: YeJunHao <[email protected]>
AuthorDate: Mon Apr 21 11:40:41 2025 +0800

    [core] Reduce the file size per append table compaction task (#5493)
---
 .../paimon/append/UnawareAppendTableCompactionCoordinator.java |  2 +-
 .../append/UnawareAppendTableCompactionCoordinatorTest.java    | 10 +++++-----
 .../paimon/spark/procedure/CompactProcedureTestBase.scala      |  4 ++--
 3 files changed, 8 insertions(+), 8 deletions(-)

diff --git 
a/paimon-core/src/main/java/org/apache/paimon/append/UnawareAppendTableCompactionCoordinator.java
 
b/paimon-core/src/main/java/org/apache/paimon/append/UnawareAppendTableCompactionCoordinator.java
index 565bd5d99d..937a589df2 100644
--- 
a/paimon-core/src/main/java/org/apache/paimon/append/UnawareAppendTableCompactionCoordinator.java
+++ 
b/paimon-core/src/main/java/org/apache/paimon/append/UnawareAppendTableCompactionCoordinator.java
@@ -330,7 +330,7 @@ public class UnawareAppendTableCompactionCoordinator {
             }
 
             public boolean binFull() {
-                return totalFileSize >= targetFileSize * 50 && fileNum >= 
minFileNum;
+                return totalFileSize >= targetFileSize * 2 && fileNum >= 
minFileNum;
             }
         }
     }
diff --git 
a/paimon-core/src/test/java/org/apache/paimon/append/UnawareAppendTableCompactionCoordinatorTest.java
 
b/paimon-core/src/test/java/org/apache/paimon/append/UnawareAppendTableCompactionCoordinatorTest.java
index 13f21cdfe0..382926fac6 100644
--- 
a/paimon-core/src/test/java/org/apache/paimon/append/UnawareAppendTableCompactionCoordinatorTest.java
+++ 
b/paimon-core/src/test/java/org/apache/paimon/append/UnawareAppendTableCompactionCoordinatorTest.java
@@ -59,7 +59,7 @@ public class UnawareAppendTableCompactionCoordinatorTest {
     @Test
     public void testForCompactPlan() {
         List<DataFileMeta> files = generateNewFiles(200, 0);
-        assertTasks(files, 1);
+        assertTasks(files, 2);
     }
 
     @Test
@@ -73,7 +73,7 @@ public class UnawareAppendTableCompactionCoordinatorTest {
         List<DataFileMeta> files =
                 generateNewFiles(
                         100, 
appendOnlyFileStoreTable.coreOptions().targetFileSize(false) / 3 + 1);
-        assertTasks(files, 1);
+        assertTasks(files, 17);
     }
 
     @Test
@@ -91,14 +91,14 @@ public class UnawareAppendTableCompactionCoordinatorTest {
                         1000, 
appendOnlyFileStoreTable.coreOptions().targetFileSize(false) / 10);
         compactionCoordinator.notifyNewFiles(partition, files);
 
-        assertThat(compactionCoordinator.compactPlan().size()).isEqualTo(3);
+        assertThat(compactionCoordinator.compactPlan().size()).isEqualTo(56);
         files.clear();
 
         files =
                 generateNewFiles(
                         1050, 
appendOnlyFileStoreTable.coreOptions().targetFileSize(false) / 5);
         compactionCoordinator.notifyNewFiles(partition, files);
-        assertThat(compactionCoordinator.compactPlan().size()).isEqualTo(5);
+        assertThat(compactionCoordinator.compactPlan().size()).isEqualTo(105);
     }
 
     @Test
@@ -107,7 +107,7 @@ public class UnawareAppendTableCompactionCoordinatorTest {
                 generateNewFiles(
                         1089, 
appendOnlyFileStoreTable.coreOptions().targetFileSize(false) / 5);
         compactionCoordinator.notifyNewFiles(partition, files);
-        assertThat(compactionCoordinator.compactPlan().size()).isEqualTo(5);
+        assertThat(compactionCoordinator.compactPlan().size()).isEqualTo(109);
     }
 
     @Test
diff --git 
a/paimon-spark/paimon-spark-ut/src/test/scala/org/apache/paimon/spark/procedure/CompactProcedureTestBase.scala
 
b/paimon-spark/paimon-spark-ut/src/test/scala/org/apache/paimon/spark/procedure/CompactProcedureTestBase.scala
index 9bea013589..ac09735fca 100644
--- 
a/paimon-spark/paimon-spark-ut/src/test/scala/org/apache/paimon/spark/procedure/CompactProcedureTestBase.scala
+++ 
b/paimon-spark/paimon-spark-ut/src/test/scala/org/apache/paimon/spark/procedure/CompactProcedureTestBase.scala
@@ -767,12 +767,12 @@ abstract class CompactProcedureTestBase extends 
PaimonSparkTestBase with StreamT
       spark.sql(
         "CALL sys.compact(table => 'T', options => 
'source.split.open-file-cost=3200M, compaction.min.file-num=2')")
 
-      // sparkParallelism is 5, task groups is 1, use 1 as the read parallelism
+      // sparkParallelism is 5, task groups is 3, use 3 as the read parallelism
       spark.conf.set("spark.sql.shuffle.partitions", 5)
       spark.sql(
         "CALL sys.compact(table => 'T', options => 
'source.split.open-file-cost=3200M, compaction.min.file-num=2')")
 
-      assertResult(Seq(2, 1))(taskBuffer)
+      assertResult(Seq(2, 3))(taskBuffer)
     } finally {
       spark.sparkContext.removeSparkListener(listener)
     }

Reply via email to