This is an automated email from the ASF dual-hosted git repository.
russellspitzer pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/iceberg.git
The following commit(s) were added to refs/heads/main by this push:
new 420fb1a688 Spark 4.1: Set data file sort_order_id in manifest for
writes from Spark (#15150)
420fb1a688 is described below
commit 420fb1a688926440582fe078d7348f60f27997a8
Author: jbewing <[email protected]>
AuthorDate: Fri Mar 27 13:44:23 2026 -0400
Spark 4.1: Set data file sort_order_id in manifest for writes from Spark
(#15150)
---
.../java/org/apache/iceberg/SerializableTable.java | 23 ++++-
.../org/apache/iceberg/util/SortOrderUtil.java | 17 +++
.../org/apache/iceberg/util/TestSortOrderUtil.java | 64 ++++++++++++
.../spark/extensions/TestCopyOnWriteDelete.java | 20 ++++
.../spark/extensions/TestCopyOnWriteMerge.java | 29 ++++++
.../spark/extensions/TestCopyOnWriteUpdate.java | 20 ++++
.../spark/extensions/TestMergeOnReadMerge.java | 29 ++++++
.../spark/extensions/TestMergeOnReadUpdate.java | 20 ++++
.../org/apache/iceberg/spark/SparkWriteConf.java | 20 ++++
.../apache/iceberg/spark/SparkWriteOptions.java | 1 +
.../actions/SparkShufflingFileRewriteRunner.java | 16 +++
.../spark/source/SparkPositionDeltaWrite.java | 11 +-
.../apache/iceberg/spark/source/SparkWrite.java | 10 +-
.../apache/iceberg/spark/TestSparkWriteConf.java | 52 ++++++++++
.../spark/actions/TestRewriteDataFilesAction.java | 57 +++++++++-
.../iceberg/spark/source/TestSparkDataWrite.java | 115 +++++++++++++++++++++
.../spark/source/TestStructuredStreaming.java | 48 +++++++++
17 files changed, 542 insertions(+), 10 deletions(-)
diff --git a/core/src/main/java/org/apache/iceberg/SerializableTable.java
b/core/src/main/java/org/apache/iceberg/SerializableTable.java
index 7e5746ec91..a26fff1fd5 100644
--- a/core/src/main/java/org/apache/iceberg/SerializableTable.java
+++ b/core/src/main/java/org/apache/iceberg/SerializableTable.java
@@ -25,6 +25,7 @@ import java.util.UUID;
import org.apache.iceberg.encryption.EncryptionManager;
import org.apache.iceberg.io.FileIO;
import org.apache.iceberg.io.LocationProvider;
+import org.apache.iceberg.relocated.com.google.common.collect.ImmutableMap;
import org.apache.iceberg.relocated.com.google.common.collect.Maps;
import org.apache.iceberg.util.SerializableMap;
@@ -57,6 +58,7 @@ public class SerializableTable implements Table,
HasTableOperations, Serializabl
private final int defaultSpecId;
private final Map<Integer, String> specAsJsonMap;
private final String sortOrderAsJson;
+ private final Map<Integer, String> sortOrderAsJsonMap;
private final FileIO io;
private final EncryptionManager encryption;
private final Map<String, SnapshotRef> refs;
@@ -68,6 +70,7 @@ public class SerializableTable implements Table,
HasTableOperations, Serializabl
private transient volatile Schema lazySchema = null;
private transient volatile Map<Integer, PartitionSpec> lazySpecs = null;
private transient volatile SortOrder lazySortOrder = null;
+ private transient volatile Map<Integer, SortOrder> lazySortOrders = null;
protected SerializableTable(Table table) {
this.name = table.name();
@@ -80,6 +83,10 @@ public class SerializableTable implements Table,
HasTableOperations, Serializabl
Map<Integer, PartitionSpec> specs = table.specs();
specs.forEach((specId, spec) -> specAsJsonMap.put(specId,
PartitionSpecParser.toJson(spec)));
this.sortOrderAsJson = SortOrderParser.toJson(table.sortOrder());
+ this.sortOrderAsJsonMap = Maps.newHashMap();
+ table
+ .sortOrders()
+ .forEach((id, order) -> sortOrderAsJsonMap.put(id,
SortOrderParser.toJson(order)));
this.io = table.io();
this.encryption = table.encryption();
this.locationProviderTry = Try.of(table::locationProvider);
@@ -240,7 +247,21 @@ public class SerializableTable implements Table,
HasTableOperations, Serializabl
@Override
public Map<Integer, SortOrder> sortOrders() {
- return lazyTable().sortOrders();
+ if (lazySortOrders == null) {
+ synchronized (this) {
+ if (lazySortOrders == null && lazyTable == null) {
+ ImmutableMap.Builder<Integer, SortOrder> sortOrders =
+ ImmutableMap.builderWithExpectedSize(sortOrderAsJsonMap.size());
+ sortOrderAsJsonMap.forEach(
+ (id, json) -> sortOrders.put(id,
SortOrderParser.fromJson(schema(), json)));
+ this.lazySortOrders = sortOrders.build();
+ } else if (lazySortOrders == null) {
+ this.lazySortOrders = lazyTable.sortOrders();
+ }
+ }
+ }
+
+ return lazySortOrders;
}
@Override
diff --git a/core/src/main/java/org/apache/iceberg/util/SortOrderUtil.java
b/core/src/main/java/org/apache/iceberg/util/SortOrderUtil.java
index 37e0c1fffa..6548239937 100644
--- a/core/src/main/java/org/apache/iceberg/util/SortOrderUtil.java
+++ b/core/src/main/java/org/apache/iceberg/util/SortOrderUtil.java
@@ -46,6 +46,23 @@ public class SortOrderUtil {
return buildSortOrder(table.schema(), table.spec(), sortOrder);
}
+ /**
+ * Attempts to match a user-supplied {@link SortOrder} with an equivalent
sort order from a {@link
+ * Table}.
+ *
+ * @param table the table to try and match the sort order against
+ * @param userSuppliedSortOrder the user supplied sort order to try and
match with a table sort
+ * order
+ * @return the matching {@link SortOrder} from the table (with the orderId
set) or {@link
+ * SortOrder#unsorted()} if no match is found.
+ */
+ public static SortOrder findTableSortOrder(Table table, SortOrder
userSuppliedSortOrder) {
+ return table.sortOrders().values().stream()
+ .filter(sortOrder -> sortOrder.sameOrder(userSuppliedSortOrder))
+ .findFirst()
+ .orElseGet(SortOrder::unsorted);
+ }
+
/**
* Build a final sort order that satisfies the clustering required by the
partition spec.
*
diff --git a/core/src/test/java/org/apache/iceberg/util/TestSortOrderUtil.java
b/core/src/test/java/org/apache/iceberg/util/TestSortOrderUtil.java
index 02c81de932..7ac7d72a2f 100644
--- a/core/src/test/java/org/apache/iceberg/util/TestSortOrderUtil.java
+++ b/core/src/test/java/org/apache/iceberg/util/TestSortOrderUtil.java
@@ -287,4 +287,68 @@ public class TestSortOrderUtil {
.as("Should add spec fields as prefix")
.isEqualTo(expected);
}
+
+ @Test
+ public void testFindSortOrderForTable() {
+ PartitionSpec spec = PartitionSpec.unpartitioned();
+ SortOrder order = SortOrder.builderFor(SCHEMA).withOrderId(1).asc("id",
NULLS_LAST).build();
+ TestTables.TestTable table = TestTables.create(tableDir, "test", SCHEMA,
spec, order, 2);
+
+ SortOrder tableSortOrder = table.sortOrder();
+
+ SortOrder actualOrder = SortOrderUtil.findTableSortOrder(table,
tableSortOrder);
+
+ assertThat(actualOrder).as("Should find current table sort
order").isEqualTo(table.sortOrder());
+ }
+
+ @Test
+ public void testFindSortOrderForTableWithoutFieldId() {
+ PartitionSpec spec = PartitionSpec.unpartitioned();
+ SortOrder order = SortOrder.builderFor(SCHEMA).withOrderId(1).asc("id",
NULLS_LAST).build();
+ TestTables.TestTable table = TestTables.create(tableDir, "test", SCHEMA,
spec, order, 2);
+
+ SortOrder userSuppliedOrder =
+ SortOrder.builderFor(table.schema()).asc("id", NULLS_LAST).build();
+
+ SortOrder actualOrder = SortOrderUtil.findTableSortOrder(table,
userSuppliedOrder);
+
+ assertThat(actualOrder).as("Should find current table sort
order").isEqualTo(table.sortOrder());
+ }
+
+ @Test
+ public void testFindSortOrderForTableThatIsNotCurrentOrder() {
+ PartitionSpec spec = PartitionSpec.unpartitioned();
+ SortOrder order = SortOrder.builderFor(SCHEMA).withOrderId(1).asc("id",
NULLS_LAST).build();
+ TestTables.TestTable table = TestTables.create(tableDir, "test", SCHEMA,
spec, order, 2);
+
+ table.replaceSortOrder().asc("data").desc("ts").commit();
+
+ SortOrder userSuppliedOrder =
+ SortOrder.builderFor(table.schema()).asc("id", NULLS_LAST).build();
+
+ SortOrder actualOrder = SortOrderUtil.findTableSortOrder(table,
userSuppliedOrder);
+
+ assertThat(actualOrder)
+ .as("Should find first sorted table sort order")
+ .isEqualTo(table.sortOrders().get(1));
+ }
+
+ @Test
+ public void testReturnsUnsortedForMissingSortOrder() {
+ PartitionSpec spec = PartitionSpec.unpartitioned();
+ SortOrder order = SortOrder.builderFor(SCHEMA).withOrderId(1).asc("id",
NULLS_LAST).build();
+ TestTables.TestTable table = TestTables.create(tableDir, "test", SCHEMA,
spec, order, 2);
+
+ table.replaceSortOrder().asc("data").desc("ts").commit();
+
+ SortOrder userSuppliedOrder =
+ SortOrder.builderFor(table.schema()).desc("id", NULLS_LAST).build();
+
+ SortOrder actualOrder = SortOrderUtil.findTableSortOrder(table,
userSuppliedOrder);
+
+ assertThat(actualOrder)
+ .as(
+ "Should return unsorted order if user supplied order does not
match any table sort order")
+ .isEqualTo(SortOrder.unsorted());
+ }
}
diff --git
a/spark/v4.1/spark-extensions/src/test/java/org/apache/iceberg/spark/extensions/TestCopyOnWriteDelete.java
b/spark/v4.1/spark-extensions/src/test/java/org/apache/iceberg/spark/extensions/TestCopyOnWriteDelete.java
index e0b852a9d0..dd296e5d72 100644
---
a/spark/v4.1/spark-extensions/src/test/java/org/apache/iceberg/spark/extensions/TestCopyOnWriteDelete.java
+++
b/spark/v4.1/spark-extensions/src/test/java/org/apache/iceberg/spark/extensions/TestCopyOnWriteDelete.java
@@ -24,6 +24,7 @@ import java.io.File;
import java.io.IOException;
import java.util.List;
import java.util.Map;
+import org.apache.iceberg.DataFile;
import org.apache.iceberg.DeleteFile;
import org.apache.iceberg.Files;
import org.apache.iceberg.ParameterizedTestExtension;
@@ -54,6 +55,25 @@ public class TestCopyOnWriteDelete extends TestDelete {
TableProperties.DELETE_MODE,
RowLevelOperationMode.COPY_ON_WRITE.modeName());
}
+ @TestTemplate
+ public void testCopyOnWriteDeleteSetsSortOrderIdOnRewrittenDataFiles() {
+ createAndInitTable(
+ "id INT, dep STRING",
+ "PARTITIONED BY (dep)",
+ "{ \"id\": 1, \"dep\": \"hr\" }\n" + "{ \"id\": 2, \"dep\": \"hr\" }");
+
+ sql("ALTER TABLE %s WRITE ORDERED BY id", tableName);
+
+ sql("DELETE FROM %s WHERE id = 1", commitTarget());
+
+ Table table = validationCatalog.loadTable(tableIdent);
+ Snapshot snapshot = SnapshotUtil.latestSnapshot(table, branch);
+ assertThat(snapshot.addedDataFiles(table.io()))
+ .extracting(DataFile::sortOrderId)
+ .as("Rewritten data files should carry the table sort order id")
+ .containsOnly(table.sortOrder().orderId());
+ }
+
@TestTemplate
public void testRuntimeFilteringWithPreservedDataGrouping() throws
NoSuchTableException {
createAndInitPartitionedTable();
diff --git
a/spark/v4.1/spark-extensions/src/test/java/org/apache/iceberg/spark/extensions/TestCopyOnWriteMerge.java
b/spark/v4.1/spark-extensions/src/test/java/org/apache/iceberg/spark/extensions/TestCopyOnWriteMerge.java
index 36b7c5818e..3f4faa0be1 100644
---
a/spark/v4.1/spark-extensions/src/test/java/org/apache/iceberg/spark/extensions/TestCopyOnWriteMerge.java
+++
b/spark/v4.1/spark-extensions/src/test/java/org/apache/iceberg/spark/extensions/TestCopyOnWriteMerge.java
@@ -22,6 +22,7 @@ import static org.assertj.core.api.Assertions.assertThat;
import java.util.Collections;
import java.util.Map;
+import org.apache.iceberg.DataFile;
import org.apache.iceberg.ParameterizedTestExtension;
import org.apache.iceberg.RowLevelOperationMode;
import org.apache.iceberg.Snapshot;
@@ -45,6 +46,34 @@ public class TestCopyOnWriteMerge extends TestMerge {
TableProperties.MERGE_MODE,
RowLevelOperationMode.COPY_ON_WRITE.modeName());
}
+ @TestTemplate
+ public void testCopyOnWriteMergeSetsSortOrderIdOnRewrittenDataFiles() {
+ createAndInitTable("id INT, dep STRING");
+ sql("ALTER TABLE %s ADD PARTITION FIELD dep", tableName);
+ sql("ALTER TABLE %s WRITE ORDERED BY id", tableName);
+
+ append(tableName, "{ \"id\": 1, \"dep\": \"hr\" }\n" + "{ \"id\": 2,
\"dep\": \"hr\" }");
+ createBranchIfNeeded();
+
+ createOrReplaceView("source", Collections.singletonList(1),
Encoders.INT());
+
+ sql(
+ "MERGE INTO %s t USING source s "
+ + "ON t.id == s.value "
+ + "WHEN MATCHED THEN "
+ + " UPDATE SET dep = 'changed' "
+ + "WHEN NOT MATCHED THEN "
+ + " INSERT (id, dep) VALUES (s.value, 'new')",
+ commitTarget());
+
+ Table table = validationCatalog.loadTable(tableIdent);
+ Snapshot snapshot = SnapshotUtil.latestSnapshot(table, branch);
+ assertThat(snapshot.addedDataFiles(table.io()))
+ .extracting(DataFile::sortOrderId)
+ .as("Rewritten data files should carry the table sort order id")
+ .containsOnly(table.sortOrder().orderId());
+ }
+
@TestTemplate
public void testRuntimeFilteringWithReportedPartitioning() {
createAndInitTable("id INT, dep STRING");
diff --git
a/spark/v4.1/spark-extensions/src/test/java/org/apache/iceberg/spark/extensions/TestCopyOnWriteUpdate.java
b/spark/v4.1/spark-extensions/src/test/java/org/apache/iceberg/spark/extensions/TestCopyOnWriteUpdate.java
index ab5b6da50b..22a2b0932f 100644
---
a/spark/v4.1/spark-extensions/src/test/java/org/apache/iceberg/spark/extensions/TestCopyOnWriteUpdate.java
+++
b/spark/v4.1/spark-extensions/src/test/java/org/apache/iceberg/spark/extensions/TestCopyOnWriteUpdate.java
@@ -21,6 +21,7 @@ package org.apache.iceberg.spark.extensions;
import static org.assertj.core.api.Assertions.assertThat;
import java.util.Map;
+import org.apache.iceberg.DataFile;
import org.apache.iceberg.ParameterizedTestExtension;
import org.apache.iceberg.RowLevelOperationMode;
import org.apache.iceberg.Snapshot;
@@ -43,6 +44,25 @@ public class TestCopyOnWriteUpdate extends TestUpdate {
TableProperties.UPDATE_MODE,
RowLevelOperationMode.COPY_ON_WRITE.modeName());
}
+ @TestTemplate
+ public void testCopyOnWriteUpdateSetsSortOrderIdOnRewrittenDataFiles() {
+ createAndInitTable(
+ "id INT, dep STRING",
+ "PARTITIONED BY (dep)",
+ "{ \"id\": 1, \"dep\": \"hr\" }\n" + "{ \"id\": 2, \"dep\": \"hr\" }");
+
+ sql("ALTER TABLE %s WRITE ORDERED BY id", tableName);
+
+ sql("UPDATE %s SET dep = 'changed' WHERE id = 1", commitTarget());
+
+ Table table = validationCatalog.loadTable(tableIdent);
+ Snapshot snapshot = SnapshotUtil.latestSnapshot(table, branch);
+ assertThat(snapshot.addedDataFiles(table.io()))
+ .extracting(DataFile::sortOrderId)
+ .as("Rewritten data files should carry the table sort order id")
+ .containsOnly(table.sortOrder().orderId());
+ }
+
@TestTemplate
public void testRuntimeFilteringWithReportedPartitioning() {
createAndInitTable("id INT, dep STRING");
diff --git
a/spark/v4.1/spark-extensions/src/test/java/org/apache/iceberg/spark/extensions/TestMergeOnReadMerge.java
b/spark/v4.1/spark-extensions/src/test/java/org/apache/iceberg/spark/extensions/TestMergeOnReadMerge.java
index 737f19e86a..9a42b58e34 100644
---
a/spark/v4.1/spark-extensions/src/test/java/org/apache/iceberg/spark/extensions/TestMergeOnReadMerge.java
+++
b/spark/v4.1/spark-extensions/src/test/java/org/apache/iceberg/spark/extensions/TestMergeOnReadMerge.java
@@ -26,6 +26,7 @@ import java.util.Map;
import java.util.Set;
import java.util.stream.Collectors;
import java.util.stream.IntStream;
+import org.apache.iceberg.DataFile;
import org.apache.iceberg.DeleteFile;
import org.apache.iceberg.FileFormat;
import org.apache.iceberg.ParameterizedTestExtension;
@@ -136,6 +137,34 @@ public class TestMergeOnReadMerge extends TestMerge {
assertThat(dvs).allMatch(dv -> FileFormat.fromFileName(dv.location()) ==
FileFormat.PUFFIN);
}
+ @TestTemplate
+ public void testMergeOnReadMergeSetsSortOrderIdOnNewDataFiles() {
+ createAndInitTable(
+ "id INT, dep STRING",
+ "PARTITIONED BY (dep)",
+ "{ \"id\": 1, \"dep\": \"hr\" }\n" + "{ \"id\": 2, \"dep\": \"hr\" }");
+
+ sql("ALTER TABLE %s WRITE ORDERED BY id", tableName);
+
+ createOrReplaceView("source", ImmutableList.of(1, 3), Encoders.INT());
+
+ sql(
+ "MERGE INTO %s AS t USING source AS s "
+ + "ON t.id == s.value "
+ + "WHEN MATCHED THEN "
+ + " UPDATE SET id = id + 10 "
+ + "WHEN NOT MATCHED THEN "
+ + " INSERT (id, dep) VALUES (s.value, 'hr')",
+ commitTarget());
+
+ Table table = validationCatalog.loadTable(tableIdent);
+ Snapshot snapshot = SnapshotUtil.latestSnapshot(table, branch);
+ assertThat(snapshot.addedDataFiles(table.io()))
+ .extracting(DataFile::sortOrderId)
+ .as("All new data files should carry the table sort order id")
+ .containsOnly(table.sortOrder().orderId());
+ }
+
private void checkMergeDeleteGranularity(DeleteGranularity
deleteGranularity) {
createTableWithDeleteGranularity(
"id INT, dep STRING", "PARTITIONED BY (dep)", deleteGranularity);
diff --git
a/spark/v4.1/spark-extensions/src/test/java/org/apache/iceberg/spark/extensions/TestMergeOnReadUpdate.java
b/spark/v4.1/spark-extensions/src/test/java/org/apache/iceberg/spark/extensions/TestMergeOnReadUpdate.java
index 2398bc45b1..d1c336d5dd 100644
---
a/spark/v4.1/spark-extensions/src/test/java/org/apache/iceberg/spark/extensions/TestMergeOnReadUpdate.java
+++
b/spark/v4.1/spark-extensions/src/test/java/org/apache/iceberg/spark/extensions/TestMergeOnReadUpdate.java
@@ -25,6 +25,7 @@ import java.util.List;
import java.util.Map;
import java.util.Set;
import java.util.stream.Collectors;
+import org.apache.iceberg.DataFile;
import org.apache.iceberg.DeleteFile;
import org.apache.iceberg.FileFormat;
import org.apache.iceberg.ParameterizedTestExtension;
@@ -224,6 +225,25 @@ public class TestMergeOnReadUpdate extends TestUpdate {
assertThat(dvs).allMatch(dv -> FileFormat.fromFileName(dv.location()) ==
FileFormat.PUFFIN);
}
+ @TestTemplate
+ public void testMergeOnReadUpdateSetsSortOrderIdOnNewDataFiles() {
+ createAndInitTable(
+ "id INT, dep STRING",
+ "PARTITIONED BY (dep)",
+ "{ \"id\": 1, \"dep\": \"hr\" }\n" + "{ \"id\": 2, \"dep\": \"hr\" }");
+
+ sql("ALTER TABLE %s WRITE ORDERED BY id", tableName);
+
+ sql("UPDATE %s SET id = id + 10 WHERE id = 1", commitTarget());
+
+ Table table = validationCatalog.loadTable(tableIdent);
+ Snapshot snapshot = SnapshotUtil.latestSnapshot(table, branch);
+ assertThat(snapshot.addedDataFiles(table.io()))
+ .extracting(DataFile::sortOrderId)
+ .as("All new data files should carry the table sort order id")
+ .containsOnly(table.sortOrder().orderId());
+ }
+
private void initTable(String partitionedBy, DeleteGranularity
deleteGranularity) {
createTableWithDeleteGranularity("id INT, dep STRING", partitionedBy,
deleteGranularity);
diff --git
a/spark/v4.1/spark/src/main/java/org/apache/iceberg/spark/SparkWriteConf.java
b/spark/v4.1/spark/src/main/java/org/apache/iceberg/spark/SparkWriteConf.java
index 8a629d3a6d..2296c076f0 100644
---
a/spark/v4.1/spark/src/main/java/org/apache/iceberg/spark/SparkWriteConf.java
+++
b/spark/v4.1/spark/src/main/java/org/apache/iceberg/spark/SparkWriteConf.java
@@ -42,6 +42,7 @@ import org.apache.iceberg.DistributionMode;
import org.apache.iceberg.FileFormat;
import org.apache.iceberg.IsolationLevel;
import org.apache.iceberg.SnapshotSummary;
+import org.apache.iceberg.SortOrder;
import org.apache.iceberg.Table;
import org.apache.iceberg.TableProperties;
import org.apache.iceberg.TableUtil;
@@ -171,6 +172,25 @@ public class SparkWriteConf {
return outputSpecId;
}
+ public int outputSortOrderId(SparkWriteRequirements writeRequirements) {
+ Integer explicitId =
+
confParser.intConf().option(SparkWriteOptions.OUTPUT_SORT_ORDER_ID).parseOptional();
+
+ if (explicitId != null) {
+ Preconditions.checkArgument(
+ table.sortOrders().containsKey(explicitId),
+ "Cannot use output sort order id %s because the table does not
contain a sort order with that id",
+ explicitId);
+ return explicitId;
+ }
+
+ if (writeRequirements.hasOrdering()) {
+ return table.sortOrder().orderId();
+ }
+
+ return SortOrder.unsorted().orderId();
+ }
+
public FileFormat dataFileFormat() {
String valueAsString =
confParser
diff --git
a/spark/v4.1/spark/src/main/java/org/apache/iceberg/spark/SparkWriteOptions.java
b/spark/v4.1/spark/src/main/java/org/apache/iceberg/spark/SparkWriteOptions.java
index 86c27acd88..2b88d2bb1e 100644
---
a/spark/v4.1/spark/src/main/java/org/apache/iceberg/spark/SparkWriteOptions.java
+++
b/spark/v4.1/spark/src/main/java/org/apache/iceberg/spark/SparkWriteOptions.java
@@ -54,6 +54,7 @@ public class SparkWriteOptions {
public static final String CHECK_ORDERING = "check-ordering";
public static final String OUTPUT_SPEC_ID = "output-spec-id";
+ public static final String OUTPUT_SORT_ORDER_ID = "output-sort-order-id";
public static final String OVERWRITE_MODE = "overwrite-mode";
diff --git
a/spark/v4.1/spark/src/main/java/org/apache/iceberg/spark/actions/SparkShufflingFileRewriteRunner.java
b/spark/v4.1/spark/src/main/java/org/apache/iceberg/spark/actions/SparkShufflingFileRewriteRunner.java
index bc1665dc32..f20017648d 100644
---
a/spark/v4.1/spark/src/main/java/org/apache/iceberg/spark/actions/SparkShufflingFileRewriteRunner.java
+++
b/spark/v4.1/spark/src/main/java/org/apache/iceberg/spark/actions/SparkShufflingFileRewriteRunner.java
@@ -46,10 +46,14 @@ import
org.apache.spark.sql.connector.distributions.OrderedDistribution;
import org.apache.spark.sql.connector.expressions.SortOrder;
import org.apache.spark.sql.connector.write.RequiresDistributionAndOrdering;
import
org.apache.spark.sql.execution.datasources.v2.DistributionAndOrderingUtils$;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
import scala.Option;
abstract class SparkShufflingFileRewriteRunner extends
SparkDataFileRewriteRunner {
+ private static final Logger LOG =
LoggerFactory.getLogger(SparkShufflingFileRewriteRunner.class);
+
/**
* The number of shuffle partitions to use for each output file. By default,
this file rewriter
* assumes each shuffle partition would become a separate output file.
Attempting to generate
@@ -113,12 +117,24 @@ abstract class SparkShufflingFileRewriteRunner extends
SparkDataFileRewriteRunne
spec(fileGroup.outputSpecId()),
fileGroup.expectedOutputFiles()));
+ org.apache.iceberg.SortOrder sortOrderInJobSpec = sortOrder();
+
+ org.apache.iceberg.SortOrder maybeMatchingTableSortOrder =
+ SortOrderUtil.findTableSortOrder(table(), sortOrder());
+
+ if (sortOrderInJobSpec.isSorted() &&
maybeMatchingTableSortOrder.isUnsorted()) {
+ LOG.warn(
+ "Sort order specified for job {} doesn't match any table sort
orders, rewritten files will not be marked as sorted in the manifest files",
+ Spark3Util.describe(sortOrderInJobSpec));
+ }
+
sortedDF
.write()
.format("iceberg")
.option(SparkWriteOptions.TARGET_FILE_SIZE_BYTES,
fileGroup.maxOutputFileSize())
.option(SparkWriteOptions.USE_TABLE_DISTRIBUTION_AND_ORDERING, "false")
.option(SparkWriteOptions.OUTPUT_SPEC_ID, fileGroup.outputSpecId())
+ .option(SparkWriteOptions.OUTPUT_SORT_ORDER_ID,
maybeMatchingTableSortOrder.orderId())
.mode("append")
.save(groupId);
}
diff --git
a/spark/v4.1/spark/src/main/java/org/apache/iceberg/spark/source/SparkPositionDeltaWrite.java
b/spark/v4.1/spark/src/main/java/org/apache/iceberg/spark/source/SparkPositionDeltaWrite.java
index fa4ab524a4..89f6e1c350 100644
---
a/spark/v4.1/spark/src/main/java/org/apache/iceberg/spark/source/SparkPositionDeltaWrite.java
+++
b/spark/v4.1/spark/src/main/java/org/apache/iceberg/spark/source/SparkPositionDeltaWrite.java
@@ -119,6 +119,7 @@ class SparkPositionDeltaWrite extends BaseSparkWrite
private final String wapId;
private final Map<String, String> extraSnapshotMetadata;
private final SparkWriteRequirements writeRequirements;
+ private final int sortOrderId;
private final Context context;
private final Map<String, String> writeProperties;
@@ -146,6 +147,7 @@ class SparkPositionDeltaWrite extends BaseSparkWrite
this.wapId = writeConf.wapId();
this.extraSnapshotMetadata = writeConf.extraSnapshotMetadata();
this.writeRequirements = writeConf.positionDeltaRequirements(command);
+ this.sortOrderId = writeConf.outputSortOrderId(writeRequirements);
this.context = new Context(dataSchema, writeConf, info, writeRequirements);
this.writeProperties = writeConf.writeProperties();
@@ -206,7 +208,8 @@ class SparkPositionDeltaWrite extends BaseSparkWrite
broadcastRewritableDeletes(),
command,
context,
- writeProperties);
+ writeProperties,
+ sortOrderId);
}
private Broadcast<Map<String, DeleteFileSet>> broadcastRewritableDeletes()
{
@@ -416,18 +419,21 @@ class SparkPositionDeltaWrite extends BaseSparkWrite
private final Command command;
private final Context context;
private final Map<String, String> writeProperties;
+ private final int sortOrderId;
PositionDeltaWriteFactory(
Broadcast<Table> tableBroadcast,
Broadcast<Map<String, DeleteFileSet>> rewritableDeletesBroadcast,
Command command,
Context context,
- Map<String, String> writeProperties) {
+ Map<String, String> writeProperties,
+ int sortOrderId) {
this.tableBroadcast = tableBroadcast;
this.rewritableDeletesBroadcast = rewritableDeletesBroadcast;
this.command = command;
this.context = context;
this.writeProperties = writeProperties;
+ this.sortOrderId = sortOrderId;
}
@Override
@@ -454,6 +460,7 @@ class SparkPositionDeltaWrite extends BaseSparkWrite
.deleteFileFormat(context.deleteFileFormat())
.positionDeleteSparkType(context.deleteSparkType())
.writeProperties(writeProperties)
+ .dataSortOrder(table.sortOrders().get(sortOrderId))
.build();
if (command == DELETE) {
diff --git
a/spark/v4.1/spark/src/main/java/org/apache/iceberg/spark/source/SparkWrite.java
b/spark/v4.1/spark/src/main/java/org/apache/iceberg/spark/source/SparkWrite.java
index 22dc3be66d..7caa24772e 100644
---
a/spark/v4.1/spark/src/main/java/org/apache/iceberg/spark/source/SparkWrite.java
+++
b/spark/v4.1/spark/src/main/java/org/apache/iceberg/spark/source/SparkWrite.java
@@ -214,6 +214,7 @@ abstract class SparkWrite extends BaseSparkWrite implements
Write, RequiresDistr
// broadcast the table metadata as the writer factory will be sent to
executors
Broadcast<Table> tableBroadcast =
sparkContext.broadcast(SerializableTableWithSize.copyOf(table));
+ int sortOrderId = writeConf.outputSortOrderId(writeRequirements);
return new WriterFactory(
tableBroadcast,
queryId,
@@ -223,7 +224,8 @@ abstract class SparkWrite extends BaseSparkWrite implements
Write, RequiresDistr
writeSchema,
dsSchema,
useFanoutWriter,
- writeProperties);
+ writeProperties,
+ sortOrderId);
}
private void commitOperation(SnapshotUpdate<?> operation, String
description) {
@@ -708,6 +710,7 @@ abstract class SparkWrite extends BaseSparkWrite implements
Write, RequiresDistr
private final boolean useFanoutWriter;
private final String queryId;
private final Map<String, String> writeProperties;
+ private final int sortOrderId;
protected WriterFactory(
Broadcast<Table> tableBroadcast,
@@ -718,7 +721,8 @@ abstract class SparkWrite extends BaseSparkWrite implements
Write, RequiresDistr
Schema writeSchema,
StructType dsSchema,
boolean useFanoutWriter,
- Map<String, String> writeProperties) {
+ Map<String, String> writeProperties,
+ int sortOrderId) {
this.tableBroadcast = tableBroadcast;
this.format = format;
this.outputSpecId = outputSpecId;
@@ -728,6 +732,7 @@ abstract class SparkWrite extends BaseSparkWrite implements
Write, RequiresDistr
this.useFanoutWriter = useFanoutWriter;
this.queryId = queryId;
this.writeProperties = writeProperties;
+ this.sortOrderId = sortOrderId;
}
@Override
@@ -752,6 +757,7 @@ abstract class SparkWrite extends BaseSparkWrite implements
Write, RequiresDistr
.dataSchema(writeSchema)
.dataSparkType(dsSchema)
.writeProperties(writeProperties)
+ .dataSortOrder(table.sortOrders().get(sortOrderId))
.build();
Function<InternalRow, InternalRow> rowLineageExtractor = new
ExtractRowLineage(writeSchema);
diff --git
a/spark/v4.1/spark/src/test/java/org/apache/iceberg/spark/TestSparkWriteConf.java
b/spark/v4.1/spark/src/test/java/org/apache/iceberg/spark/TestSparkWriteConf.java
index 227b93dfa4..383a21087d 100644
---
a/spark/v4.1/spark/src/test/java/org/apache/iceberg/spark/TestSparkWriteConf.java
+++
b/spark/v4.1/spark/src/test/java/org/apache/iceberg/spark/TestSparkWriteConf.java
@@ -45,6 +45,7 @@ import static
org.apache.spark.sql.connector.write.RowLevelOperation.Command.DEL
import static
org.apache.spark.sql.connector.write.RowLevelOperation.Command.MERGE;
import static
org.apache.spark.sql.connector.write.RowLevelOperation.Command.UPDATE;
import static org.assertj.core.api.Assertions.assertThat;
+import static
org.assertj.core.api.Assertions.assertThatIllegalArgumentException;
import static org.assertj.core.api.Assertions.assertThatThrownBy;
import java.time.Duration;
@@ -606,6 +607,57 @@ public class TestSparkWriteConf extends
TestBaseWithCatalog {
assertThat(writeConf.deleteFileFormat()).isEqualTo(FileFormat.PUFFIN);
}
+ @TestTemplate
+ public void testSortOrderWriteConf() {
+ Table table = validationCatalog.loadTable(tableIdent);
+
+ table.replaceSortOrder().asc("id").commit();
+
+ SparkWriteConf writeConf =
+ new SparkWriteConf(
+ spark,
+ table,
+ new CaseInsensitiveStringMap(
+ ImmutableMap.of(SparkWriteOptions.OUTPUT_SORT_ORDER_ID, "1")));
+
+ assertThat(writeConf.outputSortOrderId(SparkWriteRequirements.EMPTY))
+ .isEqualTo(table.sortOrder().orderId());
+ }
+
+ @TestTemplate
+ public void testSortOrderWriteConfWithInvalidId() {
+ Table table = validationCatalog.loadTable(tableIdent);
+
+ table.replaceSortOrder().asc("id").commit();
+
+ SparkWriteConf writeConfForUnknownSortOrder =
+ new SparkWriteConf(
+ spark,
+ table,
+ new CaseInsensitiveStringMap(
+ ImmutableMap.of(SparkWriteOptions.OUTPUT_SORT_ORDER_ID,
"999")));
+
+ assertThatIllegalArgumentException()
+ .isThrownBy(
+ () ->
writeConfForUnknownSortOrder.outputSortOrderId(SparkWriteRequirements.EMPTY))
+ .withMessage(
+ "Cannot use output sort order id 999 because the table does not
contain a sort order with that id");
+ }
+
+ @TestTemplate
+ public void testSortOrderWriteConfWithNoOption() {
+ Table table = validationCatalog.loadTable(tableIdent);
+
+ table.replaceSortOrder().asc("id").commit();
+
+ SparkWriteConf writeConfNoOption = new SparkWriteConf(spark, table);
+
+
assertThat(writeConfNoOption.outputSortOrderId(writeConfNoOption.writeRequirements()))
+ .isEqualTo(table.sortOrder().orderId());
+
+
assertThat(writeConfNoOption.outputSortOrderId(SparkWriteRequirements.EMPTY)).isEqualTo(0);
+ }
+
private void testWriteProperties(List<Map<String, String>> propertiesSuite) {
withSQLConf(
propertiesSuite.get(0),
diff --git
a/spark/v4.1/spark/src/test/java/org/apache/iceberg/spark/actions/TestRewriteDataFilesAction.java
b/spark/v4.1/spark/src/test/java/org/apache/iceberg/spark/actions/TestRewriteDataFilesAction.java
index 7f5dea1904..9524b0e716 100644
---
a/spark/v4.1/spark/src/test/java/org/apache/iceberg/spark/actions/TestRewriteDataFilesAction.java
+++
b/spark/v4.1/spark/src/test/java/org/apache/iceberg/spark/actions/TestRewriteDataFilesAction.java
@@ -1517,7 +1517,7 @@ public class TestRewriteDataFilesAction extends TestBase {
}
@TestTemplate
- public void testSimpleSort() {
+ public void testSimpleSort() throws IOException {
Table table = createTable(20);
shouldHaveFiles(table, 20);
table.replaceSortOrder().asc("c2").commit();
@@ -1545,10 +1545,11 @@ public class TestRewriteDataFilesAction extends
TestBase {
shouldHaveACleanCache(table);
shouldHaveMultipleFiles(table);
shouldHaveLastCommitSorted(table, "c2");
+ dataFilesSortOrderShouldMatchTableSortOrder(table);
}
@TestTemplate
- public void testSortAfterPartitionChange() {
+ public void testSortAfterPartitionChange() throws IOException {
Table table = createTable(20);
shouldHaveFiles(table, 20);
table.updateSpec().addField(Expressions.bucket("c1", 4)).commit();
@@ -1579,10 +1580,11 @@ public class TestRewriteDataFilesAction extends
TestBase {
shouldHaveACleanCache(table);
shouldHaveMultipleFiles(table);
shouldHaveLastCommitSorted(table, "c2");
+ dataFilesSortOrderShouldMatchTableSortOrder(table);
}
@TestTemplate
- public void testSortCustomSortOrder() {
+ public void testSortCustomSortOrder() throws IOException {
Table table = createTable(20);
shouldHaveLastCommitUnsorted(table, "c2");
shouldHaveFiles(table, 20);
@@ -1608,10 +1610,11 @@ public class TestRewriteDataFilesAction extends
TestBase {
shouldHaveACleanCache(table);
shouldHaveMultipleFiles(table);
shouldHaveLastCommitSorted(table, "c2");
+ dataFilesShouldHaveSortOrderIdMatching(table, SortOrder.unsorted());
}
@TestTemplate
- public void testSortCustomSortOrderRequiresRepartition() {
+ public void testSortCustomSortOrderRequiresRepartition() throws IOException {
int partitions = 4;
Table table = createTable();
writeRecords(20, SCALE, partitions);
@@ -1647,10 +1650,40 @@ public class TestRewriteDataFilesAction extends
TestBase {
shouldHaveMultipleFiles(table);
shouldHaveLastCommitUnsorted(table, "c2");
shouldHaveLastCommitSorted(table, "c3");
+ dataFilesShouldHaveSortOrderIdMatching(table, SortOrder.unsorted());
}
@TestTemplate
- public void testAutoSortShuffleOutput() {
+ public void testSortPastTableSortOrderGetsAppliedToFiles() throws
IOException {
+ Table table = createTable(1);
+
+ table.replaceSortOrder().asc("c3").commit();
+ SortOrder c3SortOrder = table.sortOrder();
+
+ table.replaceSortOrder().asc("c2").commit();
+
+ List<Object[]> originalData = currentData();
+
+ RewriteDataFiles.Result result =
+ basicRewrite(table)
+ .sort(SortOrder.builderFor(table.schema()).asc("c3").build())
+ .option(SizeBasedFileRewritePlanner.REWRITE_ALL, "true")
+ .execute();
+
+ assertThat(result.rewriteResults()).as("Should have 1
fileGroups").hasSize(1);
+
+ table.refresh();
+
+ List<Object[]> postRewriteData = currentData();
+ assertEquals("We shouldn't have changed the data", originalData,
postRewriteData);
+
+ shouldHaveSnapshots(table, 2);
+ shouldHaveACleanCache(table);
+ dataFilesShouldHaveSortOrderIdMatching(table, c3SortOrder);
+ }
+
+ @TestTemplate
+ public void testAutoSortShuffleOutput() throws IOException {
Table table = createTable(20);
shouldHaveLastCommitUnsorted(table, "c2");
shouldHaveFiles(table, 20);
@@ -1685,6 +1718,7 @@ public class TestRewriteDataFilesAction extends TestBase {
shouldHaveACleanCache(table);
shouldHaveMultipleFiles(table);
shouldHaveLastCommitSorted(table, "c2");
+ dataFilesShouldHaveSortOrderIdMatching(table, SortOrder.unsorted());
}
@TestTemplate
@@ -2623,4 +2657,17 @@ public class TestRewriteDataFilesAction extends TestBase
{
return groupIDs.contains(argument.info().globalIndex());
}
}
+
+ private void dataFilesSortOrderShouldMatchTableSortOrder(Table table) throws
IOException {
+ dataFilesShouldHaveSortOrderIdMatching(table, table.sortOrder());
+ }
+
+ private void dataFilesShouldHaveSortOrderIdMatching(Table table, SortOrder
sortOrder)
+ throws IOException {
+ try (CloseableIterable<FileScanTask> files = table.newScan().planFiles()) {
+ assertThat(files)
+ .extracting(fileScanTask -> fileScanTask.file().sortOrderId())
+ .containsOnly(sortOrder.orderId());
+ }
+ }
}
diff --git
a/spark/v4.1/spark/src/test/java/org/apache/iceberg/spark/source/TestSparkDataWrite.java
b/spark/v4.1/spark/src/test/java/org/apache/iceberg/spark/source/TestSparkDataWrite.java
index 63e9a638e5..a90de51fce 100644
---
a/spark/v4.1/spark/src/test/java/org/apache/iceberg/spark/source/TestSparkDataWrite.java
+++
b/spark/v4.1/spark/src/test/java/org/apache/iceberg/spark/source/TestSparkDataWrite.java
@@ -28,6 +28,7 @@ import static org.mockito.Mockito.spy;
import static org.mockito.Mockito.when;
import java.io.File;
+import java.io.IOException;
import java.net.InetAddress;
import java.nio.file.Path;
import java.util.List;
@@ -36,6 +37,7 @@ import org.apache.hadoop.conf.Configuration;
import org.apache.iceberg.AppendFiles;
import org.apache.iceberg.DataFile;
import org.apache.iceberg.FileFormat;
+import org.apache.iceberg.FileScanTask;
import org.apache.iceberg.ManifestFile;
import org.apache.iceberg.ManifestFiles;
import org.apache.iceberg.Parameter;
@@ -44,10 +46,12 @@ import org.apache.iceberg.Parameters;
import org.apache.iceberg.PartitionSpec;
import org.apache.iceberg.Schema;
import org.apache.iceberg.SnapshotRef;
+import org.apache.iceberg.SortOrder;
import org.apache.iceberg.Table;
import org.apache.iceberg.TableProperties;
import org.apache.iceberg.exceptions.CommitStateUnknownException;
import org.apache.iceberg.hadoop.HadoopTables;
+import org.apache.iceberg.io.CloseableIterable;
import org.apache.iceberg.relocated.com.google.common.collect.ImmutableList;
import org.apache.iceberg.relocated.com.google.common.collect.ImmutableMap;
import org.apache.iceberg.relocated.com.google.common.collect.Lists;
@@ -154,6 +158,7 @@ public class TestSparkDataWrite {
assertThat(file.splitOffsets()).as("Split offsets not
present").isNotNull();
}
assertThat(file.recordCount()).as("Should have reported record count
as 1").isEqualTo(1);
+
assertThat(file.sortOrderId()).isEqualTo(SortOrder.unsorted().orderId());
// TODO: append more metric info
if (format.equals(FileFormat.PARQUET)) {
assertThat(file.columnSizes()).as("Column sizes metric not
present").isNotNull();
@@ -555,6 +560,116 @@ public class TestSparkDataWrite {
assertThat(actual2).hasSameSizeAs(expected2).isEqualTo(expected2);
}
+ @TestTemplate
+ public void testWriteDataFilesInTableSortOrder() throws IOException {
+ File parent = temp.resolve(format.toString()).toFile();
+ File location = new File(parent, "test");
+ HadoopTables tables = new HadoopTables(CONF);
+ PartitionSpec spec = PartitionSpec.unpartitioned();
+ SortOrder sortOrder = SortOrder.builderFor(SCHEMA).asc("id").build();
+ Table table = tables.create(SCHEMA, spec, sortOrder, ImmutableMap.of(),
location.toString());
+
+ List<SimpleRecord> expected = Lists.newArrayListWithCapacity(10);
+ for (int i = 0; i < 10; i++) {
+ expected.add(new SimpleRecord(i, "a"));
+ }
+
+ Dataset<Row> df = spark.createDataFrame(expected, SimpleRecord.class);
+
+ df.select("id", "data")
+ .write()
+ .format("iceberg")
+ .option(SparkWriteOptions.WRITE_FORMAT, format.toString())
+ .mode(SaveMode.Append)
+ .save(location.toString());
+
+ Dataset<Row> result =
spark.read().format("iceberg").load(location.toString());
+
+ List<SimpleRecord> actual =
+
result.orderBy("id").as(Encoders.bean(SimpleRecord.class)).collectAsList();
+ assertThat(actual).hasSameSizeAs(expected).isEqualTo(expected);
+
+ try (CloseableIterable<FileScanTask> fileScanTasks =
table.newScan().planFiles()) {
+ assertThat(fileScanTasks)
+ .extracting(task -> task.file().sortOrderId())
+ .as("All DataFiles are written with the table sort order id")
+ .containsOnly(table.sortOrder().orderId());
+ }
+ }
+
+ @TestTemplate
+ public void testWriteDataFilesUnsortedTable() throws IOException {
+ File parent = temp.resolve(format.toString()).toFile();
+ File location = new File(parent, "test");
+
+ HadoopTables tables = new HadoopTables(CONF);
+ PartitionSpec spec = PartitionSpec.unpartitioned();
+ Table table = tables.create(SCHEMA, spec, location.toString());
+
+ List<SimpleRecord> expected = Lists.newArrayList(new SimpleRecord(1, "a"));
+ Dataset<Row> df = spark.createDataFrame(expected, SimpleRecord.class);
+
+ df.select("id", "data")
+ .write()
+ .format("iceberg")
+ .option(SparkWriteOptions.WRITE_FORMAT, format.toString())
+ .mode(SaveMode.Append)
+ .save(location.toString());
+
+ try (CloseableIterable<FileScanTask> tasks = table.newScan().planFiles()) {
+ assertThat(tasks)
+ .extracting(task -> task.file().sortOrderId())
+ .as("All DataFiles should have unsorted sort order id")
+ .containsOnly(SortOrder.unsorted().orderId());
+ }
+ }
+
+ @TestTemplate
+ public void testWriteDataFilesAfterSortOrderChange() throws IOException {
+ File parent = temp.resolve(format.toString()).toFile();
+ File location = new File(parent, "test");
+
+ HadoopTables tables = new HadoopTables(CONF);
+ PartitionSpec spec = PartitionSpec.unpartitioned();
+ Table table = tables.create(SCHEMA, spec, location.toString());
+
+ List<SimpleRecord> records = Lists.newArrayList(new SimpleRecord(1, "a"));
+ Dataset<Row> df = spark.createDataFrame(records, SimpleRecord.class);
+
+ df.select("id", "data")
+ .write()
+ .format("iceberg")
+ .option(SparkWriteOptions.WRITE_FORMAT, format.toString())
+ .mode(SaveMode.Append)
+ .save(location.toString());
+
+ table.refresh();
+ int unsortedId = SortOrder.unsorted().orderId();
+
+ try (CloseableIterable<FileScanTask> tasks = table.newScan().planFiles()) {
+ assertThat(tasks).extracting(task ->
task.file().sortOrderId()).containsOnly(unsortedId);
+ }
+
+ table.replaceSortOrder().asc("id").commit();
+ int sortedId = table.sortOrder().orderId();
+
+ df.select("id", "data")
+ .write()
+ .format("iceberg")
+ .option(SparkWriteOptions.WRITE_FORMAT, format.toString())
+ .mode(SaveMode.Append)
+ .save(location.toString());
+
+ table.refresh();
+
+ try (CloseableIterable<FileScanTask> tasks = table.newScan().planFiles()) {
+ assertThat(tasks)
+ .extracting(task -> task.file().sortOrderId())
+ .as("Should contain both unsorted and sorted files")
+ .containsOnly(unsortedId, sortedId);
+ }
+ }
+
public void partitionedCreateWithTargetFileSizeViaOption(IcebergOptionsType
option) {
File parent = temp.resolve(format.toString()).toFile();
File location = new File(parent, "test");
diff --git
a/spark/v4.1/spark/src/test/java/org/apache/iceberg/spark/source/TestStructuredStreaming.java
b/spark/v4.1/spark/src/test/java/org/apache/iceberg/spark/source/TestStructuredStreaming.java
index 635229f6a0..8d191cf30b 100644
---
a/spark/v4.1/spark/src/test/java/org/apache/iceberg/spark/source/TestStructuredStreaming.java
+++
b/spark/v4.1/spark/src/test/java/org/apache/iceberg/spark/source/TestStructuredStreaming.java
@@ -29,10 +29,14 @@ import java.nio.file.Path;
import java.nio.file.Paths;
import java.util.List;
import org.apache.hadoop.conf.Configuration;
+import org.apache.iceberg.FileScanTask;
import org.apache.iceberg.PartitionSpec;
import org.apache.iceberg.Schema;
+import org.apache.iceberg.SortOrder;
import org.apache.iceberg.Table;
import org.apache.iceberg.hadoop.HadoopTables;
+import org.apache.iceberg.io.CloseableIterable;
+import org.apache.iceberg.relocated.com.google.common.collect.ImmutableMap;
import org.apache.iceberg.relocated.com.google.common.collect.Lists;
import org.apache.iceberg.types.Types;
import org.apache.spark.sql.Dataset;
@@ -262,6 +266,50 @@ public class TestStructuredStreaming {
}
}
+ @Test
+ public void testStreamingWriteDataFilesInTableSortOrder() throws Exception {
+ File parent = temp.resolve("parquet").toFile();
+ File location = new File(parent, "test-table");
+ File checkpoint = new File(parent, "checkpoint");
+
+ HadoopTables tables = new HadoopTables(CONF);
+ PartitionSpec spec =
PartitionSpec.builderFor(SCHEMA).identity("data").build();
+ SortOrder sortOrder = SortOrder.builderFor(SCHEMA).asc("id").build();
+ Table table = tables.create(SCHEMA, spec, sortOrder, ImmutableMap.of(),
location.toString());
+
+ MemoryStream<Integer> inputStream = newMemoryStream(1, spark,
Encoders.INT());
+ DataStreamWriter<Row> streamWriter =
+ inputStream
+ .toDF()
+ .selectExpr("value AS id", "CAST (value AS STRING) AS data")
+ .writeStream()
+ .outputMode("append")
+ .format("iceberg")
+ .option("checkpointLocation", checkpoint.toString())
+ .option("path", location.toString());
+
+ try {
+ StreamingQuery query = streamWriter.start();
+ List<Integer> batch1 = Lists.newArrayList(1, 2);
+ send(batch1, inputStream);
+ query.processAllAvailable();
+ query.stop();
+
+ table.refresh();
+
+ try (CloseableIterable<FileScanTask> tasks =
table.newScan().planFiles()) {
+ assertThat(tasks)
+ .extracting(task -> task.file().sortOrderId())
+ .as("All DataFiles are written with the table sort order id")
+ .containsOnly(table.sortOrder().orderId());
+ }
+ } finally {
+ for (StreamingQuery query : spark.streams().active()) {
+ query.stop();
+ }
+ }
+ }
+
@Test
public void testStreamingWriteUpdateMode() throws Exception {
File parent = temp.resolve("parquet").toFile();