This is an automated email from the ASF dual-hosted git repository.

russellspitzer pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/iceberg.git


The following commit(s) were added to refs/heads/main by this push:
     new 420fb1a688 Spark 4.1: Set data file sort_order_id in manifest for 
writes from Spark (#15150)
420fb1a688 is described below

commit 420fb1a688926440582fe078d7348f60f27997a8
Author: jbewing <[email protected]>
AuthorDate: Fri Mar 27 13:44:23 2026 -0400

    Spark 4.1: Set data file sort_order_id in manifest for writes from Spark 
(#15150)
---
 .../java/org/apache/iceberg/SerializableTable.java |  23 ++++-
 .../org/apache/iceberg/util/SortOrderUtil.java     |  17 +++
 .../org/apache/iceberg/util/TestSortOrderUtil.java |  64 ++++++++++++
 .../spark/extensions/TestCopyOnWriteDelete.java    |  20 ++++
 .../spark/extensions/TestCopyOnWriteMerge.java     |  29 ++++++
 .../spark/extensions/TestCopyOnWriteUpdate.java    |  20 ++++
 .../spark/extensions/TestMergeOnReadMerge.java     |  29 ++++++
 .../spark/extensions/TestMergeOnReadUpdate.java    |  20 ++++
 .../org/apache/iceberg/spark/SparkWriteConf.java   |  20 ++++
 .../apache/iceberg/spark/SparkWriteOptions.java    |   1 +
 .../actions/SparkShufflingFileRewriteRunner.java   |  16 +++
 .../spark/source/SparkPositionDeltaWrite.java      |  11 +-
 .../apache/iceberg/spark/source/SparkWrite.java    |  10 +-
 .../apache/iceberg/spark/TestSparkWriteConf.java   |  52 ++++++++++
 .../spark/actions/TestRewriteDataFilesAction.java  |  57 +++++++++-
 .../iceberg/spark/source/TestSparkDataWrite.java   | 115 +++++++++++++++++++++
 .../spark/source/TestStructuredStreaming.java      |  48 +++++++++
 17 files changed, 542 insertions(+), 10 deletions(-)

diff --git a/core/src/main/java/org/apache/iceberg/SerializableTable.java 
b/core/src/main/java/org/apache/iceberg/SerializableTable.java
index 7e5746ec91..a26fff1fd5 100644
--- a/core/src/main/java/org/apache/iceberg/SerializableTable.java
+++ b/core/src/main/java/org/apache/iceberg/SerializableTable.java
@@ -25,6 +25,7 @@ import java.util.UUID;
 import org.apache.iceberg.encryption.EncryptionManager;
 import org.apache.iceberg.io.FileIO;
 import org.apache.iceberg.io.LocationProvider;
+import org.apache.iceberg.relocated.com.google.common.collect.ImmutableMap;
 import org.apache.iceberg.relocated.com.google.common.collect.Maps;
 import org.apache.iceberg.util.SerializableMap;
 
@@ -57,6 +58,7 @@ public class SerializableTable implements Table, 
HasTableOperations, Serializabl
   private final int defaultSpecId;
   private final Map<Integer, String> specAsJsonMap;
   private final String sortOrderAsJson;
+  private final Map<Integer, String> sortOrderAsJsonMap;
   private final FileIO io;
   private final EncryptionManager encryption;
   private final Map<String, SnapshotRef> refs;
@@ -68,6 +70,7 @@ public class SerializableTable implements Table, 
HasTableOperations, Serializabl
   private transient volatile Schema lazySchema = null;
   private transient volatile Map<Integer, PartitionSpec> lazySpecs = null;
   private transient volatile SortOrder lazySortOrder = null;
+  private transient volatile Map<Integer, SortOrder> lazySortOrders = null;
 
   protected SerializableTable(Table table) {
     this.name = table.name();
@@ -80,6 +83,10 @@ public class SerializableTable implements Table, 
HasTableOperations, Serializabl
     Map<Integer, PartitionSpec> specs = table.specs();
     specs.forEach((specId, spec) -> specAsJsonMap.put(specId, 
PartitionSpecParser.toJson(spec)));
     this.sortOrderAsJson = SortOrderParser.toJson(table.sortOrder());
+    this.sortOrderAsJsonMap = Maps.newHashMap();
+    table
+        .sortOrders()
+        .forEach((id, order) -> sortOrderAsJsonMap.put(id, 
SortOrderParser.toJson(order)));
     this.io = table.io();
     this.encryption = table.encryption();
     this.locationProviderTry = Try.of(table::locationProvider);
@@ -240,7 +247,21 @@ public class SerializableTable implements Table, 
HasTableOperations, Serializabl
 
   @Override
   public Map<Integer, SortOrder> sortOrders() {
-    return lazyTable().sortOrders();
+    if (lazySortOrders == null) {
+      synchronized (this) {
+        if (lazySortOrders == null && lazyTable == null) {
+          ImmutableMap.Builder<Integer, SortOrder> sortOrders =
+              ImmutableMap.builderWithExpectedSize(sortOrderAsJsonMap.size());
+          sortOrderAsJsonMap.forEach(
+              (id, json) -> sortOrders.put(id, 
SortOrderParser.fromJson(schema(), json)));
+          this.lazySortOrders = sortOrders.build();
+        } else if (lazySortOrders == null) {
+          this.lazySortOrders = lazyTable.sortOrders();
+        }
+      }
+    }
+
+    return lazySortOrders;
   }
 
   @Override
diff --git a/core/src/main/java/org/apache/iceberg/util/SortOrderUtil.java 
b/core/src/main/java/org/apache/iceberg/util/SortOrderUtil.java
index 37e0c1fffa..6548239937 100644
--- a/core/src/main/java/org/apache/iceberg/util/SortOrderUtil.java
+++ b/core/src/main/java/org/apache/iceberg/util/SortOrderUtil.java
@@ -46,6 +46,23 @@ public class SortOrderUtil {
     return buildSortOrder(table.schema(), table.spec(), sortOrder);
   }
 
+  /**
+   * Attempts to match a user-supplied {@link SortOrder} with an equivalent 
sort order from a {@link
+   * Table}.
+   *
+   * @param table the table to try and match the sort order against
+   * @param userSuppliedSortOrder the user supplied sort order to try and 
match with a table sort
+   *     order
+   * @return the matching {@link SortOrder} from the table (with the orderId 
set) or {@link
+   *     SortOrder#unsorted()} if no match is found.
+   */
+  public static SortOrder findTableSortOrder(Table table, SortOrder 
userSuppliedSortOrder) {
+    return table.sortOrders().values().stream()
+        .filter(sortOrder -> sortOrder.sameOrder(userSuppliedSortOrder))
+        .findFirst()
+        .orElseGet(SortOrder::unsorted);
+  }
+
   /**
    * Build a final sort order that satisfies the clustering required by the 
partition spec.
    *
diff --git a/core/src/test/java/org/apache/iceberg/util/TestSortOrderUtil.java 
b/core/src/test/java/org/apache/iceberg/util/TestSortOrderUtil.java
index 02c81de932..7ac7d72a2f 100644
--- a/core/src/test/java/org/apache/iceberg/util/TestSortOrderUtil.java
+++ b/core/src/test/java/org/apache/iceberg/util/TestSortOrderUtil.java
@@ -287,4 +287,68 @@ public class TestSortOrderUtil {
         .as("Should add spec fields as prefix")
         .isEqualTo(expected);
   }
+
+  @Test
+  public void testFindSortOrderForTable() {
+    PartitionSpec spec = PartitionSpec.unpartitioned();
+    SortOrder order = SortOrder.builderFor(SCHEMA).withOrderId(1).asc("id", 
NULLS_LAST).build();
+    TestTables.TestTable table = TestTables.create(tableDir, "test", SCHEMA, 
spec, order, 2);
+
+    SortOrder tableSortOrder = table.sortOrder();
+
+    SortOrder actualOrder = SortOrderUtil.findTableSortOrder(table, 
tableSortOrder);
+
+    assertThat(actualOrder).as("Should find current table sort 
order").isEqualTo(table.sortOrder());
+  }
+
+  @Test
+  public void testFindSortOrderForTableWithoutFieldId() {
+    PartitionSpec spec = PartitionSpec.unpartitioned();
+    SortOrder order = SortOrder.builderFor(SCHEMA).withOrderId(1).asc("id", 
NULLS_LAST).build();
+    TestTables.TestTable table = TestTables.create(tableDir, "test", SCHEMA, 
spec, order, 2);
+
+    SortOrder userSuppliedOrder =
+        SortOrder.builderFor(table.schema()).asc("id", NULLS_LAST).build();
+
+    SortOrder actualOrder = SortOrderUtil.findTableSortOrder(table, 
userSuppliedOrder);
+
+    assertThat(actualOrder).as("Should find current table sort 
order").isEqualTo(table.sortOrder());
+  }
+
+  @Test
+  public void testFindSortOrderForTableThatIsNotCurrentOrder() {
+    PartitionSpec spec = PartitionSpec.unpartitioned();
+    SortOrder order = SortOrder.builderFor(SCHEMA).withOrderId(1).asc("id", 
NULLS_LAST).build();
+    TestTables.TestTable table = TestTables.create(tableDir, "test", SCHEMA, 
spec, order, 2);
+
+    table.replaceSortOrder().asc("data").desc("ts").commit();
+
+    SortOrder userSuppliedOrder =
+        SortOrder.builderFor(table.schema()).asc("id", NULLS_LAST).build();
+
+    SortOrder actualOrder = SortOrderUtil.findTableSortOrder(table, 
userSuppliedOrder);
+
+    assertThat(actualOrder)
+        .as("Should find first sorted table sort order")
+        .isEqualTo(table.sortOrders().get(1));
+  }
+
+  @Test
+  public void testReturnsUnsortedForMissingSortOrder() {
+    PartitionSpec spec = PartitionSpec.unpartitioned();
+    SortOrder order = SortOrder.builderFor(SCHEMA).withOrderId(1).asc("id", 
NULLS_LAST).build();
+    TestTables.TestTable table = TestTables.create(tableDir, "test", SCHEMA, 
spec, order, 2);
+
+    table.replaceSortOrder().asc("data").desc("ts").commit();
+
+    SortOrder userSuppliedOrder =
+        SortOrder.builderFor(table.schema()).desc("id", NULLS_LAST).build();
+
+    SortOrder actualOrder = SortOrderUtil.findTableSortOrder(table, 
userSuppliedOrder);
+
+    assertThat(actualOrder)
+        .as(
+            "Should return unsorted order if user supplied order does not 
match any table sort order")
+        .isEqualTo(SortOrder.unsorted());
+  }
 }
diff --git 
a/spark/v4.1/spark-extensions/src/test/java/org/apache/iceberg/spark/extensions/TestCopyOnWriteDelete.java
 
b/spark/v4.1/spark-extensions/src/test/java/org/apache/iceberg/spark/extensions/TestCopyOnWriteDelete.java
index e0b852a9d0..dd296e5d72 100644
--- 
a/spark/v4.1/spark-extensions/src/test/java/org/apache/iceberg/spark/extensions/TestCopyOnWriteDelete.java
+++ 
b/spark/v4.1/spark-extensions/src/test/java/org/apache/iceberg/spark/extensions/TestCopyOnWriteDelete.java
@@ -24,6 +24,7 @@ import java.io.File;
 import java.io.IOException;
 import java.util.List;
 import java.util.Map;
+import org.apache.iceberg.DataFile;
 import org.apache.iceberg.DeleteFile;
 import org.apache.iceberg.Files;
 import org.apache.iceberg.ParameterizedTestExtension;
@@ -54,6 +55,25 @@ public class TestCopyOnWriteDelete extends TestDelete {
         TableProperties.DELETE_MODE, 
RowLevelOperationMode.COPY_ON_WRITE.modeName());
   }
 
+  @TestTemplate
+  public void testCopyOnWriteDeleteSetsSortOrderIdOnRewrittenDataFiles() {
+    createAndInitTable(
+        "id INT, dep STRING",
+        "PARTITIONED BY (dep)",
+        "{ \"id\": 1, \"dep\": \"hr\" }\n" + "{ \"id\": 2, \"dep\": \"hr\" }");
+
+    sql("ALTER TABLE %s WRITE ORDERED BY id", tableName);
+
+    sql("DELETE FROM %s WHERE id = 1", commitTarget());
+
+    Table table = validationCatalog.loadTable(tableIdent);
+    Snapshot snapshot = SnapshotUtil.latestSnapshot(table, branch);
+    assertThat(snapshot.addedDataFiles(table.io()))
+        .extracting(DataFile::sortOrderId)
+        .as("Rewritten data files should carry the table sort order id")
+        .containsOnly(table.sortOrder().orderId());
+  }
+
   @TestTemplate
   public void testRuntimeFilteringWithPreservedDataGrouping() throws 
NoSuchTableException {
     createAndInitPartitionedTable();
diff --git 
a/spark/v4.1/spark-extensions/src/test/java/org/apache/iceberg/spark/extensions/TestCopyOnWriteMerge.java
 
b/spark/v4.1/spark-extensions/src/test/java/org/apache/iceberg/spark/extensions/TestCopyOnWriteMerge.java
index 36b7c5818e..3f4faa0be1 100644
--- 
a/spark/v4.1/spark-extensions/src/test/java/org/apache/iceberg/spark/extensions/TestCopyOnWriteMerge.java
+++ 
b/spark/v4.1/spark-extensions/src/test/java/org/apache/iceberg/spark/extensions/TestCopyOnWriteMerge.java
@@ -22,6 +22,7 @@ import static org.assertj.core.api.Assertions.assertThat;
 
 import java.util.Collections;
 import java.util.Map;
+import org.apache.iceberg.DataFile;
 import org.apache.iceberg.ParameterizedTestExtension;
 import org.apache.iceberg.RowLevelOperationMode;
 import org.apache.iceberg.Snapshot;
@@ -45,6 +46,34 @@ public class TestCopyOnWriteMerge extends TestMerge {
         TableProperties.MERGE_MODE, 
RowLevelOperationMode.COPY_ON_WRITE.modeName());
   }
 
+  @TestTemplate
+  public void testCopyOnWriteMergeSetsSortOrderIdOnRewrittenDataFiles() {
+    createAndInitTable("id INT, dep STRING");
+    sql("ALTER TABLE %s ADD PARTITION FIELD dep", tableName);
+    sql("ALTER TABLE %s WRITE ORDERED BY id", tableName);
+
+    append(tableName, "{ \"id\": 1, \"dep\": \"hr\" }\n" + "{ \"id\": 2, 
\"dep\": \"hr\" }");
+    createBranchIfNeeded();
+
+    createOrReplaceView("source", Collections.singletonList(1), 
Encoders.INT());
+
+    sql(
+        "MERGE INTO %s t USING source s "
+            + "ON t.id == s.value "
+            + "WHEN MATCHED THEN "
+            + "  UPDATE SET dep = 'changed' "
+            + "WHEN NOT MATCHED THEN "
+            + "  INSERT (id, dep) VALUES (s.value, 'new')",
+        commitTarget());
+
+    Table table = validationCatalog.loadTable(tableIdent);
+    Snapshot snapshot = SnapshotUtil.latestSnapshot(table, branch);
+    assertThat(snapshot.addedDataFiles(table.io()))
+        .extracting(DataFile::sortOrderId)
+        .as("Rewritten data files should carry the table sort order id")
+        .containsOnly(table.sortOrder().orderId());
+  }
+
   @TestTemplate
   public void testRuntimeFilteringWithReportedPartitioning() {
     createAndInitTable("id INT, dep STRING");
diff --git 
a/spark/v4.1/spark-extensions/src/test/java/org/apache/iceberg/spark/extensions/TestCopyOnWriteUpdate.java
 
b/spark/v4.1/spark-extensions/src/test/java/org/apache/iceberg/spark/extensions/TestCopyOnWriteUpdate.java
index ab5b6da50b..22a2b0932f 100644
--- 
a/spark/v4.1/spark-extensions/src/test/java/org/apache/iceberg/spark/extensions/TestCopyOnWriteUpdate.java
+++ 
b/spark/v4.1/spark-extensions/src/test/java/org/apache/iceberg/spark/extensions/TestCopyOnWriteUpdate.java
@@ -21,6 +21,7 @@ package org.apache.iceberg.spark.extensions;
 import static org.assertj.core.api.Assertions.assertThat;
 
 import java.util.Map;
+import org.apache.iceberg.DataFile;
 import org.apache.iceberg.ParameterizedTestExtension;
 import org.apache.iceberg.RowLevelOperationMode;
 import org.apache.iceberg.Snapshot;
@@ -43,6 +44,25 @@ public class TestCopyOnWriteUpdate extends TestUpdate {
         TableProperties.UPDATE_MODE, 
RowLevelOperationMode.COPY_ON_WRITE.modeName());
   }
 
+  @TestTemplate
+  public void testCopyOnWriteUpdateSetsSortOrderIdOnRewrittenDataFiles() {
+    createAndInitTable(
+        "id INT, dep STRING",
+        "PARTITIONED BY (dep)",
+        "{ \"id\": 1, \"dep\": \"hr\" }\n" + "{ \"id\": 2, \"dep\": \"hr\" }");
+
+    sql("ALTER TABLE %s WRITE ORDERED BY id", tableName);
+
+    sql("UPDATE %s SET dep = 'changed' WHERE id = 1", commitTarget());
+
+    Table table = validationCatalog.loadTable(tableIdent);
+    Snapshot snapshot = SnapshotUtil.latestSnapshot(table, branch);
+    assertThat(snapshot.addedDataFiles(table.io()))
+        .extracting(DataFile::sortOrderId)
+        .as("Rewritten data files should carry the table sort order id")
+        .containsOnly(table.sortOrder().orderId());
+  }
+
   @TestTemplate
   public void testRuntimeFilteringWithReportedPartitioning() {
     createAndInitTable("id INT, dep STRING");
diff --git 
a/spark/v4.1/spark-extensions/src/test/java/org/apache/iceberg/spark/extensions/TestMergeOnReadMerge.java
 
b/spark/v4.1/spark-extensions/src/test/java/org/apache/iceberg/spark/extensions/TestMergeOnReadMerge.java
index 737f19e86a..9a42b58e34 100644
--- 
a/spark/v4.1/spark-extensions/src/test/java/org/apache/iceberg/spark/extensions/TestMergeOnReadMerge.java
+++ 
b/spark/v4.1/spark-extensions/src/test/java/org/apache/iceberg/spark/extensions/TestMergeOnReadMerge.java
@@ -26,6 +26,7 @@ import java.util.Map;
 import java.util.Set;
 import java.util.stream.Collectors;
 import java.util.stream.IntStream;
+import org.apache.iceberg.DataFile;
 import org.apache.iceberg.DeleteFile;
 import org.apache.iceberg.FileFormat;
 import org.apache.iceberg.ParameterizedTestExtension;
@@ -136,6 +137,34 @@ public class TestMergeOnReadMerge extends TestMerge {
     assertThat(dvs).allMatch(dv -> FileFormat.fromFileName(dv.location()) == 
FileFormat.PUFFIN);
   }
 
+  @TestTemplate
+  public void testMergeOnReadMergeSetsSortOrderIdOnNewDataFiles() {
+    createAndInitTable(
+        "id INT, dep STRING",
+        "PARTITIONED BY (dep)",
+        "{ \"id\": 1, \"dep\": \"hr\" }\n" + "{ \"id\": 2, \"dep\": \"hr\" }");
+
+    sql("ALTER TABLE %s WRITE ORDERED BY id", tableName);
+
+    createOrReplaceView("source", ImmutableList.of(1, 3), Encoders.INT());
+
+    sql(
+        "MERGE INTO %s AS t USING source AS s "
+            + "ON t.id == s.value "
+            + "WHEN MATCHED THEN "
+            + " UPDATE SET id = id + 10 "
+            + "WHEN NOT MATCHED THEN "
+            + " INSERT (id, dep) VALUES (s.value, 'hr')",
+        commitTarget());
+
+    Table table = validationCatalog.loadTable(tableIdent);
+    Snapshot snapshot = SnapshotUtil.latestSnapshot(table, branch);
+    assertThat(snapshot.addedDataFiles(table.io()))
+        .extracting(DataFile::sortOrderId)
+        .as("All new data files should carry the table sort order id")
+        .containsOnly(table.sortOrder().orderId());
+  }
+
   private void checkMergeDeleteGranularity(DeleteGranularity 
deleteGranularity) {
     createTableWithDeleteGranularity(
         "id INT, dep STRING", "PARTITIONED BY (dep)", deleteGranularity);
diff --git 
a/spark/v4.1/spark-extensions/src/test/java/org/apache/iceberg/spark/extensions/TestMergeOnReadUpdate.java
 
b/spark/v4.1/spark-extensions/src/test/java/org/apache/iceberg/spark/extensions/TestMergeOnReadUpdate.java
index 2398bc45b1..d1c336d5dd 100644
--- 
a/spark/v4.1/spark-extensions/src/test/java/org/apache/iceberg/spark/extensions/TestMergeOnReadUpdate.java
+++ 
b/spark/v4.1/spark-extensions/src/test/java/org/apache/iceberg/spark/extensions/TestMergeOnReadUpdate.java
@@ -25,6 +25,7 @@ import java.util.List;
 import java.util.Map;
 import java.util.Set;
 import java.util.stream.Collectors;
+import org.apache.iceberg.DataFile;
 import org.apache.iceberg.DeleteFile;
 import org.apache.iceberg.FileFormat;
 import org.apache.iceberg.ParameterizedTestExtension;
@@ -224,6 +225,25 @@ public class TestMergeOnReadUpdate extends TestUpdate {
     assertThat(dvs).allMatch(dv -> FileFormat.fromFileName(dv.location()) == 
FileFormat.PUFFIN);
   }
 
+  @TestTemplate
+  public void testMergeOnReadUpdateSetsSortOrderIdOnNewDataFiles() {
+    createAndInitTable(
+        "id INT, dep STRING",
+        "PARTITIONED BY (dep)",
+        "{ \"id\": 1, \"dep\": \"hr\" }\n" + "{ \"id\": 2, \"dep\": \"hr\" }");
+
+    sql("ALTER TABLE %s WRITE ORDERED BY id", tableName);
+
+    sql("UPDATE %s SET id = id + 10 WHERE id = 1", commitTarget());
+
+    Table table = validationCatalog.loadTable(tableIdent);
+    Snapshot snapshot = SnapshotUtil.latestSnapshot(table, branch);
+    assertThat(snapshot.addedDataFiles(table.io()))
+        .extracting(DataFile::sortOrderId)
+        .as("All new data files should carry the table sort order id")
+        .containsOnly(table.sortOrder().orderId());
+  }
+
   private void initTable(String partitionedBy, DeleteGranularity 
deleteGranularity) {
     createTableWithDeleteGranularity("id INT, dep STRING", partitionedBy, 
deleteGranularity);
 
diff --git 
a/spark/v4.1/spark/src/main/java/org/apache/iceberg/spark/SparkWriteConf.java 
b/spark/v4.1/spark/src/main/java/org/apache/iceberg/spark/SparkWriteConf.java
index 8a629d3a6d..2296c076f0 100644
--- 
a/spark/v4.1/spark/src/main/java/org/apache/iceberg/spark/SparkWriteConf.java
+++ 
b/spark/v4.1/spark/src/main/java/org/apache/iceberg/spark/SparkWriteConf.java
@@ -42,6 +42,7 @@ import org.apache.iceberg.DistributionMode;
 import org.apache.iceberg.FileFormat;
 import org.apache.iceberg.IsolationLevel;
 import org.apache.iceberg.SnapshotSummary;
+import org.apache.iceberg.SortOrder;
 import org.apache.iceberg.Table;
 import org.apache.iceberg.TableProperties;
 import org.apache.iceberg.TableUtil;
@@ -171,6 +172,25 @@ public class SparkWriteConf {
     return outputSpecId;
   }
 
+  public int outputSortOrderId(SparkWriteRequirements writeRequirements) {
+    Integer explicitId =
+        
confParser.intConf().option(SparkWriteOptions.OUTPUT_SORT_ORDER_ID).parseOptional();
+
+    if (explicitId != null) {
+      Preconditions.checkArgument(
+          table.sortOrders().containsKey(explicitId),
+          "Cannot use output sort order id %s because the table does not 
contain a sort order with that id",
+          explicitId);
+      return explicitId;
+    }
+
+    if (writeRequirements.hasOrdering()) {
+      return table.sortOrder().orderId();
+    }
+
+    return SortOrder.unsorted().orderId();
+  }
+
   public FileFormat dataFileFormat() {
     String valueAsString =
         confParser
diff --git 
a/spark/v4.1/spark/src/main/java/org/apache/iceberg/spark/SparkWriteOptions.java
 
b/spark/v4.1/spark/src/main/java/org/apache/iceberg/spark/SparkWriteOptions.java
index 86c27acd88..2b88d2bb1e 100644
--- 
a/spark/v4.1/spark/src/main/java/org/apache/iceberg/spark/SparkWriteOptions.java
+++ 
b/spark/v4.1/spark/src/main/java/org/apache/iceberg/spark/SparkWriteOptions.java
@@ -54,6 +54,7 @@ public class SparkWriteOptions {
   public static final String CHECK_ORDERING = "check-ordering";
 
   public static final String OUTPUT_SPEC_ID = "output-spec-id";
+  public static final String OUTPUT_SORT_ORDER_ID = "output-sort-order-id";
 
   public static final String OVERWRITE_MODE = "overwrite-mode";
 
diff --git 
a/spark/v4.1/spark/src/main/java/org/apache/iceberg/spark/actions/SparkShufflingFileRewriteRunner.java
 
b/spark/v4.1/spark/src/main/java/org/apache/iceberg/spark/actions/SparkShufflingFileRewriteRunner.java
index bc1665dc32..f20017648d 100644
--- 
a/spark/v4.1/spark/src/main/java/org/apache/iceberg/spark/actions/SparkShufflingFileRewriteRunner.java
+++ 
b/spark/v4.1/spark/src/main/java/org/apache/iceberg/spark/actions/SparkShufflingFileRewriteRunner.java
@@ -46,10 +46,14 @@ import 
org.apache.spark.sql.connector.distributions.OrderedDistribution;
 import org.apache.spark.sql.connector.expressions.SortOrder;
 import org.apache.spark.sql.connector.write.RequiresDistributionAndOrdering;
 import 
org.apache.spark.sql.execution.datasources.v2.DistributionAndOrderingUtils$;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
 import scala.Option;
 
 abstract class SparkShufflingFileRewriteRunner extends 
SparkDataFileRewriteRunner {
 
+  private static final Logger LOG = 
LoggerFactory.getLogger(SparkShufflingFileRewriteRunner.class);
+
   /**
    * The number of shuffle partitions to use for each output file. By default, 
this file rewriter
    * assumes each shuffle partition would become a separate output file. 
Attempting to generate
@@ -113,12 +117,24 @@ abstract class SparkShufflingFileRewriteRunner extends 
SparkDataFileRewriteRunne
                 spec(fileGroup.outputSpecId()),
                 fileGroup.expectedOutputFiles()));
 
+    org.apache.iceberg.SortOrder sortOrderInJobSpec = sortOrder();
+
+    org.apache.iceberg.SortOrder maybeMatchingTableSortOrder =
+        SortOrderUtil.findTableSortOrder(table(), sortOrder());
+
+    if (sortOrderInJobSpec.isSorted() && 
maybeMatchingTableSortOrder.isUnsorted()) {
+      LOG.warn(
+          "Sort order specified for job {} doesn't match any table sort 
orders, rewritten files will not be marked as sorted in the manifest files",
+          Spark3Util.describe(sortOrderInJobSpec));
+    }
+
     sortedDF
         .write()
         .format("iceberg")
         .option(SparkWriteOptions.TARGET_FILE_SIZE_BYTES, 
fileGroup.maxOutputFileSize())
         .option(SparkWriteOptions.USE_TABLE_DISTRIBUTION_AND_ORDERING, "false")
         .option(SparkWriteOptions.OUTPUT_SPEC_ID, fileGroup.outputSpecId())
+        .option(SparkWriteOptions.OUTPUT_SORT_ORDER_ID, 
maybeMatchingTableSortOrder.orderId())
         .mode("append")
         .save(groupId);
   }
diff --git 
a/spark/v4.1/spark/src/main/java/org/apache/iceberg/spark/source/SparkPositionDeltaWrite.java
 
b/spark/v4.1/spark/src/main/java/org/apache/iceberg/spark/source/SparkPositionDeltaWrite.java
index fa4ab524a4..89f6e1c350 100644
--- 
a/spark/v4.1/spark/src/main/java/org/apache/iceberg/spark/source/SparkPositionDeltaWrite.java
+++ 
b/spark/v4.1/spark/src/main/java/org/apache/iceberg/spark/source/SparkPositionDeltaWrite.java
@@ -119,6 +119,7 @@ class SparkPositionDeltaWrite extends BaseSparkWrite
   private final String wapId;
   private final Map<String, String> extraSnapshotMetadata;
   private final SparkWriteRequirements writeRequirements;
+  private final int sortOrderId;
   private final Context context;
   private final Map<String, String> writeProperties;
 
@@ -146,6 +147,7 @@ class SparkPositionDeltaWrite extends BaseSparkWrite
     this.wapId = writeConf.wapId();
     this.extraSnapshotMetadata = writeConf.extraSnapshotMetadata();
     this.writeRequirements = writeConf.positionDeltaRequirements(command);
+    this.sortOrderId = writeConf.outputSortOrderId(writeRequirements);
     this.context = new Context(dataSchema, writeConf, info, writeRequirements);
     this.writeProperties = writeConf.writeProperties();
 
@@ -206,7 +208,8 @@ class SparkPositionDeltaWrite extends BaseSparkWrite
           broadcastRewritableDeletes(),
           command,
           context,
-          writeProperties);
+          writeProperties,
+          sortOrderId);
     }
 
     private Broadcast<Map<String, DeleteFileSet>> broadcastRewritableDeletes() 
{
@@ -416,18 +419,21 @@ class SparkPositionDeltaWrite extends BaseSparkWrite
     private final Command command;
     private final Context context;
     private final Map<String, String> writeProperties;
+    private final int sortOrderId;
 
     PositionDeltaWriteFactory(
         Broadcast<Table> tableBroadcast,
         Broadcast<Map<String, DeleteFileSet>> rewritableDeletesBroadcast,
         Command command,
         Context context,
-        Map<String, String> writeProperties) {
+        Map<String, String> writeProperties,
+        int sortOrderId) {
       this.tableBroadcast = tableBroadcast;
       this.rewritableDeletesBroadcast = rewritableDeletesBroadcast;
       this.command = command;
       this.context = context;
       this.writeProperties = writeProperties;
+      this.sortOrderId = sortOrderId;
     }
 
     @Override
@@ -454,6 +460,7 @@ class SparkPositionDeltaWrite extends BaseSparkWrite
               .deleteFileFormat(context.deleteFileFormat())
               .positionDeleteSparkType(context.deleteSparkType())
               .writeProperties(writeProperties)
+              .dataSortOrder(table.sortOrders().get(sortOrderId))
               .build();
 
       if (command == DELETE) {
diff --git 
a/spark/v4.1/spark/src/main/java/org/apache/iceberg/spark/source/SparkWrite.java
 
b/spark/v4.1/spark/src/main/java/org/apache/iceberg/spark/source/SparkWrite.java
index 22dc3be66d..7caa24772e 100644
--- 
a/spark/v4.1/spark/src/main/java/org/apache/iceberg/spark/source/SparkWrite.java
+++ 
b/spark/v4.1/spark/src/main/java/org/apache/iceberg/spark/source/SparkWrite.java
@@ -214,6 +214,7 @@ abstract class SparkWrite extends BaseSparkWrite implements 
Write, RequiresDistr
     // broadcast the table metadata as the writer factory will be sent to 
executors
     Broadcast<Table> tableBroadcast =
         sparkContext.broadcast(SerializableTableWithSize.copyOf(table));
+    int sortOrderId = writeConf.outputSortOrderId(writeRequirements);
     return new WriterFactory(
         tableBroadcast,
         queryId,
@@ -223,7 +224,8 @@ abstract class SparkWrite extends BaseSparkWrite implements 
Write, RequiresDistr
         writeSchema,
         dsSchema,
         useFanoutWriter,
-        writeProperties);
+        writeProperties,
+        sortOrderId);
   }
 
   private void commitOperation(SnapshotUpdate<?> operation, String 
description) {
@@ -708,6 +710,7 @@ abstract class SparkWrite extends BaseSparkWrite implements 
Write, RequiresDistr
     private final boolean useFanoutWriter;
     private final String queryId;
     private final Map<String, String> writeProperties;
+    private final int sortOrderId;
 
     protected WriterFactory(
         Broadcast<Table> tableBroadcast,
@@ -718,7 +721,8 @@ abstract class SparkWrite extends BaseSparkWrite implements 
Write, RequiresDistr
         Schema writeSchema,
         StructType dsSchema,
         boolean useFanoutWriter,
-        Map<String, String> writeProperties) {
+        Map<String, String> writeProperties,
+        int sortOrderId) {
       this.tableBroadcast = tableBroadcast;
       this.format = format;
       this.outputSpecId = outputSpecId;
@@ -728,6 +732,7 @@ abstract class SparkWrite extends BaseSparkWrite implements 
Write, RequiresDistr
       this.useFanoutWriter = useFanoutWriter;
       this.queryId = queryId;
       this.writeProperties = writeProperties;
+      this.sortOrderId = sortOrderId;
     }
 
     @Override
@@ -752,6 +757,7 @@ abstract class SparkWrite extends BaseSparkWrite implements 
Write, RequiresDistr
               .dataSchema(writeSchema)
               .dataSparkType(dsSchema)
               .writeProperties(writeProperties)
+              .dataSortOrder(table.sortOrders().get(sortOrderId))
               .build();
 
       Function<InternalRow, InternalRow> rowLineageExtractor = new 
ExtractRowLineage(writeSchema);
diff --git 
a/spark/v4.1/spark/src/test/java/org/apache/iceberg/spark/TestSparkWriteConf.java
 
b/spark/v4.1/spark/src/test/java/org/apache/iceberg/spark/TestSparkWriteConf.java
index 227b93dfa4..383a21087d 100644
--- 
a/spark/v4.1/spark/src/test/java/org/apache/iceberg/spark/TestSparkWriteConf.java
+++ 
b/spark/v4.1/spark/src/test/java/org/apache/iceberg/spark/TestSparkWriteConf.java
@@ -45,6 +45,7 @@ import static 
org.apache.spark.sql.connector.write.RowLevelOperation.Command.DEL
 import static 
org.apache.spark.sql.connector.write.RowLevelOperation.Command.MERGE;
 import static 
org.apache.spark.sql.connector.write.RowLevelOperation.Command.UPDATE;
 import static org.assertj.core.api.Assertions.assertThat;
+import static 
org.assertj.core.api.Assertions.assertThatIllegalArgumentException;
 import static org.assertj.core.api.Assertions.assertThatThrownBy;
 
 import java.time.Duration;
@@ -606,6 +607,57 @@ public class TestSparkWriteConf extends 
TestBaseWithCatalog {
     assertThat(writeConf.deleteFileFormat()).isEqualTo(FileFormat.PUFFIN);
   }
 
+  @TestTemplate
+  public void testSortOrderWriteConf() {
+    Table table = validationCatalog.loadTable(tableIdent);
+
+    table.replaceSortOrder().asc("id").commit();
+
+    SparkWriteConf writeConf =
+        new SparkWriteConf(
+            spark,
+            table,
+            new CaseInsensitiveStringMap(
+                ImmutableMap.of(SparkWriteOptions.OUTPUT_SORT_ORDER_ID, "1")));
+
+    assertThat(writeConf.outputSortOrderId(SparkWriteRequirements.EMPTY))
+        .isEqualTo(table.sortOrder().orderId());
+  }
+
+  @TestTemplate
+  public void testSortOrderWriteConfWithInvalidId() {
+    Table table = validationCatalog.loadTable(tableIdent);
+
+    table.replaceSortOrder().asc("id").commit();
+
+    SparkWriteConf writeConfForUnknownSortOrder =
+        new SparkWriteConf(
+            spark,
+            table,
+            new CaseInsensitiveStringMap(
+                ImmutableMap.of(SparkWriteOptions.OUTPUT_SORT_ORDER_ID, 
"999")));
+
+    assertThatIllegalArgumentException()
+        .isThrownBy(
+            () -> 
writeConfForUnknownSortOrder.outputSortOrderId(SparkWriteRequirements.EMPTY))
+        .withMessage(
+            "Cannot use output sort order id 999 because the table does not 
contain a sort order with that id");
+  }
+
+  @TestTemplate
+  public void testSortOrderWriteConfWithNoOption() {
+    Table table = validationCatalog.loadTable(tableIdent);
+
+    table.replaceSortOrder().asc("id").commit();
+
+    SparkWriteConf writeConfNoOption = new SparkWriteConf(spark, table);
+
+    
assertThat(writeConfNoOption.outputSortOrderId(writeConfNoOption.writeRequirements()))
+        .isEqualTo(table.sortOrder().orderId());
+
+    
assertThat(writeConfNoOption.outputSortOrderId(SparkWriteRequirements.EMPTY)).isEqualTo(0);
+  }
+
   private void testWriteProperties(List<Map<String, String>> propertiesSuite) {
     withSQLConf(
         propertiesSuite.get(0),
diff --git 
a/spark/v4.1/spark/src/test/java/org/apache/iceberg/spark/actions/TestRewriteDataFilesAction.java
 
b/spark/v4.1/spark/src/test/java/org/apache/iceberg/spark/actions/TestRewriteDataFilesAction.java
index 7f5dea1904..9524b0e716 100644
--- 
a/spark/v4.1/spark/src/test/java/org/apache/iceberg/spark/actions/TestRewriteDataFilesAction.java
+++ 
b/spark/v4.1/spark/src/test/java/org/apache/iceberg/spark/actions/TestRewriteDataFilesAction.java
@@ -1517,7 +1517,7 @@ public class TestRewriteDataFilesAction extends TestBase {
   }
 
   @TestTemplate
-  public void testSimpleSort() {
+  public void testSimpleSort() throws IOException {
     Table table = createTable(20);
     shouldHaveFiles(table, 20);
     table.replaceSortOrder().asc("c2").commit();
@@ -1545,10 +1545,11 @@ public class TestRewriteDataFilesAction extends 
TestBase {
     shouldHaveACleanCache(table);
     shouldHaveMultipleFiles(table);
     shouldHaveLastCommitSorted(table, "c2");
+    dataFilesSortOrderShouldMatchTableSortOrder(table);
   }
 
   @TestTemplate
-  public void testSortAfterPartitionChange() {
+  public void testSortAfterPartitionChange() throws IOException {
     Table table = createTable(20);
     shouldHaveFiles(table, 20);
     table.updateSpec().addField(Expressions.bucket("c1", 4)).commit();
@@ -1579,10 +1580,11 @@ public class TestRewriteDataFilesAction extends 
TestBase {
     shouldHaveACleanCache(table);
     shouldHaveMultipleFiles(table);
     shouldHaveLastCommitSorted(table, "c2");
+    dataFilesSortOrderShouldMatchTableSortOrder(table);
   }
 
   @TestTemplate
-  public void testSortCustomSortOrder() {
+  public void testSortCustomSortOrder() throws IOException {
     Table table = createTable(20);
     shouldHaveLastCommitUnsorted(table, "c2");
     shouldHaveFiles(table, 20);
@@ -1608,10 +1610,11 @@ public class TestRewriteDataFilesAction extends 
TestBase {
     shouldHaveACleanCache(table);
     shouldHaveMultipleFiles(table);
     shouldHaveLastCommitSorted(table, "c2");
+    dataFilesShouldHaveSortOrderIdMatching(table, SortOrder.unsorted());
   }
 
   @TestTemplate
-  public void testSortCustomSortOrderRequiresRepartition() {
+  public void testSortCustomSortOrderRequiresRepartition() throws IOException {
     int partitions = 4;
     Table table = createTable();
     writeRecords(20, SCALE, partitions);
@@ -1647,10 +1650,40 @@ public class TestRewriteDataFilesAction extends 
TestBase {
     shouldHaveMultipleFiles(table);
     shouldHaveLastCommitUnsorted(table, "c2");
     shouldHaveLastCommitSorted(table, "c3");
+    dataFilesShouldHaveSortOrderIdMatching(table, SortOrder.unsorted());
   }
 
   @TestTemplate
-  public void testAutoSortShuffleOutput() {
+  public void testSortPastTableSortOrderGetsAppliedToFiles() throws 
IOException {
+    Table table = createTable(1);
+
+    table.replaceSortOrder().asc("c3").commit();
+    SortOrder c3SortOrder = table.sortOrder();
+
+    table.replaceSortOrder().asc("c2").commit();
+
+    List<Object[]> originalData = currentData();
+
+    RewriteDataFiles.Result result =
+        basicRewrite(table)
+            .sort(SortOrder.builderFor(table.schema()).asc("c3").build())
+            .option(SizeBasedFileRewritePlanner.REWRITE_ALL, "true")
+            .execute();
+
+    assertThat(result.rewriteResults()).as("Should have 1 
fileGroups").hasSize(1);
+
+    table.refresh();
+
+    List<Object[]> postRewriteData = currentData();
+    assertEquals("We shouldn't have changed the data", originalData, 
postRewriteData);
+
+    shouldHaveSnapshots(table, 2);
+    shouldHaveACleanCache(table);
+    dataFilesShouldHaveSortOrderIdMatching(table, c3SortOrder);
+  }
+
+  @TestTemplate
+  public void testAutoSortShuffleOutput() throws IOException {
     Table table = createTable(20);
     shouldHaveLastCommitUnsorted(table, "c2");
     shouldHaveFiles(table, 20);
@@ -1685,6 +1718,7 @@ public class TestRewriteDataFilesAction extends TestBase {
     shouldHaveACleanCache(table);
     shouldHaveMultipleFiles(table);
     shouldHaveLastCommitSorted(table, "c2");
+    dataFilesShouldHaveSortOrderIdMatching(table, SortOrder.unsorted());
   }
 
   @TestTemplate
@@ -2623,4 +2657,17 @@ public class TestRewriteDataFilesAction extends TestBase 
{
       return groupIDs.contains(argument.info().globalIndex());
     }
   }
+
+  private void dataFilesSortOrderShouldMatchTableSortOrder(Table table) throws 
IOException {
+    dataFilesShouldHaveSortOrderIdMatching(table, table.sortOrder());
+  }
+
+  private void dataFilesShouldHaveSortOrderIdMatching(Table table, SortOrder 
sortOrder)
+      throws IOException {
+    try (CloseableIterable<FileScanTask> files = table.newScan().planFiles()) {
+      assertThat(files)
+          .extracting(fileScanTask -> fileScanTask.file().sortOrderId())
+          .containsOnly(sortOrder.orderId());
+    }
+  }
 }
diff --git 
a/spark/v4.1/spark/src/test/java/org/apache/iceberg/spark/source/TestSparkDataWrite.java
 
b/spark/v4.1/spark/src/test/java/org/apache/iceberg/spark/source/TestSparkDataWrite.java
index 63e9a638e5..a90de51fce 100644
--- 
a/spark/v4.1/spark/src/test/java/org/apache/iceberg/spark/source/TestSparkDataWrite.java
+++ 
b/spark/v4.1/spark/src/test/java/org/apache/iceberg/spark/source/TestSparkDataWrite.java
@@ -28,6 +28,7 @@ import static org.mockito.Mockito.spy;
 import static org.mockito.Mockito.when;
 
 import java.io.File;
+import java.io.IOException;
 import java.net.InetAddress;
 import java.nio.file.Path;
 import java.util.List;
@@ -36,6 +37,7 @@ import org.apache.hadoop.conf.Configuration;
 import org.apache.iceberg.AppendFiles;
 import org.apache.iceberg.DataFile;
 import org.apache.iceberg.FileFormat;
+import org.apache.iceberg.FileScanTask;
 import org.apache.iceberg.ManifestFile;
 import org.apache.iceberg.ManifestFiles;
 import org.apache.iceberg.Parameter;
@@ -44,10 +46,12 @@ import org.apache.iceberg.Parameters;
 import org.apache.iceberg.PartitionSpec;
 import org.apache.iceberg.Schema;
 import org.apache.iceberg.SnapshotRef;
+import org.apache.iceberg.SortOrder;
 import org.apache.iceberg.Table;
 import org.apache.iceberg.TableProperties;
 import org.apache.iceberg.exceptions.CommitStateUnknownException;
 import org.apache.iceberg.hadoop.HadoopTables;
+import org.apache.iceberg.io.CloseableIterable;
 import org.apache.iceberg.relocated.com.google.common.collect.ImmutableList;
 import org.apache.iceberg.relocated.com.google.common.collect.ImmutableMap;
 import org.apache.iceberg.relocated.com.google.common.collect.Lists;
@@ -154,6 +158,7 @@ public class TestSparkDataWrite {
           assertThat(file.splitOffsets()).as("Split offsets not 
present").isNotNull();
         }
         assertThat(file.recordCount()).as("Should have reported record count 
as 1").isEqualTo(1);
+        
assertThat(file.sortOrderId()).isEqualTo(SortOrder.unsorted().orderId());
         // TODO: append more metric info
         if (format.equals(FileFormat.PARQUET)) {
           assertThat(file.columnSizes()).as("Column sizes metric not 
present").isNotNull();
@@ -555,6 +560,116 @@ public class TestSparkDataWrite {
     assertThat(actual2).hasSameSizeAs(expected2).isEqualTo(expected2);
   }
 
+  @TestTemplate
+  public void testWriteDataFilesInTableSortOrder() throws IOException {
+    File parent = temp.resolve(format.toString()).toFile();
+    File location = new File(parent, "test");
+    HadoopTables tables = new HadoopTables(CONF);
+    PartitionSpec spec = PartitionSpec.unpartitioned();
+    SortOrder sortOrder = SortOrder.builderFor(SCHEMA).asc("id").build();
+    Table table = tables.create(SCHEMA, spec, sortOrder, ImmutableMap.of(), 
location.toString());
+
+    List<SimpleRecord> expected = Lists.newArrayListWithCapacity(10);
+    for (int i = 0; i < 10; i++) {
+      expected.add(new SimpleRecord(i, "a"));
+    }
+
+    Dataset<Row> df = spark.createDataFrame(expected, SimpleRecord.class);
+
+    df.select("id", "data")
+        .write()
+        .format("iceberg")
+        .option(SparkWriteOptions.WRITE_FORMAT, format.toString())
+        .mode(SaveMode.Append)
+        .save(location.toString());
+
+    Dataset<Row> result = 
spark.read().format("iceberg").load(location.toString());
+
+    List<SimpleRecord> actual =
+        
result.orderBy("id").as(Encoders.bean(SimpleRecord.class)).collectAsList();
+    assertThat(actual).hasSameSizeAs(expected).isEqualTo(expected);
+
+    try (CloseableIterable<FileScanTask> fileScanTasks = 
table.newScan().planFiles()) {
+      assertThat(fileScanTasks)
+          .extracting(task -> task.file().sortOrderId())
+          .as("All DataFiles are written with the table sort order id")
+          .containsOnly(table.sortOrder().orderId());
+    }
+  }
+
+  @TestTemplate
+  public void testWriteDataFilesUnsortedTable() throws IOException {
+    File parent = temp.resolve(format.toString()).toFile();
+    File location = new File(parent, "test");
+
+    HadoopTables tables = new HadoopTables(CONF);
+    PartitionSpec spec = PartitionSpec.unpartitioned();
+    Table table = tables.create(SCHEMA, spec, location.toString());
+
+    List<SimpleRecord> expected = Lists.newArrayList(new SimpleRecord(1, "a"));
+    Dataset<Row> df = spark.createDataFrame(expected, SimpleRecord.class);
+
+    df.select("id", "data")
+        .write()
+        .format("iceberg")
+        .option(SparkWriteOptions.WRITE_FORMAT, format.toString())
+        .mode(SaveMode.Append)
+        .save(location.toString());
+
+    try (CloseableIterable<FileScanTask> tasks = table.newScan().planFiles()) {
+      assertThat(tasks)
+          .extracting(task -> task.file().sortOrderId())
+          .as("All DataFiles should have unsorted sort order id")
+          .containsOnly(SortOrder.unsorted().orderId());
+    }
+  }
+
+  @TestTemplate
+  public void testWriteDataFilesAfterSortOrderChange() throws IOException {
+    File parent = temp.resolve(format.toString()).toFile();
+    File location = new File(parent, "test");
+
+    HadoopTables tables = new HadoopTables(CONF);
+    PartitionSpec spec = PartitionSpec.unpartitioned();
+    Table table = tables.create(SCHEMA, spec, location.toString());
+
+    List<SimpleRecord> records = Lists.newArrayList(new SimpleRecord(1, "a"));
+    Dataset<Row> df = spark.createDataFrame(records, SimpleRecord.class);
+
+    df.select("id", "data")
+        .write()
+        .format("iceberg")
+        .option(SparkWriteOptions.WRITE_FORMAT, format.toString())
+        .mode(SaveMode.Append)
+        .save(location.toString());
+
+    table.refresh();
+    int unsortedId = SortOrder.unsorted().orderId();
+
+    try (CloseableIterable<FileScanTask> tasks = table.newScan().planFiles()) {
+      assertThat(tasks).extracting(task -> 
task.file().sortOrderId()).containsOnly(unsortedId);
+    }
+
+    table.replaceSortOrder().asc("id").commit();
+    int sortedId = table.sortOrder().orderId();
+
+    df.select("id", "data")
+        .write()
+        .format("iceberg")
+        .option(SparkWriteOptions.WRITE_FORMAT, format.toString())
+        .mode(SaveMode.Append)
+        .save(location.toString());
+
+    table.refresh();
+
+    try (CloseableIterable<FileScanTask> tasks = table.newScan().planFiles()) {
+      assertThat(tasks)
+          .extracting(task -> task.file().sortOrderId())
+          .as("Should contain both unsorted and sorted files")
+          .containsOnly(unsortedId, sortedId);
+    }
+  }
+
   public void partitionedCreateWithTargetFileSizeViaOption(IcebergOptionsType 
option) {
     File parent = temp.resolve(format.toString()).toFile();
     File location = new File(parent, "test");
diff --git 
a/spark/v4.1/spark/src/test/java/org/apache/iceberg/spark/source/TestStructuredStreaming.java
 
b/spark/v4.1/spark/src/test/java/org/apache/iceberg/spark/source/TestStructuredStreaming.java
index 635229f6a0..8d191cf30b 100644
--- 
a/spark/v4.1/spark/src/test/java/org/apache/iceberg/spark/source/TestStructuredStreaming.java
+++ 
b/spark/v4.1/spark/src/test/java/org/apache/iceberg/spark/source/TestStructuredStreaming.java
@@ -29,10 +29,14 @@ import java.nio.file.Path;
 import java.nio.file.Paths;
 import java.util.List;
 import org.apache.hadoop.conf.Configuration;
+import org.apache.iceberg.FileScanTask;
 import org.apache.iceberg.PartitionSpec;
 import org.apache.iceberg.Schema;
+import org.apache.iceberg.SortOrder;
 import org.apache.iceberg.Table;
 import org.apache.iceberg.hadoop.HadoopTables;
+import org.apache.iceberg.io.CloseableIterable;
+import org.apache.iceberg.relocated.com.google.common.collect.ImmutableMap;
 import org.apache.iceberg.relocated.com.google.common.collect.Lists;
 import org.apache.iceberg.types.Types;
 import org.apache.spark.sql.Dataset;
@@ -262,6 +266,50 @@ public class TestStructuredStreaming {
     }
   }
 
+  @Test
+  public void testStreamingWriteDataFilesInTableSortOrder() throws Exception {
+    File parent = temp.resolve("parquet").toFile();
+    File location = new File(parent, "test-table");
+    File checkpoint = new File(parent, "checkpoint");
+
+    HadoopTables tables = new HadoopTables(CONF);
+    PartitionSpec spec = 
PartitionSpec.builderFor(SCHEMA).identity("data").build();
+    SortOrder sortOrder = SortOrder.builderFor(SCHEMA).asc("id").build();
+    Table table = tables.create(SCHEMA, spec, sortOrder, ImmutableMap.of(), 
location.toString());
+
+    MemoryStream<Integer> inputStream = newMemoryStream(1, spark, 
Encoders.INT());
+    DataStreamWriter<Row> streamWriter =
+        inputStream
+            .toDF()
+            .selectExpr("value AS id", "CAST (value AS STRING) AS data")
+            .writeStream()
+            .outputMode("append")
+            .format("iceberg")
+            .option("checkpointLocation", checkpoint.toString())
+            .option("path", location.toString());
+
+    try {
+      StreamingQuery query = streamWriter.start();
+      List<Integer> batch1 = Lists.newArrayList(1, 2);
+      send(batch1, inputStream);
+      query.processAllAvailable();
+      query.stop();
+
+      table.refresh();
+
+      try (CloseableIterable<FileScanTask> tasks = 
table.newScan().planFiles()) {
+        assertThat(tasks)
+            .extracting(task -> task.file().sortOrderId())
+            .as("All DataFiles are written with the table sort order id")
+            .containsOnly(table.sortOrder().orderId());
+      }
+    } finally {
+      for (StreamingQuery query : spark.streams().active()) {
+        query.stop();
+      }
+    }
+  }
+
   @Test
   public void testStreamingWriteUpdateMode() throws Exception {
     File parent = temp.resolve("parquet").toFile();


Reply via email to