This is an automated email from the ASF dual-hosted git repository.
blue pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/iceberg.git
The following commit(s) were added to refs/heads/master by this push:
new bf7edc4 Spark: Support custom metadata in snapshot summary (#1241)
bf7edc4 is described below
commit bf7edc4b325df6dd80d86fea0149d2be0ad09468
Author: moulimukherjee <[email protected]>
AuthorDate: Fri Jul 24 15:21:15 2020 -0700
Spark: Support custom metadata in snapshot summary (#1241)
---
.../java/org/apache/iceberg/SnapshotSummary.java | 1 +
site/docs/configuration.md | 1 +
.../spark/source/TestDataSourceOptions.java | 24 ++++++++++++++++++++++
.../org/apache/iceberg/spark/source/Writer.java | 13 ++++++++++++
.../iceberg/spark/source/SparkBatchWrite.java | 13 ++++++++++++
5 files changed, 52 insertions(+)
diff --git a/core/src/main/java/org/apache/iceberg/SnapshotSummary.java
b/core/src/main/java/org/apache/iceberg/SnapshotSummary.java
index ec7b73f..4b7e480 100644
--- a/core/src/main/java/org/apache/iceberg/SnapshotSummary.java
+++ b/core/src/main/java/org/apache/iceberg/SnapshotSummary.java
@@ -47,6 +47,7 @@ public class SnapshotSummary {
public static final String PUBLISHED_WAP_ID_PROP = "published-wap-id";
public static final String SOURCE_SNAPSHOT_ID_PROP = "source-snapshot-id";
public static final String REPLACE_PARTITIONS_PROP = "replace-partitions";
+ public static final String EXTRA_METADATA_PREFIX = "snapshot-property.";
private SnapshotSummary() {
}
diff --git a/site/docs/configuration.md b/site/docs/configuration.md
index 27dd60b..2badc75 100644
--- a/site/docs/configuration.md
+++ b/site/docs/configuration.md
@@ -133,4 +133,5 @@ df.write
| write-format | Table write.format.default | File format to use for
this write operation; parquet or avro |
| target-file-size-bytes | As per table property | Overrides this table's
write.target-file-size-bytes |
| check-nullability | true | Sets the nullable
check on fields |
+| snapshot-property._custom-key_ | null | Adds an entry with
custom-key and corresponding value in the snapshot summary |
diff --git
a/spark/src/test/java/org/apache/iceberg/spark/source/TestDataSourceOptions.java
b/spark/src/test/java/org/apache/iceberg/spark/source/TestDataSourceOptions.java
index 5881ade..cf1b8c9 100644
---
a/spark/src/test/java/org/apache/iceberg/spark/source/TestDataSourceOptions.java
+++
b/spark/src/test/java/org/apache/iceberg/spark/source/TestDataSourceOptions.java
@@ -364,4 +364,28 @@ public abstract class TestDataSourceOptions {
int partitionNum = metadataDf.javaRDD().getNumPartitions();
Assert.assertEquals("Spark partitions should match", expectedSplits,
partitionNum);
}
+
+ @Test
+ public void testExtraSnapshotMetadata() throws IOException {
+ String tableLocation = temp.newFolder("iceberg-table").toString();
+ HadoopTables tables = new HadoopTables(CONF);
+ tables.create(SCHEMA, PartitionSpec.unpartitioned(), Maps.newHashMap(),
tableLocation);
+
+ List<SimpleRecord> expectedRecords = Lists.newArrayList(
+ new SimpleRecord(1, "a"),
+ new SimpleRecord(2, "b")
+ );
+ Dataset<Row> originalDf = spark.createDataFrame(expectedRecords,
SimpleRecord.class);
+ originalDf.select("id", "data").write()
+ .format("iceberg")
+ .mode("append")
+ .option("snapshot-property.extra-key", "someValue")
+ .option("snapshot-property.another-key", "anotherValue")
+ .save(tableLocation);
+
+ Table table = tables.load(tableLocation);
+
+
Assert.assertTrue(table.currentSnapshot().summary().get("extra-key").equals("someValue"));
+
Assert.assertTrue(table.currentSnapshot().summary().get("another-key").equals("anotherValue"));
+ }
}
diff --git a/spark2/src/main/java/org/apache/iceberg/spark/source/Writer.java
b/spark2/src/main/java/org/apache/iceberg/spark/source/Writer.java
index a24820b..9ce590c 100644
--- a/spark2/src/main/java/org/apache/iceberg/spark/source/Writer.java
+++ b/spark2/src/main/java/org/apache/iceberg/spark/source/Writer.java
@@ -39,6 +39,7 @@ import org.apache.iceberg.io.FileIO;
import org.apache.iceberg.io.LocationProvider;
import org.apache.iceberg.relocated.com.google.common.collect.ImmutableList;
import org.apache.iceberg.relocated.com.google.common.collect.Iterables;
+import org.apache.iceberg.relocated.com.google.common.collect.Maps;
import org.apache.iceberg.util.PropertyUtil;
import org.apache.iceberg.util.Tasks;
import org.apache.spark.broadcast.Broadcast;
@@ -79,6 +80,7 @@ class Writer implements DataSourceWriter {
private final long targetFileSize;
private final Schema writeSchema;
private final StructType dsSchema;
+ private final Map<String, String> extraSnapshotMetadata;
Writer(Table table, Broadcast<FileIO> io, Broadcast<EncryptionManager>
encryptionManager,
DataSourceOptions options, boolean replacePartitions, String
applicationId, Schema writeSchema,
@@ -98,6 +100,13 @@ class Writer implements DataSourceWriter {
this.wapId = wapId;
this.writeSchema = writeSchema;
this.dsSchema = dsSchema;
+ this.extraSnapshotMetadata = Maps.newHashMap();
+
+ options.asMap().forEach((key, value) -> {
+ if (key.startsWith(SnapshotSummary.EXTRA_METADATA_PREFIX)) {
+
extraSnapshotMetadata.put(key.substring(SnapshotSummary.EXTRA_METADATA_PREFIX.length()),
value);
+ }
+ });
long tableTargetFileSize = PropertyUtil.propertyAsLong(
table.properties(), WRITE_TARGET_FILE_SIZE_BYTES,
WRITE_TARGET_FILE_SIZE_BYTES_DEFAULT);
@@ -138,6 +147,10 @@ class Writer implements DataSourceWriter {
operation.set("spark.app.id", applicationId);
}
+ if (!extraSnapshotMetadata.isEmpty()) {
+ extraSnapshotMetadata.forEach((key, value) -> operation.set(key, value));
+ }
+
if (isWapTable() && wapId != null) {
// write-audit-publish is enabled for this table and job
// stage the changes without changing the current snapshot
diff --git
a/spark3/src/main/java/org/apache/iceberg/spark/source/SparkBatchWrite.java
b/spark3/src/main/java/org/apache/iceberg/spark/source/SparkBatchWrite.java
index 9cd80e9..6b27453 100644
--- a/spark3/src/main/java/org/apache/iceberg/spark/source/SparkBatchWrite.java
+++ b/spark3/src/main/java/org/apache/iceberg/spark/source/SparkBatchWrite.java
@@ -41,6 +41,7 @@ import org.apache.iceberg.io.FileIO;
import org.apache.iceberg.io.LocationProvider;
import org.apache.iceberg.relocated.com.google.common.collect.ImmutableList;
import org.apache.iceberg.relocated.com.google.common.collect.Iterables;
+import org.apache.iceberg.relocated.com.google.common.collect.Maps;
import org.apache.iceberg.util.PropertyUtil;
import org.apache.iceberg.util.Tasks;
import org.apache.spark.broadcast.Broadcast;
@@ -84,6 +85,7 @@ class SparkBatchWrite implements BatchWrite {
private final long targetFileSize;
private final Schema writeSchema;
private final StructType dsSchema;
+ private final Map<String, String> extraSnapshotMetadata;
SparkBatchWrite(Table table, Broadcast<FileIO> io,
Broadcast<EncryptionManager> encryptionManager,
CaseInsensitiveStringMap options, boolean overwriteDynamic,
boolean overwriteByFilter,
@@ -100,6 +102,13 @@ class SparkBatchWrite implements BatchWrite {
this.wapId = wapId;
this.writeSchema = writeSchema;
this.dsSchema = dsSchema;
+ this.extraSnapshotMetadata = Maps.newHashMap();
+
+ options.forEach((key, value) -> {
+ if (key.startsWith(SnapshotSummary.EXTRA_METADATA_PREFIX)) {
+
extraSnapshotMetadata.put(key.substring(SnapshotSummary.EXTRA_METADATA_PREFIX.length()),
value);
+ }
+ });
long tableTargetFileSize = PropertyUtil.propertyAsLong(
table.properties(), WRITE_TARGET_FILE_SIZE_BYTES,
WRITE_TARGET_FILE_SIZE_BYTES_DEFAULT);
@@ -142,6 +151,10 @@ class SparkBatchWrite implements BatchWrite {
operation.set("spark.app.id", applicationId);
}
+ if (!extraSnapshotMetadata.isEmpty()) {
+ extraSnapshotMetadata.forEach((key, value) -> operation.set(key, value));
+ }
+
if (isWapTable() && wapId != null) {
// write-audit-publish is enabled for this table and job
// stage the changes without changing the current snapshot