This is an automated email from the ASF dual-hosted git repository.
blue pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/incubator-iceberg.git
The following commit(s) were added to refs/heads/master by this push:
new 81ae930 Add SnapshotUpdate interface with set method (#180)
81ae930 is described below
commit 81ae930c61f708191aa1f30a787e4a3d10def6f8
Author: Ryan Blue <[email protected]>
AuthorDate: Fri May 10 15:46:10 2019 -0700
Add SnapshotUpdate interface with set method (#180)
---
.../main/java/org/apache/iceberg/AppendFiles.java | 2 +-
.../main/java/org/apache/iceberg/DeleteFiles.java | 2 +-
.../java/org/apache/iceberg/OverwriteFiles.java | 2 +-
.../java/org/apache/iceberg/ReplacePartitions.java | 2 +-
.../main/java/org/apache/iceberg/RewriteFiles.java | 2 +-
.../{AppendFiles.java => SnapshotUpdate.java} | 21 ++++++++++-----------
.../main/java/org/apache/iceberg/FastAppend.java | 8 +++++++-
.../main/java/org/apache/iceberg/MergeAppend.java | 7 ++++++-
...shotUpdate.java => MergingSnapshotProducer.java} | 16 ++++++++++++----
.../main/java/org/apache/iceberg/OverwriteData.java | 7 ++++++-
.../main/java/org/apache/iceberg/ReplaceFiles.java | 7 ++++++-
.../apache/iceberg/ReplacePartitionsOperation.java | 8 +++++++-
.../{SnapshotUpdate.java => SnapshotProducer.java} | 6 +++---
.../java/org/apache/iceberg/SnapshotSummary.java | 9 +++++++++
.../java/org/apache/iceberg/StreamingDelete.java | 7 ++++++-
15 files changed, 77 insertions(+), 29 deletions(-)
diff --git a/api/src/main/java/org/apache/iceberg/AppendFiles.java
b/api/src/main/java/org/apache/iceberg/AppendFiles.java
index 2a9192a..912b103 100644
--- a/api/src/main/java/org/apache/iceberg/AppendFiles.java
+++ b/api/src/main/java/org/apache/iceberg/AppendFiles.java
@@ -28,7 +28,7 @@ package org.apache.iceberg;
* When committing, these changes will be applied to the latest table
snapshot. Commit conflicts
* will be resolved by applying the changes to the new latest snapshot and
reattempting the commit.
*/
-public interface AppendFiles extends PendingUpdate<Snapshot> {
+public interface AppendFiles extends SnapshotUpdate<AppendFiles> {
/**
* Append a {@link DataFile} to the table.
*
diff --git a/api/src/main/java/org/apache/iceberg/DeleteFiles.java
b/api/src/main/java/org/apache/iceberg/DeleteFiles.java
index e93dfb4..7a0aa7d 100644
--- a/api/src/main/java/org/apache/iceberg/DeleteFiles.java
+++ b/api/src/main/java/org/apache/iceberg/DeleteFiles.java
@@ -32,7 +32,7 @@ import org.apache.iceberg.expressions.Projections;
* When committing, these changes will be applied to the latest table
snapshot. Commit conflicts
* will be resolved by applying the changes to the new latest snapshot and
reattempting the commit.
*/
-public interface DeleteFiles extends PendingUpdate<Snapshot> {
+public interface DeleteFiles extends SnapshotUpdate<DeleteFiles> {
/**
* Delete a file path from the underlying table.
* <p>
diff --git a/api/src/main/java/org/apache/iceberg/OverwriteFiles.java
b/api/src/main/java/org/apache/iceberg/OverwriteFiles.java
index 8d8e38c..8af7974 100644
--- a/api/src/main/java/org/apache/iceberg/OverwriteFiles.java
+++ b/api/src/main/java/org/apache/iceberg/OverwriteFiles.java
@@ -37,7 +37,7 @@ import org.apache.iceberg.expressions.Projections;
* This has no requirements for the latest snapshot and will not fail based on
other snapshot
* changes.
*/
-public interface OverwriteFiles extends PendingUpdate<Snapshot> {
+public interface OverwriteFiles extends SnapshotUpdate<OverwriteFiles> {
/**
* Delete files that match an {@link Expression} on data rows from the table.
* <p>
diff --git a/api/src/main/java/org/apache/iceberg/ReplacePartitions.java
b/api/src/main/java/org/apache/iceberg/ReplacePartitions.java
index 07108e3..2d87ebf 100644
--- a/api/src/main/java/org/apache/iceberg/ReplacePartitions.java
+++ b/api/src/main/java/org/apache/iceberg/ReplacePartitions.java
@@ -34,7 +34,7 @@ package org.apache.iceberg;
* This has no requirements for the latest snapshot and will not fail based on
other snapshot
* changes.
*/
-public interface ReplacePartitions extends PendingUpdate<Snapshot> {
+public interface ReplacePartitions extends SnapshotUpdate<ReplacePartitions> {
/**
* Add a {@link DataFile} to the table.
*
diff --git a/api/src/main/java/org/apache/iceberg/RewriteFiles.java
b/api/src/main/java/org/apache/iceberg/RewriteFiles.java
index 3ee1374..49df7fc 100644
--- a/api/src/main/java/org/apache/iceberg/RewriteFiles.java
+++ b/api/src/main/java/org/apache/iceberg/RewriteFiles.java
@@ -33,7 +33,7 @@ import org.apache.iceberg.exceptions.ValidationException;
* If any of the deleted files are no longer in the latest snapshot when
reattempting, the commit
* will throw a {@link ValidationException}.
*/
-public interface RewriteFiles extends PendingUpdate<Snapshot> {
+public interface RewriteFiles extends SnapshotUpdate<RewriteFiles> {
/**
* Add a rewrite that replaces one set of files with another set that
contains the same data.
*
diff --git a/api/src/main/java/org/apache/iceberg/AppendFiles.java
b/api/src/main/java/org/apache/iceberg/SnapshotUpdate.java
similarity index 61%
copy from api/src/main/java/org/apache/iceberg/AppendFiles.java
copy to api/src/main/java/org/apache/iceberg/SnapshotUpdate.java
index 2a9192a..fcead03 100644
--- a/api/src/main/java/org/apache/iceberg/AppendFiles.java
+++ b/api/src/main/java/org/apache/iceberg/SnapshotUpdate.java
@@ -20,20 +20,19 @@
package org.apache.iceberg;
/**
- * API for appending new files in a table.
- * <p>
- * This API accumulates file additions, produces a new {@link Snapshot} of the
table, and commits
- * that snapshot as the current.
- * <p>
- * When committing, these changes will be applied to the latest table
snapshot. Commit conflicts
- * will be resolved by applying the changes to the new latest snapshot and
reattempting the commit.
+ * API for table changes that produce snapshots. This interface contains
common methods for all
+ * updates that create a new table {@link Snapshot}.
+ *
+ * @param <ThisT> the child Java API class, returned by method chaining.
*/
-public interface AppendFiles extends PendingUpdate<Snapshot> {
+public interface SnapshotUpdate<ThisT> extends PendingUpdate<Snapshot> {
/**
- * Append a {@link DataFile} to the table.
+ * Set a summary property in the snapshot produced by this update.
*
- * @param file a data file
+ * @param property a String property name
+ * @param value a String property value
* @return this for method chaining
*/
- AppendFiles appendFile(DataFile file);
+ ThisT set(String property, String value);
+
}
diff --git a/core/src/main/java/org/apache/iceberg/FastAppend.java
b/core/src/main/java/org/apache/iceberg/FastAppend.java
index 2bb9038..6bf560c 100644
--- a/core/src/main/java/org/apache/iceberg/FastAppend.java
+++ b/core/src/main/java/org/apache/iceberg/FastAppend.java
@@ -33,7 +33,7 @@ import org.apache.iceberg.io.OutputFile;
* <p>
* This implementation will attempt to commit 5 times before throwing {@link
CommitFailedException}.
*/
-class FastAppend extends SnapshotUpdate implements AppendFiles {
+class FastAppend extends SnapshotProducer<AppendFiles> implements AppendFiles {
private final PartitionSpec spec;
private final SnapshotSummary.Builder summaryBuilder =
SnapshotSummary.builder();
private final List<DataFile> newFiles = Lists.newArrayList();
@@ -46,6 +46,12 @@ class FastAppend extends SnapshotUpdate implements
AppendFiles {
}
@Override
+ public AppendFiles set(String property, String value) {
+ summaryBuilder.set(property, value);
+ return this;
+ }
+
+ @Override
protected String operation() {
return DataOperations.APPEND;
}
diff --git a/core/src/main/java/org/apache/iceberg/MergeAppend.java
b/core/src/main/java/org/apache/iceberg/MergeAppend.java
index 5d992bb..2646515 100644
--- a/core/src/main/java/org/apache/iceberg/MergeAppend.java
+++ b/core/src/main/java/org/apache/iceberg/MergeAppend.java
@@ -26,12 +26,17 @@ import org.apache.iceberg.exceptions.CommitFailedException;
* <p>
* This implementation will attempt to commit 5 times before throwing {@link
CommitFailedException}.
*/
-class MergeAppend extends MergingSnapshotUpdate implements AppendFiles {
+class MergeAppend extends MergingSnapshotProducer<AppendFiles> implements
AppendFiles {
MergeAppend(TableOperations ops) {
super(ops);
}
@Override
+ protected AppendFiles self() {
+ return this;
+ }
+
+ @Override
protected String operation() {
return DataOperations.APPEND;
}
diff --git a/core/src/main/java/org/apache/iceberg/MergingSnapshotUpdate.java
b/core/src/main/java/org/apache/iceberg/MergingSnapshotProducer.java
similarity index 98%
rename from core/src/main/java/org/apache/iceberg/MergingSnapshotUpdate.java
rename to core/src/main/java/org/apache/iceberg/MergingSnapshotProducer.java
index 835b99f..aebded7 100644
--- a/core/src/main/java/org/apache/iceberg/MergingSnapshotUpdate.java
+++ b/core/src/main/java/org/apache/iceberg/MergingSnapshotProducer.java
@@ -55,8 +55,8 @@ import static
org.apache.iceberg.TableProperties.MANIFEST_MIN_MERGE_COUNT_DEFAUL
import static org.apache.iceberg.TableProperties.MANIFEST_TARGET_SIZE_BYTES;
import static
org.apache.iceberg.TableProperties.MANIFEST_TARGET_SIZE_BYTES_DEFAULT;
-abstract class MergingSnapshotUpdate extends SnapshotUpdate {
- private static final Logger LOG =
LoggerFactory.getLogger(MergingSnapshotUpdate.class);
+abstract class MergingSnapshotProducer<ThisT> extends SnapshotProducer<ThisT> {
+ private static final Logger LOG =
LoggerFactory.getLogger(MergingSnapshotProducer.class);
private static final Joiner COMMA = Joiner.on(",");
@@ -104,7 +104,7 @@ abstract class MergingSnapshotUpdate extends SnapshotUpdate
{
private boolean filterUpdated = false; // used to clear caches of filtered
and merged manifests
- MergingSnapshotUpdate(TableOperations ops) {
+ MergingSnapshotProducer(TableOperations ops) {
super(ops);
this.ops = ops;
this.spec = ops.current().spec();
@@ -114,6 +114,14 @@ abstract class MergingSnapshotUpdate extends
SnapshotUpdate {
.propertyAsInt(MANIFEST_MIN_MERGE_COUNT,
MANIFEST_MIN_MERGE_COUNT_DEFAULT);
}
+ protected abstract ThisT self();
+
+ @Override
+ public ThisT set(String property, String value) {
+ summaryBuilder.set(property, value);
+ return self();
+ }
+
protected PartitionSpec writeSpec() {
// the spec is set when the write is started
return spec;
@@ -181,7 +189,7 @@ abstract class MergingSnapshotUpdate extends SnapshotUpdate
{
summaryBuilder.clear();
if (filterUpdated) {
- cleanUncommittedFilters(SnapshotUpdate.EMPTY_SET);
+ cleanUncommittedFilters(SnapshotProducer.EMPTY_SET);
this.filterUpdated = false;
}
diff --git a/core/src/main/java/org/apache/iceberg/OverwriteData.java
b/core/src/main/java/org/apache/iceberg/OverwriteData.java
index f6ba7f3..ba96180 100644
--- a/core/src/main/java/org/apache/iceberg/OverwriteData.java
+++ b/core/src/main/java/org/apache/iceberg/OverwriteData.java
@@ -26,7 +26,7 @@ import org.apache.iceberg.expressions.Expression;
import org.apache.iceberg.expressions.Projections;
import org.apache.iceberg.expressions.StrictMetricsEvaluator;
-public class OverwriteData extends MergingSnapshotUpdate implements
OverwriteFiles {
+public class OverwriteData extends MergingSnapshotProducer<OverwriteFiles>
implements OverwriteFiles {
private boolean validateAddedFiles = false;
protected OverwriteData(TableOperations ops) {
@@ -34,6 +34,11 @@ public class OverwriteData extends MergingSnapshotUpdate
implements OverwriteFil
}
@Override
+ protected OverwriteFiles self() {
+ return this;
+ }
+
+ @Override
protected String operation() {
return DataOperations.OVERWRITE;
}
diff --git a/core/src/main/java/org/apache/iceberg/ReplaceFiles.java
b/core/src/main/java/org/apache/iceberg/ReplaceFiles.java
index 71779f2..d35f4e3 100644
--- a/core/src/main/java/org/apache/iceberg/ReplaceFiles.java
+++ b/core/src/main/java/org/apache/iceberg/ReplaceFiles.java
@@ -22,7 +22,7 @@ package org.apache.iceberg;
import com.google.common.base.Preconditions;
import java.util.Set;
-class ReplaceFiles extends MergingSnapshotUpdate implements RewriteFiles {
+class ReplaceFiles extends MergingSnapshotProducer<RewriteFiles> implements
RewriteFiles {
ReplaceFiles(TableOperations ops) {
super(ops);
@@ -31,6 +31,11 @@ class ReplaceFiles extends MergingSnapshotUpdate implements
RewriteFiles {
}
@Override
+ protected RewriteFiles self() {
+ return this;
+ }
+
+ @Override
protected String operation() {
return DataOperations.REPLACE;
}
diff --git
a/core/src/main/java/org/apache/iceberg/ReplacePartitionsOperation.java
b/core/src/main/java/org/apache/iceberg/ReplacePartitionsOperation.java
index fb21231..474b99c 100644
--- a/core/src/main/java/org/apache/iceberg/ReplacePartitionsOperation.java
+++ b/core/src/main/java/org/apache/iceberg/ReplacePartitionsOperation.java
@@ -23,12 +23,18 @@ import java.util.List;
import org.apache.iceberg.exceptions.ValidationException;
import org.apache.iceberg.expressions.Expressions;
-public class ReplacePartitionsOperation extends MergingSnapshotUpdate
implements ReplacePartitions {
+public class ReplacePartitionsOperation
+ extends MergingSnapshotProducer<ReplacePartitions> implements
ReplacePartitions {
ReplacePartitionsOperation(TableOperations ops) {
super(ops);
}
@Override
+ protected ReplacePartitions self() {
+ return this;
+ }
+
+ @Override
protected String operation() {
return DataOperations.OVERWRITE;
}
diff --git a/core/src/main/java/org/apache/iceberg/SnapshotUpdate.java
b/core/src/main/java/org/apache/iceberg/SnapshotProducer.java
similarity index 98%
rename from core/src/main/java/org/apache/iceberg/SnapshotUpdate.java
rename to core/src/main/java/org/apache/iceberg/SnapshotProducer.java
index 90c15d7..ed5a508 100644
--- a/core/src/main/java/org/apache/iceberg/SnapshotUpdate.java
+++ b/core/src/main/java/org/apache/iceberg/SnapshotProducer.java
@@ -52,8 +52,8 @@ import static
org.apache.iceberg.TableProperties.COMMIT_TOTAL_RETRY_TIME_MS_DEFA
import static org.apache.iceberg.TableProperties.MANIFEST_LISTS_ENABLED;
import static
org.apache.iceberg.TableProperties.MANIFEST_LISTS_ENABLED_DEFAULT;
-abstract class SnapshotUpdate implements PendingUpdate<Snapshot> {
- private static final Logger LOG =
LoggerFactory.getLogger(SnapshotUpdate.class);
+abstract class SnapshotProducer<ThisT> implements SnapshotUpdate<ThisT> {
+ private static final Logger LOG =
LoggerFactory.getLogger(SnapshotProducer.class);
static final Set<ManifestFile> EMPTY_SET = Sets.newHashSet();
/**
@@ -68,7 +68,7 @@ abstract class SnapshotUpdate implements
PendingUpdate<Snapshot> {
private Long snapshotId = null;
private TableMetadata base = null;
- protected SnapshotUpdate(TableOperations ops) {
+ protected SnapshotProducer(TableOperations ops) {
this.ops = ops;
this.base = ops.current();
this.manifestsWithMetadata = Caffeine
diff --git a/core/src/main/java/org/apache/iceberg/SnapshotSummary.java
b/core/src/main/java/org/apache/iceberg/SnapshotSummary.java
index dee9f02..e33e004 100644
--- a/core/src/main/java/org/apache/iceberg/SnapshotSummary.java
+++ b/core/src/main/java/org/apache/iceberg/SnapshotSummary.java
@@ -20,6 +20,7 @@
package org.apache.iceberg;
import com.google.common.collect.ImmutableMap;
+import com.google.common.collect.Maps;
import com.google.common.collect.Sets;
import java.util.Map;
import java.util.Set;
@@ -49,6 +50,7 @@ public class SnapshotSummary {
private long deletedDuplicateFiles = 0L;
private long addedRecords = 0L;
private long deletedRecords = 0L;
+ private Map<String, String> properties = Maps.newHashMap();
public void clear() {
changedPartitions.clear();
@@ -75,9 +77,16 @@ public class SnapshotSummary {
this.addedRecords += file.recordCount();
}
+ public void set(String property, String value) {
+ properties.put(property, value);
+ }
+
public Map<String, String> build() {
ImmutableMap.Builder<String, String> builder = ImmutableMap.builder();
+ // copy custom summary properties
+ builder.putAll(properties);
+
setIf(addedFiles > 0, builder, ADDED_FILES_PROP, addedFiles);
setIf(deletedFiles > 0, builder, DELETED_FILES_PROP, deletedFiles);
setIf(deletedDuplicateFiles > 0, builder, DELETED_DUPLICATE_FILES,
deletedDuplicateFiles);
diff --git a/core/src/main/java/org/apache/iceberg/StreamingDelete.java
b/core/src/main/java/org/apache/iceberg/StreamingDelete.java
index 3237d7b..f711a82 100644
--- a/core/src/main/java/org/apache/iceberg/StreamingDelete.java
+++ b/core/src/main/java/org/apache/iceberg/StreamingDelete.java
@@ -27,12 +27,17 @@ import org.apache.iceberg.expressions.Expression;
* <p>
* This implementation will attempt to commit 5 times before throwing {@link
CommitFailedException}.
*/
-class StreamingDelete extends MergingSnapshotUpdate implements DeleteFiles {
+class StreamingDelete extends MergingSnapshotProducer<DeleteFiles> implements
DeleteFiles {
StreamingDelete(TableOperations ops) {
super(ops);
}
@Override
+ protected DeleteFiles self() {
+ return this;
+ }
+
+ @Override
protected String operation() {
return DataOperations.DELETE;
}