This is an automated email from the ASF dual-hosted git repository.
biyan pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/paimon.git
The following commit(s) were added to refs/heads/master by this push:
new a545030812 [spark] Fallback to v1 write if clustering.columns is set
(#6377)
a545030812 is described below
commit a54503081293846e40c9e13e3542d3d4c8b980cb
Author: Kerwin Zhang <[email protected]>
AuthorDate: Mon Oct 13 10:20:32 2025 +0800
[spark] Fallback to v1 write if clustering.columns is set (#6377)
---
.../scala/org/apache/paimon/spark/SparkTable.scala | 2 +-
.../org/apache/paimon/spark/SparkWriteITCase.java | 56 ++++++++++++++++++----
2 files changed, 47 insertions(+), 11 deletions(-)
diff --git
a/paimon-spark/paimon-spark-common/src/main/scala/org/apache/paimon/spark/SparkTable.scala
b/paimon-spark/paimon-spark-common/src/main/scala/org/apache/paimon/spark/SparkTable.scala
index 8b540127fe..530cb40943 100644
---
a/paimon-spark/paimon-spark-common/src/main/scala/org/apache/paimon/spark/SparkTable.scala
+++
b/paimon-spark/paimon-spark-common/src/main/scala/org/apache/paimon/spark/SparkTable.scala
@@ -68,7 +68,7 @@ case class SparkTable(table: Table)
case _ => false
}
- }
+ } && coreOptions.clusteringColumns().isEmpty
}
def getTable: Table = table
diff --git
a/paimon-spark/paimon-spark-ut/src/test/java/org/apache/paimon/spark/SparkWriteITCase.java
b/paimon-spark/paimon-spark-ut/src/test/java/org/apache/paimon/spark/SparkWriteITCase.java
index d889c1ac66..d03172fcb8 100644
---
a/paimon-spark/paimon-spark-ut/src/test/java/org/apache/paimon/spark/SparkWriteITCase.java
+++
b/paimon-spark/paimon-spark-ut/src/test/java/org/apache/paimon/spark/SparkWriteITCase.java
@@ -29,6 +29,7 @@ import org.apache.paimon.utils.SnapshotManager;
import org.apache.spark.sql.Row;
import org.apache.spark.sql.SparkSession;
+import org.apache.spark.sql.execution.ui.SQLExecutionUIData;
import org.junit.jupiter.api.AfterAll;
import org.junit.jupiter.api.AfterEach;
import org.junit.jupiter.api.Assertions;
@@ -37,7 +38,7 @@ import org.junit.jupiter.api.Test;
import org.junit.jupiter.api.TestInstance;
import org.junit.jupiter.api.io.TempDir;
import org.junit.jupiter.params.ParameterizedTest;
-import org.junit.jupiter.params.provider.ValueSource;
+import org.junit.jupiter.params.provider.CsvSource;
import java.io.IOException;
import java.util.Arrays;
@@ -46,6 +47,8 @@ import java.util.List;
import java.util.Objects;
import java.util.stream.Collectors;
+import scala.collection.JavaConverters;
+
import static org.assertj.core.api.Assertions.assertThat;
import static org.assertj.core.api.Assertions.assertThatThrownBy;
@@ -339,15 +342,48 @@ public class SparkWriteITCase {
}
@ParameterizedTest
- @ValueSource(strings = {"order", "zorder", "hilbert"})
- public void testWriteWithClustering(String clusterStrategy) {
- spark.sql(
- "CREATE TABLE T (a INT, b INT) TBLPROPERTIES ("
- + "'clustering.columns'='a,b',"
- + String.format("'clustering.strategy'='%s')",
clusterStrategy));
- spark.sql("INSERT INTO T VALUES (2, 2), (1, 1), (3,
3)").collectAsList();
- List<Row> rows = spark.sql("SELECT * FROM T").collectAsList();
- assertThat(rows.toString()).isEqualTo("[[1,1], [2,2], [3,3]]");
+ @CsvSource({
+ "order, true",
+ "zorder, true",
+ "hilbert, true",
+ "order, false",
+ "zorder, false",
+ "hilbert, false"
+ })
+ public void testWriteWithClustering(String clusterStrategy, boolean
useV2Write) {
+ try {
+ long currentTimeMillis = System.currentTimeMillis();
+ spark.conf().set("spark.paimon.write.use-v2-write",
String.valueOf(useV2Write));
+ spark.sql(
+ "CREATE TABLE T (a INT, b INT) TBLPROPERTIES ("
+ + "'clustering.columns'='a,b',"
+ + String.format("'clustering.strategy'='%s')",
clusterStrategy));
+
+ spark.sql("INSERT INTO T VALUES (2, 2), (1, 1), (3,
3)").collectAsList();
+ scala.collection.Seq<SQLExecutionUIData> executionSeq =
+ spark.sharedState().statusStore().executionsList();
+
+ java.util.List<SQLExecutionUIData> executionList =
+ JavaConverters.seqAsJavaList(executionSeq);
+
+ boolean hasSort =
+ executionList.stream()
+ .anyMatch(
+ e -> {
+ if (e.submissionTime() <=
currentTimeMillis) {
+ return false;
+ }
+ String description =
e.physicalPlanDescription();
+ return description != null
+ &&
description.toLowerCase().contains("sort");
+ });
+ assertThat(hasSort).isEqualTo(true);
+
+ List<Row> rows = spark.sql("SELECT * FROM T").collectAsList();
+ assertThat(rows.toString()).isEqualTo("[[1,1], [2,2], [3,3]]");
+ } finally {
+ spark.conf().unset("spark.paimon.write.use-v2-write");
+ }
}
@Test