This is an automated email from the ASF dual-hosted git repository.

russellspitzer pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/iceberg.git


The following commit(s) were added to refs/heads/main by this push:
     new 99b1d0ee4c Spark 3.5: Add Support for Providing output-spec-id During 
Rewrite Datafiles
99b1d0ee4c is described below

commit 99b1d0ee4c8fb67d457c4d7a03065e4cb63af435
Author: Himadri Pal <[email protected]>
AuthorDate: Wed Mar 6 21:58:54 2024 -0800

    Spark 3.5: Add Support for Providing output-spec-id During Rewrite Datafiles
---
 .../apache/iceberg/actions/RewriteDataFiles.java   |   9 ++
 .../iceberg/actions/SizeBasedFileRewriter.java     |  22 ++++
 .../spark/actions/RewriteDataFilesSparkAction.java |   3 +-
 .../spark/actions/SparkBinPackDataRewriter.java    |   3 +-
 .../spark/actions/SparkShufflingDataRewriter.java  |  20 +++-
 .../spark/actions/SparkZOrderDataRewriter.java     |  16 +++
 .../spark/actions/TestRewriteDataFilesAction.java  | 126 +++++++++++++++++++++
 7 files changed, 194 insertions(+), 5 deletions(-)

diff --git a/api/src/main/java/org/apache/iceberg/actions/RewriteDataFiles.java 
b/api/src/main/java/org/apache/iceberg/actions/RewriteDataFiles.java
index 854b099351..40dc24318c 100644
--- a/api/src/main/java/org/apache/iceberg/actions/RewriteDataFiles.java
+++ b/api/src/main/java/org/apache/iceberg/actions/RewriteDataFiles.java
@@ -119,6 +119,15 @@ public interface RewriteDataFiles
 
   String REWRITE_JOB_ORDER_DEFAULT = RewriteJobOrder.NONE.orderName();
 
+  /**
+   * The partition specification ID to be used for rewritten files
+   *
+   * <p>output-spec-id ID is used by the file rewriter during the rewrite 
operation to identify the
+   * specific output partition spec. Data will be reorganized during the 
rewrite to align with the
+   * output partitioning. Defaults to the current table specification.
+   */
+  String OUTPUT_SPEC_ID = "output-spec-id";
+
   /**
    * Choose BINPACK as a strategy for this rewrite operation
    *
diff --git 
a/core/src/main/java/org/apache/iceberg/actions/SizeBasedFileRewriter.java 
b/core/src/main/java/org/apache/iceberg/actions/SizeBasedFileRewriter.java
index cf98c5266a..fb3c27220c 100644
--- a/core/src/main/java/org/apache/iceberg/actions/SizeBasedFileRewriter.java
+++ b/core/src/main/java/org/apache/iceberg/actions/SizeBasedFileRewriter.java
@@ -24,6 +24,7 @@ import java.util.Map;
 import java.util.Set;
 import org.apache.iceberg.ContentFile;
 import org.apache.iceberg.ContentScanTask;
+import org.apache.iceberg.PartitionSpec;
 import org.apache.iceberg.Table;
 import org.apache.iceberg.relocated.com.google.common.base.Preconditions;
 import org.apache.iceberg.relocated.com.google.common.collect.ImmutableSet;
@@ -111,6 +112,8 @@ public abstract class SizeBasedFileRewriter<T extends 
ContentScanTask<F>, F exte
   private boolean rewriteAll;
   private long maxGroupSize;
 
+  private int outputSpecId;
+
   protected SizeBasedFileRewriter(Table table) {
     this.table = table;
   }
@@ -146,6 +149,7 @@ public abstract class SizeBasedFileRewriter<T extends 
ContentScanTask<F>, F exte
     this.minInputFiles = minInputFiles(options);
     this.rewriteAll = rewriteAll(options);
     this.maxGroupSize = maxGroupSize(options);
+    this.outputSpecId = outputSpecId(options);
 
     if (rewriteAll) {
       LOG.info("Configured to rewrite all provided files in table {}", 
table.name());
@@ -258,6 +262,24 @@ public abstract class SizeBasedFileRewriter<T extends 
ContentScanTask<F>, F exte
     return (long) (targetFileSize + ((maxFileSize - targetFileSize) * 0.5));
   }
 
+  protected PartitionSpec outputSpec() {
+    return table.specs().get(outputSpecId);
+  }
+
+  protected int outputSpecId() {
+    return outputSpecId;
+  }
+
+  private int outputSpecId(Map<String, String> options) {
+    int specId =
+        PropertyUtil.propertyAsInt(options, RewriteDataFiles.OUTPUT_SPEC_ID, 
table.spec().specId());
+    Preconditions.checkArgument(
+        table.specs().containsKey(specId),
+        "Cannot use output spec id %s because the table does not contain a 
reference to this spec-id.",
+        specId);
+    return specId;
+  }
+
   private Map<String, Long> sizeThresholds(Map<String, String> options) {
     long target =
         PropertyUtil.propertyAsLong(options, TARGET_FILE_SIZE_BYTES, 
defaultTargetFileSize());
diff --git 
a/spark/v3.5/spark/src/main/java/org/apache/iceberg/spark/actions/RewriteDataFilesSparkAction.java
 
b/spark/v3.5/spark/src/main/java/org/apache/iceberg/spark/actions/RewriteDataFilesSparkAction.java
index a2a585db78..bf1a901dbd 100644
--- 
a/spark/v3.5/spark/src/main/java/org/apache/iceberg/spark/actions/RewriteDataFilesSparkAction.java
+++ 
b/spark/v3.5/spark/src/main/java/org/apache/iceberg/spark/actions/RewriteDataFilesSparkAction.java
@@ -81,7 +81,8 @@ public class RewriteDataFilesSparkAction
           PARTIAL_PROGRESS_MAX_COMMITS,
           TARGET_FILE_SIZE_BYTES,
           USE_STARTING_SEQUENCE_NUMBER,
-          REWRITE_JOB_ORDER);
+          REWRITE_JOB_ORDER,
+          OUTPUT_SPEC_ID);
 
   private static final RewriteDataFilesSparkAction.Result EMPTY_RESULT =
       
ImmutableRewriteDataFiles.Result.builder().rewriteResults(ImmutableList.of()).build();
diff --git 
a/spark/v3.5/spark/src/main/java/org/apache/iceberg/spark/actions/SparkBinPackDataRewriter.java
 
b/spark/v3.5/spark/src/main/java/org/apache/iceberg/spark/actions/SparkBinPackDataRewriter.java
index 9a96f44ebd..d256bf2794 100644
--- 
a/spark/v3.5/spark/src/main/java/org/apache/iceberg/spark/actions/SparkBinPackDataRewriter.java
+++ 
b/spark/v3.5/spark/src/main/java/org/apache/iceberg/spark/actions/SparkBinPackDataRewriter.java
@@ -58,13 +58,14 @@ class SparkBinPackDataRewriter extends 
SparkSizeBasedDataRewriter {
         .option(SparkWriteOptions.REWRITTEN_FILE_SCAN_TASK_SET_ID, groupId)
         .option(SparkWriteOptions.TARGET_FILE_SIZE_BYTES, writeMaxFileSize())
         .option(SparkWriteOptions.DISTRIBUTION_MODE, 
distributionMode(group).modeName())
+        .option(SparkWriteOptions.OUTPUT_SPEC_ID, outputSpecId())
         .mode("append")
         .save(groupId);
   }
 
   // invoke a shuffle if the original spec does not match the output spec
   private DistributionMode distributionMode(List<FileScanTask> group) {
-    boolean requiresRepartition = !group.get(0).spec().equals(table().spec());
+    boolean requiresRepartition = !group.get(0).spec().equals(outputSpec());
     return requiresRepartition ? DistributionMode.RANGE : 
DistributionMode.NONE;
   }
 }
diff --git 
a/spark/v3.5/spark/src/main/java/org/apache/iceberg/spark/actions/SparkShufflingDataRewriter.java
 
b/spark/v3.5/spark/src/main/java/org/apache/iceberg/spark/actions/SparkShufflingDataRewriter.java
index c9c962526e..ce572c6486 100644
--- 
a/spark/v3.5/spark/src/main/java/org/apache/iceberg/spark/actions/SparkShufflingDataRewriter.java
+++ 
b/spark/v3.5/spark/src/main/java/org/apache/iceberg/spark/actions/SparkShufflingDataRewriter.java
@@ -23,6 +23,8 @@ import java.util.Map;
 import java.util.Set;
 import java.util.function.Function;
 import org.apache.iceberg.FileScanTask;
+import org.apache.iceberg.PartitionSpec;
+import org.apache.iceberg.Schema;
 import org.apache.iceberg.Table;
 import org.apache.iceberg.relocated.com.google.common.base.Preconditions;
 import org.apache.iceberg.relocated.com.google.common.collect.ImmutableSet;
@@ -86,6 +88,16 @@ abstract class SparkShufflingDataRewriter extends 
SparkSizeBasedDataRewriter {
 
   protected abstract org.apache.iceberg.SortOrder sortOrder();
 
+  /**
+   * Retrieves and returns the schema for the rewrite using the current table 
schema.
+   *
+   * <p>The schema with all columns required for correctly sorting the table. 
This may include
+   * additional computed columns which are not written to the table but are 
used for sorting.
+   */
+  protected Schema sortSchema() {
+    return table().schema();
+  }
+
   protected abstract Dataset<Row> sortedDF(
       Dataset<Row> df, Function<Dataset<Row>, Dataset<Row>> sortFunc);
 
@@ -122,6 +134,7 @@ abstract class SparkShufflingDataRewriter extends 
SparkSizeBasedDataRewriter {
         .option(SparkWriteOptions.REWRITTEN_FILE_SCAN_TASK_SET_ID, groupId)
         .option(SparkWriteOptions.TARGET_FILE_SIZE_BYTES, writeMaxFileSize())
         .option(SparkWriteOptions.USE_TABLE_DISTRIBUTION_AND_ORDERING, "false")
+        .option(SparkWriteOptions.OUTPUT_SPEC_ID, outputSpecId())
         .mode("append")
         .save(groupId);
   }
@@ -152,11 +165,12 @@ abstract class SparkShufflingDataRewriter extends 
SparkSizeBasedDataRewriter {
   }
 
   private org.apache.iceberg.SortOrder outputSortOrder(List<FileScanTask> 
group) {
-    boolean includePartitionColumns = 
!group.get(0).spec().equals(table().spec());
-    if (includePartitionColumns) {
+    PartitionSpec spec = outputSpec();
+    boolean requiresRepartitioning = !group.get(0).spec().equals(spec);
+    if (requiresRepartitioning) {
       // build in the requirement for partition sorting into our sort order
       // as the original spec for this group does not match the output spec
-      return SortOrderUtil.buildSortOrder(table(), sortOrder());
+      return SortOrderUtil.buildSortOrder(sortSchema(), spec, sortOrder());
     } else {
       return sortOrder();
     }
diff --git 
a/spark/v3.5/spark/src/main/java/org/apache/iceberg/spark/actions/SparkZOrderDataRewriter.java
 
b/spark/v3.5/spark/src/main/java/org/apache/iceberg/spark/actions/SparkZOrderDataRewriter.java
index 9a618661fe..cc4fb78ebd 100644
--- 
a/spark/v3.5/spark/src/main/java/org/apache/iceberg/spark/actions/SparkZOrderDataRewriter.java
+++ 
b/spark/v3.5/spark/src/main/java/org/apache/iceberg/spark/actions/SparkZOrderDataRewriter.java
@@ -30,6 +30,7 @@ import org.apache.iceberg.SortDirection;
 import org.apache.iceberg.SortOrder;
 import org.apache.iceberg.Table;
 import org.apache.iceberg.relocated.com.google.common.base.Preconditions;
+import org.apache.iceberg.relocated.com.google.common.collect.ImmutableList;
 import org.apache.iceberg.relocated.com.google.common.collect.ImmutableSet;
 import org.apache.iceberg.relocated.com.google.common.collect.Lists;
 import org.apache.iceberg.spark.SparkUtil;
@@ -108,6 +109,21 @@ class SparkZOrderDataRewriter extends 
SparkShufflingDataRewriter {
     return Z_SORT_ORDER;
   }
 
+  /**
+   * Overrides the sortSchema method to include columns from Z_SCHEMA.
+   *
+   * <p>This method generates a new Schema object which consists of columns 
from the original table
+   * schema and Z_SCHEMA.
+   */
+  @Override
+  protected Schema sortSchema() {
+    return new Schema(
+        new ImmutableList.Builder<Types.NestedField>()
+            .addAll(table().schema().columns())
+            .addAll(Z_SCHEMA.columns())
+            .build());
+  }
+
   @Override
   protected Dataset<Row> sortedDF(Dataset<Row> df, Function<Dataset<Row>, 
Dataset<Row>> sortFunc) {
     Dataset<Row> zValueDF = df.withColumn(Z_COLUMN, zValue(df));
diff --git 
a/spark/v3.5/spark/src/test/java/org/apache/iceberg/spark/actions/TestRewriteDataFilesAction.java
 
b/spark/v3.5/spark/src/test/java/org/apache/iceberg/spark/actions/TestRewriteDataFilesAction.java
index 82b32f2ce0..500092c044 100644
--- 
a/spark/v3.5/spark/src/test/java/org/apache/iceberg/spark/actions/TestRewriteDataFilesAction.java
+++ 
b/spark/v3.5/spark/src/test/java/org/apache/iceberg/spark/actions/TestRewriteDataFilesAction.java
@@ -55,6 +55,7 @@ import org.apache.iceberg.DeleteFile;
 import org.apache.iceberg.FileFormat;
 import org.apache.iceberg.FileScanTask;
 import org.apache.iceberg.MetadataTableType;
+import org.apache.iceberg.PartitionData;
 import org.apache.iceberg.PartitionSpec;
 import org.apache.iceberg.RewriteJobOrder;
 import org.apache.iceberg.RowDelta;
@@ -108,6 +109,7 @@ import org.apache.iceberg.util.StructLikeMap;
 import org.apache.spark.sql.Dataset;
 import org.apache.spark.sql.Row;
 import org.apache.spark.sql.internal.SQLConf;
+import org.assertj.core.api.Assertions;
 import org.junit.jupiter.api.BeforeAll;
 import org.junit.jupiter.api.BeforeEach;
 import org.junit.jupiter.api.Test;
@@ -1463,6 +1465,130 @@ public class TestRewriteDataFilesAction extends 
TestBase {
     
assertThat(table.currentSnapshot().summary()).containsKeys(commitMetricsKeys);
   }
 
+  @Test
+  public void testBinPackRewriterWithSpecificUnparitionedOutputSpec() {
+    Table table = createTable(10);
+    shouldHaveFiles(table, 10);
+    int outputSpecId = table.spec().specId();
+    table.updateSpec().addField(Expressions.truncate("c2", 2)).commit();
+
+    long dataSizeBefore = testDataSize(table);
+    long count = currentData().size();
+
+    RewriteDataFiles.Result result =
+        basicRewrite(table)
+            .option(RewriteDataFiles.OUTPUT_SPEC_ID, 
String.valueOf(outputSpecId))
+            .option(SizeBasedFileRewriter.REWRITE_ALL, "true")
+            .binPack()
+            .execute();
+
+    assertThat(result.rewrittenBytesCount()).isEqualTo(dataSizeBefore);
+    assertThat(currentData().size()).isEqualTo(count);
+    shouldRewriteDataFilesWithPartitionSpec(table, outputSpecId);
+  }
+
+  @Test
+  public void testBinPackRewriterWithSpecificOutputSpec() {
+    Table table = createTable(10);
+    shouldHaveFiles(table, 10);
+    table.updateSpec().addField(Expressions.truncate("c2", 2)).commit();
+    int outputSpecId = table.spec().specId();
+    table.updateSpec().addField(Expressions.bucket("c3", 2)).commit();
+
+    long dataSizeBefore = testDataSize(table);
+    long count = currentData().size();
+
+    RewriteDataFiles.Result result =
+        basicRewrite(table)
+            .option(RewriteDataFiles.OUTPUT_SPEC_ID, 
String.valueOf(outputSpecId))
+            .option(SizeBasedFileRewriter.REWRITE_ALL, "true")
+            .binPack()
+            .execute();
+
+    assertThat(result.rewrittenBytesCount()).isEqualTo(dataSizeBefore);
+    assertThat(currentData().size()).isEqualTo(count);
+    shouldRewriteDataFilesWithPartitionSpec(table, outputSpecId);
+  }
+
+  @Test
+  public void testBinpackRewriteWithInvalidOutputSpecId() {
+    Table table = createTable(10);
+    shouldHaveFiles(table, 10);
+    Assertions.assertThatThrownBy(
+            () ->
+                actions()
+                    .rewriteDataFiles(table)
+                    .option(RewriteDataFiles.OUTPUT_SPEC_ID, 
String.valueOf(1234))
+                    .binPack()
+                    .execute())
+        .isInstanceOf(IllegalArgumentException.class)
+        .hasMessage(
+            "Cannot use output spec id 1234 because the table does not contain 
a reference to this spec-id.");
+  }
+
+  @Test
+  public void testSortRewriterWithSpecificOutputSpecId() {
+    Table table = createTable(10);
+    shouldHaveFiles(table, 10);
+    table.updateSpec().addField(Expressions.truncate("c2", 2)).commit();
+    int outputSpecId = table.spec().specId();
+    table.updateSpec().addField(Expressions.bucket("c3", 2)).commit();
+
+    long dataSizeBefore = testDataSize(table);
+    long count = currentData().size();
+
+    RewriteDataFiles.Result result =
+        basicRewrite(table)
+            .option(RewriteDataFiles.OUTPUT_SPEC_ID, 
String.valueOf(outputSpecId))
+            .option(SizeBasedFileRewriter.REWRITE_ALL, "true")
+            
.sort(SortOrder.builderFor(table.schema()).asc("c2").asc("c3").build())
+            .execute();
+
+    assertThat(result.rewrittenBytesCount()).isEqualTo(dataSizeBefore);
+    assertThat(currentData().size()).isEqualTo(count);
+    shouldRewriteDataFilesWithPartitionSpec(table, outputSpecId);
+  }
+
+  @Test
+  public void testZOrderRewriteWithSpecificOutputSpecId() {
+    Table table = createTable(10);
+    shouldHaveFiles(table, 10);
+    table.updateSpec().addField(Expressions.truncate("c2", 2)).commit();
+    int outputSpecId = table.spec().specId();
+    table.updateSpec().addField(Expressions.bucket("c3", 2)).commit();
+
+    long dataSizeBefore = testDataSize(table);
+    long count = currentData().size();
+
+    RewriteDataFiles.Result result =
+        basicRewrite(table)
+            .option(RewriteDataFiles.OUTPUT_SPEC_ID, 
String.valueOf(outputSpecId))
+            .option(SizeBasedFileRewriter.REWRITE_ALL, "true")
+            .zOrder("c2", "c3")
+            .execute();
+
+    assertThat(result.rewrittenBytesCount()).isEqualTo(dataSizeBefore);
+    assertThat(currentData().size()).isEqualTo(count);
+    shouldRewriteDataFilesWithPartitionSpec(table, outputSpecId);
+  }
+
+  protected void shouldRewriteDataFilesWithPartitionSpec(Table table, int 
outputSpecId) {
+    List<DataFile> rewrittenFiles = currentDataFiles(table);
+    assertThat(rewrittenFiles).allMatch(file -> file.specId() == outputSpecId);
+    assertThat(rewrittenFiles)
+        .allMatch(
+            file ->
+                ((PartitionData) file.partition())
+                    .getPartitionType()
+                    .equals(table.specs().get(outputSpecId).partitionType()));
+  }
+
+  protected List<DataFile> currentDataFiles(Table table) {
+    return Streams.stream(table.newScan().planFiles())
+        .map(FileScanTask::file)
+        .collect(Collectors.toList());
+  }
+
   private Stream<RewriteFileGroup> toGroupStream(Table table, 
RewriteDataFilesSparkAction rewrite) {
     rewrite.validateAndInitOptions();
     StructLikeMap<List<List<FileScanTask>>> fileGroupsByPartition =

Reply via email to