This is an automated email from the ASF dual-hosted git repository.

biyan pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/paimon.git


The following commit(s) were added to refs/heads/master by this push:
     new a545030812 [spark] Fallback to v1 write if clustering.columns is set 
(#6377)
a545030812 is described below

commit a54503081293846e40c9e13e3542d3d4c8b980cb
Author: Kerwin Zhang <[email protected]>
AuthorDate: Mon Oct 13 10:20:32 2025 +0800

    [spark] Fallback to v1 write if clustering.columns is set (#6377)
---
 .../scala/org/apache/paimon/spark/SparkTable.scala |  2 +-
 .../org/apache/paimon/spark/SparkWriteITCase.java  | 56 ++++++++++++++++++----
 2 files changed, 47 insertions(+), 11 deletions(-)

diff --git 
a/paimon-spark/paimon-spark-common/src/main/scala/org/apache/paimon/spark/SparkTable.scala
 
b/paimon-spark/paimon-spark-common/src/main/scala/org/apache/paimon/spark/SparkTable.scala
index 8b540127fe..530cb40943 100644
--- 
a/paimon-spark/paimon-spark-common/src/main/scala/org/apache/paimon/spark/SparkTable.scala
+++ 
b/paimon-spark/paimon-spark-common/src/main/scala/org/apache/paimon/spark/SparkTable.scala
@@ -68,7 +68,7 @@ case class SparkTable(table: Table)
 
         case _ => false
       }
-    }
+    } && coreOptions.clusteringColumns().isEmpty
   }
 
   def getTable: Table = table
diff --git 
a/paimon-spark/paimon-spark-ut/src/test/java/org/apache/paimon/spark/SparkWriteITCase.java
 
b/paimon-spark/paimon-spark-ut/src/test/java/org/apache/paimon/spark/SparkWriteITCase.java
index d889c1ac66..d03172fcb8 100644
--- 
a/paimon-spark/paimon-spark-ut/src/test/java/org/apache/paimon/spark/SparkWriteITCase.java
+++ 
b/paimon-spark/paimon-spark-ut/src/test/java/org/apache/paimon/spark/SparkWriteITCase.java
@@ -29,6 +29,7 @@ import org.apache.paimon.utils.SnapshotManager;
 
 import org.apache.spark.sql.Row;
 import org.apache.spark.sql.SparkSession;
+import org.apache.spark.sql.execution.ui.SQLExecutionUIData;
 import org.junit.jupiter.api.AfterAll;
 import org.junit.jupiter.api.AfterEach;
 import org.junit.jupiter.api.Assertions;
@@ -37,7 +38,7 @@ import org.junit.jupiter.api.Test;
 import org.junit.jupiter.api.TestInstance;
 import org.junit.jupiter.api.io.TempDir;
 import org.junit.jupiter.params.ParameterizedTest;
-import org.junit.jupiter.params.provider.ValueSource;
+import org.junit.jupiter.params.provider.CsvSource;
 
 import java.io.IOException;
 import java.util.Arrays;
@@ -46,6 +47,8 @@ import java.util.List;
 import java.util.Objects;
 import java.util.stream.Collectors;
 
+import scala.collection.JavaConverters;
+
 import static org.assertj.core.api.Assertions.assertThat;
 import static org.assertj.core.api.Assertions.assertThatThrownBy;
 
@@ -339,15 +342,48 @@ public class SparkWriteITCase {
     }
 
     @ParameterizedTest
-    @ValueSource(strings = {"order", "zorder", "hilbert"})
-    public void testWriteWithClustering(String clusterStrategy) {
-        spark.sql(
-                "CREATE TABLE T (a INT, b INT) TBLPROPERTIES ("
-                        + "'clustering.columns'='a,b',"
-                        + String.format("'clustering.strategy'='%s')", 
clusterStrategy));
-        spark.sql("INSERT INTO T VALUES (2, 2), (1, 1), (3, 
3)").collectAsList();
-        List<Row> rows = spark.sql("SELECT * FROM T").collectAsList();
-        assertThat(rows.toString()).isEqualTo("[[1,1], [2,2], [3,3]]");
+    @CsvSource({
+        "order, true",
+        "zorder, true",
+        "hilbert, true",
+        "order, false",
+        "zorder, false",
+        "hilbert, false"
+    })
+    public void testWriteWithClustering(String clusterStrategy, boolean 
useV2Write) {
+        try {
+            long currentTimeMillis = System.currentTimeMillis();
+            spark.conf().set("spark.paimon.write.use-v2-write", 
String.valueOf(useV2Write));
+            spark.sql(
+                    "CREATE TABLE T (a INT, b INT) TBLPROPERTIES ("
+                            + "'clustering.columns'='a,b',"
+                            + String.format("'clustering.strategy'='%s')", 
clusterStrategy));
+
+            spark.sql("INSERT INTO T VALUES (2, 2), (1, 1), (3, 
3)").collectAsList();
+            scala.collection.Seq<SQLExecutionUIData> executionSeq =
+                    spark.sharedState().statusStore().executionsList();
+
+            java.util.List<SQLExecutionUIData> executionList =
+                    JavaConverters.seqAsJavaList(executionSeq);
+
+            boolean hasSort =
+                    executionList.stream()
+                            .anyMatch(
+                                    e -> {
+                                        if (e.submissionTime() <= 
currentTimeMillis) {
+                                            return false;
+                                        }
+                                        String description = 
e.physicalPlanDescription();
+                                        return description != null
+                                                && 
description.toLowerCase().contains("sort");
+                                    });
+            assertThat(hasSort).isEqualTo(true);
+
+            List<Row> rows = spark.sql("SELECT * FROM T").collectAsList();
+            assertThat(rows.toString()).isEqualTo("[[1,1], [2,2], [3,3]]");
+        } finally {
+            spark.conf().unset("spark.paimon.write.use-v2-write");
+        }
     }
 
     @Test

Reply via email to