rdblue commented on a change in pull request #974: URL: https://github.com/apache/incubator-iceberg/pull/974#discussion_r430026545
########## File path: core/src/test/java/org/apache/iceberg/TestSequenceNumberForV2Table.java ########## @@ -0,0 +1,344 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.iceberg; + +import java.io.IOException; +import java.util.HashMap; +import java.util.HashSet; +import java.util.Iterator; +import java.util.Map; +import java.util.Set; +import java.util.stream.Collectors; +import org.apache.iceberg.exceptions.CommitFailedException; +import org.junit.Assert; +import org.junit.Test; + +public class TestSequenceNumberForV2Table extends TableTestBase { + + public TestSequenceNumberForV2Table() { + super(2); + } + + @Test + public void testFastAppend() throws IOException { Review comment: The motivation for #1038 was to cover more cases for each action. Instead of having one test case for `FastAppend`, we should add the necessary assertions to all `FastAppend` tests. That keeps the tests in one place and increases coverage of situations that validate sequence numbers. Could you make sure that the cases that you're testing here are covered by assertions in the `FastAppend` tests so we don't need this case? Ideally, I'd prefer to add assertions to all of the operation tests instead of this suite, but for now I think it would be reasonable to just update the cases in `FastAppend` and add the rest of this suite. We can replace test cases here with assertions in the right test suites over a few more PRs. ########## File path: core/src/test/java/org/apache/iceberg/TestSequenceNumberForV2Table.java ########## @@ -0,0 +1,299 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.iceberg; + +import java.io.IOException; +import java.util.ArrayList; +import java.util.HashSet; +import java.util.List; +import java.util.Set; +import java.util.concurrent.Callable; +import java.util.concurrent.ExecutorService; +import java.util.concurrent.Executors; +import java.util.stream.Collectors; +import org.junit.Assert; +import org.junit.Test; + +public class TestSequenceNumberForV2Table extends TableTestBase { + + public TestSequenceNumberForV2Table() { + super(2); + } + + @Test + public void testSequenceNumberForFastAppend() throws IOException { + table.newFastAppend().appendFile(FILE_A).commit(); + Assert.assertEquals(1, table.currentSnapshot().sequenceNumber()); + ManifestFile manifestFile = table.currentSnapshot().manifests().get(0); + Assert.assertEquals(1, manifestFile.sequenceNumber()); + + table.newFastAppend().appendFile(FILE_B).commit(); + Assert.assertEquals(2, table.currentSnapshot().sequenceNumber()); + manifestFile = table.currentSnapshot().manifests().stream() + .filter(manifest -> manifest.snapshotId() == table.currentSnapshot().snapshotId()) + .collect(Collectors.toList()).get(0); + Assert.assertEquals(2, manifestFile.sequenceNumber()); + + manifestFile = writeManifest(FILE_C, FILE_D); + table.newFastAppend().appendManifest(manifestFile).commit(); + Assert.assertEquals(3, table.currentSnapshot().sequenceNumber()); + + manifestFile = table.currentSnapshot().manifests().stream() + .filter(manifest -> manifest.snapshotId() == table.currentSnapshot().snapshotId()) + .collect(Collectors.toList()).get(0); + Assert.assertEquals(3, manifestFile.sequenceNumber()); + + for (ManifestEntry entry : ManifestFiles.read(manifestFile, + table.io(), table.ops().current().specsById()).entries()) { + if (entry.file().path().equals(FILE_C.path()) || entry.file().path().equals(FILE_D.path())) { + Assert.assertEquals(3, entry.sequenceNumber().longValue()); + } + } + } + + @Test + public void testSequenceNumberForMergeAppend() throws IOException { + table.updateProperties() + .set(TableProperties.MANIFEST_MIN_MERGE_COUNT, "1") + .commit(); + table.newAppend().appendFile(FILE_A).commit(); + Assert.assertEquals(1, table.currentSnapshot().sequenceNumber()); + + table.newAppend().appendFile(FILE_B).commit(); + Assert.assertEquals(2, table.currentSnapshot().sequenceNumber()); + + ManifestFile manifestFile = writeManifest(FILE_C, FILE_D); + table.newAppend().appendManifest(manifestFile).commit(); + Assert.assertEquals(3, table.currentSnapshot().sequenceNumber()); + + manifestFile = table.currentSnapshot().manifests().get(0); + + Assert.assertEquals("the sequence number of manifest should be 3", 3, manifestFile.sequenceNumber()); + + for (ManifestEntry entry : ManifestFiles.read(manifestFile, + table.io(), table.ops().current().specsById()).entries()) { + if (entry.file().path().equals(FILE_A.path())) { + Assert.assertEquals("the sequence number of data file should be 1", 1, entry.sequenceNumber().longValue()); + } + + if (entry.file().path().equals(FILE_B.path())) { + Assert.assertEquals("the sequence number of data file should be 2", 2, entry.sequenceNumber().longValue()); + } + + if (entry.file().path().equals(FILE_C.path()) || entry.file().path().equals(FILE_D.path())) { + Assert.assertEquals("the sequence number of data file should be 3", 3, entry.sequenceNumber().longValue()); + } + } + } + + @Test + public void testSequenceNumberForRewrite() { + table.newFastAppend().appendFile(FILE_A).commit(); + table.newFastAppend().appendFile(FILE_B).commit(); + Assert.assertEquals(2, table.currentSnapshot().sequenceNumber()); + + table.rewriteManifests().clusterBy(file -> "").commit(); + Assert.assertEquals("the sequence number of snapshot should be 3", + 3, table.currentSnapshot().sequenceNumber()); + + ManifestFile newManifest = table.currentSnapshot().manifests().get(0); + Assert.assertEquals("the sequence number of manifest should be 3", + 3, newManifest.sequenceNumber()); + + for (ManifestEntry entry : ManifestFiles.read(newManifest, + table.io(), table.ops().current().specsById()).entries()) { + if (entry.file().path().equals(FILE_A.path())) { + Assert.assertEquals("the sequence number of data file should be 1", 1, entry.sequenceNumber().longValue()); + } + + if (entry.file().path().equals(FILE_B.path())) { + Assert.assertEquals("the sequence number of data file should be 1", 2, entry.sequenceNumber().longValue()); + } + } + } + + @Test + public void testCommitConflict() { + Transaction txn = table.newTransaction(); + + txn.newFastAppend().appendFile(FILE_A).apply(); + table.newFastAppend().appendFile(FILE_B).commit(); + + AssertHelpers.assertThrows("Should failed due to conflict", + IllegalStateException.class, "last operation has not committed", txn::commitTransaction); + + Assert.assertEquals(1, TestTables.load(tableDir, "test").currentSnapshot().sequenceNumber()); + + AppendFiles appendFiles = table.newFastAppend().appendFile(FILE_C); + appendFiles.apply(); + table.newFastAppend().appendFile(FILE_D).commit(); + appendFiles.commit(); + + ManifestFile manifestFile = table.currentSnapshot().manifests().stream() + .filter(manifest -> manifest.snapshotId() == table.currentSnapshot().snapshotId()) + .collect(Collectors.toList()).get(0); + + for (ManifestEntry entry : ManifestFiles.read(manifestFile, table.io(), + table.ops().current().specsById()).entries()) { + if (entry.file().path().equals(FILE_C.path())) { + Assert.assertEquals(table.currentSnapshot().sequenceNumber(), entry.sequenceNumber().longValue()); + } + } + } + + @Test + public void testConcurrentCommit() throws InterruptedException { + ExecutorService threadPool = Executors.newFixedThreadPool(4); + List<Callable<Void>> tasks = new ArrayList<>(); + + Callable<Void> write1 = () -> { + Transaction txn = table.newTransaction(); + txn.newFastAppend().appendFile(FILE_A).commit(); + txn.commitTransaction(); + return null; + }; + + Callable<Void> write2 = () -> { + Transaction txn = table.newTransaction(); + txn.newAppend().appendFile(FILE_B).commit(); + txn.commitTransaction(); + return null; + }; + + Callable<Void> write3 = () -> { + Transaction txn = table.newTransaction(); + txn.newDelete().deleteFile(FILE_A).commit(); + txn.commitTransaction(); + return null; + }; + + Callable<Void> write4 = () -> { + Transaction txn = table.newTransaction(); + txn.newOverwrite().addFile(FILE_D).commit(); + txn.commitTransaction(); + return null; + }; + + tasks.add(write1); + tasks.add(write2); + tasks.add(write3); + tasks.add(write4); + threadPool.invokeAll(tasks); + threadPool.shutdown(); + + Assert.assertEquals(4, TestTables.load(tableDir, "test").currentSnapshot().sequenceNumber()); + } + + @Test + public void testRollBack() { + table.newFastAppend().appendFile(FILE_A).commit(); + long snapshotId = table.currentSnapshot().snapshotId(); + table.newFastAppend().appendFile(FILE_B).commit(); + + Assert.assertEquals(2, TestTables.load(tableDir, "test").currentSnapshot().sequenceNumber()); + + table.manageSnapshots().rollbackTo(snapshotId).commit(); + + Assert.assertEquals(1, TestTables.load(tableDir, "test").currentSnapshot().sequenceNumber()); + } + + @Test + public void testMultipleTxnOperations() { + Snapshot snapshot; + Transaction txn = table.newTransaction(); + txn.newOverwrite().addFile(FILE_A).commit(); + txn.commitTransaction(); + Assert.assertEquals(1, TestTables.load(tableDir, "test").currentSnapshot().sequenceNumber()); + + txn = table.newTransaction(); + Set<DataFile> toAddFiles = new HashSet<>(); + Set<DataFile> toDeleteFiles = new HashSet<>(); + toAddFiles.add(FILE_B); + toDeleteFiles.add(FILE_A); + txn.newRewrite().rewriteFiles(toDeleteFiles, toAddFiles).commit(); + txn.commitTransaction(); + Assert.assertEquals(2, TestTables.load(tableDir, "test").currentSnapshot().sequenceNumber()); + + txn = table.newTransaction(); + txn.newReplacePartitions().addFile(FILE_C).commit(); + txn.commitTransaction(); + Assert.assertEquals(3, TestTables.load(tableDir, "test").currentSnapshot().sequenceNumber()); + + txn = table.newTransaction(); + txn.newDelete().deleteFile(FILE_C).commit(); + txn.commitTransaction(); + + Assert.assertEquals(4, TestTables.load(tableDir, "test").currentSnapshot().sequenceNumber()); + + txn = table.newTransaction(); + txn.newAppend().appendFile(FILE_C).commit(); + txn.commitTransaction(); + Assert.assertEquals(5, TestTables.load(tableDir, "test").currentSnapshot().sequenceNumber()); + + snapshot = table.currentSnapshot(); + txn = table.newTransaction(); + txn.newOverwrite().addFile(FILE_D).deleteFile(FILE_C).commit(); + txn.commitTransaction(); + Assert.assertEquals(6, TestTables.load(tableDir, "test").currentSnapshot().sequenceNumber()); + + txn = table.newTransaction(); + txn.expireSnapshots().expireOlderThan(snapshot.timestampMillis()).commit(); + txn.commitTransaction(); + Assert.assertEquals(6, TestTables.load(tableDir, "test").currentSnapshot().sequenceNumber()); + } + + @Test + public void testSequenceNumberForCherryPicking() { + table.newAppend() + .appendFile(FILE_A) + .commit(); + + // WAP commit + table.newAppend() + .appendFile(FILE_B) + .set("wap.id", "123456789") + .stageOnly() + .commit(); + + Assert.assertEquals("the snapshot sequence number should be 1", 1, + table.currentSnapshot().sequenceNumber()); + + // pick the snapshot that's staged but not committed + Snapshot wapSnapshot = readMetadata().snapshots().get(1); + + Assert.assertEquals("the snapshot sequence number should be 2", 2, + wapSnapshot.sequenceNumber()); + + // table has new commit + table.newAppend() + .appendFile(FILE_C) + .commit(); + + Assert.assertEquals("the snapshot sequence number should be 3", + 3, table.currentSnapshot().sequenceNumber()); + + // cherry-pick snapshot + table.manageSnapshots().cherrypick(wapSnapshot.snapshotId()).commit(); + + Assert.assertEquals("the snapshot sequence number should be 4", + 4, table.currentSnapshot().sequenceNumber()); Review comment: I think this still needs to be updated. ########## File path: core/src/test/java/org/apache/iceberg/TestSequenceNumberForV2Table.java ########## @@ -0,0 +1,299 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.iceberg; + +import java.io.IOException; +import java.util.ArrayList; +import java.util.HashSet; +import java.util.List; +import java.util.Set; +import java.util.concurrent.Callable; +import java.util.concurrent.ExecutorService; +import java.util.concurrent.Executors; +import java.util.stream.Collectors; +import org.junit.Assert; +import org.junit.Test; + +public class TestSequenceNumberForV2Table extends TableTestBase { + + public TestSequenceNumberForV2Table() { + super(2); + } + + @Test + public void testSequenceNumberForFastAppend() throws IOException { + table.newFastAppend().appendFile(FILE_A).commit(); + Assert.assertEquals(1, table.currentSnapshot().sequenceNumber()); + ManifestFile manifestFile = table.currentSnapshot().manifests().get(0); + Assert.assertEquals(1, manifestFile.sequenceNumber()); + + table.newFastAppend().appendFile(FILE_B).commit(); + Assert.assertEquals(2, table.currentSnapshot().sequenceNumber()); + manifestFile = table.currentSnapshot().manifests().stream() + .filter(manifest -> manifest.snapshotId() == table.currentSnapshot().snapshotId()) + .collect(Collectors.toList()).get(0); + Assert.assertEquals(2, manifestFile.sequenceNumber()); + + manifestFile = writeManifest(FILE_C, FILE_D); + table.newFastAppend().appendManifest(manifestFile).commit(); + Assert.assertEquals(3, table.currentSnapshot().sequenceNumber()); + + manifestFile = table.currentSnapshot().manifests().stream() + .filter(manifest -> manifest.snapshotId() == table.currentSnapshot().snapshotId()) + .collect(Collectors.toList()).get(0); + Assert.assertEquals(3, manifestFile.sequenceNumber()); + + for (ManifestEntry entry : ManifestFiles.read(manifestFile, + table.io(), table.ops().current().specsById()).entries()) { + if (entry.file().path().equals(FILE_C.path()) || entry.file().path().equals(FILE_D.path())) { + Assert.assertEquals(3, entry.sequenceNumber().longValue()); + } + } + } + + @Test + public void testSequenceNumberForMergeAppend() throws IOException { + table.updateProperties() + .set(TableProperties.MANIFEST_MIN_MERGE_COUNT, "1") + .commit(); + table.newAppend().appendFile(FILE_A).commit(); + Assert.assertEquals(1, table.currentSnapshot().sequenceNumber()); + + table.newAppend().appendFile(FILE_B).commit(); + Assert.assertEquals(2, table.currentSnapshot().sequenceNumber()); + + ManifestFile manifestFile = writeManifest(FILE_C, FILE_D); + table.newAppend().appendManifest(manifestFile).commit(); + Assert.assertEquals(3, table.currentSnapshot().sequenceNumber()); + + manifestFile = table.currentSnapshot().manifests().get(0); + + Assert.assertEquals("the sequence number of manifest should be 3", 3, manifestFile.sequenceNumber()); + + for (ManifestEntry entry : ManifestFiles.read(manifestFile, + table.io(), table.ops().current().specsById()).entries()) { + if (entry.file().path().equals(FILE_A.path())) { + Assert.assertEquals("the sequence number of data file should be 1", 1, entry.sequenceNumber().longValue()); + } + + if (entry.file().path().equals(FILE_B.path())) { + Assert.assertEquals("the sequence number of data file should be 2", 2, entry.sequenceNumber().longValue()); + } + + if (entry.file().path().equals(FILE_C.path()) || entry.file().path().equals(FILE_D.path())) { + Assert.assertEquals("the sequence number of data file should be 3", 3, entry.sequenceNumber().longValue()); + } + } + } + + @Test + public void testSequenceNumberForRewrite() { + table.newFastAppend().appendFile(FILE_A).commit(); + table.newFastAppend().appendFile(FILE_B).commit(); + Assert.assertEquals(2, table.currentSnapshot().sequenceNumber()); + + table.rewriteManifests().clusterBy(file -> "").commit(); + Assert.assertEquals("the sequence number of snapshot should be 3", + 3, table.currentSnapshot().sequenceNumber()); + + ManifestFile newManifest = table.currentSnapshot().manifests().get(0); + Assert.assertEquals("the sequence number of manifest should be 3", + 3, newManifest.sequenceNumber()); + + for (ManifestEntry entry : ManifestFiles.read(newManifest, + table.io(), table.ops().current().specsById()).entries()) { + if (entry.file().path().equals(FILE_A.path())) { + Assert.assertEquals("the sequence number of data file should be 1", 1, entry.sequenceNumber().longValue()); + } + + if (entry.file().path().equals(FILE_B.path())) { + Assert.assertEquals("the sequence number of data file should be 1", 2, entry.sequenceNumber().longValue()); + } + } + } + + @Test + public void testCommitConflict() { + Transaction txn = table.newTransaction(); + + txn.newFastAppend().appendFile(FILE_A).apply(); + table.newFastAppend().appendFile(FILE_B).commit(); + + AssertHelpers.assertThrows("Should failed due to conflict", + IllegalStateException.class, "last operation has not committed", txn::commitTransaction); + + Assert.assertEquals(1, TestTables.load(tableDir, "test").currentSnapshot().sequenceNumber()); + + AppendFiles appendFiles = table.newFastAppend().appendFile(FILE_C); + appendFiles.apply(); + table.newFastAppend().appendFile(FILE_D).commit(); + appendFiles.commit(); + + ManifestFile manifestFile = table.currentSnapshot().manifests().stream() + .filter(manifest -> manifest.snapshotId() == table.currentSnapshot().snapshotId()) + .collect(Collectors.toList()).get(0); + + for (ManifestEntry entry : ManifestFiles.read(manifestFile, table.io(), + table.ops().current().specsById()).entries()) { + if (entry.file().path().equals(FILE_C.path())) { + Assert.assertEquals(table.currentSnapshot().sequenceNumber(), entry.sequenceNumber().longValue()); + } + } + } + + @Test + public void testConcurrentCommit() throws InterruptedException { + ExecutorService threadPool = Executors.newFixedThreadPool(4); + List<Callable<Void>> tasks = new ArrayList<>(); + + Callable<Void> write1 = () -> { + Transaction txn = table.newTransaction(); + txn.newFastAppend().appendFile(FILE_A).commit(); + txn.commitTransaction(); + return null; + }; + + Callable<Void> write2 = () -> { + Transaction txn = table.newTransaction(); + txn.newAppend().appendFile(FILE_B).commit(); + txn.commitTransaction(); + return null; + }; + + Callable<Void> write3 = () -> { + Transaction txn = table.newTransaction(); + txn.newDelete().deleteFile(FILE_A).commit(); + txn.commitTransaction(); + return null; + }; + + Callable<Void> write4 = () -> { + Transaction txn = table.newTransaction(); + txn.newOverwrite().addFile(FILE_D).commit(); + txn.commitTransaction(); + return null; + }; + + tasks.add(write1); + tasks.add(write2); + tasks.add(write3); + tasks.add(write4); + threadPool.invokeAll(tasks); + threadPool.shutdown(); + + Assert.assertEquals(4, TestTables.load(tableDir, "test").currentSnapshot().sequenceNumber()); + } + + @Test + public void testRollBack() { + table.newFastAppend().appendFile(FILE_A).commit(); + long snapshotId = table.currentSnapshot().snapshotId(); + table.newFastAppend().appendFile(FILE_B).commit(); + + Assert.assertEquals(2, TestTables.load(tableDir, "test").currentSnapshot().sequenceNumber()); + + table.manageSnapshots().rollbackTo(snapshotId).commit(); + + Assert.assertEquals(1, TestTables.load(tableDir, "test").currentSnapshot().sequenceNumber()); + } + + @Test + public void testMultipleTxnOperations() { + Snapshot snapshot; + Transaction txn = table.newTransaction(); + txn.newOverwrite().addFile(FILE_A).commit(); + txn.commitTransaction(); + Assert.assertEquals(1, TestTables.load(tableDir, "test").currentSnapshot().sequenceNumber()); + + txn = table.newTransaction(); + Set<DataFile> toAddFiles = new HashSet<>(); + Set<DataFile> toDeleteFiles = new HashSet<>(); + toAddFiles.add(FILE_B); + toDeleteFiles.add(FILE_A); + txn.newRewrite().rewriteFiles(toDeleteFiles, toAddFiles).commit(); + txn.commitTransaction(); + Assert.assertEquals(2, TestTables.load(tableDir, "test").currentSnapshot().sequenceNumber()); + + txn = table.newTransaction(); + txn.newReplacePartitions().addFile(FILE_C).commit(); + txn.commitTransaction(); + Assert.assertEquals(3, TestTables.load(tableDir, "test").currentSnapshot().sequenceNumber()); + + txn = table.newTransaction(); + txn.newDelete().deleteFile(FILE_C).commit(); + txn.commitTransaction(); + + Assert.assertEquals(4, TestTables.load(tableDir, "test").currentSnapshot().sequenceNumber()); + + txn = table.newTransaction(); + txn.newAppend().appendFile(FILE_C).commit(); + txn.commitTransaction(); + Assert.assertEquals(5, TestTables.load(tableDir, "test").currentSnapshot().sequenceNumber()); + + snapshot = table.currentSnapshot(); + txn = table.newTransaction(); + txn.newOverwrite().addFile(FILE_D).deleteFile(FILE_C).commit(); + txn.commitTransaction(); + Assert.assertEquals(6, TestTables.load(tableDir, "test").currentSnapshot().sequenceNumber()); + + txn = table.newTransaction(); + txn.expireSnapshots().expireOlderThan(snapshot.timestampMillis()).commit(); + txn.commitTransaction(); + Assert.assertEquals(6, TestTables.load(tableDir, "test").currentSnapshot().sequenceNumber()); + } + + @Test + public void testSequenceNumberForCherryPicking() { + table.newAppend() + .appendFile(FILE_A) + .commit(); + + // WAP commit + table.newAppend() + .appendFile(FILE_B) + .set("wap.id", "123456789") + .stageOnly() + .commit(); + + Assert.assertEquals("the snapshot sequence number should be 1", 1, + table.currentSnapshot().sequenceNumber()); + + // pick the snapshot that's staged but not committed + Snapshot wapSnapshot = readMetadata().snapshots().get(1); + + Assert.assertEquals("the snapshot sequence number should be 2", 2, + wapSnapshot.sequenceNumber()); + + // table has new commit + table.newAppend() + .appendFile(FILE_C) + .commit(); + + Assert.assertEquals("the snapshot sequence number should be 3", + 3, table.currentSnapshot().sequenceNumber()); + + // cherry-pick snapshot + table.manageSnapshots().cherrypick(wapSnapshot.snapshotId()).commit(); + + Assert.assertEquals("the snapshot sequence number should be 4", + 4, table.currentSnapshot().sequenceNumber()); Review comment: Yeah, sorry I missed that. ########## File path: core/src/test/java/org/apache/iceberg/TestSequenceNumberForV2Table.java ########## @@ -0,0 +1,328 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.iceberg; + +import java.io.IOException; +import java.util.HashMap; +import java.util.HashSet; +import java.util.Iterator; +import java.util.Map; +import java.util.Set; +import java.util.stream.Collectors; +import org.apache.iceberg.exceptions.CommitFailedException; +import org.junit.Assert; +import org.junit.Test; + +public class TestSequenceNumberForV2Table extends TableTestBase { + + public TestSequenceNumberForV2Table() { + super(2); + } + + @Test + public void testMergeAppend() throws IOException { + table.newAppend().appendFile(FILE_A).commit(); + ManifestFile manifestFile = table.currentSnapshot().manifests().get(0); + validateManifestEntries(manifestFile, 1, files(FILE_A), seqs(1)); + table.newAppend().appendFile(FILE_B).commit(); + manifestFile = table.currentSnapshot().manifests().get(0); + validateManifestEntries(manifestFile, 2, files(FILE_B), seqs(2)); + + table.newAppend() + .appendManifest(writeManifest("input-m0.avro", + manifestEntry(ManifestEntry.Status.ADDED, null, FILE_C))) + .commit(); + + validateDataFiles(files(FILE_A, FILE_B, FILE_C), seqs(1, 2, 3)); + + table.updateProperties() + .set(TableProperties.MANIFEST_MIN_MERGE_COUNT, "1") + .commit(); + + table.newAppend() + .appendManifest(writeManifest("input-m1.avro", + manifestEntry(ManifestEntry.Status.ADDED, null, FILE_D))) + .commit(); + + manifestFile = table.currentSnapshot().manifests().stream() + .filter(manifest -> manifest.snapshotId() == table.currentSnapshot().snapshotId()) + .collect(Collectors.toList()).get(0); + validateManifestEntries(manifestFile, 4, files(FILE_A, FILE_B, FILE_C, FILE_D), seqs(1, 2, 3, 4)); + validateDataFiles(files(FILE_A, FILE_B, FILE_C, FILE_D), seqs(1, 2, 3, 4)); + } + + @Test + public void testRewrite() { + table.newFastAppend().appendFile(FILE_A).commit(); + table.newFastAppend().appendFile(FILE_B).commit(); + Assert.assertEquals(2, table.currentSnapshot().sequenceNumber()); + + table.rewriteManifests().clusterBy(file -> "").commit(); + Assert.assertEquals("Snapshot sequence number should be 3", + 3, table.currentSnapshot().sequenceNumber()); + + ManifestFile newManifest = table.currentSnapshot().manifests().stream() + .filter(manifest -> manifest.snapshotId() == table.currentSnapshot().snapshotId()) + .collect(Collectors.toList()).get(0); + + validateManifestEntries(newManifest, 3, files(FILE_A, FILE_B), seqs(1, 2)); + } + + @Test + public void testCommitConflict() { + AppendFiles appendA = table.newFastAppend(); + appendA.appendFile(FILE_A).apply(); + + table.updateProperties() + .set(TableProperties.COMMIT_NUM_RETRIES, "0") + .commit(); + + table.ops().failCommits(1); + + AssertHelpers.assertThrows("Should reject commit", + CommitFailedException.class, "Injected failure", + () -> table.newFastAppend().appendFile(FILE_B).commit()); + + table.updateProperties() + .set(TableProperties.COMMIT_NUM_RETRIES, "5") + .commit(); + + appendA.commit(); + + Assert.assertEquals(1, table.currentSnapshot().sequenceNumber()); + + AppendFiles appendFiles = table.newFastAppend().appendFile(FILE_C); + appendFiles.apply(); + table.newFastAppend().appendFile(FILE_D).commit(); + appendFiles.commit(); + + validateDataFiles(files(FILE_A, FILE_D, FILE_C), seqs(1, 2, 3)); + } + + @Test + public void testRollBack() { + table.newFastAppend().appendFile(FILE_A).commit(); + long snapshotId = table.currentSnapshot().snapshotId(); + + table.newFastAppend().appendFile(FILE_B).commit(); + Assert.assertEquals("Snapshot sequence number should match expected", + 2, table.currentSnapshot().sequenceNumber()); + + table.manageSnapshots().rollbackTo(snapshotId).commit(); + Assert.assertEquals("Snapshot sequence number should match expected", + 1, table.currentSnapshot().sequenceNumber()); + + Assert.assertEquals("Table last sequence number should be 2", + 2, table.operations().current().lastSequenceNumber()); + + table.newFastAppend().appendFile(FILE_C).commit(); + Assert.assertEquals("Snapshot sequence number should match expected", + 3, table.currentSnapshot().sequenceNumber()); + } + + @Test + public void testSingleTransaction() { + Transaction txn = table.newTransaction(); + txn.newAppend().appendFile(FILE_A).commit(); + txn.commitTransaction(); + Assert.assertEquals("Snapshot sequence number should match expected", + 1, table.currentSnapshot().sequenceNumber()); + } + + @Test + public void testConcurrentTransaction() { + Transaction txn1 = table.newTransaction(); + Transaction txn2 = table.newTransaction(); + Transaction txn3 = table.newTransaction(); + Transaction txn4 = table.newTransaction(); + + txn1.newFastAppend().appendFile(FILE_A).commit(); + txn3.newOverwrite().addFile(FILE_C).commit(); + txn4.newDelete().deleteFile(FILE_A).commit(); + txn2.newAppend().appendFile(FILE_B).commit(); + + txn1.commitTransaction(); + txn2.commitTransaction(); + txn3.commitTransaction(); + txn4.commitTransaction(); + + Assert.assertEquals("Snapshot sequence number should match expected", + 4, table.currentSnapshot().sequenceNumber()); + validateDataFiles(files(FILE_A, FILE_B, FILE_C), seqs(4, 2, 3)); + } + + @Test + public void testMultipleOperationsTransaction() { + Transaction txn = table.newTransaction(); + txn.newAppend().appendFile(FILE_A).commit(); + Set<DataFile> toAddFiles = new HashSet<>(); + Set<DataFile> toDeleteFiles = new HashSet<>(); + toAddFiles.add(FILE_B); + toDeleteFiles.add(FILE_A); + txn.newRewrite().rewriteFiles(toDeleteFiles, toAddFiles).commit(); + txn.commitTransaction(); + + Assert.assertEquals("Snapshot sequence number should match expected", + 2, table.currentSnapshot().sequenceNumber()); + validateDataFiles(files(FILE_A, FILE_B), seqs(2, 2)); + } + + @Test + public void testExpirationInTransaction() { + table.newFastAppend().appendFile(FILE_A).commit(); + long snapshotId = table.currentSnapshot().snapshotId(); + table.newAppend().appendFile(FILE_B).commit(); + + Transaction txn = table.newTransaction(); + txn.expireSnapshots().expireSnapshotId(snapshotId).commit(); + txn.commitTransaction(); + + Assert.assertEquals("Snapshot sequence number should match expected", + 2, table.currentSnapshot().sequenceNumber()); + validateDataFiles(files(FILE_B), seqs(2)); + } + + @Test + public void testTransactionFailure() { + table.newAppend() + .appendFile(FILE_A) + .appendFile(FILE_B) + .commit(); + + table.updateProperties() + .set(TableProperties.COMMIT_NUM_RETRIES, "0") + .commit(); + + table.ops().failCommits(1); + + Transaction txn = table.newTransaction(); + txn.newAppend().appendFile(FILE_C).commit(); + + AssertHelpers.assertThrows("Transaction commit should fail", + CommitFailedException.class, "Injected failure", txn::commitTransaction); + + Assert.assertEquals("Snapshot sequence number should match expected", + 1, table.currentSnapshot().sequenceNumber()); + } + + @Test + public void testCherryPicking() { + table.newAppend() + .appendFile(FILE_A) + .commit(); + + table.newAppend() + .appendFile(FILE_B) + .stageOnly() + .commit(); + + Assert.assertEquals("Snapshot sequence number should be 1", 1, + table.currentSnapshot().sequenceNumber()); + + // pick the snapshot that's staged but not committed + Snapshot stagedSnapshot = readMetadata().snapshots().get(1); + + Assert.assertEquals("Snapshot sequence number should be 2", 2, + stagedSnapshot.sequenceNumber()); + + // table has new commit + table.newAppend() + .appendFile(FILE_C) + .commit(); + + Assert.assertEquals("Snapshot sequence number should be 3", + 3, table.currentSnapshot().sequenceNumber()); + + // cherry-pick snapshot + table.manageSnapshots().cherrypick(stagedSnapshot.snapshotId()).commit(); + + Assert.assertEquals("Snapshot sequence number should be 4", + 4, table.currentSnapshot().sequenceNumber()); + + + validateDataFiles(files(FILE_A, FILE_B, FILE_C), seqs(1, 4, 3)); + } + + @Test + public void testCherryPickFastForward() { + table.newAppend() + .appendFile(FILE_A) + .commit(); + + table.newAppend() + .appendFile(FILE_B) + .stageOnly() + .commit(); + + Assert.assertEquals("Snapshot sequence number should be 1", 1, + table.currentSnapshot().sequenceNumber()); + + // pick the snapshot that's staged but not committed + Snapshot stagedSnapshot = readMetadata().snapshots().get(1); + + Assert.assertEquals("Snapshot sequence number should be 2", 2, + stagedSnapshot.sequenceNumber()); + + // cherry-pick snapshot, this will fast forward + table.manageSnapshots().cherrypick(stagedSnapshot.snapshotId()).commit(); + Assert.assertEquals("Snapshot sequence number should be 2", + 2, table.currentSnapshot().sequenceNumber()); + + validateDataFiles(files(FILE_A, FILE_B), seqs(1, 2)); + } + + void validateDataFiles(Iterator<DataFile> files, Iterator<Long> expectedSeqs) { Review comment: These tests should use the validation methods from TableTestBase instead of adding its own. This method causes the test cases in this PR to be confusing because sometimes the sequence number is for a delete. Instead, the test cases need to be very specific about metadata. All the metadata for every change should be validated: * Each operation that produces a sequence number should assert the number of changed manifests * Each changed manifest should have its entries validated, including file status, snapshot id, and sequence number * Operations that do not produce a sequence number should validate that the sequence number did not change using the last sequence number in table metadata. ########## File path: core/src/test/java/org/apache/iceberg/TestSequenceNumberForV2Table.java ########## @@ -0,0 +1,328 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.iceberg; + +import java.io.IOException; +import java.util.HashMap; +import java.util.HashSet; +import java.util.Iterator; +import java.util.Map; +import java.util.Set; +import java.util.stream.Collectors; +import org.apache.iceberg.exceptions.CommitFailedException; +import org.junit.Assert; +import org.junit.Test; + +public class TestSequenceNumberForV2Table extends TableTestBase { + + public TestSequenceNumberForV2Table() { + super(2); + } + + @Test + public void testMergeAppend() throws IOException { + table.newAppend().appendFile(FILE_A).commit(); + ManifestFile manifestFile = table.currentSnapshot().manifests().get(0); + validateManifestEntries(manifestFile, 1, files(FILE_A), seqs(1)); + table.newAppend().appendFile(FILE_B).commit(); + manifestFile = table.currentSnapshot().manifests().get(0); + validateManifestEntries(manifestFile, 2, files(FILE_B), seqs(2)); + + table.newAppend() + .appendManifest(writeManifest("input-m0.avro", + manifestEntry(ManifestEntry.Status.ADDED, null, FILE_C))) + .commit(); + + validateDataFiles(files(FILE_A, FILE_B, FILE_C), seqs(1, 2, 3)); + + table.updateProperties() + .set(TableProperties.MANIFEST_MIN_MERGE_COUNT, "1") + .commit(); + + table.newAppend() + .appendManifest(writeManifest("input-m1.avro", + manifestEntry(ManifestEntry.Status.ADDED, null, FILE_D))) + .commit(); + + manifestFile = table.currentSnapshot().manifests().stream() + .filter(manifest -> manifest.snapshotId() == table.currentSnapshot().snapshotId()) + .collect(Collectors.toList()).get(0); + validateManifestEntries(manifestFile, 4, files(FILE_A, FILE_B, FILE_C, FILE_D), seqs(1, 2, 3, 4)); + validateDataFiles(files(FILE_A, FILE_B, FILE_C, FILE_D), seqs(1, 2, 3, 4)); + } + + @Test + public void testRewrite() { + table.newFastAppend().appendFile(FILE_A).commit(); + table.newFastAppend().appendFile(FILE_B).commit(); + Assert.assertEquals(2, table.currentSnapshot().sequenceNumber()); + + table.rewriteManifests().clusterBy(file -> "").commit(); + Assert.assertEquals("Snapshot sequence number should be 3", + 3, table.currentSnapshot().sequenceNumber()); + + ManifestFile newManifest = table.currentSnapshot().manifests().stream() + .filter(manifest -> manifest.snapshotId() == table.currentSnapshot().snapshotId()) + .collect(Collectors.toList()).get(0); + + validateManifestEntries(newManifest, 3, files(FILE_A, FILE_B), seqs(1, 2)); + } + + @Test + public void testCommitConflict() { + AppendFiles appendA = table.newFastAppend(); + appendA.appendFile(FILE_A).apply(); + + table.updateProperties() + .set(TableProperties.COMMIT_NUM_RETRIES, "0") + .commit(); + + table.ops().failCommits(1); + + AssertHelpers.assertThrows("Should reject commit", + CommitFailedException.class, "Injected failure", + () -> table.newFastAppend().appendFile(FILE_B).commit()); + + table.updateProperties() + .set(TableProperties.COMMIT_NUM_RETRIES, "5") + .commit(); + + appendA.commit(); + + Assert.assertEquals(1, table.currentSnapshot().sequenceNumber()); + + AppendFiles appendFiles = table.newFastAppend().appendFile(FILE_C); + appendFiles.apply(); + table.newFastAppend().appendFile(FILE_D).commit(); + appendFiles.commit(); + + validateDataFiles(files(FILE_A, FILE_D, FILE_C), seqs(1, 2, 3)); + } + + @Test + public void testRollBack() { + table.newFastAppend().appendFile(FILE_A).commit(); + long snapshotId = table.currentSnapshot().snapshotId(); + + table.newFastAppend().appendFile(FILE_B).commit(); + Assert.assertEquals("Snapshot sequence number should match expected", + 2, table.currentSnapshot().sequenceNumber()); + + table.manageSnapshots().rollbackTo(snapshotId).commit(); + Assert.assertEquals("Snapshot sequence number should match expected", + 1, table.currentSnapshot().sequenceNumber()); + + Assert.assertEquals("Table last sequence number should be 2", + 2, table.operations().current().lastSequenceNumber()); + + table.newFastAppend().appendFile(FILE_C).commit(); + Assert.assertEquals("Snapshot sequence number should match expected", + 3, table.currentSnapshot().sequenceNumber()); + } + + @Test + public void testSingleTransaction() { + Transaction txn = table.newTransaction(); + txn.newAppend().appendFile(FILE_A).commit(); + txn.commitTransaction(); + Assert.assertEquals("Snapshot sequence number should match expected", + 1, table.currentSnapshot().sequenceNumber()); + } + + @Test + public void testConcurrentTransaction() { + Transaction txn1 = table.newTransaction(); + Transaction txn2 = table.newTransaction(); + Transaction txn3 = table.newTransaction(); + Transaction txn4 = table.newTransaction(); + + txn1.newFastAppend().appendFile(FILE_A).commit(); + txn3.newOverwrite().addFile(FILE_C).commit(); + txn4.newDelete().deleteFile(FILE_A).commit(); + txn2.newAppend().appendFile(FILE_B).commit(); + + txn1.commitTransaction(); + txn2.commitTransaction(); + txn3.commitTransaction(); + txn4.commitTransaction(); + + Assert.assertEquals("Snapshot sequence number should match expected", + 4, table.currentSnapshot().sequenceNumber()); + validateDataFiles(files(FILE_A, FILE_B, FILE_C), seqs(4, 2, 3)); + } + + @Test + public void testMultipleOperationsTransaction() { + Transaction txn = table.newTransaction(); + txn.newAppend().appendFile(FILE_A).commit(); + Set<DataFile> toAddFiles = new HashSet<>(); + Set<DataFile> toDeleteFiles = new HashSet<>(); + toAddFiles.add(FILE_B); + toDeleteFiles.add(FILE_A); + txn.newRewrite().rewriteFiles(toDeleteFiles, toAddFiles).commit(); + txn.commitTransaction(); + + Assert.assertEquals("Snapshot sequence number should match expected", + 2, table.currentSnapshot().sequenceNumber()); + validateDataFiles(files(FILE_A, FILE_B), seqs(2, 2)); + } + + @Test + public void testExpirationInTransaction() { + table.newFastAppend().appendFile(FILE_A).commit(); + long snapshotId = table.currentSnapshot().snapshotId(); + table.newAppend().appendFile(FILE_B).commit(); + + Transaction txn = table.newTransaction(); + txn.expireSnapshots().expireSnapshotId(snapshotId).commit(); + txn.commitTransaction(); + + Assert.assertEquals("Snapshot sequence number should match expected", + 2, table.currentSnapshot().sequenceNumber()); + validateDataFiles(files(FILE_B), seqs(2)); + } + + @Test + public void testTransactionFailure() { + table.newAppend() + .appendFile(FILE_A) + .appendFile(FILE_B) + .commit(); + + table.updateProperties() + .set(TableProperties.COMMIT_NUM_RETRIES, "0") + .commit(); + + table.ops().failCommits(1); + + Transaction txn = table.newTransaction(); + txn.newAppend().appendFile(FILE_C).commit(); + + AssertHelpers.assertThrows("Transaction commit should fail", + CommitFailedException.class, "Injected failure", txn::commitTransaction); + + Assert.assertEquals("Snapshot sequence number should match expected", + 1, table.currentSnapshot().sequenceNumber()); Review comment: This assertion validates that the table's current snapshot didn't change, not that the table did not assign as sequence number to the failed transaction. This should check `TableMetadata.lastSequenceNumber` instead. ########## File path: core/src/test/java/org/apache/iceberg/TestSequenceNumberForV2Table.java ########## @@ -0,0 +1,328 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.iceberg; + +import java.io.IOException; +import java.util.HashMap; +import java.util.HashSet; +import java.util.Iterator; +import java.util.Map; +import java.util.Set; +import java.util.stream.Collectors; +import org.apache.iceberg.exceptions.CommitFailedException; +import org.junit.Assert; +import org.junit.Test; + +public class TestSequenceNumberForV2Table extends TableTestBase { + + public TestSequenceNumberForV2Table() { + super(2); + } + + @Test + public void testMergeAppend() throws IOException { + table.newAppend().appendFile(FILE_A).commit(); + ManifestFile manifestFile = table.currentSnapshot().manifests().get(0); + validateManifestEntries(manifestFile, 1, files(FILE_A), seqs(1)); + table.newAppend().appendFile(FILE_B).commit(); + manifestFile = table.currentSnapshot().manifests().get(0); + validateManifestEntries(manifestFile, 2, files(FILE_B), seqs(2)); + + table.newAppend() + .appendManifest(writeManifest("input-m0.avro", + manifestEntry(ManifestEntry.Status.ADDED, null, FILE_C))) + .commit(); + + validateDataFiles(files(FILE_A, FILE_B, FILE_C), seqs(1, 2, 3)); + + table.updateProperties() + .set(TableProperties.MANIFEST_MIN_MERGE_COUNT, "1") + .commit(); + + table.newAppend() + .appendManifest(writeManifest("input-m1.avro", + manifestEntry(ManifestEntry.Status.ADDED, null, FILE_D))) + .commit(); + + manifestFile = table.currentSnapshot().manifests().stream() + .filter(manifest -> manifest.snapshotId() == table.currentSnapshot().snapshotId()) + .collect(Collectors.toList()).get(0); + validateManifestEntries(manifestFile, 4, files(FILE_A, FILE_B, FILE_C, FILE_D), seqs(1, 2, 3, 4)); + validateDataFiles(files(FILE_A, FILE_B, FILE_C, FILE_D), seqs(1, 2, 3, 4)); + } + + @Test + public void testRewrite() { + table.newFastAppend().appendFile(FILE_A).commit(); + table.newFastAppend().appendFile(FILE_B).commit(); + Assert.assertEquals(2, table.currentSnapshot().sequenceNumber()); + + table.rewriteManifests().clusterBy(file -> "").commit(); + Assert.assertEquals("Snapshot sequence number should be 3", + 3, table.currentSnapshot().sequenceNumber()); + + ManifestFile newManifest = table.currentSnapshot().manifests().stream() + .filter(manifest -> manifest.snapshotId() == table.currentSnapshot().snapshotId()) + .collect(Collectors.toList()).get(0); + + validateManifestEntries(newManifest, 3, files(FILE_A, FILE_B), seqs(1, 2)); + } + + @Test + public void testCommitConflict() { + AppendFiles appendA = table.newFastAppend(); + appendA.appendFile(FILE_A).apply(); + + table.updateProperties() + .set(TableProperties.COMMIT_NUM_RETRIES, "0") + .commit(); + + table.ops().failCommits(1); + + AssertHelpers.assertThrows("Should reject commit", + CommitFailedException.class, "Injected failure", + () -> table.newFastAppend().appendFile(FILE_B).commit()); + + table.updateProperties() + .set(TableProperties.COMMIT_NUM_RETRIES, "5") + .commit(); + + appendA.commit(); + + Assert.assertEquals(1, table.currentSnapshot().sequenceNumber()); + + AppendFiles appendFiles = table.newFastAppend().appendFile(FILE_C); + appendFiles.apply(); + table.newFastAppend().appendFile(FILE_D).commit(); + appendFiles.commit(); + + validateDataFiles(files(FILE_A, FILE_D, FILE_C), seqs(1, 2, 3)); + } + + @Test + public void testRollBack() { + table.newFastAppend().appendFile(FILE_A).commit(); + long snapshotId = table.currentSnapshot().snapshotId(); + + table.newFastAppend().appendFile(FILE_B).commit(); + Assert.assertEquals("Snapshot sequence number should match expected", + 2, table.currentSnapshot().sequenceNumber()); + + table.manageSnapshots().rollbackTo(snapshotId).commit(); + Assert.assertEquals("Snapshot sequence number should match expected", + 1, table.currentSnapshot().sequenceNumber()); + + Assert.assertEquals("Table last sequence number should be 2", + 2, table.operations().current().lastSequenceNumber()); + + table.newFastAppend().appendFile(FILE_C).commit(); + Assert.assertEquals("Snapshot sequence number should match expected", + 3, table.currentSnapshot().sequenceNumber()); + } + + @Test + public void testSingleTransaction() { + Transaction txn = table.newTransaction(); + txn.newAppend().appendFile(FILE_A).commit(); + txn.commitTransaction(); + Assert.assertEquals("Snapshot sequence number should match expected", + 1, table.currentSnapshot().sequenceNumber()); + } + + @Test + public void testConcurrentTransaction() { + Transaction txn1 = table.newTransaction(); + Transaction txn2 = table.newTransaction(); + Transaction txn3 = table.newTransaction(); + Transaction txn4 = table.newTransaction(); + + txn1.newFastAppend().appendFile(FILE_A).commit(); + txn3.newOverwrite().addFile(FILE_C).commit(); + txn4.newDelete().deleteFile(FILE_A).commit(); + txn2.newAppend().appendFile(FILE_B).commit(); + + txn1.commitTransaction(); + txn2.commitTransaction(); + txn3.commitTransaction(); + txn4.commitTransaction(); + + Assert.assertEquals("Snapshot sequence number should match expected", + 4, table.currentSnapshot().sequenceNumber()); + validateDataFiles(files(FILE_A, FILE_B, FILE_C), seqs(4, 2, 3)); + } + + @Test + public void testMultipleOperationsTransaction() { + Transaction txn = table.newTransaction(); + txn.newAppend().appendFile(FILE_A).commit(); Review comment: The sequence number of FILE_A needs to be validated, along with the rest of the metadata for this commit. Transactions will still produce a snapshot for every operation, it will just swap the existing state for the final state. ########## File path: core/src/test/java/org/apache/iceberg/TestSequenceNumberForV2Table.java ########## @@ -0,0 +1,328 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.iceberg; + +import java.io.IOException; +import java.util.HashMap; +import java.util.HashSet; +import java.util.Iterator; +import java.util.Map; +import java.util.Set; +import java.util.stream.Collectors; +import org.apache.iceberg.exceptions.CommitFailedException; +import org.junit.Assert; +import org.junit.Test; + +public class TestSequenceNumberForV2Table extends TableTestBase { + + public TestSequenceNumberForV2Table() { + super(2); + } + + @Test + public void testMergeAppend() throws IOException { + table.newAppend().appendFile(FILE_A).commit(); + ManifestFile manifestFile = table.currentSnapshot().manifests().get(0); + validateManifestEntries(manifestFile, 1, files(FILE_A), seqs(1)); + table.newAppend().appendFile(FILE_B).commit(); + manifestFile = table.currentSnapshot().manifests().get(0); + validateManifestEntries(manifestFile, 2, files(FILE_B), seqs(2)); + + table.newAppend() + .appendManifest(writeManifest("input-m0.avro", + manifestEntry(ManifestEntry.Status.ADDED, null, FILE_C))) + .commit(); + + validateDataFiles(files(FILE_A, FILE_B, FILE_C), seqs(1, 2, 3)); + + table.updateProperties() + .set(TableProperties.MANIFEST_MIN_MERGE_COUNT, "1") + .commit(); + + table.newAppend() + .appendManifest(writeManifest("input-m1.avro", + manifestEntry(ManifestEntry.Status.ADDED, null, FILE_D))) + .commit(); + + manifestFile = table.currentSnapshot().manifests().stream() + .filter(manifest -> manifest.snapshotId() == table.currentSnapshot().snapshotId()) + .collect(Collectors.toList()).get(0); + validateManifestEntries(manifestFile, 4, files(FILE_A, FILE_B, FILE_C, FILE_D), seqs(1, 2, 3, 4)); + validateDataFiles(files(FILE_A, FILE_B, FILE_C, FILE_D), seqs(1, 2, 3, 4)); + } + + @Test + public void testRewrite() { + table.newFastAppend().appendFile(FILE_A).commit(); + table.newFastAppend().appendFile(FILE_B).commit(); + Assert.assertEquals(2, table.currentSnapshot().sequenceNumber()); + + table.rewriteManifests().clusterBy(file -> "").commit(); + Assert.assertEquals("Snapshot sequence number should be 3", + 3, table.currentSnapshot().sequenceNumber()); + + ManifestFile newManifest = table.currentSnapshot().manifests().stream() + .filter(manifest -> manifest.snapshotId() == table.currentSnapshot().snapshotId()) + .collect(Collectors.toList()).get(0); + + validateManifestEntries(newManifest, 3, files(FILE_A, FILE_B), seqs(1, 2)); + } + + @Test + public void testCommitConflict() { + AppendFiles appendA = table.newFastAppend(); + appendA.appendFile(FILE_A).apply(); + + table.updateProperties() + .set(TableProperties.COMMIT_NUM_RETRIES, "0") + .commit(); + + table.ops().failCommits(1); + + AssertHelpers.assertThrows("Should reject commit", + CommitFailedException.class, "Injected failure", + () -> table.newFastAppend().appendFile(FILE_B).commit()); + + table.updateProperties() + .set(TableProperties.COMMIT_NUM_RETRIES, "5") + .commit(); + + appendA.commit(); + + Assert.assertEquals(1, table.currentSnapshot().sequenceNumber()); + + AppendFiles appendFiles = table.newFastAppend().appendFile(FILE_C); + appendFiles.apply(); + table.newFastAppend().appendFile(FILE_D).commit(); + appendFiles.commit(); + + validateDataFiles(files(FILE_A, FILE_D, FILE_C), seqs(1, 2, 3)); + } + + @Test + public void testRollBack() { + table.newFastAppend().appendFile(FILE_A).commit(); + long snapshotId = table.currentSnapshot().snapshotId(); + + table.newFastAppend().appendFile(FILE_B).commit(); + Assert.assertEquals("Snapshot sequence number should match expected", + 2, table.currentSnapshot().sequenceNumber()); + + table.manageSnapshots().rollbackTo(snapshotId).commit(); + Assert.assertEquals("Snapshot sequence number should match expected", + 1, table.currentSnapshot().sequenceNumber()); + + Assert.assertEquals("Table last sequence number should be 2", + 2, table.operations().current().lastSequenceNumber()); + + table.newFastAppend().appendFile(FILE_C).commit(); + Assert.assertEquals("Snapshot sequence number should match expected", + 3, table.currentSnapshot().sequenceNumber()); + } + + @Test + public void testSingleTransaction() { + Transaction txn = table.newTransaction(); + txn.newAppend().appendFile(FILE_A).commit(); + txn.commitTransaction(); + Assert.assertEquals("Snapshot sequence number should match expected", + 1, table.currentSnapshot().sequenceNumber()); + } + + @Test + public void testConcurrentTransaction() { + Transaction txn1 = table.newTransaction(); + Transaction txn2 = table.newTransaction(); + Transaction txn3 = table.newTransaction(); + Transaction txn4 = table.newTransaction(); + + txn1.newFastAppend().appendFile(FILE_A).commit(); + txn3.newOverwrite().addFile(FILE_C).commit(); + txn4.newDelete().deleteFile(FILE_A).commit(); + txn2.newAppend().appendFile(FILE_B).commit(); + + txn1.commitTransaction(); + txn2.commitTransaction(); + txn3.commitTransaction(); + txn4.commitTransaction(); + + Assert.assertEquals("Snapshot sequence number should match expected", + 4, table.currentSnapshot().sequenceNumber()); + validateDataFiles(files(FILE_A, FILE_B, FILE_C), seqs(4, 2, 3)); + } + + @Test + public void testMultipleOperationsTransaction() { + Transaction txn = table.newTransaction(); + txn.newAppend().appendFile(FILE_A).commit(); + Set<DataFile> toAddFiles = new HashSet<>(); + Set<DataFile> toDeleteFiles = new HashSet<>(); + toAddFiles.add(FILE_B); + toDeleteFiles.add(FILE_A); + txn.newRewrite().rewriteFiles(toDeleteFiles, toAddFiles).commit(); + txn.commitTransaction(); + + Assert.assertEquals("Snapshot sequence number should match expected", + 2, table.currentSnapshot().sequenceNumber()); + validateDataFiles(files(FILE_A, FILE_B), seqs(2, 2)); + } + + @Test + public void testExpirationInTransaction() { + table.newFastAppend().appendFile(FILE_A).commit(); + long snapshotId = table.currentSnapshot().snapshotId(); + table.newAppend().appendFile(FILE_B).commit(); + + Transaction txn = table.newTransaction(); + txn.expireSnapshots().expireSnapshotId(snapshotId).commit(); + txn.commitTransaction(); + + Assert.assertEquals("Snapshot sequence number should match expected", + 2, table.currentSnapshot().sequenceNumber()); Review comment: This needs to validate the table's last sequence number. ########## File path: core/src/test/java/org/apache/iceberg/TestSequenceNumberForV2Table.java ########## @@ -0,0 +1,328 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.iceberg; + +import java.io.IOException; +import java.util.HashMap; +import java.util.HashSet; +import java.util.Iterator; +import java.util.Map; +import java.util.Set; +import java.util.stream.Collectors; +import org.apache.iceberg.exceptions.CommitFailedException; +import org.junit.Assert; +import org.junit.Test; + +public class TestSequenceNumberForV2Table extends TableTestBase { + + public TestSequenceNumberForV2Table() { + super(2); + } + + @Test + public void testMergeAppend() throws IOException { + table.newAppend().appendFile(FILE_A).commit(); + ManifestFile manifestFile = table.currentSnapshot().manifests().get(0); + validateManifestEntries(manifestFile, 1, files(FILE_A), seqs(1)); + table.newAppend().appendFile(FILE_B).commit(); + manifestFile = table.currentSnapshot().manifests().get(0); + validateManifestEntries(manifestFile, 2, files(FILE_B), seqs(2)); + + table.newAppend() + .appendManifest(writeManifest("input-m0.avro", + manifestEntry(ManifestEntry.Status.ADDED, null, FILE_C))) + .commit(); + + validateDataFiles(files(FILE_A, FILE_B, FILE_C), seqs(1, 2, 3)); + + table.updateProperties() + .set(TableProperties.MANIFEST_MIN_MERGE_COUNT, "1") + .commit(); + + table.newAppend() + .appendManifest(writeManifest("input-m1.avro", + manifestEntry(ManifestEntry.Status.ADDED, null, FILE_D))) + .commit(); + + manifestFile = table.currentSnapshot().manifests().stream() + .filter(manifest -> manifest.snapshotId() == table.currentSnapshot().snapshotId()) + .collect(Collectors.toList()).get(0); + validateManifestEntries(manifestFile, 4, files(FILE_A, FILE_B, FILE_C, FILE_D), seqs(1, 2, 3, 4)); + validateDataFiles(files(FILE_A, FILE_B, FILE_C, FILE_D), seqs(1, 2, 3, 4)); + } + + @Test + public void testRewrite() { + table.newFastAppend().appendFile(FILE_A).commit(); + table.newFastAppend().appendFile(FILE_B).commit(); + Assert.assertEquals(2, table.currentSnapshot().sequenceNumber()); + + table.rewriteManifests().clusterBy(file -> "").commit(); + Assert.assertEquals("Snapshot sequence number should be 3", + 3, table.currentSnapshot().sequenceNumber()); + + ManifestFile newManifest = table.currentSnapshot().manifests().stream() + .filter(manifest -> manifest.snapshotId() == table.currentSnapshot().snapshotId()) + .collect(Collectors.toList()).get(0); + + validateManifestEntries(newManifest, 3, files(FILE_A, FILE_B), seqs(1, 2)); + } + + @Test + public void testCommitConflict() { + AppendFiles appendA = table.newFastAppend(); + appendA.appendFile(FILE_A).apply(); + + table.updateProperties() + .set(TableProperties.COMMIT_NUM_RETRIES, "0") + .commit(); + + table.ops().failCommits(1); + + AssertHelpers.assertThrows("Should reject commit", + CommitFailedException.class, "Injected failure", + () -> table.newFastAppend().appendFile(FILE_B).commit()); + + table.updateProperties() + .set(TableProperties.COMMIT_NUM_RETRIES, "5") + .commit(); + + appendA.commit(); + + Assert.assertEquals(1, table.currentSnapshot().sequenceNumber()); + + AppendFiles appendFiles = table.newFastAppend().appendFile(FILE_C); + appendFiles.apply(); + table.newFastAppend().appendFile(FILE_D).commit(); + appendFiles.commit(); + + validateDataFiles(files(FILE_A, FILE_D, FILE_C), seqs(1, 2, 3)); + } + + @Test + public void testRollBack() { + table.newFastAppend().appendFile(FILE_A).commit(); + long snapshotId = table.currentSnapshot().snapshotId(); + + table.newFastAppend().appendFile(FILE_B).commit(); + Assert.assertEquals("Snapshot sequence number should match expected", + 2, table.currentSnapshot().sequenceNumber()); + + table.manageSnapshots().rollbackTo(snapshotId).commit(); + Assert.assertEquals("Snapshot sequence number should match expected", + 1, table.currentSnapshot().sequenceNumber()); + + Assert.assertEquals("Table last sequence number should be 2", + 2, table.operations().current().lastSequenceNumber()); + + table.newFastAppend().appendFile(FILE_C).commit(); + Assert.assertEquals("Snapshot sequence number should match expected", + 3, table.currentSnapshot().sequenceNumber()); + } + + @Test + public void testSingleTransaction() { + Transaction txn = table.newTransaction(); + txn.newAppend().appendFile(FILE_A).commit(); + txn.commitTransaction(); + Assert.assertEquals("Snapshot sequence number should match expected", + 1, table.currentSnapshot().sequenceNumber()); + } + + @Test + public void testConcurrentTransaction() { + Transaction txn1 = table.newTransaction(); + Transaction txn2 = table.newTransaction(); + Transaction txn3 = table.newTransaction(); + Transaction txn4 = table.newTransaction(); + + txn1.newFastAppend().appendFile(FILE_A).commit(); + txn3.newOverwrite().addFile(FILE_C).commit(); + txn4.newDelete().deleteFile(FILE_A).commit(); + txn2.newAppend().appendFile(FILE_B).commit(); + + txn1.commitTransaction(); + txn2.commitTransaction(); + txn3.commitTransaction(); + txn4.commitTransaction(); Review comment: The table state resulting from each transaction needs to be validated, not just the final state. ########## File path: core/src/test/java/org/apache/iceberg/TestSequenceNumberForV2Table.java ########## @@ -0,0 +1,328 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.iceberg; + +import java.io.IOException; +import java.util.HashMap; +import java.util.HashSet; +import java.util.Iterator; +import java.util.Map; +import java.util.Set; +import java.util.stream.Collectors; +import org.apache.iceberg.exceptions.CommitFailedException; +import org.junit.Assert; +import org.junit.Test; + +public class TestSequenceNumberForV2Table extends TableTestBase { + + public TestSequenceNumberForV2Table() { + super(2); + } + + @Test + public void testMergeAppend() throws IOException { + table.newAppend().appendFile(FILE_A).commit(); + ManifestFile manifestFile = table.currentSnapshot().manifests().get(0); + validateManifestEntries(manifestFile, 1, files(FILE_A), seqs(1)); + table.newAppend().appendFile(FILE_B).commit(); + manifestFile = table.currentSnapshot().manifests().get(0); + validateManifestEntries(manifestFile, 2, files(FILE_B), seqs(2)); + + table.newAppend() + .appendManifest(writeManifest("input-m0.avro", + manifestEntry(ManifestEntry.Status.ADDED, null, FILE_C))) + .commit(); + + validateDataFiles(files(FILE_A, FILE_B, FILE_C), seqs(1, 2, 3)); + + table.updateProperties() + .set(TableProperties.MANIFEST_MIN_MERGE_COUNT, "1") + .commit(); + + table.newAppend() + .appendManifest(writeManifest("input-m1.avro", + manifestEntry(ManifestEntry.Status.ADDED, null, FILE_D))) + .commit(); + + manifestFile = table.currentSnapshot().manifests().stream() + .filter(manifest -> manifest.snapshotId() == table.currentSnapshot().snapshotId()) + .collect(Collectors.toList()).get(0); + validateManifestEntries(manifestFile, 4, files(FILE_A, FILE_B, FILE_C, FILE_D), seqs(1, 2, 3, 4)); + validateDataFiles(files(FILE_A, FILE_B, FILE_C, FILE_D), seqs(1, 2, 3, 4)); + } + + @Test + public void testRewrite() { + table.newFastAppend().appendFile(FILE_A).commit(); + table.newFastAppend().appendFile(FILE_B).commit(); + Assert.assertEquals(2, table.currentSnapshot().sequenceNumber()); + + table.rewriteManifests().clusterBy(file -> "").commit(); + Assert.assertEquals("Snapshot sequence number should be 3", + 3, table.currentSnapshot().sequenceNumber()); + + ManifestFile newManifest = table.currentSnapshot().manifests().stream() + .filter(manifest -> manifest.snapshotId() == table.currentSnapshot().snapshotId()) + .collect(Collectors.toList()).get(0); + + validateManifestEntries(newManifest, 3, files(FILE_A, FILE_B), seqs(1, 2)); + } + + @Test + public void testCommitConflict() { + AppendFiles appendA = table.newFastAppend(); + appendA.appendFile(FILE_A).apply(); + + table.updateProperties() + .set(TableProperties.COMMIT_NUM_RETRIES, "0") + .commit(); + + table.ops().failCommits(1); + + AssertHelpers.assertThrows("Should reject commit", + CommitFailedException.class, "Injected failure", + () -> table.newFastAppend().appendFile(FILE_B).commit()); + + table.updateProperties() + .set(TableProperties.COMMIT_NUM_RETRIES, "5") + .commit(); + + appendA.commit(); + + Assert.assertEquals(1, table.currentSnapshot().sequenceNumber()); + + AppendFiles appendFiles = table.newFastAppend().appendFile(FILE_C); + appendFiles.apply(); + table.newFastAppend().appendFile(FILE_D).commit(); + appendFiles.commit(); + + validateDataFiles(files(FILE_A, FILE_D, FILE_C), seqs(1, 2, 3)); + } + + @Test + public void testRollBack() { + table.newFastAppend().appendFile(FILE_A).commit(); + long snapshotId = table.currentSnapshot().snapshotId(); + + table.newFastAppend().appendFile(FILE_B).commit(); + Assert.assertEquals("Snapshot sequence number should match expected", + 2, table.currentSnapshot().sequenceNumber()); + + table.manageSnapshots().rollbackTo(snapshotId).commit(); + Assert.assertEquals("Snapshot sequence number should match expected", + 1, table.currentSnapshot().sequenceNumber()); + + Assert.assertEquals("Table last sequence number should be 2", + 2, table.operations().current().lastSequenceNumber()); + + table.newFastAppend().appendFile(FILE_C).commit(); + Assert.assertEquals("Snapshot sequence number should match expected", + 3, table.currentSnapshot().sequenceNumber()); Review comment: Overall, this is a good test. But you do need to add validations for the rest of the metadata. ########## File path: core/src/test/java/org/apache/iceberg/TestSequenceNumberForV2Table.java ########## @@ -0,0 +1,328 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.iceberg; + +import java.io.IOException; +import java.util.HashMap; +import java.util.HashSet; +import java.util.Iterator; +import java.util.Map; +import java.util.Set; +import java.util.stream.Collectors; +import org.apache.iceberg.exceptions.CommitFailedException; +import org.junit.Assert; +import org.junit.Test; + +public class TestSequenceNumberForV2Table extends TableTestBase { + + public TestSequenceNumberForV2Table() { + super(2); + } + + @Test + public void testMergeAppend() throws IOException { + table.newAppend().appendFile(FILE_A).commit(); + ManifestFile manifestFile = table.currentSnapshot().manifests().get(0); + validateManifestEntries(manifestFile, 1, files(FILE_A), seqs(1)); + table.newAppend().appendFile(FILE_B).commit(); + manifestFile = table.currentSnapshot().manifests().get(0); + validateManifestEntries(manifestFile, 2, files(FILE_B), seqs(2)); + + table.newAppend() + .appendManifest(writeManifest("input-m0.avro", + manifestEntry(ManifestEntry.Status.ADDED, null, FILE_C))) + .commit(); + + validateDataFiles(files(FILE_A, FILE_B, FILE_C), seqs(1, 2, 3)); + + table.updateProperties() + .set(TableProperties.MANIFEST_MIN_MERGE_COUNT, "1") + .commit(); + + table.newAppend() + .appendManifest(writeManifest("input-m1.avro", + manifestEntry(ManifestEntry.Status.ADDED, null, FILE_D))) + .commit(); + + manifestFile = table.currentSnapshot().manifests().stream() + .filter(manifest -> manifest.snapshotId() == table.currentSnapshot().snapshotId()) + .collect(Collectors.toList()).get(0); + validateManifestEntries(manifestFile, 4, files(FILE_A, FILE_B, FILE_C, FILE_D), seqs(1, 2, 3, 4)); + validateDataFiles(files(FILE_A, FILE_B, FILE_C, FILE_D), seqs(1, 2, 3, 4)); + } + + @Test + public void testRewrite() { + table.newFastAppend().appendFile(FILE_A).commit(); + table.newFastAppend().appendFile(FILE_B).commit(); + Assert.assertEquals(2, table.currentSnapshot().sequenceNumber()); + + table.rewriteManifests().clusterBy(file -> "").commit(); + Assert.assertEquals("Snapshot sequence number should be 3", + 3, table.currentSnapshot().sequenceNumber()); + + ManifestFile newManifest = table.currentSnapshot().manifests().stream() + .filter(manifest -> manifest.snapshotId() == table.currentSnapshot().snapshotId()) + .collect(Collectors.toList()).get(0); + + validateManifestEntries(newManifest, 3, files(FILE_A, FILE_B), seqs(1, 2)); + } + + @Test + public void testCommitConflict() { + AppendFiles appendA = table.newFastAppend(); + appendA.appendFile(FILE_A).apply(); + + table.updateProperties() + .set(TableProperties.COMMIT_NUM_RETRIES, "0") + .commit(); + + table.ops().failCommits(1); + + AssertHelpers.assertThrows("Should reject commit", + CommitFailedException.class, "Injected failure", + () -> table.newFastAppend().appendFile(FILE_B).commit()); + + table.updateProperties() + .set(TableProperties.COMMIT_NUM_RETRIES, "5") + .commit(); + + appendA.commit(); + + Assert.assertEquals(1, table.currentSnapshot().sequenceNumber()); + + AppendFiles appendFiles = table.newFastAppend().appendFile(FILE_C); + appendFiles.apply(); + table.newFastAppend().appendFile(FILE_D).commit(); + appendFiles.commit(); + + validateDataFiles(files(FILE_A, FILE_D, FILE_C), seqs(1, 2, 3)); + } + + @Test + public void testRollBack() { + table.newFastAppend().appendFile(FILE_A).commit(); + long snapshotId = table.currentSnapshot().snapshotId(); + + table.newFastAppend().appendFile(FILE_B).commit(); + Assert.assertEquals("Snapshot sequence number should match expected", + 2, table.currentSnapshot().sequenceNumber()); + + table.manageSnapshots().rollbackTo(snapshotId).commit(); + Assert.assertEquals("Snapshot sequence number should match expected", + 1, table.currentSnapshot().sequenceNumber()); + + Assert.assertEquals("Table last sequence number should be 2", + 2, table.operations().current().lastSequenceNumber()); + + table.newFastAppend().appendFile(FILE_C).commit(); + Assert.assertEquals("Snapshot sequence number should match expected", + 3, table.currentSnapshot().sequenceNumber()); + } + + @Test + public void testSingleTransaction() { + Transaction txn = table.newTransaction(); + txn.newAppend().appendFile(FILE_A).commit(); + txn.commitTransaction(); + Assert.assertEquals("Snapshot sequence number should match expected", + 1, table.currentSnapshot().sequenceNumber()); + } + + @Test + public void testConcurrentTransaction() { + Transaction txn1 = table.newTransaction(); + Transaction txn2 = table.newTransaction(); + Transaction txn3 = table.newTransaction(); + Transaction txn4 = table.newTransaction(); + + txn1.newFastAppend().appendFile(FILE_A).commit(); + txn3.newOverwrite().addFile(FILE_C).commit(); + txn4.newDelete().deleteFile(FILE_A).commit(); + txn2.newAppend().appendFile(FILE_B).commit(); + + txn1.commitTransaction(); + txn2.commitTransaction(); + txn3.commitTransaction(); + txn4.commitTransaction(); + + Assert.assertEquals("Snapshot sequence number should match expected", + 4, table.currentSnapshot().sequenceNumber()); + validateDataFiles(files(FILE_A, FILE_B, FILE_C), seqs(4, 2, 3)); + } + + @Test + public void testMultipleOperationsTransaction() { + Transaction txn = table.newTransaction(); + txn.newAppend().appendFile(FILE_A).commit(); + Set<DataFile> toAddFiles = new HashSet<>(); + Set<DataFile> toDeleteFiles = new HashSet<>(); + toAddFiles.add(FILE_B); + toDeleteFiles.add(FILE_A); + txn.newRewrite().rewriteFiles(toDeleteFiles, toAddFiles).commit(); + txn.commitTransaction(); + + Assert.assertEquals("Snapshot sequence number should match expected", + 2, table.currentSnapshot().sequenceNumber()); + validateDataFiles(files(FILE_A, FILE_B), seqs(2, 2)); + } + + @Test + public void testExpirationInTransaction() { + table.newFastAppend().appendFile(FILE_A).commit(); + long snapshotId = table.currentSnapshot().snapshotId(); + table.newAppend().appendFile(FILE_B).commit(); + + Transaction txn = table.newTransaction(); + txn.expireSnapshots().expireSnapshotId(snapshotId).commit(); + txn.commitTransaction(); + + Assert.assertEquals("Snapshot sequence number should match expected", + 2, table.currentSnapshot().sequenceNumber()); + validateDataFiles(files(FILE_B), seqs(2)); + } + + @Test + public void testTransactionFailure() { + table.newAppend() + .appendFile(FILE_A) + .appendFile(FILE_B) + .commit(); + + table.updateProperties() + .set(TableProperties.COMMIT_NUM_RETRIES, "0") + .commit(); + + table.ops().failCommits(1); + + Transaction txn = table.newTransaction(); + txn.newAppend().appendFile(FILE_C).commit(); + + AssertHelpers.assertThrows("Transaction commit should fail", + CommitFailedException.class, "Injected failure", txn::commitTransaction); + + Assert.assertEquals("Snapshot sequence number should match expected", + 1, table.currentSnapshot().sequenceNumber()); + } + + @Test + public void testCherryPicking() { + table.newAppend() + .appendFile(FILE_A) + .commit(); + + table.newAppend() + .appendFile(FILE_B) + .stageOnly() + .commit(); + + Assert.assertEquals("Snapshot sequence number should be 1", 1, + table.currentSnapshot().sequenceNumber()); + + // pick the snapshot that's staged but not committed + Snapshot stagedSnapshot = readMetadata().snapshots().get(1); + + Assert.assertEquals("Snapshot sequence number should be 2", 2, + stagedSnapshot.sequenceNumber()); + + // table has new commit + table.newAppend() + .appendFile(FILE_C) + .commit(); + + Assert.assertEquals("Snapshot sequence number should be 3", + 3, table.currentSnapshot().sequenceNumber()); + + // cherry-pick snapshot + table.manageSnapshots().cherrypick(stagedSnapshot.snapshotId()).commit(); + + Assert.assertEquals("Snapshot sequence number should be 4", + 4, table.currentSnapshot().sequenceNumber()); + + + validateDataFiles(files(FILE_A, FILE_B, FILE_C), seqs(1, 4, 3)); + } + + @Test + public void testCherryPickFastForward() { + table.newAppend() + .appendFile(FILE_A) + .commit(); + + table.newAppend() + .appendFile(FILE_B) + .stageOnly() + .commit(); + + Assert.assertEquals("Snapshot sequence number should be 1", 1, + table.currentSnapshot().sequenceNumber()); + + // pick the snapshot that's staged but not committed + Snapshot stagedSnapshot = readMetadata().snapshots().get(1); + + Assert.assertEquals("Snapshot sequence number should be 2", 2, + stagedSnapshot.sequenceNumber()); + + // cherry-pick snapshot, this will fast forward + table.manageSnapshots().cherrypick(stagedSnapshot.snapshotId()).commit(); + Assert.assertEquals("Snapshot sequence number should be 2", + 2, table.currentSnapshot().sequenceNumber()); + + validateDataFiles(files(FILE_A, FILE_B), seqs(1, 2)); + } + + void validateDataFiles(Iterator<DataFile> files, Iterator<Long> expectedSeqs) { Review comment: These tests should use the validation methods from TableTestBase instead of adding its own. This method causes the test cases in this PR to be confusing because sometimes the sequence number is for a delete. Instead, the test cases need to be very specific about metadata. All the metadata for every change should be validated: * Each operation that produces a sequence number should assert the number of changed manifests * Each changed manifest should have its entries validated, including file status, snapshot id, and sequence number * Each operation should validate the lastSequenceNumber in table metadata * Operations that do not produce a sequence number should validate that the sequence number did not change using the last sequence number in table metadata. ########## File path: core/src/test/java/org/apache/iceberg/TestSequenceNumberForV2Table.java ########## @@ -0,0 +1,328 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.iceberg; + +import java.io.IOException; +import java.util.HashMap; +import java.util.HashSet; +import java.util.Iterator; +import java.util.Map; +import java.util.Set; +import java.util.stream.Collectors; +import org.apache.iceberg.exceptions.CommitFailedException; +import org.junit.Assert; +import org.junit.Test; + +public class TestSequenceNumberForV2Table extends TableTestBase { + + public TestSequenceNumberForV2Table() { + super(2); + } + + @Test + public void testMergeAppend() throws IOException { + table.newAppend().appendFile(FILE_A).commit(); + ManifestFile manifestFile = table.currentSnapshot().manifests().get(0); + validateManifestEntries(manifestFile, 1, files(FILE_A), seqs(1)); + table.newAppend().appendFile(FILE_B).commit(); + manifestFile = table.currentSnapshot().manifests().get(0); + validateManifestEntries(manifestFile, 2, files(FILE_B), seqs(2)); + + table.newAppend() + .appendManifest(writeManifest("input-m0.avro", + manifestEntry(ManifestEntry.Status.ADDED, null, FILE_C))) + .commit(); + + validateDataFiles(files(FILE_A, FILE_B, FILE_C), seqs(1, 2, 3)); + + table.updateProperties() + .set(TableProperties.MANIFEST_MIN_MERGE_COUNT, "1") + .commit(); + + table.newAppend() + .appendManifest(writeManifest("input-m1.avro", + manifestEntry(ManifestEntry.Status.ADDED, null, FILE_D))) + .commit(); + + manifestFile = table.currentSnapshot().manifests().stream() + .filter(manifest -> manifest.snapshotId() == table.currentSnapshot().snapshotId()) + .collect(Collectors.toList()).get(0); + validateManifestEntries(manifestFile, 4, files(FILE_A, FILE_B, FILE_C, FILE_D), seqs(1, 2, 3, 4)); + validateDataFiles(files(FILE_A, FILE_B, FILE_C, FILE_D), seqs(1, 2, 3, 4)); + } + + @Test + public void testRewrite() { + table.newFastAppend().appendFile(FILE_A).commit(); + table.newFastAppend().appendFile(FILE_B).commit(); + Assert.assertEquals(2, table.currentSnapshot().sequenceNumber()); + + table.rewriteManifests().clusterBy(file -> "").commit(); + Assert.assertEquals("Snapshot sequence number should be 3", + 3, table.currentSnapshot().sequenceNumber()); + + ManifestFile newManifest = table.currentSnapshot().manifests().stream() + .filter(manifest -> manifest.snapshotId() == table.currentSnapshot().snapshotId()) + .collect(Collectors.toList()).get(0); + + validateManifestEntries(newManifest, 3, files(FILE_A, FILE_B), seqs(1, 2)); + } + + @Test + public void testCommitConflict() { + AppendFiles appendA = table.newFastAppend(); + appendA.appendFile(FILE_A).apply(); + + table.updateProperties() + .set(TableProperties.COMMIT_NUM_RETRIES, "0") + .commit(); Review comment: This should be done before `AppendFiles`. ---------------------------------------------------------------- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org --------------------------------------------------------------------- To unsubscribe, e-mail: issues-unsubscr...@iceberg.apache.org For additional commands, e-mail: issues-h...@iceberg.apache.org