This is an automated email from the ASF dual-hosted git repository.
kevinjqliu pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/iceberg.git
The following commit(s) were added to refs/heads/main by this push:
new bdbb37555c Spark 3.4: Set data file sort_order_id in manifest for
writes from Spark (#16308)
bdbb37555c is described below
commit bdbb37555c9db9b134122dc4fd998e5ebbeb41af
Author: Kevin Liu <[email protected]>
AuthorDate: Tue May 12 22:04:00 2026 -0400
Spark 3.4: Set data file sort_order_id in manifest for writes from Spark
(#16308)
Backport of #15832 to spark/v3.4.
Adds the output-sort-order-id write option and threads the resolved
sort-order id through SparkWrite and SparkPositionDeltaWrite so written
data files record the sort order in their manifest entry.
Adaptation: v3.4 SparkWrite still uses 'partitionedFanoutEnabled' (not
renamed to 'useFanoutWriter' as in v3.5). Kept the v3.4 name and added
the new 'sortOrderId' parameter alongside it.
---
.../spark/extensions/TestCopyOnWriteDelete.java | 19 ++++
.../spark/extensions/TestCopyOnWriteMerge.java | 28 +++++
.../spark/extensions/TestCopyOnWriteUpdate.java | 19 ++++
.../spark/extensions/TestMergeOnReadMerge.java | 29 ++++++
.../spark/extensions/TestMergeOnReadUpdate.java | 20 ++++
.../org/apache/iceberg/spark/SparkWriteConf.java | 20 ++++
.../apache/iceberg/spark/SparkWriteOptions.java | 1 +
.../actions/SparkShufflingFileRewriteRunner.java | 16 +++
.../spark/source/SparkPositionDeltaWrite.java | 11 +-
.../apache/iceberg/spark/source/SparkWrite.java | 10 +-
.../apache/iceberg/spark/TestSparkWriteConf.java | 46 +++++++++
.../spark/actions/TestRewriteDataFilesAction.java | 57 +++++++++-
.../iceberg/spark/source/TestSparkDataWrite.java | 115 +++++++++++++++++++++
.../spark/source/TestStructuredStreaming.java | 48 +++++++++
14 files changed, 430 insertions(+), 9 deletions(-)
diff --git
a/spark/v3.4/spark-extensions/src/test/java/org/apache/iceberg/spark/extensions/TestCopyOnWriteDelete.java
b/spark/v3.4/spark-extensions/src/test/java/org/apache/iceberg/spark/extensions/TestCopyOnWriteDelete.java
index f7ded0c4d7..d39dff060c 100644
---
a/spark/v3.4/spark-extensions/src/test/java/org/apache/iceberg/spark/extensions/TestCopyOnWriteDelete.java
+++
b/spark/v3.4/spark-extensions/src/test/java/org/apache/iceberg/spark/extensions/TestCopyOnWriteDelete.java
@@ -162,6 +162,25 @@ public class TestCopyOnWriteDelete extends TestDelete {
assertThat(executorService.awaitTermination(2,
TimeUnit.MINUTES)).as("Timeout").isTrue();
}
+ @TestTemplate
+ public void testCopyOnWriteDeleteSetsSortOrderIdOnRewrittenDataFiles() {
+ createAndInitTable(
+ "id INT, dep STRING",
+ "PARTITIONED BY (dep)",
+ "{ \"id\": 1, \"dep\": \"hr\" }\n" + "{ \"id\": 2, \"dep\": \"hr\" }");
+
+ sql("ALTER TABLE %s WRITE ORDERED BY id", tableName);
+
+ sql("DELETE FROM %s WHERE id = 1", commitTarget());
+
+ Table table = validationCatalog.loadTable(tableIdent);
+ Snapshot snapshot = SnapshotUtil.latestSnapshot(table, branch);
+ assertThat(snapshot.addedDataFiles(table.io()))
+ .extracting(DataFile::sortOrderId)
+ .as("Rewritten data files should carry the table sort order id")
+ .containsOnly(table.sortOrder().orderId());
+ }
+
@TestTemplate
public void testRuntimeFilteringWithPreservedDataGrouping() throws
NoSuchTableException {
createAndInitPartitionedTable();
diff --git
a/spark/v3.4/spark-extensions/src/test/java/org/apache/iceberg/spark/extensions/TestCopyOnWriteMerge.java
b/spark/v3.4/spark-extensions/src/test/java/org/apache/iceberg/spark/extensions/TestCopyOnWriteMerge.java
index 8d509c2952..03d5b4ca5b 100644
---
a/spark/v3.4/spark-extensions/src/test/java/org/apache/iceberg/spark/extensions/TestCopyOnWriteMerge.java
+++
b/spark/v3.4/spark-extensions/src/test/java/org/apache/iceberg/spark/extensions/TestCopyOnWriteMerge.java
@@ -147,6 +147,34 @@ public class TestCopyOnWriteMerge extends TestMerge {
assertThat(executorService.awaitTermination(2,
TimeUnit.MINUTES)).as("Timeout").isTrue();
}
+ @TestTemplate
+ public void testCopyOnWriteMergeSetsSortOrderIdOnRewrittenDataFiles() {
+ createAndInitTable("id INT, dep STRING");
+ sql("ALTER TABLE %s ADD PARTITION FIELD dep", tableName);
+ sql("ALTER TABLE %s WRITE ORDERED BY id", tableName);
+
+ append(tableName, "{ \"id\": 1, \"dep\": \"hr\" }\n" + "{ \"id\": 2,
\"dep\": \"hr\" }");
+ createBranchIfNeeded();
+
+ createOrReplaceView("source", Collections.singletonList(1),
Encoders.INT());
+
+ sql(
+ "MERGE INTO %s t USING source s "
+ + "ON t.id == s.value "
+ + "WHEN MATCHED THEN "
+ + " UPDATE SET dep = 'changed' "
+ + "WHEN NOT MATCHED THEN "
+ + " INSERT (id, dep) VALUES (s.value, 'new')",
+ commitTarget());
+
+ Table table = validationCatalog.loadTable(tableIdent);
+ Snapshot snapshot = SnapshotUtil.latestSnapshot(table, branch);
+ assertThat(snapshot.addedDataFiles(table.io()))
+ .extracting(DataFile::sortOrderId)
+ .as("Rewritten data files should carry the table sort order id")
+ .containsOnly(table.sortOrder().orderId());
+ }
+
@TestTemplate
public void testRuntimeFilteringWithReportedPartitioning() {
createAndInitTable("id INT, dep STRING");
diff --git
a/spark/v3.4/spark-extensions/src/test/java/org/apache/iceberg/spark/extensions/TestCopyOnWriteUpdate.java
b/spark/v3.4/spark-extensions/src/test/java/org/apache/iceberg/spark/extensions/TestCopyOnWriteUpdate.java
index 21d1377b2b..b547218acb 100644
---
a/spark/v3.4/spark-extensions/src/test/java/org/apache/iceberg/spark/extensions/TestCopyOnWriteUpdate.java
+++
b/spark/v3.4/spark-extensions/src/test/java/org/apache/iceberg/spark/extensions/TestCopyOnWriteUpdate.java
@@ -149,6 +149,25 @@ public class TestCopyOnWriteUpdate extends TestUpdate {
assertThat(executorService.awaitTermination(2,
TimeUnit.MINUTES)).as("Timeout").isTrue();
}
+ @TestTemplate
+ public void testCopyOnWriteUpdateSetsSortOrderIdOnRewrittenDataFiles() {
+ createAndInitTable(
+ "id INT, dep STRING",
+ "PARTITIONED BY (dep)",
+ "{ \"id\": 1, \"dep\": \"hr\" }\n" + "{ \"id\": 2, \"dep\": \"hr\" }");
+
+ sql("ALTER TABLE %s WRITE ORDERED BY id", tableName);
+
+ sql("UPDATE %s SET dep = 'changed' WHERE id = 1", commitTarget());
+
+ Table table = validationCatalog.loadTable(tableIdent);
+ Snapshot snapshot = SnapshotUtil.latestSnapshot(table, branch);
+ assertThat(snapshot.addedDataFiles(table.io()))
+ .extracting(DataFile::sortOrderId)
+ .as("Rewritten data files should carry the table sort order id")
+ .containsOnly(table.sortOrder().orderId());
+ }
+
@TestTemplate
public void testRuntimeFilteringWithReportedPartitioning() {
createAndInitTable("id INT, dep STRING");
diff --git
a/spark/v3.4/spark-extensions/src/test/java/org/apache/iceberg/spark/extensions/TestMergeOnReadMerge.java
b/spark/v3.4/spark-extensions/src/test/java/org/apache/iceberg/spark/extensions/TestMergeOnReadMerge.java
index 361faade7e..dcd9b95757 100644
---
a/spark/v3.4/spark-extensions/src/test/java/org/apache/iceberg/spark/extensions/TestMergeOnReadMerge.java
+++
b/spark/v3.4/spark-extensions/src/test/java/org/apache/iceberg/spark/extensions/TestMergeOnReadMerge.java
@@ -26,6 +26,7 @@ import java.util.Map;
import java.util.Set;
import java.util.stream.Collectors;
import java.util.stream.IntStream;
+import org.apache.iceberg.DataFile;
import org.apache.iceberg.DeleteFile;
import org.apache.iceberg.FileFormat;
import org.apache.iceberg.ParameterizedTestExtension;
@@ -136,6 +137,34 @@ public class TestMergeOnReadMerge extends TestMerge {
assertThat(dvs).allMatch(dv -> FileFormat.fromFileName(dv.location()) ==
FileFormat.PUFFIN);
}
+ @TestTemplate
+ public void testMergeOnReadMergeSetsSortOrderIdOnNewDataFiles() {
+ createAndInitTable(
+ "id INT, dep STRING",
+ "PARTITIONED BY (dep)",
+ "{ \"id\": 1, \"dep\": \"hr\" }\n" + "{ \"id\": 2, \"dep\": \"hr\" }");
+
+ sql("ALTER TABLE %s WRITE ORDERED BY id", tableName);
+
+ createOrReplaceView("source", ImmutableList.of(1, 3), Encoders.INT());
+
+ sql(
+ "MERGE INTO %s AS t USING source AS s "
+ + "ON t.id == s.value "
+ + "WHEN MATCHED THEN "
+ + " UPDATE SET id = id + 10 "
+ + "WHEN NOT MATCHED THEN "
+ + " INSERT (id, dep) VALUES (s.value, 'hr')",
+ commitTarget());
+
+ Table table = validationCatalog.loadTable(tableIdent);
+ Snapshot snapshot = SnapshotUtil.latestSnapshot(table, branch);
+ assertThat(snapshot.addedDataFiles(table.io()))
+ .extracting(DataFile::sortOrderId)
+ .as("All new data files should carry the table sort order id")
+ .containsOnly(table.sortOrder().orderId());
+ }
+
private void checkMergeDeleteGranularity(DeleteGranularity
deleteGranularity) {
createTableWithDeleteGranularity(
"id INT, dep STRING", "PARTITIONED BY (dep)", deleteGranularity);
diff --git
a/spark/v3.4/spark-extensions/src/test/java/org/apache/iceberg/spark/extensions/TestMergeOnReadUpdate.java
b/spark/v3.4/spark-extensions/src/test/java/org/apache/iceberg/spark/extensions/TestMergeOnReadUpdate.java
index a876e6d66b..599f39c8a2 100644
---
a/spark/v3.4/spark-extensions/src/test/java/org/apache/iceberg/spark/extensions/TestMergeOnReadUpdate.java
+++
b/spark/v3.4/spark-extensions/src/test/java/org/apache/iceberg/spark/extensions/TestMergeOnReadUpdate.java
@@ -25,6 +25,7 @@ import java.util.List;
import java.util.Map;
import java.util.Set;
import java.util.stream.Collectors;
+import org.apache.iceberg.DataFile;
import org.apache.iceberg.DeleteFile;
import org.apache.iceberg.FileFormat;
import org.apache.iceberg.ParameterizedTestExtension;
@@ -197,6 +198,25 @@ public class TestMergeOnReadUpdate extends TestUpdate {
assertThat(dvs).allMatch(dv -> FileFormat.fromFileName(dv.location()) ==
FileFormat.PUFFIN);
}
+ @TestTemplate
+ public void testMergeOnReadUpdateSetsSortOrderIdOnNewDataFiles() {
+ createAndInitTable(
+ "id INT, dep STRING",
+ "PARTITIONED BY (dep)",
+ "{ \"id\": 1, \"dep\": \"hr\" }\n" + "{ \"id\": 2, \"dep\": \"hr\" }");
+
+ sql("ALTER TABLE %s WRITE ORDERED BY id", tableName);
+
+ sql("UPDATE %s SET id = id + 10 WHERE id = 1", commitTarget());
+
+ Table table = validationCatalog.loadTable(tableIdent);
+ Snapshot snapshot = SnapshotUtil.latestSnapshot(table, branch);
+ assertThat(snapshot.addedDataFiles(table.io()))
+ .extracting(DataFile::sortOrderId)
+ .as("All new data files should carry the table sort order id")
+ .containsOnly(table.sortOrder().orderId());
+ }
+
private void checkUpdateFileGranularity(DeleteGranularity deleteGranularity)
{
initTable("PARTITIONED BY (dep)", deleteGranularity);
diff --git
a/spark/v3.4/spark/src/main/java/org/apache/iceberg/spark/SparkWriteConf.java
b/spark/v3.4/spark/src/main/java/org/apache/iceberg/spark/SparkWriteConf.java
index a13cff6e99..6f2a4781c5 100644
---
a/spark/v3.4/spark/src/main/java/org/apache/iceberg/spark/SparkWriteConf.java
+++
b/spark/v3.4/spark/src/main/java/org/apache/iceberg/spark/SparkWriteConf.java
@@ -41,6 +41,7 @@ import org.apache.iceberg.DistributionMode;
import org.apache.iceberg.FileFormat;
import org.apache.iceberg.IsolationLevel;
import org.apache.iceberg.SnapshotSummary;
+import org.apache.iceberg.SortOrder;
import org.apache.iceberg.Table;
import org.apache.iceberg.TableProperties;
import org.apache.iceberg.TableUtil;
@@ -157,6 +158,25 @@ public class SparkWriteConf {
return outputSpecId;
}
+ public int outputSortOrderId(SparkWriteRequirements writeRequirements) {
+ Integer explicitId =
+
confParser.intConf().option(SparkWriteOptions.OUTPUT_SORT_ORDER_ID).parseOptional();
+
+ if (explicitId != null) {
+ Preconditions.checkArgument(
+ table.sortOrders().containsKey(explicitId),
+ "Cannot use output sort order id %s because the table does not
contain a sort order with that id",
+ explicitId);
+ return explicitId;
+ }
+
+ if (writeRequirements.hasOrdering()) {
+ return table.sortOrder().orderId();
+ }
+
+ return SortOrder.unsorted().orderId();
+ }
+
public FileFormat dataFileFormat() {
String valueAsString =
confParser
diff --git
a/spark/v3.4/spark/src/main/java/org/apache/iceberg/spark/SparkWriteOptions.java
b/spark/v3.4/spark/src/main/java/org/apache/iceberg/spark/SparkWriteOptions.java
index 391cb6bae3..1daecb523b 100644
---
a/spark/v3.4/spark/src/main/java/org/apache/iceberg/spark/SparkWriteOptions.java
+++
b/spark/v3.4/spark/src/main/java/org/apache/iceberg/spark/SparkWriteOptions.java
@@ -59,6 +59,7 @@ public class SparkWriteOptions {
"handle-timestamp-without-timezone";
public static final String OUTPUT_SPEC_ID = "output-spec-id";
+ public static final String OUTPUT_SORT_ORDER_ID = "output-sort-order-id";
public static final String OVERWRITE_MODE = "overwrite-mode";
diff --git
a/spark/v3.4/spark/src/main/java/org/apache/iceberg/spark/actions/SparkShufflingFileRewriteRunner.java
b/spark/v3.4/spark/src/main/java/org/apache/iceberg/spark/actions/SparkShufflingFileRewriteRunner.java
index b1c5a5c090..346abaee5e 100644
---
a/spark/v3.4/spark/src/main/java/org/apache/iceberg/spark/actions/SparkShufflingFileRewriteRunner.java
+++
b/spark/v3.4/spark/src/main/java/org/apache/iceberg/spark/actions/SparkShufflingFileRewriteRunner.java
@@ -47,10 +47,14 @@ import
org.apache.spark.sql.connector.distributions.OrderedDistribution;
import org.apache.spark.sql.connector.expressions.SortOrder;
import org.apache.spark.sql.connector.write.RequiresDistributionAndOrdering;
import
org.apache.spark.sql.execution.datasources.v2.DistributionAndOrderingUtils$;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
import scala.Option;
abstract class SparkShufflingFileRewriteRunner extends
SparkDataFileRewriteRunner {
+ private static final Logger LOG =
LoggerFactory.getLogger(SparkShufflingFileRewriteRunner.class);
+
/**
* The number of shuffle partitions to use for each output file. By default,
this file rewriter
* assumes each shuffle partition would become a separate output file.
Attempting to generate
@@ -119,6 +123,17 @@ abstract class SparkShufflingFileRewriteRunner extends
SparkDataFileRewriteRunne
spec(fileGroup.outputSpecId()),
fileGroup.expectedOutputFiles()));
+ org.apache.iceberg.SortOrder sortOrderInJobSpec = sortOrder();
+
+ org.apache.iceberg.SortOrder maybeMatchingTableSortOrder =
+ SortOrderUtil.findTableSortOrder(table(), sortOrder());
+
+ if (sortOrderInJobSpec.isSorted() &&
maybeMatchingTableSortOrder.isUnsorted()) {
+ LOG.warn(
+ "Sort order specified for job {} doesn't match any table sort
orders, rewritten files will not be marked as sorted in the manifest files",
+ Spark3Util.describe(sortOrderInJobSpec));
+ }
+
sortedDF
.write()
.format("iceberg")
@@ -126,6 +141,7 @@ abstract class SparkShufflingFileRewriteRunner extends
SparkDataFileRewriteRunne
.option(SparkWriteOptions.TARGET_FILE_SIZE_BYTES,
fileGroup.maxOutputFileSize())
.option(SparkWriteOptions.USE_TABLE_DISTRIBUTION_AND_ORDERING, "false")
.option(SparkWriteOptions.OUTPUT_SPEC_ID, fileGroup.outputSpecId())
+ .option(SparkWriteOptions.OUTPUT_SORT_ORDER_ID,
maybeMatchingTableSortOrder.orderId())
.mode("append")
.save(groupId);
}
diff --git
a/spark/v3.4/spark/src/main/java/org/apache/iceberg/spark/source/SparkPositionDeltaWrite.java
b/spark/v3.4/spark/src/main/java/org/apache/iceberg/spark/source/SparkPositionDeltaWrite.java
index f1fd7b7ff9..885b67c6ae 100644
---
a/spark/v3.4/spark/src/main/java/org/apache/iceberg/spark/source/SparkPositionDeltaWrite.java
+++
b/spark/v3.4/spark/src/main/java/org/apache/iceberg/spark/source/SparkPositionDeltaWrite.java
@@ -110,6 +110,7 @@ class SparkPositionDeltaWrite implements DeltaWrite,
RequiresDistributionAndOrde
private final String branch;
private final Map<String, String> extraSnapshotMetadata;
private final SparkWriteRequirements writeRequirements;
+ private final int sortOrderId;
private final Context context;
private final Map<String, String> writeProperties;
@@ -135,6 +136,7 @@ class SparkPositionDeltaWrite implements DeltaWrite,
RequiresDistributionAndOrde
this.branch = writeConf.branch();
this.extraSnapshotMetadata = writeConf.extraSnapshotMetadata();
this.writeRequirements = writeConf.positionDeltaRequirements(command);
+ this.sortOrderId = writeConf.outputSortOrderId(writeRequirements);
this.context = new Context(dataSchema, writeConf, info, writeRequirements);
this.writeProperties = writeConf.writeProperties();
}
@@ -169,7 +171,8 @@ class SparkPositionDeltaWrite implements DeltaWrite,
RequiresDistributionAndOrde
broadcastRewritableDeletes(),
command,
context,
- writeProperties);
+ writeProperties,
+ sortOrderId);
}
private Broadcast<Map<String, DeleteFileSet>> broadcastRewritableDeletes()
{
@@ -379,18 +382,21 @@ class SparkPositionDeltaWrite implements DeltaWrite,
RequiresDistributionAndOrde
private final Command command;
private final Context context;
private final Map<String, String> writeProperties;
+ private final int sortOrderId;
PositionDeltaWriteFactory(
Broadcast<Table> tableBroadcast,
Broadcast<Map<String, DeleteFileSet>> rewritableDeletesBroadcast,
Command command,
Context context,
- Map<String, String> writeProperties) {
+ Map<String, String> writeProperties,
+ int sortOrderId) {
this.tableBroadcast = tableBroadcast;
this.rewritableDeletesBroadcast = rewritableDeletesBroadcast;
this.command = command;
this.context = context;
this.writeProperties = writeProperties;
+ this.sortOrderId = sortOrderId;
}
@Override
@@ -417,6 +423,7 @@ class SparkPositionDeltaWrite implements DeltaWrite,
RequiresDistributionAndOrde
.deleteFileFormat(context.deleteFileFormat())
.positionDeleteSparkType(context.deleteSparkType())
.writeProperties(writeProperties)
+ .dataSortOrder(table.sortOrders().get(sortOrderId))
.build();
if (command == DELETE) {
diff --git
a/spark/v3.4/spark/src/main/java/org/apache/iceberg/spark/source/SparkWrite.java
b/spark/v3.4/spark/src/main/java/org/apache/iceberg/spark/source/SparkWrite.java
index e890680cf7..f1bbd949d5 100644
---
a/spark/v3.4/spark/src/main/java/org/apache/iceberg/spark/source/SparkWrite.java
+++
b/spark/v3.4/spark/src/main/java/org/apache/iceberg/spark/source/SparkWrite.java
@@ -182,6 +182,7 @@ abstract class SparkWrite implements Write,
RequiresDistributionAndOrdering {
// broadcast the table metadata as the writer factory will be sent to
executors
Broadcast<Table> tableBroadcast =
sparkContext.broadcast(SerializableTableWithSize.copyOf(table));
+ int sortOrderId = writeConf.outputSortOrderId(writeRequirements);
return new WriterFactory(
tableBroadcast,
queryId,
@@ -191,7 +192,8 @@ abstract class SparkWrite implements Write,
RequiresDistributionAndOrdering {
writeSchema,
dsSchema,
partitionedFanoutEnabled,
- writeProperties);
+ writeProperties,
+ sortOrderId);
}
private void commitOperation(SnapshotUpdate<?> operation, String
description) {
@@ -656,6 +658,7 @@ abstract class SparkWrite implements Write,
RequiresDistributionAndOrdering {
private final boolean partitionedFanoutEnabled;
private final String queryId;
private final Map<String, String> writeProperties;
+ private final int sortOrderId;
protected WriterFactory(
Broadcast<Table> tableBroadcast,
@@ -666,7 +669,8 @@ abstract class SparkWrite implements Write,
RequiresDistributionAndOrdering {
Schema writeSchema,
StructType dsSchema,
boolean partitionedFanoutEnabled,
- Map<String, String> writeProperties) {
+ Map<String, String> writeProperties,
+ int sortOrderId) {
this.tableBroadcast = tableBroadcast;
this.format = format;
this.outputSpecId = outputSpecId;
@@ -676,6 +680,7 @@ abstract class SparkWrite implements Write,
RequiresDistributionAndOrdering {
this.partitionedFanoutEnabled = partitionedFanoutEnabled;
this.queryId = queryId;
this.writeProperties = writeProperties;
+ this.sortOrderId = sortOrderId;
}
@Override
@@ -700,6 +705,7 @@ abstract class SparkWrite implements Write,
RequiresDistributionAndOrdering {
.dataSchema(writeSchema)
.dataSparkType(dsSchema)
.writeProperties(writeProperties)
+ .dataSortOrder(table.sortOrders().get(sortOrderId))
.build();
if (spec.isUnpartitioned()) {
diff --git
a/spark/v3.4/spark/src/test/java/org/apache/iceberg/spark/TestSparkWriteConf.java
b/spark/v3.4/spark/src/test/java/org/apache/iceberg/spark/TestSparkWriteConf.java
index ae7ed3af65..4b261aaf83 100644
---
a/spark/v3.4/spark/src/test/java/org/apache/iceberg/spark/TestSparkWriteConf.java
+++
b/spark/v3.4/spark/src/test/java/org/apache/iceberg/spark/TestSparkWriteConf.java
@@ -45,6 +45,7 @@ import static
org.apache.spark.sql.connector.write.RowLevelOperation.Command.DEL
import static
org.apache.spark.sql.connector.write.RowLevelOperation.Command.MERGE;
import static
org.apache.spark.sql.connector.write.RowLevelOperation.Command.UPDATE;
import static org.assertj.core.api.Assertions.assertThat;
+import static
org.assertj.core.api.Assertions.assertThatIllegalArgumentException;
import static org.assertj.core.api.Assertions.assertThatThrownBy;
import java.time.Duration;
@@ -489,6 +490,51 @@ public class TestSparkWriteConf extends
TestBaseWithCatalog {
}
}
+ @TestTemplate
+ public void testSortOrderWriteConf() {
+ Table table = validationCatalog.loadTable(tableIdent);
+
+ table.replaceSortOrder().asc("id").commit();
+
+ SparkWriteConf writeConf =
+ new SparkWriteConf(
+ spark, table,
ImmutableMap.of(SparkWriteOptions.OUTPUT_SORT_ORDER_ID, "1"));
+
+ assertThat(writeConf.outputSortOrderId(SparkWriteRequirements.EMPTY))
+ .isEqualTo(table.sortOrder().orderId());
+ }
+
+ @TestTemplate
+ public void testSortOrderWriteConfWithInvalidId() {
+ Table table = validationCatalog.loadTable(tableIdent);
+
+ table.replaceSortOrder().asc("id").commit();
+
+ SparkWriteConf writeConfForUnknownSortOrder =
+ new SparkWriteConf(
+ spark, table,
ImmutableMap.of(SparkWriteOptions.OUTPUT_SORT_ORDER_ID, "999"));
+
+ assertThatIllegalArgumentException()
+ .isThrownBy(
+ () ->
writeConfForUnknownSortOrder.outputSortOrderId(SparkWriteRequirements.EMPTY))
+ .withMessage(
+ "Cannot use output sort order id 999 because the table does not
contain a sort order with that id");
+ }
+
+ @TestTemplate
+ public void testSortOrderWriteConfWithNoOption() {
+ Table table = validationCatalog.loadTable(tableIdent);
+
+ table.replaceSortOrder().asc("id").commit();
+
+ SparkWriteConf writeConfNoOption = new SparkWriteConf(spark, table,
ImmutableMap.of());
+
+
assertThat(writeConfNoOption.outputSortOrderId(writeConfNoOption.writeRequirements()))
+ .isEqualTo(table.sortOrder().orderId());
+
+
assertThat(writeConfNoOption.outputSortOrderId(SparkWriteRequirements.EMPTY)).isEqualTo(0);
+ }
+
private void testWriteProperties(List<Map<String, String>> propertiesSuite) {
withSQLConf(
propertiesSuite.get(0),
diff --git
a/spark/v3.4/spark/src/test/java/org/apache/iceberg/spark/actions/TestRewriteDataFilesAction.java
b/spark/v3.4/spark/src/test/java/org/apache/iceberg/spark/actions/TestRewriteDataFilesAction.java
index 6abce5b24d..d74d8a29f9 100644
---
a/spark/v3.4/spark/src/test/java/org/apache/iceberg/spark/actions/TestRewriteDataFilesAction.java
+++
b/spark/v3.4/spark/src/test/java/org/apache/iceberg/spark/actions/TestRewriteDataFilesAction.java
@@ -1516,7 +1516,7 @@ public class TestRewriteDataFilesAction extends TestBase {
}
@TestTemplate
- public void testSimpleSort() {
+ public void testSimpleSort() throws IOException {
Table table = createTable(20);
shouldHaveFiles(table, 20);
table.replaceSortOrder().asc("c2").commit();
@@ -1544,10 +1544,11 @@ public class TestRewriteDataFilesAction extends
TestBase {
shouldHaveACleanCache(table);
shouldHaveMultipleFiles(table);
shouldHaveLastCommitSorted(table, "c2");
+ dataFilesSortOrderShouldMatchTableSortOrder(table);
}
@TestTemplate
- public void testSortAfterPartitionChange() {
+ public void testSortAfterPartitionChange() throws IOException {
Table table = createTable(20);
shouldHaveFiles(table, 20);
table.updateSpec().addField(Expressions.bucket("c1", 4)).commit();
@@ -1578,10 +1579,11 @@ public class TestRewriteDataFilesAction extends
TestBase {
shouldHaveACleanCache(table);
shouldHaveMultipleFiles(table);
shouldHaveLastCommitSorted(table, "c2");
+ dataFilesSortOrderShouldMatchTableSortOrder(table);
}
@TestTemplate
- public void testSortCustomSortOrder() {
+ public void testSortCustomSortOrder() throws IOException {
Table table = createTable(20);
shouldHaveLastCommitUnsorted(table, "c2");
shouldHaveFiles(table, 20);
@@ -1607,10 +1609,11 @@ public class TestRewriteDataFilesAction extends
TestBase {
shouldHaveACleanCache(table);
shouldHaveMultipleFiles(table);
shouldHaveLastCommitSorted(table, "c2");
+ dataFilesShouldHaveSortOrderIdMatching(table, SortOrder.unsorted());
}
@TestTemplate
- public void testSortCustomSortOrderRequiresRepartition() {
+ public void testSortCustomSortOrderRequiresRepartition() throws IOException {
int partitions = 4;
Table table = createTable();
writeRecords(20, SCALE, partitions);
@@ -1646,10 +1649,40 @@ public class TestRewriteDataFilesAction extends
TestBase {
shouldHaveMultipleFiles(table);
shouldHaveLastCommitUnsorted(table, "c2");
shouldHaveLastCommitSorted(table, "c3");
+ dataFilesShouldHaveSortOrderIdMatching(table, SortOrder.unsorted());
}
@TestTemplate
- public void testAutoSortShuffleOutput() {
+ public void testSortPastTableSortOrderGetsAppliedToFiles() throws
IOException {
+ Table table = createTable(1);
+
+ table.replaceSortOrder().asc("c3").commit();
+ SortOrder c3SortOrder = table.sortOrder();
+
+ table.replaceSortOrder().asc("c2").commit();
+
+ List<Object[]> originalData = currentData();
+
+ RewriteDataFiles.Result result =
+ basicRewrite(table)
+ .sort(SortOrder.builderFor(table.schema()).asc("c3").build())
+ .option(SizeBasedFileRewritePlanner.REWRITE_ALL, "true")
+ .execute();
+
+ assertThat(result.rewriteResults()).as("Should have 1
fileGroups").hasSize(1);
+
+ table.refresh();
+
+ List<Object[]> postRewriteData = currentData();
+ assertEquals("We shouldn't have changed the data", originalData,
postRewriteData);
+
+ shouldHaveSnapshots(table, 2);
+ shouldHaveACleanCache(table);
+ dataFilesShouldHaveSortOrderIdMatching(table, c3SortOrder);
+ }
+
+ @TestTemplate
+ public void testAutoSortShuffleOutput() throws IOException {
Table table = createTable(20);
shouldHaveLastCommitUnsorted(table, "c2");
shouldHaveFiles(table, 20);
@@ -1684,6 +1717,7 @@ public class TestRewriteDataFilesAction extends TestBase {
shouldHaveACleanCache(table);
shouldHaveMultipleFiles(table);
shouldHaveLastCommitSorted(table, "c2");
+ dataFilesShouldHaveSortOrderIdMatching(table, SortOrder.unsorted());
}
@TestTemplate
@@ -2619,4 +2653,17 @@ public class TestRewriteDataFilesAction extends TestBase
{
return groupIDs.contains(argument.info().globalIndex());
}
}
+
+ private void dataFilesSortOrderShouldMatchTableSortOrder(Table table) throws
IOException {
+ dataFilesShouldHaveSortOrderIdMatching(table, table.sortOrder());
+ }
+
+ private void dataFilesShouldHaveSortOrderIdMatching(Table table, SortOrder
sortOrder)
+ throws IOException {
+ try (CloseableIterable<FileScanTask> files = table.newScan().planFiles()) {
+ assertThat(files)
+ .extracting(fileScanTask -> fileScanTask.file().sortOrderId())
+ .containsOnly(sortOrder.orderId());
+ }
+ }
}
diff --git
a/spark/v3.4/spark/src/test/java/org/apache/iceberg/spark/source/TestSparkDataWrite.java
b/spark/v3.4/spark/src/test/java/org/apache/iceberg/spark/source/TestSparkDataWrite.java
index 439c4443b9..bf667956ec 100644
---
a/spark/v3.4/spark/src/test/java/org/apache/iceberg/spark/source/TestSparkDataWrite.java
+++
b/spark/v3.4/spark/src/test/java/org/apache/iceberg/spark/source/TestSparkDataWrite.java
@@ -27,6 +27,7 @@ import static org.mockito.Mockito.spy;
import static org.mockito.Mockito.when;
import java.io.File;
+import java.io.IOException;
import java.net.InetAddress;
import java.nio.file.Path;
import java.util.List;
@@ -35,6 +36,7 @@ import org.apache.hadoop.conf.Configuration;
import org.apache.iceberg.AppendFiles;
import org.apache.iceberg.DataFile;
import org.apache.iceberg.FileFormat;
+import org.apache.iceberg.FileScanTask;
import org.apache.iceberg.ManifestFile;
import org.apache.iceberg.ManifestFiles;
import org.apache.iceberg.Parameter;
@@ -43,10 +45,12 @@ import org.apache.iceberg.Parameters;
import org.apache.iceberg.PartitionSpec;
import org.apache.iceberg.Schema;
import org.apache.iceberg.SnapshotRef;
+import org.apache.iceberg.SortOrder;
import org.apache.iceberg.Table;
import org.apache.iceberg.TableProperties;
import org.apache.iceberg.exceptions.CommitStateUnknownException;
import org.apache.iceberg.hadoop.HadoopTables;
+import org.apache.iceberg.io.CloseableIterable;
import org.apache.iceberg.relocated.com.google.common.collect.ImmutableList;
import org.apache.iceberg.relocated.com.google.common.collect.ImmutableMap;
import org.apache.iceberg.relocated.com.google.common.collect.Lists;
@@ -155,6 +159,7 @@ public class TestSparkDataWrite {
assertThat(file.splitOffsets()).as("Split offsets not
present").isNotNull();
}
assertThat(file.recordCount()).as("Should have reported record count
as 1").isEqualTo(1);
+
assertThat(file.sortOrderId()).isEqualTo(SortOrder.unsorted().orderId());
// TODO: append more metric info
if (format.equals(FileFormat.PARQUET)) {
assertThat(file.columnSizes()).as("Column sizes metric not
present").isNotNull();
@@ -475,6 +480,116 @@ public class TestSparkDataWrite {
assertThat(actual2).hasSameSizeAs(expected2).isEqualTo(expected2);
}
+ @TestTemplate
+ public void testWriteDataFilesInTableSortOrder() throws IOException {
+ File parent = temp.resolve(format.toString()).toFile();
+ File location = new File(parent, "test");
+ HadoopTables tables = new HadoopTables(CONF);
+ PartitionSpec spec = PartitionSpec.unpartitioned();
+ SortOrder sortOrder = SortOrder.builderFor(SCHEMA).asc("id").build();
+ Table table = tables.create(SCHEMA, spec, sortOrder, ImmutableMap.of(),
location.toString());
+
+ List<SimpleRecord> expected = Lists.newArrayListWithCapacity(10);
+ for (int i = 0; i < 10; i++) {
+ expected.add(new SimpleRecord(i, "a"));
+ }
+
+ Dataset<Row> df = spark.createDataFrame(expected, SimpleRecord.class);
+
+ df.select("id", "data")
+ .write()
+ .format("iceberg")
+ .option(SparkWriteOptions.WRITE_FORMAT, format.toString())
+ .mode(SaveMode.Append)
+ .save(location.toString());
+
+ Dataset<Row> result =
spark.read().format("iceberg").load(location.toString());
+
+ List<SimpleRecord> actual =
+
result.orderBy("id").as(Encoders.bean(SimpleRecord.class)).collectAsList();
+ assertThat(actual).hasSameSizeAs(expected).isEqualTo(expected);
+
+ try (CloseableIterable<FileScanTask> fileScanTasks =
table.newScan().planFiles()) {
+ assertThat(fileScanTasks)
+ .extracting(task -> task.file().sortOrderId())
+ .as("All DataFiles are written with the table sort order id")
+ .containsOnly(table.sortOrder().orderId());
+ }
+ }
+
+ @TestTemplate
+ public void testWriteDataFilesUnsortedTable() throws IOException {
+ File parent = temp.resolve(format.toString()).toFile();
+ File location = new File(parent, "test");
+
+ HadoopTables tables = new HadoopTables(CONF);
+ PartitionSpec spec = PartitionSpec.unpartitioned();
+ Table table = tables.create(SCHEMA, spec, location.toString());
+
+ List<SimpleRecord> expected = Lists.newArrayList(new SimpleRecord(1, "a"));
+ Dataset<Row> df = spark.createDataFrame(expected, SimpleRecord.class);
+
+ df.select("id", "data")
+ .write()
+ .format("iceberg")
+ .option(SparkWriteOptions.WRITE_FORMAT, format.toString())
+ .mode(SaveMode.Append)
+ .save(location.toString());
+
+ try (CloseableIterable<FileScanTask> tasks = table.newScan().planFiles()) {
+ assertThat(tasks)
+ .extracting(task -> task.file().sortOrderId())
+ .as("All DataFiles should have unsorted sort order id")
+ .containsOnly(SortOrder.unsorted().orderId());
+ }
+ }
+
+ @TestTemplate
+ public void testWriteDataFilesAfterSortOrderChange() throws IOException {
+ File parent = temp.resolve(format.toString()).toFile();
+ File location = new File(parent, "test");
+
+ HadoopTables tables = new HadoopTables(CONF);
+ PartitionSpec spec = PartitionSpec.unpartitioned();
+ Table table = tables.create(SCHEMA, spec, location.toString());
+
+ List<SimpleRecord> records = Lists.newArrayList(new SimpleRecord(1, "a"));
+ Dataset<Row> df = spark.createDataFrame(records, SimpleRecord.class);
+
+ df.select("id", "data")
+ .write()
+ .format("iceberg")
+ .option(SparkWriteOptions.WRITE_FORMAT, format.toString())
+ .mode(SaveMode.Append)
+ .save(location.toString());
+
+ table.refresh();
+ int unsortedId = SortOrder.unsorted().orderId();
+
+ try (CloseableIterable<FileScanTask> tasks = table.newScan().planFiles()) {
+ assertThat(tasks).extracting(task ->
task.file().sortOrderId()).containsOnly(unsortedId);
+ }
+
+ table.replaceSortOrder().asc("id").commit();
+ int sortedId = table.sortOrder().orderId();
+
+ df.select("id", "data")
+ .write()
+ .format("iceberg")
+ .option(SparkWriteOptions.WRITE_FORMAT, format.toString())
+ .mode(SaveMode.Append)
+ .save(location.toString());
+
+ table.refresh();
+
+ try (CloseableIterable<FileScanTask> tasks = table.newScan().planFiles()) {
+ assertThat(tasks)
+ .extracting(task -> task.file().sortOrderId())
+ .as("Should contain both unsorted and sorted files")
+ .containsOnly(unsortedId, sortedId);
+ }
+ }
+
public void partitionedCreateWithTargetFileSizeViaOption(IcebergOptionsType
option) {
File parent = temp.resolve(format.toString()).toFile();
File location = new File(parent, "test");
diff --git
a/spark/v3.4/spark/src/test/java/org/apache/iceberg/spark/source/TestStructuredStreaming.java
b/spark/v3.4/spark/src/test/java/org/apache/iceberg/spark/source/TestStructuredStreaming.java
index a974b58a97..fcc8acd56a 100644
---
a/spark/v3.4/spark/src/test/java/org/apache/iceberg/spark/source/TestStructuredStreaming.java
+++
b/spark/v3.4/spark/src/test/java/org/apache/iceberg/spark/source/TestStructuredStreaming.java
@@ -29,10 +29,14 @@ import java.nio.file.Path;
import java.nio.file.Paths;
import java.util.List;
import org.apache.hadoop.conf.Configuration;
+import org.apache.iceberg.FileScanTask;
import org.apache.iceberg.PartitionSpec;
import org.apache.iceberg.Schema;
+import org.apache.iceberg.SortOrder;
import org.apache.iceberg.Table;
import org.apache.iceberg.hadoop.HadoopTables;
+import org.apache.iceberg.io.CloseableIterable;
+import org.apache.iceberg.relocated.com.google.common.collect.ImmutableMap;
import org.apache.iceberg.relocated.com.google.common.collect.Lists;
import org.apache.iceberg.spark.TestBase;
import org.apache.iceberg.types.Types;
@@ -262,6 +266,50 @@ public class TestStructuredStreaming {
}
}
+ @Test
+ public void testStreamingWriteDataFilesInTableSortOrder() throws Exception {
+ File parent = temp.resolve("parquet").toFile();
+ File location = new File(parent, "test-table");
+ File checkpoint = new File(parent, "checkpoint");
+
+ HadoopTables tables = new HadoopTables(CONF);
+ PartitionSpec spec =
PartitionSpec.builderFor(SCHEMA).identity("data").build();
+ SortOrder sortOrder = SortOrder.builderFor(SCHEMA).asc("id").build();
+ Table table = tables.create(SCHEMA, spec, sortOrder, ImmutableMap.of(),
location.toString());
+
+ MemoryStream<Integer> inputStream = newMemoryStream(1, spark.sqlContext(),
Encoders.INT());
+ DataStreamWriter<Row> streamWriter =
+ inputStream
+ .toDF()
+ .selectExpr("value AS id", "CAST (value AS STRING) AS data")
+ .writeStream()
+ .outputMode("append")
+ .format("iceberg")
+ .option("checkpointLocation", checkpoint.toString())
+ .option("path", location.toString());
+
+ try {
+ StreamingQuery query = streamWriter.start();
+ List<Integer> batch1 = Lists.newArrayList(1, 2);
+ send(batch1, inputStream);
+ query.processAllAvailable();
+ query.stop();
+
+ table.refresh();
+
+ try (CloseableIterable<FileScanTask> tasks =
table.newScan().planFiles()) {
+ assertThat(tasks)
+ .extracting(task -> task.file().sortOrderId())
+ .as("All DataFiles are written with the table sort order id")
+ .containsOnly(table.sortOrder().orderId());
+ }
+ } finally {
+ for (StreamingQuery query : spark.streams().active()) {
+ query.stop();
+ }
+ }
+ }
+
@Test
public void testStreamingWriteUpdateMode() throws Exception {
File parent = temp.resolve("parquet").toFile();