nastra commented on code in PR #5984:
URL: https://github.com/apache/iceberg/pull/5984#discussion_r999021646
##########
core/src/main/java/org/apache/iceberg/BaseIncrementalScan.java:
##########
@@ -35,6 +35,26 @@ protected BaseIncrementalScan(
protected abstract CloseableIterable<T> doPlanFiles(
Long fromSnapshotIdExclusive, long toSnapshotIdInclusive);
+ @Override
+ public ThisT fromSnapshotInclusive(long fromSnapshotId, String branch) {
+ Preconditions.checkArgument(
+ table().snapshot(fromSnapshotId) != null,
+ "Cannot find the starting snapshot: %s",
+ fromSnapshotId);
+
+ Snapshot branchSnapshot = table().snapshot(branch);
+ Preconditions.checkArgument(branchSnapshot != null, "Cannot find the
branch: %s", branch);
Review Comment:
in `BaseTableScan` we say `Cannot find ref %` so it would be good to align
this part here as well. Also it would be good to use `ref` or `referenceName`
rather than `branch` across parameters/variable names, since that is more
precise imo
##########
api/src/main/java/org/apache/iceberg/IncrementalScan.java:
##########
@@ -33,6 +49,21 @@
*/
ThisT fromSnapshotInclusive(long fromSnapshotId);
+ /**
+ * Instructs this scan to look for changes starting from a particular
snapshot (exclusive).
+ *
+ * <p>If the start snapshot is not configured, it is defaulted to the oldest
ancestor of the end
+ * snapshot (inclusive).
+ *
+ * @param fromSnapshotId the start snapshot ID (exclusive)
+ * @param branch the ref used
+ * @return this for method chaining
+ * @throws IllegalArgumentException if the start snapshot is not an ancestor
of the end snapshot
+ */
+ default ThisT fromSnapshotExclusive(long fromSnapshotId, String branch) {
+ throw new UnsupportedOperationException("Unsupported starting from the
specified reference.");
Review Comment:
```suggestion
throw new UnsupportedOperationException("Starting from the specified
reference is not supported.");
```
##########
core/src/main/java/org/apache/iceberg/BaseIncrementalScan.java:
##########
@@ -35,6 +35,26 @@ protected BaseIncrementalScan(
protected abstract CloseableIterable<T> doPlanFiles(
Long fromSnapshotIdExclusive, long toSnapshotIdInclusive);
+ @Override
+ public ThisT fromSnapshotInclusive(long fromSnapshotId, String branch) {
+ Preconditions.checkArgument(
+ table().snapshot(fromSnapshotId) != null,
+ "Cannot find the starting snapshot: %s",
+ fromSnapshotId);
+
+ Snapshot branchSnapshot = table().snapshot(branch);
+ Preconditions.checkArgument(branchSnapshot != null, "Cannot find the
branch: %s", branch);
+ Preconditions.checkArgument(
+ SnapshotUtil.isAncestorOf(table(), branchSnapshot.snapshotId(),
fromSnapshotId),
+ "Starting snapshot (inclusive) %s is not a parent ancestor of branch:
%s",
Review Comment:
```suggestion
"Starting snapshot (inclusive) %s is not a parent ancestor of ref:
%s",
```
##########
core/src/test/java/org/apache/iceberg/TestBaseIncrementalAppendScan.java:
##########
@@ -50,6 +50,42 @@ public void testFromSnapshotInclusive() {
Assert.assertEquals(3, Iterables.size(scanWithToSnapshot.planFiles()));
}
+ @Test
+ public void testFromSnapshotInclusiveWithBranch() {
Review Comment:
it would also be good to add tests that hit the `Cannot find the starting
snapshot` / `Cannot find ref...` error messages
##########
api/src/main/java/org/apache/iceberg/IncrementalScan.java:
##########
@@ -21,6 +21,22 @@
/** API for configuring an incremental scan. */
public interface IncrementalScan<ThisT, T extends ScanTask, G extends
ScanTaskGroup<T>>
extends Scan<ThisT, T, G> {
+
+ /**
+ * Instructs this scan to look for changes starting from a particular
snapshot (inclusive).
+ *
+ * <p>If the start snapshot is not configured, it is defaulted to the oldest
ancestor of the end
+ * snapshot (inclusive).
+ *
+ * @param fromSnapshotId the start snapshot ID (inclusive)
+ * @param branch the ref used
+ * @return this for method chaining
+ * @throws IllegalArgumentException if the start snapshot is not an ancestor
of the end snapshot
+ */
+ default ThisT fromSnapshotInclusive(long fromSnapshotId, String branch) {
+ throw new UnsupportedOperationException("Unsupported starting from the
specified reference.");
Review Comment:
```suggestion
throw new UnsupportedOperationException("Starting from the specified
reference is not supported.");
```
##########
core/src/main/java/org/apache/iceberg/BaseIncrementalScan.java:
##########
@@ -35,6 +35,26 @@ protected BaseIncrementalScan(
protected abstract CloseableIterable<T> doPlanFiles(
Long fromSnapshotIdExclusive, long toSnapshotIdInclusive);
+ @Override
+ public ThisT fromSnapshotInclusive(long fromSnapshotId, String branch) {
Review Comment:
```suggestion
public ThisT fromSnapshotInclusive(long fromSnapshotId, String
referenceName) {
```
##########
api/src/main/java/org/apache/iceberg/IncrementalScan.java:
##########
@@ -33,6 +49,21 @@
*/
ThisT fromSnapshotInclusive(long fromSnapshotId);
+ /**
+ * Instructs this scan to look for changes starting from a particular
snapshot (exclusive).
+ *
+ * <p>If the start snapshot is not configured, it is defaulted to the oldest
ancestor of the end
+ * snapshot (inclusive).
+ *
+ * @param fromSnapshotId the start snapshot ID (exclusive)
+ * @param branch the ref used
+ * @return this for method chaining
+ * @throws IllegalArgumentException if the start snapshot is not an ancestor
of the end snapshot
+ */
+ default ThisT fromSnapshotExclusive(long fromSnapshotId, String branch) {
Review Comment:
```suggestion
default ThisT fromSnapshotExclusive(long fromSnapshotId, String
referenceName) {
```
##########
core/src/main/java/org/apache/iceberg/TableScanContext.java:
##########
@@ -60,6 +61,7 @@ final class TableScanContext {
this.planExecutor = null;
this.fromSnapshotInclusive = false;
this.metricsReporter = new LoggingMetricsReporter();
+ this.branch = null;
Review Comment:
I think this should rather be `ref` or `referenceName`. Also I think it
would be great to get #5982 / #5985 in before this PR
##########
core/src/test/java/org/apache/iceberg/TestBaseIncrementalAppendScan.java:
##########
@@ -67,6 +103,35 @@ public void testFromSnapshotExclusive() {
Assert.assertEquals(1, Iterables.size(scanWithToSnapshot.planFiles()));
}
+ @Test
+ public void testFromSnapshotExclusiveWithBranch() {
+ table.newFastAppend().appendFile(FILE_A).commit();
+ long snapshotAId = table.currentSnapshot().snapshotId();
+
+ String branchName = "b1";
+ table.manageSnapshots().createBranch(branchName, snapshotAId).commit();
+
+ table.newFastAppend().appendFile(FILE_B).commit();
+ long snapshotBId = table.currentSnapshot().snapshotId();
+ String branchName2 = "b2";
+ table.manageSnapshots().createBranch(branchName2, snapshotBId).commit();
+
+ table.newFastAppend().appendFile(FILE_C).toBranch(branchName).commit();
+ long snapshotCId = table.snapshot(branchName).snapshotId();
+
+ IncrementalAppendScan scan = newScan().fromSnapshotExclusive(snapshotAId,
branchName);
+ Assert.assertEquals(1, Iterables.size(scan.planFiles()));
+
+ IncrementalAppendScan scanWithToSnapshot =
+ newScan().fromSnapshotExclusive(snapshotAId,
branchName).toSnapshot(snapshotBId);
+ Assert.assertEquals(1, Iterables.size(scanWithToSnapshot.planFiles()));
+
+ AssertHelpers.assertThrows(
+ "Should throw exception",
+ NullPointerException.class,
Review Comment:
I think it would be good to always check against an expected error message
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]