This is an automated email from the ASF dual-hosted git repository.

russellspitzer pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/iceberg.git


The following commit(s) were added to refs/heads/main by this push:
     new 32e9f40468 Spark 3.5: Don't change table distribution when only 
altering local order (#10774)
32e9f40468 is described below

commit 32e9f40468756a60d2cc52e2b9e951209268e94b
Author: Manu Zhang <[email protected]>
AuthorDate: Sat Oct 26 03:28:51 2024 +0800

    Spark 3.5: Don't change table distribution when only altering local order 
(#10774)
---
 .../IcebergSqlExtensionsAstBuilder.scala           | 10 +++++---
 .../v2/SetWriteDistributionAndOrderingExec.scala   | 10 +++++---
 .../TestSetWriteDistributionAndOrdering.java       | 29 ++++++++++++++++++++--
 .../logical/SetWriteDistributionAndOrdering.scala  |  2 +-
 4 files changed, 40 insertions(+), 11 deletions(-)

diff --git 
a/spark/v3.5/spark-extensions/src/main/scala/org/apache/spark/sql/catalyst/parser/extensions/IcebergSqlExtensionsAstBuilder.scala
 
b/spark/v3.5/spark-extensions/src/main/scala/org/apache/spark/sql/catalyst/parser/extensions/IcebergSqlExtensionsAstBuilder.scala
index 2e438de2b8..6b1cc41da0 100644
--- 
a/spark/v3.5/spark-extensions/src/main/scala/org/apache/spark/sql/catalyst/parser/extensions/IcebergSqlExtensionsAstBuilder.scala
+++ 
b/spark/v3.5/spark-extensions/src/main/scala/org/apache/spark/sql/catalyst/parser/extensions/IcebergSqlExtensionsAstBuilder.scala
@@ -226,11 +226,13 @@ class IcebergSqlExtensionsAstBuilder(delegate: 
ParserInterface) extends IcebergS
     }
 
     val distributionMode = if (distributionSpec != null) {
-      DistributionMode.HASH
-    } else if (orderingSpec.UNORDERED != null || orderingSpec.LOCALLY != null) 
{
-      DistributionMode.NONE
+      Some(DistributionMode.HASH)
+    } else if (orderingSpec.UNORDERED != null) {
+      Some(DistributionMode.NONE)
+    } else if (orderingSpec.LOCALLY() != null) {
+      None
     } else {
-      DistributionMode.RANGE
+      Some(DistributionMode.RANGE)
     }
 
     val ordering = if (orderingSpec != null && orderingSpec.order != null) {
diff --git 
a/spark/v3.5/spark-extensions/src/main/scala/org/apache/spark/sql/execution/datasources/v2/SetWriteDistributionAndOrderingExec.scala
 
b/spark/v3.5/spark-extensions/src/main/scala/org/apache/spark/sql/execution/datasources/v2/SetWriteDistributionAndOrderingExec.scala
index feecc02350..c9004ddc5b 100644
--- 
a/spark/v3.5/spark-extensions/src/main/scala/org/apache/spark/sql/execution/datasources/v2/SetWriteDistributionAndOrderingExec.scala
+++ 
b/spark/v3.5/spark-extensions/src/main/scala/org/apache/spark/sql/execution/datasources/v2/SetWriteDistributionAndOrderingExec.scala
@@ -35,7 +35,7 @@ import org.apache.spark.sql.connector.catalog.TableCatalog
 case class SetWriteDistributionAndOrderingExec(
     catalog: TableCatalog,
     ident: Identifier,
-    distributionMode: DistributionMode,
+    distributionMode: Option[DistributionMode],
     sortOrder: Seq[(Term, SortDirection, NullOrder)]) extends 
LeafV2CommandExec {
 
   import CatalogV2Implicits._
@@ -56,9 +56,11 @@ case class SetWriteDistributionAndOrderingExec(
         }
         orderBuilder.commit()
 
-        txn.updateProperties()
-          .set(WRITE_DISTRIBUTION_MODE, distributionMode.modeName())
-          .commit()
+        distributionMode.foreach { mode =>
+          txn.updateProperties()
+            .set(WRITE_DISTRIBUTION_MODE, mode.modeName())
+            .commit()
+        }
 
         txn.commitTransaction()
 
diff --git 
a/spark/v3.5/spark-extensions/src/test/java/org/apache/iceberg/spark/extensions/TestSetWriteDistributionAndOrdering.java
 
b/spark/v3.5/spark-extensions/src/test/java/org/apache/iceberg/spark/extensions/TestSetWriteDistributionAndOrdering.java
index 77b7797fe1..b8547772da 100644
--- 
a/spark/v3.5/spark-extensions/src/test/java/org/apache/iceberg/spark/extensions/TestSetWriteDistributionAndOrdering.java
+++ 
b/spark/v3.5/spark-extensions/src/test/java/org/apache/iceberg/spark/extensions/TestSetWriteDistributionAndOrdering.java
@@ -200,8 +200,7 @@ public class TestSetWriteDistributionAndOrdering extends 
ExtensionsTestBase {
 
     table.refresh();
 
-    String distributionMode = 
table.properties().get(TableProperties.WRITE_DISTRIBUTION_MODE);
-    assertThat(distributionMode).as("Distribution mode must 
match").isEqualTo("none");
+    
assertThat(table.properties().containsKey(TableProperties.WRITE_DISTRIBUTION_MODE)).isFalse();
 
     SortOrder expected =
         SortOrder.builderFor(table.schema())
@@ -213,6 +212,25 @@ public class TestSetWriteDistributionAndOrdering extends 
ExtensionsTestBase {
     assertThat(table.sortOrder()).as("Sort order must 
match").isEqualTo(expected);
   }
 
+  @TestTemplate
+  public void testSetWriteLocallyOrderedToPartitionedTable() {
+    sql(
+        "CREATE TABLE %s (id bigint NOT NULL, category string) USING iceberg 
PARTITIONED BY (id)",
+        tableName);
+    Table table = validationCatalog.loadTable(tableIdent);
+    assertThat(table.sortOrder().isUnsorted()).as("Table should start 
unsorted").isTrue();
+
+    sql("ALTER TABLE %s WRITE LOCALLY ORDERED BY category DESC", tableName);
+
+    table.refresh();
+
+    
assertThat(table.properties().containsKey(TableProperties.WRITE_DISTRIBUTION_MODE)).isFalse();
+
+    SortOrder expected =
+        
SortOrder.builderFor(table.schema()).withOrderId(1).desc("category").build();
+    assertThat(table.sortOrder()).as("Sort order must 
match").isEqualTo(expected);
+  }
+
   @TestTemplate
   public void testSetWriteDistributedByWithSort() {
     sql(
@@ -249,6 +267,13 @@ public class TestSetWriteDistributionAndOrdering extends 
ExtensionsTestBase {
 
     SortOrder expected = 
SortOrder.builderFor(table.schema()).withOrderId(1).asc("id").build();
     assertThat(table.sortOrder()).as("Sort order must 
match").isEqualTo(expected);
+
+    sql("ALTER TABLE %s WRITE LOCALLY ORDERED BY id", tableName);
+
+    table.refresh();
+
+    String newDistributionMode = 
table.properties().get(TableProperties.WRITE_DISTRIBUTION_MODE);
+    assertThat(newDistributionMode).as("Distribution mode must 
match").isEqualTo(distributionMode);
   }
 
   @TestTemplate
diff --git 
a/spark/v3.5/spark/src/main/scala/org/apache/spark/sql/catalyst/plans/logical/SetWriteDistributionAndOrdering.scala
 
b/spark/v3.5/spark/src/main/scala/org/apache/spark/sql/catalyst/plans/logical/SetWriteDistributionAndOrdering.scala
index 0a0234cdfe..7b599eb3da 100644
--- 
a/spark/v3.5/spark/src/main/scala/org/apache/spark/sql/catalyst/plans/logical/SetWriteDistributionAndOrdering.scala
+++ 
b/spark/v3.5/spark/src/main/scala/org/apache/spark/sql/catalyst/plans/logical/SetWriteDistributionAndOrdering.scala
@@ -28,7 +28,7 @@ import 
org.apache.spark.sql.connector.catalog.CatalogV2Implicits
 
 case class SetWriteDistributionAndOrdering(
     table: Seq[String],
-    distributionMode: DistributionMode,
+    distributionMode: Option[DistributionMode],
     sortOrder: Seq[(Term, SortDirection, NullOrder)]) extends LeafCommand {
 
   import CatalogV2Implicits._

Reply via email to