This is an automated email from the ASF dual-hosted git repository.
blue pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/iceberg.git
The following commit(s) were added to refs/heads/master by this push:
new e1399fd Add unit tests for sequence numbers (#974)
e1399fd is described below
commit e1399fd4c78a4d06e7840afc9aa9a3d45ca548d5
Author: Chen, Junjie <[email protected]>
AuthorDate: Wed Jun 3 06:39:25 2020 +0800
Add unit tests for sequence numbers (#974)
---
.../org/apache/iceberg/BaseRewriteManifests.java | 2 +
.../main/java/org/apache/iceberg/FastAppend.java | 2 +
.../main/java/org/apache/iceberg/MergeAppend.java | 2 +
.../apache/iceberg/MergingSnapshotProducer.java | 5 +-
.../iceberg/TestSequenceNumberForV2Table.java | 451 +++++++++++++++++++++
5 files changed, 459 insertions(+), 3 deletions(-)
diff --git a/core/src/main/java/org/apache/iceberg/BaseRewriteManifests.java
b/core/src/main/java/org/apache/iceberg/BaseRewriteManifests.java
index 1993c79..d49b60a 100644
--- a/core/src/main/java/org/apache/iceberg/BaseRewriteManifests.java
+++ b/core/src/main/java/org/apache/iceberg/BaseRewriteManifests.java
@@ -138,6 +138,8 @@ public class BaseRewriteManifests extends
SnapshotProducer<RewriteManifests> imp
Preconditions.checkArgument(
manifest.snapshotId() == null || manifest.snapshotId() == -1,
"Snapshot id must be assigned during commit");
+ Preconditions.checkArgument(manifest.sequenceNumber() == -1,
+ "Sequence must be assigned during commit");
if (snapshotIdInheritanceEnabled && manifest.snapshotId() == null) {
addedManifests.add(manifest);
diff --git a/core/src/main/java/org/apache/iceberg/FastAppend.java
b/core/src/main/java/org/apache/iceberg/FastAppend.java
index b370a18..8b88c21 100644
--- a/core/src/main/java/org/apache/iceberg/FastAppend.java
+++ b/core/src/main/java/org/apache/iceberg/FastAppend.java
@@ -97,6 +97,8 @@ class FastAppend extends SnapshotProducer<AppendFiles>
implements AppendFiles {
Preconditions.checkArgument(
manifest.snapshotId() == null || manifest.snapshotId() == -1,
"Snapshot id must be assigned during commit");
+ Preconditions.checkArgument(manifest.sequenceNumber() == -1,
+ "Sequence number must be assigned during commit");
if (snapshotIdInheritanceEnabled && manifest.snapshotId() == null) {
summaryBuilder.addedManifest(manifest);
diff --git a/core/src/main/java/org/apache/iceberg/MergeAppend.java
b/core/src/main/java/org/apache/iceberg/MergeAppend.java
index aa156fb..fb3a9a8 100644
--- a/core/src/main/java/org/apache/iceberg/MergeAppend.java
+++ b/core/src/main/java/org/apache/iceberg/MergeAppend.java
@@ -55,6 +55,8 @@ class MergeAppend extends
MergingSnapshotProducer<AppendFiles> implements Append
Preconditions.checkArgument(
manifest.snapshotId() == null || manifest.snapshotId() == -1,
"Snapshot id must be assigned during commit");
+ Preconditions.checkArgument(manifest.sequenceNumber() == -1,
+ "Sequence must be assigned during commit");
add(manifest);
return this;
}
diff --git a/core/src/main/java/org/apache/iceberg/MergingSnapshotProducer.java
b/core/src/main/java/org/apache/iceberg/MergingSnapshotProducer.java
index 7a6ab8f..8e905f0 100644
--- a/core/src/main/java/org/apache/iceberg/MergingSnapshotProducer.java
+++ b/core/src/main/java/org/apache/iceberg/MergingSnapshotProducer.java
@@ -275,7 +275,6 @@ abstract class MergingSnapshotProducer<ThisT> extends
SnapshotProducer<ThisT> {
newManifests = Iterables.concat(appendManifests,
rewrittenAppendManifests);
}
- // TODO: add sequence numbers here
Iterable<ManifestFile> newManifestsWithMetadata = Iterables.transform(
newManifests,
manifest ->
GenericManifestFile.copyOf(manifest).withSnapshotId(snapshotId()).build());
@@ -680,11 +679,11 @@ abstract class MergingSnapshotProducer<ThisT> extends
SnapshotProducer<ThisT> {
// suppress deletes from previous snapshots. only files deleted
by this snapshot
// should be added to the new manifest
if (entry.snapshotId() == snapshotId()) {
- writer.addEntry(entry);
+ writer.delete(entry);
}
} else if (entry.status() == Status.ADDED && entry.snapshotId() ==
snapshotId()) {
// adds from this snapshot are still adds, otherwise they should
be existing
- writer.addEntry(entry);
+ writer.add(entry);
} else {
// add all files from the old manifest as existing files
writer.existing(entry);
diff --git
a/core/src/test/java/org/apache/iceberg/TestSequenceNumberForV2Table.java
b/core/src/test/java/org/apache/iceberg/TestSequenceNumberForV2Table.java
new file mode 100644
index 0000000..7be092a
--- /dev/null
+++ b/core/src/test/java/org/apache/iceberg/TestSequenceNumberForV2Table.java
@@ -0,0 +1,451 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.iceberg;
+
+import java.io.IOException;
+import java.util.HashSet;
+import java.util.Set;
+import java.util.stream.Collectors;
+import org.apache.iceberg.exceptions.CommitFailedException;
+import org.junit.Test;
+
+public class TestSequenceNumberForV2Table extends TableTestBase {
+
+ public TestSequenceNumberForV2Table() {
+ super(2);
+ }
+
+ @Test
+ public void testMergeAppend() throws IOException {
+ table.newAppend().appendFile(FILE_A).commit();
+ Snapshot snap1 = table.currentSnapshot();
+ long commitId1 = snap1.snapshotId();
+ ManifestFile manifestFile = table.currentSnapshot().manifests().get(0);
+
+ validateSnapshot(null, snap1, 1, FILE_A);
+ validateManifest(manifestFile, seqs(1), ids(commitId1), files(FILE_A));
+ V2Assert.assertEquals("Snapshot sequence number should be 1", 1,
snap1.sequenceNumber());
+ V2Assert.assertEquals("Last sequence number should be 1", 1,
readMetadata().lastSequenceNumber());
+
+ table.newAppend().appendFile(FILE_B).commit();
+ Snapshot snap2 = table.currentSnapshot();
+ long commitId2 = snap2.snapshotId();
+ manifestFile = table.currentSnapshot().manifests().get(0);
+ validateSnapshot(snap1, snap2, 2, FILE_B);
+ validateManifest(manifestFile, seqs(2), ids(commitId2), files(FILE_B));
+ V2Assert.assertEquals("Snapshot sequence number should be 2", 2,
snap2.sequenceNumber());
+ V2Assert.assertEquals("Last sequence number should be 2", 2,
readMetadata().lastSequenceNumber());
+
+ table.newAppend()
+ .appendManifest(writeManifest("input-m0.avro",
+ manifestEntry(ManifestEntry.Status.ADDED, null, FILE_C)))
+ .commit();
+ Snapshot snap3 = table.currentSnapshot();
+ long commitId3 = snap3.snapshotId();
+ manifestFile = table.currentSnapshot().manifests().get(0);
+ validateManifest(manifestFile, seqs(3), ids(commitId3), files(FILE_C));
+ validateSnapshot(snap2, snap3, 3, FILE_C);
+ V2Assert.assertEquals("Snapshot sequence number should be 3", 3,
snap3.sequenceNumber());
+ V2Assert.assertEquals("Last sequence number should be 3", 3,
readMetadata().lastSequenceNumber());
+
+ table.updateProperties()
+ .set(TableProperties.MANIFEST_MIN_MERGE_COUNT, "1")
+ .commit();
+
+ table.newAppend()
+ .appendManifest(writeManifest("input-m1.avro",
+ manifestEntry(ManifestEntry.Status.ADDED, null, FILE_D)))
+ .commit();
+
+ Snapshot snap4 = table.currentSnapshot();
+ long commitId4 = snap4.snapshotId();
+ manifestFile = snap4.manifests().stream()
+ .filter(manifest -> manifest.snapshotId() == commitId4)
+ .collect(Collectors.toList()).get(0);
+ validateManifest(manifestFile, seqs(4, 3, 2, 1), ids(commitId4, commitId3,
commitId2, commitId1),
+ files(FILE_D, FILE_C, FILE_B, FILE_A));
+ V2Assert.assertEquals("Snapshot sequence number should be 4", 4,
snap4.sequenceNumber());
+ V2Assert.assertEquals("Last sequence number should be 4", 4,
readMetadata().lastSequenceNumber());
+ }
+
+ @Test
+ public void testRewrite() {
+ table.newFastAppend().appendFile(FILE_A).commit();
+ Snapshot snap1 = table.currentSnapshot();
+ long commitId1 = snap1.snapshotId();
+ ManifestFile manifestFile = table.currentSnapshot().manifests().get(0);
+ validateSnapshot(null, snap1, 1, FILE_A);
+ validateManifest(manifestFile, seqs(1), ids(commitId1), files(FILE_A));
+ V2Assert.assertEquals("Snapshot sequence number should be 1", 1,
snap1.sequenceNumber());
+ V2Assert.assertEquals("Last sequence number should be 1", 1,
readMetadata().lastSequenceNumber());
+
+ table.newFastAppend().appendFile(FILE_B).commit();
+ Snapshot snap2 = table.currentSnapshot();
+ long commitId2 = snap2.snapshotId();
+ manifestFile = table.currentSnapshot().manifests().get(0);
+ validateSnapshot(snap1, snap2, 2, FILE_B);
+ validateManifest(manifestFile, seqs(2), ids(commitId2), files(FILE_B));
+ V2Assert.assertEquals("Snapshot sequence number should be 2", 2,
snap2.sequenceNumber());
+ V2Assert.assertEquals("Last sequence number should be 2", 2,
readMetadata().lastSequenceNumber());
+
+ table.rewriteManifests().clusterBy(file -> "").commit();
+ Snapshot snap3 = table.currentSnapshot();
+ ManifestFile newManifest = snap3.manifests().stream()
+ .filter(manifest -> manifest.snapshotId() == snap3.snapshotId())
+ .collect(Collectors.toList()).get(0);
+
+ V2Assert.assertEquals("Snapshot sequence number should be 3", 3,
snap3.sequenceNumber());
+ V2Assert.assertEquals("Last sequence number should be 3", 3,
readMetadata().lastSequenceNumber());
+
+ // FILE_A and FILE_B in manifest may reorder
+ for (ManifestEntry entry : ManifestFiles.read(newManifest,
FILE_IO).entries()) {
+ if (entry.file().path().equals(FILE_A.path())) {
+ V2Assert.assertEquals("FILE_A sequence number should be 1", 1,
entry.sequenceNumber().longValue());
+ }
+
+ if (entry.file().path().equals(FILE_B.path())) {
+ V2Assert.assertEquals("FILE_b sequence number should be 2", 2,
entry.sequenceNumber().longValue());
+ }
+ }
+
+ }
+
+ @Test
+ public void testCommitConflict() {
+ AppendFiles appendA = table.newFastAppend();
+ appendA.appendFile(FILE_A).apply();
+
+ table.updateProperties()
+ .set(TableProperties.COMMIT_NUM_RETRIES, "0")
+ .commit();
+
+ table.ops().failCommits(1);
+
+ AssertHelpers.assertThrows("Should reject commit",
+ CommitFailedException.class, "Injected failure",
+ () -> table.newFastAppend().appendFile(FILE_B).commit());
+
+ table.updateProperties()
+ .set(TableProperties.COMMIT_NUM_RETRIES, "5")
+ .commit();
+
+ appendA.commit();
+ Snapshot snap1 = table.currentSnapshot();
+ long commitId1 = snap1.snapshotId();
+ ManifestFile manifestFile = table.currentSnapshot().manifests().get(0);
+ validateSnapshot(null, snap1, 1, FILE_A);
+ validateManifest(manifestFile, seqs(1), ids(commitId1), files(FILE_A));
+ V2Assert.assertEquals("Snapshot sequence number should be 1", 1,
snap1.sequenceNumber());
+ V2Assert.assertEquals("Last sequence number should be 1", 1,
readMetadata().lastSequenceNumber());
+
+ AppendFiles appendFiles = table.newFastAppend().appendFile(FILE_C);
+ appendFiles.apply();
+ table.newFastAppend().appendFile(FILE_D).commit();
+ Snapshot snap2 = table.currentSnapshot();
+ long commitId2 = snap2.snapshotId();
+ manifestFile = table.currentSnapshot().manifests().get(0);
+ validateSnapshot(snap1, snap2, 2, FILE_D);
+ validateManifest(manifestFile, seqs(2), ids(commitId2), files(FILE_D));
+ V2Assert.assertEquals("Snapshot sequence number should be 2", 2,
snap2.sequenceNumber());
+ V2Assert.assertEquals("Last sequence number should be 2", 2,
readMetadata().lastSequenceNumber());
+
+ appendFiles.commit();
+ Snapshot snap3 = table.currentSnapshot();
+ long commitId3 = snap3.snapshotId();
+ manifestFile = table.currentSnapshot().manifests().get(0);
+ validateManifest(manifestFile, seqs(3), ids(commitId3), files(FILE_C));
+ validateSnapshot(snap2, snap3, 3, FILE_C);
+ V2Assert.assertEquals("Snapshot sequence number should be 3", 3,
snap3.sequenceNumber());
+ V2Assert.assertEquals("Last sequence number should be 3", 3,
readMetadata().lastSequenceNumber());
+ }
+
+ @Test
+ public void testRollBack() {
+ table.newFastAppend().appendFile(FILE_A).commit();
+ Snapshot snap1 = table.currentSnapshot();
+ long commitId1 = snap1.snapshotId();
+ ManifestFile manifestFile = table.currentSnapshot().manifests().get(0);
+ validateSnapshot(null, snap1, 1, FILE_A);
+ validateManifest(manifestFile, seqs(1), ids(commitId1), files(FILE_A));
+ V2Assert.assertEquals("Snapshot sequence number should be 1", 1,
snap1.sequenceNumber());
+ V2Assert.assertEquals("Last sequence number should be 1", 1,
readMetadata().lastSequenceNumber());
+
+ table.newFastAppend().appendFile(FILE_B).commit();
+ Snapshot snap2 = table.currentSnapshot();
+ long commitId2 = snap2.snapshotId();
+ manifestFile = table.currentSnapshot().manifests().get(0);
+ validateSnapshot(snap1, snap2, 2, FILE_B);
+ validateManifest(manifestFile, seqs(2), ids(commitId2), files(FILE_B));
+ V2Assert.assertEquals("Snapshot sequence number should be 2", 2,
snap2.sequenceNumber());
+ V2Assert.assertEquals("Last sequence number should be 2", 2,
readMetadata().lastSequenceNumber());
+
+ table.manageSnapshots().rollbackTo(commitId1).commit();
+ Snapshot snap3 = table.currentSnapshot();
+ V2Assert.assertEquals("Snapshot sequence number should be 1", 1,
snap3.sequenceNumber());
+ V2Assert.assertEquals("Last sequence number should be 2", 2,
readMetadata().lastSequenceNumber());
+
+ table.newFastAppend().appendFile(FILE_C).commit();
+ Snapshot snap4 = table.currentSnapshot();
+ long commitId4 = snap4.snapshotId();
+ manifestFile = table.currentSnapshot().manifests().get(0);
+ validateSnapshot(snap3, snap4, 3, FILE_C);
+ validateManifest(manifestFile, seqs(3), ids(commitId4), files(FILE_C));
+ V2Assert.assertEquals("Snapshot sequence number should be 1", 3,
snap4.sequenceNumber());
+ V2Assert.assertEquals("Last sequence number should be 3", 3,
readMetadata().lastSequenceNumber());
+ }
+
+ @Test
+ public void testSingleTransaction() {
+ Transaction txn = table.newTransaction();
+ txn.newAppend().appendFile(FILE_A).commit();
+ txn.commitTransaction();
+ Snapshot snap = table.currentSnapshot();
+ long commitId = snap.snapshotId();
+ ManifestFile manifestFile = table.currentSnapshot().manifests().get(0);
+ validateSnapshot(null, snap, 1, FILE_A);
+ validateManifest(manifestFile, seqs(1), ids(commitId), files(FILE_A));
+ V2Assert.assertEquals("Snapshot sequence number should be 1", 1,
snap.sequenceNumber());
+ V2Assert.assertEquals("Last sequence number should be 1", 1,
readMetadata().lastSequenceNumber());
+ }
+
+ @Test
+ public void testConcurrentTransaction() {
+ Transaction txn1 = table.newTransaction();
+ Transaction txn2 = table.newTransaction();
+ Transaction txn3 = table.newTransaction();
+ Transaction txn4 = table.newTransaction();
+
+ txn1.newFastAppend().appendFile(FILE_A).commit();
+ txn3.newOverwrite().addFile(FILE_C).commit();
+ txn4.newDelete().deleteFile(FILE_A).commit();
+ txn2.newAppend().appendFile(FILE_B).commit();
+
+ txn1.commitTransaction();
+ Snapshot snap1 = table.currentSnapshot();
+ long commitId1 = snap1.snapshotId();
+ ManifestFile manifestFile1 = table.currentSnapshot().manifests().get(0);
+ validateSnapshot(null, snap1, 1, FILE_A);
+ validateManifest(manifestFile1, seqs(1), ids(commitId1), files(FILE_A));
+ V2Assert.assertEquals("Snapshot sequence number should be 1", 1,
snap1.sequenceNumber());
+ V2Assert.assertEquals("Last sequence number should be 1", 1,
readMetadata().lastSequenceNumber());
+
+ txn2.commitTransaction();
+ Snapshot snap2 = table.currentSnapshot();
+ long commitId2 = snap2.snapshotId();
+ ManifestFile manifestFile = table.currentSnapshot().manifests().get(0);
+ validateSnapshot(snap1, snap2, 2, FILE_B);
+ validateManifest(manifestFile, seqs(2), ids(commitId2), files(FILE_B));
+ V2Assert.assertEquals("Snapshot sequence number should be 2", 2,
snap2.sequenceNumber());
+ V2Assert.assertEquals("Last sequence number should be 2", 2,
readMetadata().lastSequenceNumber());
+
+ txn3.commitTransaction();
+ Snapshot snap3 = table.currentSnapshot();
+ long commitId3 = snap3.snapshotId();
+ manifestFile = table.currentSnapshot().manifests().stream()
+ .filter(manifest -> manifest.snapshotId() == commitId3)
+ .collect(Collectors.toList()).get(0);
+ validateManifest(manifestFile, seqs(3), ids(commitId3), files(FILE_C));
+ validateSnapshot(snap2, snap3, 3, FILE_C);
+ V2Assert.assertEquals("Snapshot sequence number should be 3", 3,
snap3.sequenceNumber());
+ V2Assert.assertEquals("Last sequence number should be 3", 3,
readMetadata().lastSequenceNumber());
+
+ txn4.commitTransaction();
+ Snapshot snap4 = table.currentSnapshot();
+ long commitId4 = snap4.snapshotId();
+ manifestFile = table.currentSnapshot().manifests().stream()
+ .filter(manifest -> manifest.snapshotId() == commitId4)
+ .collect(Collectors.toList()).get(0);
+ validateManifest(manifestFile, seqs(3, 2, 4), ids(commitId3, commitId2,
commitId4), files(FILE_C, FILE_B, FILE_A));
+ V2Assert.assertEquals("Snapshot sequence number should be 4", 4,
snap4.sequenceNumber());
+ V2Assert.assertEquals("Last sequence number should be 4", 4,
readMetadata().lastSequenceNumber());
+ }
+
+ @Test
+ public void testMultipleOperationsTransaction() {
+ Transaction txn = table.newTransaction();
+ txn.newFastAppend().appendFile(FILE_A).commit();
+ Snapshot snap1 = txn.table().currentSnapshot();
+ long commitId1 = snap1.snapshotId();
+ ManifestFile manifestFile = snap1.manifests().get(0);
+ validateSnapshot(null, snap1, 1, FILE_A);
+ validateManifest(manifestFile, seqs(1), ids(commitId1), files(FILE_A));
+ V2Assert.assertEquals("Snapshot sequence number should be 1", 1,
snap1.sequenceNumber());
+ V2Assert.assertEquals("Last sequence number should be 0", 0,
readMetadata().lastSequenceNumber());
+
+ Set<DataFile> toAddFiles = new HashSet<>();
+ Set<DataFile> toDeleteFiles = new HashSet<>();
+ toAddFiles.add(FILE_B);
+ toDeleteFiles.add(FILE_A);
+ txn.newRewrite().rewriteFiles(toDeleteFiles, toAddFiles).commit();
+ txn.commitTransaction();
+
+ Snapshot snap2 = table.currentSnapshot();
+ long commitId2 = snap2.snapshotId();
+ manifestFile = snap2.manifests().stream()
+ .filter(manifest -> manifest.snapshotId() == commitId2)
+ .collect(Collectors.toList()).get(0);
+
+ validateManifest(manifestFile, seqs(2), ids(commitId2), files(FILE_B));
+ V2Assert.assertEquals("Snapshot sequence number should be 2", 2,
snap2.sequenceNumber());
+ V2Assert.assertEquals("Last sequence number should be 2", 2,
readMetadata().lastSequenceNumber());
+ }
+
+ @Test
+ public void testExpirationInTransaction() {
+ table.newFastAppend().appendFile(FILE_A).commit();
+ Snapshot snap1 = table.currentSnapshot();
+ long commitId1 = snap1.snapshotId();
+ ManifestFile manifestFile = table.currentSnapshot().manifests().get(0);
+ validateSnapshot(null, snap1, 1, FILE_A);
+ validateManifest(manifestFile, seqs(1), ids(commitId1), files(FILE_A));
+ V2Assert.assertEquals("Snapshot sequence number should be 1", 1,
snap1.sequenceNumber());
+ V2Assert.assertEquals("Last sequence number should be 1", 1,
readMetadata().lastSequenceNumber());
+
+ table.newAppend().appendFile(FILE_B).commit();
+ Snapshot snap2 = table.currentSnapshot();
+ long commitId2 = snap2.snapshotId();
+ manifestFile = table.currentSnapshot().manifests().get(0);
+ validateSnapshot(snap1, snap2, 2, FILE_B);
+ validateManifest(manifestFile, seqs(2), ids(commitId2), files(FILE_B));
+ V2Assert.assertEquals("Snapshot sequence number should be 2", 2,
snap2.sequenceNumber());
+ V2Assert.assertEquals("Last sequence number should be 2", 2,
readMetadata().lastSequenceNumber());
+
+ Transaction txn = table.newTransaction();
+ txn.expireSnapshots().expireSnapshotId(commitId1).commit();
+ txn.commitTransaction();
+ V2Assert.assertEquals("Last sequence number should be 2", 2,
readMetadata().lastSequenceNumber());
+ }
+
+ @Test
+ public void testTransactionFailure() {
+ table.newAppend()
+ .appendFile(FILE_A)
+ .appendFile(FILE_B)
+ .commit();
+ Snapshot snap1 = table.currentSnapshot();
+ long commitId1 = snap1.snapshotId();
+ ManifestFile manifestFile = table.currentSnapshot().manifests().get(0);
+ validateSnapshot(null, snap1, 1, FILE_A, FILE_B);
+ validateManifest(manifestFile, seqs(1, 1), ids(commitId1, commitId1),
files(FILE_A, FILE_B));
+ V2Assert.assertEquals("Snapshot sequence number should be 1", 1,
snap1.sequenceNumber());
+ V2Assert.assertEquals("Last sequence number should be 1", 1,
readMetadata().lastSequenceNumber());
+
+ table.updateProperties()
+ .set(TableProperties.COMMIT_NUM_RETRIES, "0")
+ .commit();
+
+ table.ops().failCommits(1);
+
+ Transaction txn = table.newTransaction();
+ txn.newAppend().appendFile(FILE_C).commit();
+
+ AssertHelpers.assertThrows("Transaction commit should fail",
+ CommitFailedException.class, "Injected failure",
txn::commitTransaction);
+
+ V2Assert.assertEquals("Last sequence number should be 1", 1,
readMetadata().lastSequenceNumber());
+ }
+
+ @Test
+ public void testCherryPicking() {
+ table.newAppend()
+ .appendFile(FILE_A)
+ .commit();
+ Snapshot snap1 = table.currentSnapshot();
+ long commitId1 = snap1.snapshotId();
+ ManifestFile manifestFile = snap1.manifests().get(0);
+ validateSnapshot(null, snap1, 1, FILE_A);
+ validateManifest(manifestFile, seqs(1), ids(commitId1), files(FILE_A));
+ V2Assert.assertEquals("Snapshot sequence number should be 1", 1,
snap1.sequenceNumber());
+ V2Assert.assertEquals("Last sequence number should be 1", 1,
readMetadata().lastSequenceNumber());
+
+ table.newAppend()
+ .appendFile(FILE_B)
+ .stageOnly()
+ .commit();
+
+ Snapshot snap2 = table.currentSnapshot();
+ V2Assert.assertEquals("Snapshot sequence number should be 1", 1,
snap2.sequenceNumber());
+ V2Assert.assertEquals("Last sequence number should be 2", 2,
readMetadata().lastSequenceNumber());
+
+ // pick the snapshot that's staged but not committed
+ Snapshot stagedSnapshot = readMetadata().snapshots().get(1);
+ V2Assert.assertEquals("Snapshot sequence number should be 2", 2,
stagedSnapshot.sequenceNumber());
+
+ // table has new commit
+ table.newAppend()
+ .appendFile(FILE_C)
+ .commit();
+
+ Snapshot snap3 = table.currentSnapshot();
+ long commitId3 = snap3.snapshotId();
+ manifestFile = snap3.manifests().get(0);
+ validateManifest(manifestFile, seqs(3), ids(commitId3), files(FILE_C));
+ validateSnapshot(snap2, snap3, 3, FILE_C);
+ V2Assert.assertEquals("Snapshot sequence number should be 3", 3,
snap3.sequenceNumber());
+ V2Assert.assertEquals("Last sequence number should be 3", 3,
readMetadata().lastSequenceNumber());
+
+ // cherry-pick snapshot
+ table.manageSnapshots().cherrypick(stagedSnapshot.snapshotId()).commit();
+ Snapshot snap4 = table.currentSnapshot();
+ long commitId4 = snap4.snapshotId();
+ manifestFile = table.currentSnapshot().manifests().get(0);
+ validateManifest(manifestFile, seqs(4), ids(commitId4), files(FILE_B));
+ validateSnapshot(snap3, snap4, 4, FILE_B);
+ V2Assert.assertEquals("Snapshot sequence number should be 4", 4,
snap4.sequenceNumber());
+ V2Assert.assertEquals("Last sequence number should be 4", 4,
readMetadata().lastSequenceNumber());
+ }
+
+ @Test
+ public void testCherryPickFastForward() {
+ table.newAppend()
+ .appendFile(FILE_A)
+ .commit();
+ Snapshot snap1 = table.currentSnapshot();
+ long commitId1 = snap1.snapshotId();
+ ManifestFile manifestFile = snap1.manifests().get(0);
+ validateSnapshot(null, snap1, 1, FILE_A);
+ validateManifest(manifestFile, seqs(1), ids(commitId1), files(FILE_A));
+ V2Assert.assertEquals("Snapshot sequence number should be 1", 1,
snap1.sequenceNumber());
+ V2Assert.assertEquals("Last sequence number should be 1", 1,
readMetadata().lastSequenceNumber());
+
+ table.newAppend()
+ .appendFile(FILE_B)
+ .stageOnly()
+ .commit();
+ Snapshot snap2 = table.currentSnapshot();
+ V2Assert.assertEquals("Snapshot sequence number should be 1", 1,
snap2.sequenceNumber());
+ V2Assert.assertEquals("Last sequence number should be 2", 2,
readMetadata().lastSequenceNumber());
+
+ // pick the snapshot that's staged but not committed
+ Snapshot stagedSnapshot = readMetadata().snapshots().get(1);
+ V2Assert.assertEquals("Snapshot sequence number should be 2", 2,
stagedSnapshot.sequenceNumber());
+
+ // cherry-pick snapshot, this will fast forward
+ table.manageSnapshots().cherrypick(stagedSnapshot.snapshotId()).commit();
+ Snapshot snap3 = table.currentSnapshot();
+ long commitId3 = snap3.snapshotId();
+ manifestFile = snap3.manifests().get(0);
+ validateManifest(manifestFile, seqs(2), ids(commitId3), files(FILE_B));
+ validateSnapshot(snap2, snap3, 2, FILE_B);
+ V2Assert.assertEquals("Snapshot sequence number should be 2", 2,
snap3.sequenceNumber());
+ V2Assert.assertEquals("Last sequence number should be 2", 2,
readMetadata().lastSequenceNumber());
+ }
+
+}