This is an automated email from the ASF dual-hosted git repository.

blue pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/iceberg.git


The following commit(s) were added to refs/heads/master by this push:
     new bf7edc4  Spark: Support custom metadata in snapshot summary (#1241)
bf7edc4 is described below

commit bf7edc4b325df6dd80d86fea0149d2be0ad09468
Author: moulimukherjee <[email protected]>
AuthorDate: Fri Jul 24 15:21:15 2020 -0700

    Spark: Support custom metadata in snapshot summary (#1241)
---
 .../java/org/apache/iceberg/SnapshotSummary.java   |  1 +
 site/docs/configuration.md                         |  1 +
 .../spark/source/TestDataSourceOptions.java        | 24 ++++++++++++++++++++++
 .../org/apache/iceberg/spark/source/Writer.java    | 13 ++++++++++++
 .../iceberg/spark/source/SparkBatchWrite.java      | 13 ++++++++++++
 5 files changed, 52 insertions(+)

diff --git a/core/src/main/java/org/apache/iceberg/SnapshotSummary.java 
b/core/src/main/java/org/apache/iceberg/SnapshotSummary.java
index ec7b73f..4b7e480 100644
--- a/core/src/main/java/org/apache/iceberg/SnapshotSummary.java
+++ b/core/src/main/java/org/apache/iceberg/SnapshotSummary.java
@@ -47,6 +47,7 @@ public class SnapshotSummary {
   public static final String PUBLISHED_WAP_ID_PROP = "published-wap-id";
   public static final String SOURCE_SNAPSHOT_ID_PROP = "source-snapshot-id";
   public static final String REPLACE_PARTITIONS_PROP = "replace-partitions";
+  public static final String EXTRA_METADATA_PREFIX = "snapshot-property.";
 
   private SnapshotSummary() {
   }
diff --git a/site/docs/configuration.md b/site/docs/configuration.md
index 27dd60b..2badc75 100644
--- a/site/docs/configuration.md
+++ b/site/docs/configuration.md
@@ -133,4 +133,5 @@ df.write
 | write-format           | Table write.format.default | File format to use for 
this write operation; parquet or avro |
 | target-file-size-bytes | As per table property      | Overrides this table's 
write.target-file-size-bytes          |
 | check-nullability      | true                       | Sets the nullable 
check on fields                            |
+| snapshot-property._custom-key_    | null            | Adds an entry with 
custom-key and corresponding value in the snapshot summary  |
 
diff --git 
a/spark/src/test/java/org/apache/iceberg/spark/source/TestDataSourceOptions.java
 
b/spark/src/test/java/org/apache/iceberg/spark/source/TestDataSourceOptions.java
index 5881ade..cf1b8c9 100644
--- 
a/spark/src/test/java/org/apache/iceberg/spark/source/TestDataSourceOptions.java
+++ 
b/spark/src/test/java/org/apache/iceberg/spark/source/TestDataSourceOptions.java
@@ -364,4 +364,28 @@ public abstract class TestDataSourceOptions {
     int partitionNum = metadataDf.javaRDD().getNumPartitions();
     Assert.assertEquals("Spark partitions should match", expectedSplits, 
partitionNum);
   }
+
+  @Test
+  public void testExtraSnapshotMetadata() throws IOException {
+    String tableLocation = temp.newFolder("iceberg-table").toString();
+    HadoopTables tables = new HadoopTables(CONF);
+    tables.create(SCHEMA, PartitionSpec.unpartitioned(), Maps.newHashMap(), 
tableLocation);
+
+    List<SimpleRecord> expectedRecords = Lists.newArrayList(
+            new SimpleRecord(1, "a"),
+            new SimpleRecord(2, "b")
+    );
+    Dataset<Row> originalDf = spark.createDataFrame(expectedRecords, 
SimpleRecord.class);
+    originalDf.select("id", "data").write()
+            .format("iceberg")
+            .mode("append")
+            .option("snapshot-property.extra-key", "someValue")
+            .option("snapshot-property.another-key", "anotherValue")
+            .save(tableLocation);
+
+    Table table = tables.load(tableLocation);
+
+    
Assert.assertTrue(table.currentSnapshot().summary().get("extra-key").equals("someValue"));
+    
Assert.assertTrue(table.currentSnapshot().summary().get("another-key").equals("anotherValue"));
+  }
 }
diff --git a/spark2/src/main/java/org/apache/iceberg/spark/source/Writer.java 
b/spark2/src/main/java/org/apache/iceberg/spark/source/Writer.java
index a24820b..9ce590c 100644
--- a/spark2/src/main/java/org/apache/iceberg/spark/source/Writer.java
+++ b/spark2/src/main/java/org/apache/iceberg/spark/source/Writer.java
@@ -39,6 +39,7 @@ import org.apache.iceberg.io.FileIO;
 import org.apache.iceberg.io.LocationProvider;
 import org.apache.iceberg.relocated.com.google.common.collect.ImmutableList;
 import org.apache.iceberg.relocated.com.google.common.collect.Iterables;
+import org.apache.iceberg.relocated.com.google.common.collect.Maps;
 import org.apache.iceberg.util.PropertyUtil;
 import org.apache.iceberg.util.Tasks;
 import org.apache.spark.broadcast.Broadcast;
@@ -79,6 +80,7 @@ class Writer implements DataSourceWriter {
   private final long targetFileSize;
   private final Schema writeSchema;
   private final StructType dsSchema;
+  private final Map<String, String> extraSnapshotMetadata;
 
   Writer(Table table, Broadcast<FileIO> io, Broadcast<EncryptionManager> 
encryptionManager,
          DataSourceOptions options, boolean replacePartitions, String 
applicationId, Schema writeSchema,
@@ -98,6 +100,13 @@ class Writer implements DataSourceWriter {
     this.wapId = wapId;
     this.writeSchema = writeSchema;
     this.dsSchema = dsSchema;
+    this.extraSnapshotMetadata = Maps.newHashMap();
+
+    options.asMap().forEach((key, value) -> {
+      if (key.startsWith(SnapshotSummary.EXTRA_METADATA_PREFIX)) {
+        
extraSnapshotMetadata.put(key.substring(SnapshotSummary.EXTRA_METADATA_PREFIX.length()),
 value);
+      }
+    });
 
     long tableTargetFileSize = PropertyUtil.propertyAsLong(
         table.properties(), WRITE_TARGET_FILE_SIZE_BYTES, 
WRITE_TARGET_FILE_SIZE_BYTES_DEFAULT);
@@ -138,6 +147,10 @@ class Writer implements DataSourceWriter {
       operation.set("spark.app.id", applicationId);
     }
 
+    if (!extraSnapshotMetadata.isEmpty()) {
+      extraSnapshotMetadata.forEach((key, value) -> operation.set(key, value));
+    }
+
     if (isWapTable() && wapId != null) {
       // write-audit-publish is enabled for this table and job
       // stage the changes without changing the current snapshot
diff --git 
a/spark3/src/main/java/org/apache/iceberg/spark/source/SparkBatchWrite.java 
b/spark3/src/main/java/org/apache/iceberg/spark/source/SparkBatchWrite.java
index 9cd80e9..6b27453 100644
--- a/spark3/src/main/java/org/apache/iceberg/spark/source/SparkBatchWrite.java
+++ b/spark3/src/main/java/org/apache/iceberg/spark/source/SparkBatchWrite.java
@@ -41,6 +41,7 @@ import org.apache.iceberg.io.FileIO;
 import org.apache.iceberg.io.LocationProvider;
 import org.apache.iceberg.relocated.com.google.common.collect.ImmutableList;
 import org.apache.iceberg.relocated.com.google.common.collect.Iterables;
+import org.apache.iceberg.relocated.com.google.common.collect.Maps;
 import org.apache.iceberg.util.PropertyUtil;
 import org.apache.iceberg.util.Tasks;
 import org.apache.spark.broadcast.Broadcast;
@@ -84,6 +85,7 @@ class SparkBatchWrite implements BatchWrite {
   private final long targetFileSize;
   private final Schema writeSchema;
   private final StructType dsSchema;
+  private final Map<String, String> extraSnapshotMetadata;
 
   SparkBatchWrite(Table table, Broadcast<FileIO> io, 
Broadcast<EncryptionManager> encryptionManager,
                   CaseInsensitiveStringMap options, boolean overwriteDynamic, 
boolean overwriteByFilter,
@@ -100,6 +102,13 @@ class SparkBatchWrite implements BatchWrite {
     this.wapId = wapId;
     this.writeSchema = writeSchema;
     this.dsSchema = dsSchema;
+    this.extraSnapshotMetadata = Maps.newHashMap();
+
+    options.forEach((key, value) -> {
+      if (key.startsWith(SnapshotSummary.EXTRA_METADATA_PREFIX)) {
+        
extraSnapshotMetadata.put(key.substring(SnapshotSummary.EXTRA_METADATA_PREFIX.length()),
 value);
+      }
+    });
 
     long tableTargetFileSize = PropertyUtil.propertyAsLong(
         table.properties(), WRITE_TARGET_FILE_SIZE_BYTES, 
WRITE_TARGET_FILE_SIZE_BYTES_DEFAULT);
@@ -142,6 +151,10 @@ class SparkBatchWrite implements BatchWrite {
       operation.set("spark.app.id", applicationId);
     }
 
+    if (!extraSnapshotMetadata.isEmpty()) {
+      extraSnapshotMetadata.forEach((key, value) -> operation.set(key, value));
+    }
+
     if (isWapTable() && wapId != null) {
       // write-audit-publish is enabled for this table and job
       // stage the changes without changing the current snapshot

Reply via email to