This is an automated email from the ASF dual-hosted git repository.

blue pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/incubator-iceberg.git


The following commit(s) were added to refs/heads/master by this push:
     new 81ae930  Add SnapshotUpdate interface with set method (#180)
81ae930 is described below

commit 81ae930c61f708191aa1f30a787e4a3d10def6f8
Author: Ryan Blue <[email protected]>
AuthorDate: Fri May 10 15:46:10 2019 -0700

    Add SnapshotUpdate interface with set method (#180)
---
 .../main/java/org/apache/iceberg/AppendFiles.java   |  2 +-
 .../main/java/org/apache/iceberg/DeleteFiles.java   |  2 +-
 .../java/org/apache/iceberg/OverwriteFiles.java     |  2 +-
 .../java/org/apache/iceberg/ReplacePartitions.java  |  2 +-
 .../main/java/org/apache/iceberg/RewriteFiles.java  |  2 +-
 .../{AppendFiles.java => SnapshotUpdate.java}       | 21 ++++++++++-----------
 .../main/java/org/apache/iceberg/FastAppend.java    |  8 +++++++-
 .../main/java/org/apache/iceberg/MergeAppend.java   |  7 ++++++-
 ...shotUpdate.java => MergingSnapshotProducer.java} | 16 ++++++++++++----
 .../main/java/org/apache/iceberg/OverwriteData.java |  7 ++++++-
 .../main/java/org/apache/iceberg/ReplaceFiles.java  |  7 ++++++-
 .../apache/iceberg/ReplacePartitionsOperation.java  |  8 +++++++-
 .../{SnapshotUpdate.java => SnapshotProducer.java}  |  6 +++---
 .../java/org/apache/iceberg/SnapshotSummary.java    |  9 +++++++++
 .../java/org/apache/iceberg/StreamingDelete.java    |  7 ++++++-
 15 files changed, 77 insertions(+), 29 deletions(-)

diff --git a/api/src/main/java/org/apache/iceberg/AppendFiles.java 
b/api/src/main/java/org/apache/iceberg/AppendFiles.java
index 2a9192a..912b103 100644
--- a/api/src/main/java/org/apache/iceberg/AppendFiles.java
+++ b/api/src/main/java/org/apache/iceberg/AppendFiles.java
@@ -28,7 +28,7 @@ package org.apache.iceberg;
  * When committing, these changes will be applied to the latest table 
snapshot. Commit conflicts
  * will be resolved by applying the changes to the new latest snapshot and 
reattempting the commit.
  */
-public interface AppendFiles extends PendingUpdate<Snapshot> {
+public interface AppendFiles extends SnapshotUpdate<AppendFiles> {
   /**
    * Append a {@link DataFile} to the table.
    *
diff --git a/api/src/main/java/org/apache/iceberg/DeleteFiles.java 
b/api/src/main/java/org/apache/iceberg/DeleteFiles.java
index e93dfb4..7a0aa7d 100644
--- a/api/src/main/java/org/apache/iceberg/DeleteFiles.java
+++ b/api/src/main/java/org/apache/iceberg/DeleteFiles.java
@@ -32,7 +32,7 @@ import org.apache.iceberg.expressions.Projections;
  * When committing, these changes will be applied to the latest table 
snapshot. Commit conflicts
  * will be resolved by applying the changes to the new latest snapshot and 
reattempting the commit.
  */
-public interface DeleteFiles extends PendingUpdate<Snapshot> {
+public interface DeleteFiles extends SnapshotUpdate<DeleteFiles> {
   /**
    * Delete a file path from the underlying table.
    * <p>
diff --git a/api/src/main/java/org/apache/iceberg/OverwriteFiles.java 
b/api/src/main/java/org/apache/iceberg/OverwriteFiles.java
index 8d8e38c..8af7974 100644
--- a/api/src/main/java/org/apache/iceberg/OverwriteFiles.java
+++ b/api/src/main/java/org/apache/iceberg/OverwriteFiles.java
@@ -37,7 +37,7 @@ import org.apache.iceberg.expressions.Projections;
  * This has no requirements for the latest snapshot and will not fail based on 
other snapshot
  * changes.
  */
-public interface OverwriteFiles extends PendingUpdate<Snapshot> {
+public interface OverwriteFiles extends SnapshotUpdate<OverwriteFiles> {
   /**
    * Delete files that match an {@link Expression} on data rows from the table.
    * <p>
diff --git a/api/src/main/java/org/apache/iceberg/ReplacePartitions.java 
b/api/src/main/java/org/apache/iceberg/ReplacePartitions.java
index 07108e3..2d87ebf 100644
--- a/api/src/main/java/org/apache/iceberg/ReplacePartitions.java
+++ b/api/src/main/java/org/apache/iceberg/ReplacePartitions.java
@@ -34,7 +34,7 @@ package org.apache.iceberg;
  * This has no requirements for the latest snapshot and will not fail based on 
other snapshot
  * changes.
  */
-public interface ReplacePartitions extends PendingUpdate<Snapshot> {
+public interface ReplacePartitions extends SnapshotUpdate<ReplacePartitions> {
   /**
    * Add a {@link DataFile} to the table.
    *
diff --git a/api/src/main/java/org/apache/iceberg/RewriteFiles.java 
b/api/src/main/java/org/apache/iceberg/RewriteFiles.java
index 3ee1374..49df7fc 100644
--- a/api/src/main/java/org/apache/iceberg/RewriteFiles.java
+++ b/api/src/main/java/org/apache/iceberg/RewriteFiles.java
@@ -33,7 +33,7 @@ import org.apache.iceberg.exceptions.ValidationException;
  * If any of the deleted files are no longer in the latest snapshot when 
reattempting, the commit
  * will throw a {@link ValidationException}.
  */
-public interface RewriteFiles extends PendingUpdate<Snapshot> {
+public interface RewriteFiles extends SnapshotUpdate<RewriteFiles> {
   /**
    * Add a rewrite that replaces one set of files with another set that 
contains the same data.
    *
diff --git a/api/src/main/java/org/apache/iceberg/AppendFiles.java 
b/api/src/main/java/org/apache/iceberg/SnapshotUpdate.java
similarity index 61%
copy from api/src/main/java/org/apache/iceberg/AppendFiles.java
copy to api/src/main/java/org/apache/iceberg/SnapshotUpdate.java
index 2a9192a..fcead03 100644
--- a/api/src/main/java/org/apache/iceberg/AppendFiles.java
+++ b/api/src/main/java/org/apache/iceberg/SnapshotUpdate.java
@@ -20,20 +20,19 @@
 package org.apache.iceberg;
 
 /**
- * API for appending new files in a table.
- * <p>
- * This API accumulates file additions, produces a new {@link Snapshot} of the 
table, and commits
- * that snapshot as the current.
- * <p>
- * When committing, these changes will be applied to the latest table 
snapshot. Commit conflicts
- * will be resolved by applying the changes to the new latest snapshot and 
reattempting the commit.
+ * API for table changes that produce snapshots. This interface contains 
common methods for all
+ * updates that create a new table {@link Snapshot}.
+ *
+ * @param <ThisT> the child Java API class, returned by method chaining.
  */
-public interface AppendFiles extends PendingUpdate<Snapshot> {
+public interface SnapshotUpdate<ThisT> extends PendingUpdate<Snapshot> {
   /**
-   * Append a {@link DataFile} to the table.
+   * Set a summary property in the snapshot produced by this update.
    *
-   * @param file a data file
+   * @param property a String property name
+   * @param value a String property value
    * @return this for method chaining
    */
-  AppendFiles appendFile(DataFile file);
+  ThisT set(String property, String value);
+
 }
diff --git a/core/src/main/java/org/apache/iceberg/FastAppend.java 
b/core/src/main/java/org/apache/iceberg/FastAppend.java
index 2bb9038..6bf560c 100644
--- a/core/src/main/java/org/apache/iceberg/FastAppend.java
+++ b/core/src/main/java/org/apache/iceberg/FastAppend.java
@@ -33,7 +33,7 @@ import org.apache.iceberg.io.OutputFile;
  * <p>
  * This implementation will attempt to commit 5 times before throwing {@link 
CommitFailedException}.
  */
-class FastAppend extends SnapshotUpdate implements AppendFiles {
+class FastAppend extends SnapshotProducer<AppendFiles> implements AppendFiles {
   private final PartitionSpec spec;
   private final SnapshotSummary.Builder summaryBuilder = 
SnapshotSummary.builder();
   private final List<DataFile> newFiles = Lists.newArrayList();
@@ -46,6 +46,12 @@ class FastAppend extends SnapshotUpdate implements 
AppendFiles {
   }
 
   @Override
+  public AppendFiles set(String property, String value) {
+    summaryBuilder.set(property, value);
+    return this;
+  }
+
+  @Override
   protected String operation() {
     return DataOperations.APPEND;
   }
diff --git a/core/src/main/java/org/apache/iceberg/MergeAppend.java 
b/core/src/main/java/org/apache/iceberg/MergeAppend.java
index 5d992bb..2646515 100644
--- a/core/src/main/java/org/apache/iceberg/MergeAppend.java
+++ b/core/src/main/java/org/apache/iceberg/MergeAppend.java
@@ -26,12 +26,17 @@ import org.apache.iceberg.exceptions.CommitFailedException;
  * <p>
  * This implementation will attempt to commit 5 times before throwing {@link 
CommitFailedException}.
  */
-class MergeAppend extends MergingSnapshotUpdate implements AppendFiles {
+class MergeAppend extends MergingSnapshotProducer<AppendFiles> implements 
AppendFiles {
   MergeAppend(TableOperations ops) {
     super(ops);
   }
 
   @Override
+  protected AppendFiles self() {
+    return this;
+  }
+
+  @Override
   protected String operation() {
     return DataOperations.APPEND;
   }
diff --git a/core/src/main/java/org/apache/iceberg/MergingSnapshotUpdate.java 
b/core/src/main/java/org/apache/iceberg/MergingSnapshotProducer.java
similarity index 98%
rename from core/src/main/java/org/apache/iceberg/MergingSnapshotUpdate.java
rename to core/src/main/java/org/apache/iceberg/MergingSnapshotProducer.java
index 835b99f..aebded7 100644
--- a/core/src/main/java/org/apache/iceberg/MergingSnapshotUpdate.java
+++ b/core/src/main/java/org/apache/iceberg/MergingSnapshotProducer.java
@@ -55,8 +55,8 @@ import static 
org.apache.iceberg.TableProperties.MANIFEST_MIN_MERGE_COUNT_DEFAUL
 import static org.apache.iceberg.TableProperties.MANIFEST_TARGET_SIZE_BYTES;
 import static 
org.apache.iceberg.TableProperties.MANIFEST_TARGET_SIZE_BYTES_DEFAULT;
 
-abstract class MergingSnapshotUpdate extends SnapshotUpdate {
-  private static final Logger LOG = 
LoggerFactory.getLogger(MergingSnapshotUpdate.class);
+abstract class MergingSnapshotProducer<ThisT> extends SnapshotProducer<ThisT> {
+  private static final Logger LOG = 
LoggerFactory.getLogger(MergingSnapshotProducer.class);
 
   private static final Joiner COMMA = Joiner.on(",");
 
@@ -104,7 +104,7 @@ abstract class MergingSnapshotUpdate extends SnapshotUpdate 
{
 
   private boolean filterUpdated = false; // used to clear caches of filtered 
and merged manifests
 
-  MergingSnapshotUpdate(TableOperations ops) {
+  MergingSnapshotProducer(TableOperations ops) {
     super(ops);
     this.ops = ops;
     this.spec = ops.current().spec();
@@ -114,6 +114,14 @@ abstract class MergingSnapshotUpdate extends 
SnapshotUpdate {
         .propertyAsInt(MANIFEST_MIN_MERGE_COUNT, 
MANIFEST_MIN_MERGE_COUNT_DEFAULT);
   }
 
+  protected abstract ThisT self();
+
+  @Override
+  public ThisT set(String property, String value) {
+    summaryBuilder.set(property, value);
+    return self();
+  }
+
   protected PartitionSpec writeSpec() {
     // the spec is set when the write is started
     return spec;
@@ -181,7 +189,7 @@ abstract class MergingSnapshotUpdate extends SnapshotUpdate 
{
     summaryBuilder.clear();
 
     if (filterUpdated) {
-      cleanUncommittedFilters(SnapshotUpdate.EMPTY_SET);
+      cleanUncommittedFilters(SnapshotProducer.EMPTY_SET);
       this.filterUpdated = false;
     }
 
diff --git a/core/src/main/java/org/apache/iceberg/OverwriteData.java 
b/core/src/main/java/org/apache/iceberg/OverwriteData.java
index f6ba7f3..ba96180 100644
--- a/core/src/main/java/org/apache/iceberg/OverwriteData.java
+++ b/core/src/main/java/org/apache/iceberg/OverwriteData.java
@@ -26,7 +26,7 @@ import org.apache.iceberg.expressions.Expression;
 import org.apache.iceberg.expressions.Projections;
 import org.apache.iceberg.expressions.StrictMetricsEvaluator;
 
-public class OverwriteData extends MergingSnapshotUpdate implements 
OverwriteFiles {
+public class OverwriteData extends MergingSnapshotProducer<OverwriteFiles> 
implements OverwriteFiles {
   private boolean validateAddedFiles = false;
 
   protected OverwriteData(TableOperations ops) {
@@ -34,6 +34,11 @@ public class OverwriteData extends MergingSnapshotUpdate 
implements OverwriteFil
   }
 
   @Override
+  protected OverwriteFiles self() {
+    return this;
+  }
+
+  @Override
   protected String operation() {
     return DataOperations.OVERWRITE;
   }
diff --git a/core/src/main/java/org/apache/iceberg/ReplaceFiles.java 
b/core/src/main/java/org/apache/iceberg/ReplaceFiles.java
index 71779f2..d35f4e3 100644
--- a/core/src/main/java/org/apache/iceberg/ReplaceFiles.java
+++ b/core/src/main/java/org/apache/iceberg/ReplaceFiles.java
@@ -22,7 +22,7 @@ package org.apache.iceberg;
 import com.google.common.base.Preconditions;
 import java.util.Set;
 
-class ReplaceFiles extends MergingSnapshotUpdate implements RewriteFiles {
+class ReplaceFiles extends MergingSnapshotProducer<RewriteFiles> implements 
RewriteFiles {
   ReplaceFiles(TableOperations ops) {
     super(ops);
 
@@ -31,6 +31,11 @@ class ReplaceFiles extends MergingSnapshotUpdate implements 
RewriteFiles {
   }
 
   @Override
+  protected RewriteFiles self() {
+    return this;
+  }
+
+  @Override
   protected String operation() {
     return DataOperations.REPLACE;
   }
diff --git 
a/core/src/main/java/org/apache/iceberg/ReplacePartitionsOperation.java 
b/core/src/main/java/org/apache/iceberg/ReplacePartitionsOperation.java
index fb21231..474b99c 100644
--- a/core/src/main/java/org/apache/iceberg/ReplacePartitionsOperation.java
+++ b/core/src/main/java/org/apache/iceberg/ReplacePartitionsOperation.java
@@ -23,12 +23,18 @@ import java.util.List;
 import org.apache.iceberg.exceptions.ValidationException;
 import org.apache.iceberg.expressions.Expressions;
 
-public class ReplacePartitionsOperation extends MergingSnapshotUpdate 
implements ReplacePartitions {
+public class ReplacePartitionsOperation
+    extends MergingSnapshotProducer<ReplacePartitions> implements 
ReplacePartitions {
   ReplacePartitionsOperation(TableOperations ops) {
     super(ops);
   }
 
   @Override
+  protected ReplacePartitions self() {
+    return this;
+  }
+
+  @Override
   protected String operation() {
     return DataOperations.OVERWRITE;
   }
diff --git a/core/src/main/java/org/apache/iceberg/SnapshotUpdate.java 
b/core/src/main/java/org/apache/iceberg/SnapshotProducer.java
similarity index 98%
rename from core/src/main/java/org/apache/iceberg/SnapshotUpdate.java
rename to core/src/main/java/org/apache/iceberg/SnapshotProducer.java
index 90c15d7..ed5a508 100644
--- a/core/src/main/java/org/apache/iceberg/SnapshotUpdate.java
+++ b/core/src/main/java/org/apache/iceberg/SnapshotProducer.java
@@ -52,8 +52,8 @@ import static 
org.apache.iceberg.TableProperties.COMMIT_TOTAL_RETRY_TIME_MS_DEFA
 import static org.apache.iceberg.TableProperties.MANIFEST_LISTS_ENABLED;
 import static 
org.apache.iceberg.TableProperties.MANIFEST_LISTS_ENABLED_DEFAULT;
 
-abstract class SnapshotUpdate implements PendingUpdate<Snapshot> {
-  private static final Logger LOG = 
LoggerFactory.getLogger(SnapshotUpdate.class);
+abstract class SnapshotProducer<ThisT> implements SnapshotUpdate<ThisT> {
+  private static final Logger LOG = 
LoggerFactory.getLogger(SnapshotProducer.class);
   static final Set<ManifestFile> EMPTY_SET = Sets.newHashSet();
 
   /**
@@ -68,7 +68,7 @@ abstract class SnapshotUpdate implements 
PendingUpdate<Snapshot> {
   private Long snapshotId = null;
   private TableMetadata base = null;
 
-  protected SnapshotUpdate(TableOperations ops) {
+  protected SnapshotProducer(TableOperations ops) {
     this.ops = ops;
     this.base = ops.current();
     this.manifestsWithMetadata = Caffeine
diff --git a/core/src/main/java/org/apache/iceberg/SnapshotSummary.java 
b/core/src/main/java/org/apache/iceberg/SnapshotSummary.java
index dee9f02..e33e004 100644
--- a/core/src/main/java/org/apache/iceberg/SnapshotSummary.java
+++ b/core/src/main/java/org/apache/iceberg/SnapshotSummary.java
@@ -20,6 +20,7 @@
 package org.apache.iceberg;
 
 import com.google.common.collect.ImmutableMap;
+import com.google.common.collect.Maps;
 import com.google.common.collect.Sets;
 import java.util.Map;
 import java.util.Set;
@@ -49,6 +50,7 @@ public class SnapshotSummary {
     private long deletedDuplicateFiles = 0L;
     private long addedRecords = 0L;
     private long deletedRecords = 0L;
+    private Map<String, String> properties = Maps.newHashMap();
 
     public void clear() {
       changedPartitions.clear();
@@ -75,9 +77,16 @@ public class SnapshotSummary {
       this.addedRecords += file.recordCount();
     }
 
+    public void set(String property, String value) {
+      properties.put(property, value);
+    }
+
     public Map<String, String> build() {
       ImmutableMap.Builder<String, String> builder = ImmutableMap.builder();
 
+      // copy custom summary properties
+      builder.putAll(properties);
+
       setIf(addedFiles > 0, builder, ADDED_FILES_PROP, addedFiles);
       setIf(deletedFiles > 0, builder, DELETED_FILES_PROP, deletedFiles);
       setIf(deletedDuplicateFiles > 0, builder, DELETED_DUPLICATE_FILES, 
deletedDuplicateFiles);
diff --git a/core/src/main/java/org/apache/iceberg/StreamingDelete.java 
b/core/src/main/java/org/apache/iceberg/StreamingDelete.java
index 3237d7b..f711a82 100644
--- a/core/src/main/java/org/apache/iceberg/StreamingDelete.java
+++ b/core/src/main/java/org/apache/iceberg/StreamingDelete.java
@@ -27,12 +27,17 @@ import org.apache.iceberg.expressions.Expression;
  * <p>
  * This implementation will attempt to commit 5 times before throwing {@link 
CommitFailedException}.
  */
-class StreamingDelete extends MergingSnapshotUpdate implements DeleteFiles {
+class StreamingDelete extends MergingSnapshotProducer<DeleteFiles> implements 
DeleteFiles {
   StreamingDelete(TableOperations ops) {
     super(ops);
   }
 
   @Override
+  protected DeleteFiles self() {
+    return this;
+  }
+
+  @Override
   protected String operation() {
     return DataOperations.DELETE;
   }

Reply via email to