This is an automated email from the ASF dual-hosted git repository.

aokolnychyi pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/iceberg.git


The following commit(s) were added to refs/heads/main by this push:
     new 008d1731cb Spark 3.5: Propagate snapshot properties in compaction 
(#9449)
008d1731cb is described below

commit 008d1731cb0d9c2dfbcab640633651dc920953ea
Author: advancedxy <[email protected]>
AuthorDate: Fri Jan 19 03:35:48 2024 +0800

    Spark 3.5: Propagate snapshot properties in compaction (#9449)
---
 .../actions/RewriteDataFilesCommitManager.java     | 14 +++++++++
 .../RewritePositionDeletesCommitManager.java       | 10 +++++++
 .../actions/BaseSnapshotUpdateSparkAction.java     |  5 ++++
 .../spark/actions/RewriteDataFilesSparkAction.java |  3 +-
 .../RewritePositionDeleteFilesSparkAction.java     |  2 +-
 .../spark/actions/TestRewriteDataFilesAction.java  | 18 ++++++++++++
 .../TestRewritePositionDeleteFilesAction.java      | 34 ++++++++++++++++++++++
 7 files changed, 84 insertions(+), 2 deletions(-)

diff --git 
a/core/src/main/java/org/apache/iceberg/actions/RewriteDataFilesCommitManager.java
 
b/core/src/main/java/org/apache/iceberg/actions/RewriteDataFilesCommitManager.java
index 265b5c5c27..7f89db467d 100644
--- 
a/core/src/main/java/org/apache/iceberg/actions/RewriteDataFilesCommitManager.java
+++ 
b/core/src/main/java/org/apache/iceberg/actions/RewriteDataFilesCommitManager.java
@@ -18,12 +18,14 @@
  */
 package org.apache.iceberg.actions;
 
+import java.util.Map;
 import java.util.Set;
 import org.apache.iceberg.DataFile;
 import org.apache.iceberg.RewriteFiles;
 import org.apache.iceberg.Table;
 import org.apache.iceberg.exceptions.CommitStateUnknownException;
 import org.apache.iceberg.relocated.com.google.common.base.Preconditions;
+import org.apache.iceberg.relocated.com.google.common.collect.ImmutableMap;
 import org.apache.iceberg.relocated.com.google.common.collect.Sets;
 import org.apache.iceberg.util.Tasks;
 import org.slf4j.Logger;
@@ -36,6 +38,7 @@ public class RewriteDataFilesCommitManager {
   private final Table table;
   private final long startingSnapshotId;
   private final boolean useStartingSequenceNumber;
+  private final Map<String, String> snapshotProperties;
 
   // constructor used for testing
   public RewriteDataFilesCommitManager(Table table) {
@@ -48,9 +51,18 @@ public class RewriteDataFilesCommitManager {
 
   public RewriteDataFilesCommitManager(
       Table table, long startingSnapshotId, boolean useStartingSequenceNumber) 
{
+    this(table, startingSnapshotId, useStartingSequenceNumber, 
ImmutableMap.of());
+  }
+
+  public RewriteDataFilesCommitManager(
+      Table table,
+      long startingSnapshotId,
+      boolean useStartingSequenceNumber,
+      Map<String, String> snapshotProperties) {
     this.table = table;
     this.startingSnapshotId = startingSnapshotId;
     this.useStartingSequenceNumber = useStartingSequenceNumber;
+    this.snapshotProperties = snapshotProperties;
   }
 
   /**
@@ -75,6 +87,8 @@ public class RewriteDataFilesCommitManager {
       rewrite.rewriteFiles(rewrittenDataFiles, addedDataFiles);
     }
 
+    snapshotProperties.forEach(rewrite::set);
+
     rewrite.commit();
   }
 
diff --git 
a/core/src/main/java/org/apache/iceberg/actions/RewritePositionDeletesCommitManager.java
 
b/core/src/main/java/org/apache/iceberg/actions/RewritePositionDeletesCommitManager.java
index c55532692e..01b2f7528e 100644
--- 
a/core/src/main/java/org/apache/iceberg/actions/RewritePositionDeletesCommitManager.java
+++ 
b/core/src/main/java/org/apache/iceberg/actions/RewritePositionDeletesCommitManager.java
@@ -18,6 +18,7 @@
  */
 package org.apache.iceberg.actions;
 
+import java.util.Map;
 import java.util.Set;
 import org.apache.iceberg.CatalogUtil;
 import org.apache.iceberg.DeleteFile;
@@ -25,6 +26,7 @@ import org.apache.iceberg.RewriteFiles;
 import org.apache.iceberg.Table;
 import org.apache.iceberg.exceptions.CommitStateUnknownException;
 import org.apache.iceberg.relocated.com.google.common.base.Preconditions;
+import org.apache.iceberg.relocated.com.google.common.collect.ImmutableMap;
 import org.apache.iceberg.relocated.com.google.common.collect.Iterables;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
@@ -39,10 +41,16 @@ public class RewritePositionDeletesCommitManager {
 
   private final Table table;
   private final long startingSnapshotId;
+  private final Map<String, String> snapshotProperties;
 
   public RewritePositionDeletesCommitManager(Table table) {
+    this(table, ImmutableMap.of());
+  }
+
+  public RewritePositionDeletesCommitManager(Table table, Map<String, String> 
snapshotProperties) {
     this.table = table;
     this.startingSnapshotId = table.currentSnapshot().snapshotId();
+    this.snapshotProperties = snapshotProperties;
   }
 
   /**
@@ -64,6 +72,8 @@ public class RewritePositionDeletesCommitManager {
       }
     }
 
+    snapshotProperties.forEach(rewriteFiles::set);
+
     rewriteFiles.commit();
   }
 
diff --git 
a/spark/v3.5/spark/src/main/java/org/apache/iceberg/spark/actions/BaseSnapshotUpdateSparkAction.java
 
b/spark/v3.5/spark/src/main/java/org/apache/iceberg/spark/actions/BaseSnapshotUpdateSparkAction.java
index 77debe1e58..b69b80a8d3 100644
--- 
a/spark/v3.5/spark/src/main/java/org/apache/iceberg/spark/actions/BaseSnapshotUpdateSparkAction.java
+++ 
b/spark/v3.5/spark/src/main/java/org/apache/iceberg/spark/actions/BaseSnapshotUpdateSparkAction.java
@@ -19,6 +19,7 @@
 package org.apache.iceberg.spark.actions;
 
 import java.util.Map;
+import org.apache.iceberg.relocated.com.google.common.collect.ImmutableMap;
 import org.apache.iceberg.relocated.com.google.common.collect.Maps;
 import org.apache.spark.sql.SparkSession;
 
@@ -39,4 +40,8 @@ abstract class BaseSnapshotUpdateSparkAction<ThisT> extends 
BaseSparkAction<This
     summary.forEach(update::set);
     update.commit();
   }
+
+  protected Map<String, String> commitSummary() {
+    return ImmutableMap.copyOf(summary);
+  }
 }
diff --git 
a/spark/v3.5/spark/src/main/java/org/apache/iceberg/spark/actions/RewriteDataFilesSparkAction.java
 
b/spark/v3.5/spark/src/main/java/org/apache/iceberg/spark/actions/RewriteDataFilesSparkAction.java
index 6b5628a1f4..a2a585db78 100644
--- 
a/spark/v3.5/spark/src/main/java/org/apache/iceberg/spark/actions/RewriteDataFilesSparkAction.java
+++ 
b/spark/v3.5/spark/src/main/java/org/apache/iceberg/spark/actions/RewriteDataFilesSparkAction.java
@@ -257,7 +257,8 @@ public class RewriteDataFilesSparkAction
 
   @VisibleForTesting
   RewriteDataFilesCommitManager commitManager(long startingSnapshotId) {
-    return new RewriteDataFilesCommitManager(table, startingSnapshotId, 
useStartingSequenceNumber);
+    return new RewriteDataFilesCommitManager(
+        table, startingSnapshotId, useStartingSequenceNumber, commitSummary());
   }
 
   private Result doExecute(
diff --git 
a/spark/v3.5/spark/src/main/java/org/apache/iceberg/spark/actions/RewritePositionDeleteFilesSparkAction.java
 
b/spark/v3.5/spark/src/main/java/org/apache/iceberg/spark/actions/RewritePositionDeleteFilesSparkAction.java
index f3dfd2dcc3..539f6de920 100644
--- 
a/spark/v3.5/spark/src/main/java/org/apache/iceberg/spark/actions/RewritePositionDeleteFilesSparkAction.java
+++ 
b/spark/v3.5/spark/src/main/java/org/apache/iceberg/spark/actions/RewritePositionDeleteFilesSparkAction.java
@@ -215,7 +215,7 @@ public class RewritePositionDeleteFilesSparkAction
   }
 
   private RewritePositionDeletesCommitManager commitManager() {
-    return new RewritePositionDeletesCommitManager(table);
+    return new RewritePositionDeletesCommitManager(table, commitSummary());
   }
 
   private Result doExecute(
diff --git 
a/spark/v3.5/spark/src/test/java/org/apache/iceberg/spark/actions/TestRewriteDataFilesAction.java
 
b/spark/v3.5/spark/src/test/java/org/apache/iceberg/spark/actions/TestRewriteDataFilesAction.java
index d0d22e46ff..82b32f2ce0 100644
--- 
a/spark/v3.5/spark/src/test/java/org/apache/iceberg/spark/actions/TestRewriteDataFilesAction.java
+++ 
b/spark/v3.5/spark/src/test/java/org/apache/iceberg/spark/actions/TestRewriteDataFilesAction.java
@@ -60,6 +60,7 @@ import org.apache.iceberg.RewriteJobOrder;
 import org.apache.iceberg.RowDelta;
 import org.apache.iceberg.Schema;
 import org.apache.iceberg.Snapshot;
+import org.apache.iceberg.SnapshotSummary;
 import org.apache.iceberg.SortOrder;
 import org.apache.iceberg.StructLike;
 import org.apache.iceberg.Table;
@@ -1445,6 +1446,23 @@ public class TestRewriteDataFilesAction extends TestBase 
{
     assertThat(actual).as("Number of files order should not be 
ascending").isNotEqualTo(expected);
   }
 
+  @Test
+  public void testSnapshotProperty() {
+    Table table = createTable(4);
+    Result ignored = basicRewrite(table).snapshotProperty("key", 
"value").execute();
+    assertThat(table.currentSnapshot().summary())
+        .containsAllEntriesOf(ImmutableMap.of("key", "value"));
+    // make sure internal produced properties are not lost
+    String[] commitMetricsKeys =
+        new String[] {
+          SnapshotSummary.ADDED_FILES_PROP,
+          SnapshotSummary.DELETED_FILES_PROP,
+          SnapshotSummary.TOTAL_DATA_FILES_PROP,
+          SnapshotSummary.CHANGED_PARTITION_COUNT_PROP
+        };
+    
assertThat(table.currentSnapshot().summary()).containsKeys(commitMetricsKeys);
+  }
+
   private Stream<RewriteFileGroup> toGroupStream(Table table, 
RewriteDataFilesSparkAction rewrite) {
     rewrite.validateAndInitOptions();
     StructLikeMap<List<List<FileScanTask>>> fileGroupsByPartition =
diff --git 
a/spark/v3.5/spark/src/test/java/org/apache/iceberg/spark/actions/TestRewritePositionDeleteFilesAction.java
 
b/spark/v3.5/spark/src/test/java/org/apache/iceberg/spark/actions/TestRewritePositionDeleteFilesAction.java
index 7c55ff82df..89c44dbfcc 100644
--- 
a/spark/v3.5/spark/src/test/java/org/apache/iceberg/spark/actions/TestRewritePositionDeleteFilesAction.java
+++ 
b/spark/v3.5/spark/src/test/java/org/apache/iceberg/spark/actions/TestRewritePositionDeleteFilesAction.java
@@ -49,6 +49,7 @@ import org.apache.iceberg.PositionDeletesScanTask;
 import org.apache.iceberg.RowDelta;
 import org.apache.iceberg.ScanTask;
 import org.apache.iceberg.Schema;
+import org.apache.iceberg.SnapshotSummary;
 import org.apache.iceberg.StructLike;
 import org.apache.iceberg.Table;
 import org.apache.iceberg.TableProperties;
@@ -613,6 +614,39 @@ public class TestRewritePositionDeleteFilesAction extends 
CatalogTestBase {
     assertEquals("Rows must match", expectedRecords, actualRecords);
   }
 
+  @TestTemplate
+  public void testSnapshotProperty() throws Exception {
+    Table table = createTableUnpartitioned(2, SCALE);
+    List<DataFile> dataFiles = TestHelpers.dataFiles(table);
+    writePosDeletesForFiles(table, 2, DELETES_SCALE, dataFiles);
+    assertThat(dataFiles).hasSize(2);
+
+    List<DeleteFile> deleteFiles = deleteFiles(table);
+    assertThat(deleteFiles).hasSize(2);
+
+    Result ignored =
+        SparkActions.get(spark)
+            .rewritePositionDeletes(table)
+            .snapshotProperty("key", "value")
+            .option(SizeBasedFileRewriter.REWRITE_ALL, "true")
+            .execute();
+    assertThat(table.currentSnapshot().summary())
+        .containsAllEntriesOf(ImmutableMap.of("key", "value"));
+
+    // make sure internal produced properties are not lost
+    String[] commitMetricsKeys =
+        new String[] {
+          SnapshotSummary.ADDED_DELETE_FILES_PROP,
+          SnapshotSummary.ADDED_POS_DELETES_PROP,
+          SnapshotSummary.CHANGED_PARTITION_COUNT_PROP,
+          SnapshotSummary.REMOVED_DELETE_FILES_PROP,
+          SnapshotSummary.REMOVED_POS_DELETES_PROP,
+          SnapshotSummary.TOTAL_DATA_FILES_PROP,
+          SnapshotSummary.TOTAL_DELETE_FILES_PROP,
+        };
+    
assertThat(table.currentSnapshot().summary()).containsKeys(commitMetricsKeys);
+  }
+
   private Table createTablePartitioned(int partitions, int files, int 
numRecords) {
     PartitionSpec spec = 
PartitionSpec.builderFor(SCHEMA).identity("c1").build();
     Table table =

Reply via email to