This is an automated email from the ASF dual-hosted git repository.

aokolnychyi pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/iceberg.git


The following commit(s) were added to refs/heads/master by this push:
     new 40de4bc7dc Core: Implement IncrementalChangelogScan without deletes 
(#5382)
40de4bc7dc is described below

commit 40de4bc7dc12c3e2c40d4fb687f9ee3342cb1727
Author: Anton Okolnychyi <[email protected]>
AuthorDate: Thu Aug 4 20:48:29 2022 -0700

    Core: Implement IncrementalChangelogScan without deletes (#5382)
---
 .palantir/revapi.yml                               |   6 +
 api/src/main/java/org/apache/iceberg/Scan.java     |  14 +
 .../main/java/org/apache/iceberg/TableScan.java    |  16 --
 .../apache/iceberg/BaseIncrementalAppendScan.java  | 120 +--------
 .../iceberg/BaseIncrementalChangelogScan.java      | 183 +++++++++++++
 .../org/apache/iceberg/BaseIncrementalScan.java    | 142 ++++++++++
 .../src/main/java/org/apache/iceberg/BaseScan.java |  57 ++++
 .../main/java/org/apache/iceberg/BaseTable.java    |   5 +
 .../java/org/apache/iceberg/BaseTableScan.java     |  24 --
 .../java/org/apache/iceberg/DataTableScan.java     |  31 +--
 .../apache/iceberg/IncrementalDataTableScan.java   |   5 +-
 .../java/org/apache/iceberg/PartitionsTable.java   |   5 +-
 .../java/org/apache/iceberg/util/SnapshotUtil.java |   9 +
 .../test/java/org/apache/iceberg/ScanTestBase.java |  43 +--
 .../java/org/apache/iceberg/TableTestBase.java     |  33 +++
 .../iceberg/TestBaseIncrementalAppendScan.java     |   3 +-
 .../iceberg/TestBaseIncrementalChangelogScan.java  | 290 +++++++++++++++++++++
 .../java/org/apache/iceberg/TestDataTableScan.java |   2 +-
 18 files changed, 778 insertions(+), 210 deletions(-)

diff --git a/.palantir/revapi.yml b/.palantir/revapi.yml
index 79f1e51ba2..37e95468b4 100644
--- a/.palantir/revapi.yml
+++ b/.palantir/revapi.yml
@@ -117,3 +117,9 @@ acceptedBreaks:
         \ boolean)"
       justification: "IncrementalScanEvent should only be constructed by 
Iceberg code.\
         \ Hence the change of constructor params shouldn't affect users"
+    - code: "java.method.addedToInterface"
+      new: "method org.apache.iceberg.expressions.Expression 
org.apache.iceberg.Scan<ThisT, T extends org.apache.iceberg.ScanTask, G extends 
org.apache.iceberg.ScanTaskGroup<T extends 
org.apache.iceberg.ScanTask>>::filter()"
+      justification: "Move a method to the parent interface"
+    - code: "java.method.addedToInterface"
+      new: "method boolean org.apache.iceberg.Scan<ThisT, T extends 
org.apache.iceberg.ScanTask, G extends org.apache.iceberg.ScanTaskGroup<T 
extends org.apache.iceberg.ScanTask>>::isCaseSensitive()"
+      justification: "Move a method to the parent interface"
diff --git a/api/src/main/java/org/apache/iceberg/Scan.java 
b/api/src/main/java/org/apache/iceberg/Scan.java
index 118b9ce66b..ec18c162cf 100644
--- a/api/src/main/java/org/apache/iceberg/Scan.java
+++ b/api/src/main/java/org/apache/iceberg/Scan.java
@@ -59,6 +59,13 @@ public interface Scan<ThisT, T extends ScanTask, G extends 
ScanTaskGroup<T>> {
    */
   ThisT caseSensitive(boolean caseSensitive);
 
+  /**
+   * Returns whether this scan is case-sensitive with respect to column names.
+   *
+   * @return true if case-sensitive, false otherwise.
+   */
+  boolean isCaseSensitive();
+
   /**
    * Create a new scan from this that loads the column stats with each data 
file.
    *
@@ -86,6 +93,13 @@ public interface Scan<ThisT, T extends ScanTask, G extends 
ScanTaskGroup<T>> {
    */
   ThisT filter(Expression expr);
 
+  /**
+   * Returns this scan's filter {@link Expression}.
+   *
+   * @return this scan's filter expression
+   */
+  Expression filter();
+
   /**
    * Create a new scan from this that applies data filtering to files but not 
to rows in those
    * files.
diff --git a/api/src/main/java/org/apache/iceberg/TableScan.java 
b/api/src/main/java/org/apache/iceberg/TableScan.java
index de0e76b8b1..56f7f11d3c 100644
--- a/api/src/main/java/org/apache/iceberg/TableScan.java
+++ b/api/src/main/java/org/apache/iceberg/TableScan.java
@@ -18,7 +18,6 @@
  */
 package org.apache.iceberg;
 
-import org.apache.iceberg.expressions.Expression;
 import org.apache.iceberg.relocated.com.google.common.collect.Lists;
 
 /** API for configuring a table scan. */
@@ -62,13 +61,6 @@ public interface TableScan extends Scan<TableScan, 
FileScanTask, CombinedScanTas
     return select(Lists.newArrayList(columns));
   }
 
-  /**
-   * Returns this scan's filter {@link Expression}.
-   *
-   * @return this scan's filter expression
-   */
-  Expression filter();
-
   /**
    * Create a new {@link TableScan} to read appended data from {@code 
fromSnapshotId} exclusive to
    * {@code toSnapshotId} inclusive.
@@ -103,12 +95,4 @@ public interface TableScan extends Scan<TableScan, 
FileScanTask, CombinedScanTas
    * @return the Snapshot this scan will use
    */
   Snapshot snapshot();
-
-  /**
-   * Returns whether this scan should apply column name case sensitiveness as 
per {@link
-   * Scan#caseSensitive(boolean)}.
-   *
-   * @return true if case sensitive, false otherwise.
-   */
-  boolean isCaseSensitive();
 }
diff --git 
a/core/src/main/java/org/apache/iceberg/BaseIncrementalAppendScan.java 
b/core/src/main/java/org/apache/iceberg/BaseIncrementalAppendScan.java
index d8386bd98e..cf3bc10610 100644
--- a/core/src/main/java/org/apache/iceberg/BaseIncrementalAppendScan.java
+++ b/core/src/main/java/org/apache/iceberg/BaseIncrementalAppendScan.java
@@ -20,10 +20,7 @@ package org.apache.iceberg;
 
 import java.util.List;
 import java.util.Set;
-import org.apache.iceberg.events.IncrementalScanEvent;
-import org.apache.iceberg.events.Listeners;
 import org.apache.iceberg.io.CloseableIterable;
-import org.apache.iceberg.relocated.com.google.common.base.Preconditions;
 import org.apache.iceberg.relocated.com.google.common.collect.FluentIterable;
 import org.apache.iceberg.relocated.com.google.common.collect.Iterables;
 import org.apache.iceberg.relocated.com.google.common.collect.Lists;
@@ -32,7 +29,7 @@ import org.apache.iceberg.util.SnapshotUtil;
 import org.apache.iceberg.util.TableScanUtil;
 
 class BaseIncrementalAppendScan
-    extends BaseScan<IncrementalAppendScan, FileScanTask, CombinedScanTask>
+    extends BaseIncrementalScan<IncrementalAppendScan, FileScanTask, 
CombinedScanTask>
     implements IncrementalAppendScan {
 
   BaseIncrementalAppendScan(TableOperations ops, Table table) {
@@ -51,67 +48,8 @@ class BaseIncrementalAppendScan
   }
 
   @Override
-  public IncrementalAppendScan fromSnapshotInclusive(long fromSnapshotId) {
-    Preconditions.checkArgument(
-        table().snapshot(fromSnapshotId) != null,
-        "Cannot find the starting snapshot: %s",
-        fromSnapshotId);
-    return newRefinedScan(
-        tableOps(), table(), schema(), 
context().fromSnapshotIdInclusive(fromSnapshotId));
-  }
-
-  @Override
-  public IncrementalAppendScan fromSnapshotExclusive(long fromSnapshotId) {
-    // for exclusive behavior, table().snapshot(fromSnapshotId) check can't be 
applied.
-    // as fromSnapshotId could be matched to a parent snapshot that is already 
expired
-    return newRefinedScan(
-        tableOps(), table(), schema(), 
context().fromSnapshotIdExclusive(fromSnapshotId));
-  }
-
-  @Override
-  public IncrementalAppendScan toSnapshot(long toSnapshotId) {
-    Preconditions.checkArgument(
-        table().snapshot(toSnapshotId) != null, "Cannot find end snapshot: 
%s", toSnapshotId);
-    return newRefinedScan(tableOps(), table(), schema(), 
context().toSnapshotId(toSnapshotId));
-  }
-
-  @Override
-  public CloseableIterable<FileScanTask> planFiles() {
-    Long fromSnapshotId = context().fromSnapshotId();
-    Long toSnapshotId = context().toSnapshotId();
-    if (fromSnapshotId == null && toSnapshotId == null && 
table().currentSnapshot() == null) {
-      // If it is an empty table (no current snapshot) and both from and to 
snapshots aren't set
-      // either,
-      // simply return an empty iterable. In this case, listener notification 
is also skipped.
-      return CloseableIterable.empty();
-    }
-
-    long toSnapshotIdInclusive = toSnapshotIdInclusive();
-    // fromSnapshotIdExclusive can be null. appendsBetween handles null 
fromSnapshotIdExclusive
-    // properly
-    // by finding the oldest ancestor of end snapshot.
-    Long fromSnapshotIdExclusive = fromSnapshotIdExclusive(fromSnapshotId, 
toSnapshotIdInclusive);
-    if (fromSnapshotIdExclusive != null) {
-      Listeners.notifyAll(
-          new IncrementalScanEvent(
-              table().name(),
-              fromSnapshotIdExclusive,
-              toSnapshotIdInclusive,
-              context().rowFilter(),
-              table().schema(),
-              false));
-    } else {
-      Snapshot oldestAncestorSnapshot =
-          SnapshotUtil.oldestAncestorOf(toSnapshotIdInclusive, 
table()::snapshot);
-      Listeners.notifyAll(
-          new IncrementalScanEvent(
-              table().name(),
-              oldestAncestorSnapshot.snapshotId(),
-              toSnapshotIdInclusive,
-              context().rowFilter(),
-              table().schema(),
-              true));
-    }
+  protected CloseableIterable<FileScanTask> doPlanFiles(
+      Long fromSnapshotIdExclusive, long toSnapshotIdInclusive) {
 
     // appendsBetween handles null fromSnapshotId (exclusive) properly
     List<Snapshot> snapshots =
@@ -132,44 +70,6 @@ class BaseIncrementalAppendScan
         splitFiles, targetSplitSize(), splitLookback(), splitOpenFileCost());
   }
 
-  private Long fromSnapshotIdExclusive(Long fromSnapshotId, long 
toSnapshotIdInclusive) {
-    if (fromSnapshotId != null) {
-      if (context().fromSnapshotInclusive()) {
-        // validate the fromSnapshotId is an ancestor of toSnapshotId
-        Preconditions.checkArgument(
-            SnapshotUtil.isAncestorOf(table(), toSnapshotIdInclusive, 
fromSnapshotId),
-            "Starting snapshot (inclusive) %s is not an ancestor of end 
snapshot %s",
-            fromSnapshotId,
-            toSnapshotIdInclusive);
-        // for inclusive behavior fromSnapshotIdExclusive is set to the parent 
snapshot id, which
-        // can be null.
-        return table().snapshot(fromSnapshotId).parentId();
-      } else {
-        // validate the parent snapshot id an ancestor of toSnapshotId
-        Preconditions.checkArgument(
-            SnapshotUtil.isParentAncestorOf(table(), toSnapshotIdInclusive, 
fromSnapshotId),
-            "Starting snapshot (exclusive) %s is not a parent ancestor of end 
snapshot %s",
-            fromSnapshotId,
-            toSnapshotIdInclusive);
-        return fromSnapshotId;
-      }
-    } else {
-      return null;
-    }
-  }
-
-  private long toSnapshotIdInclusive() {
-    if (context().toSnapshotId() != null) {
-      return context().toSnapshotId();
-    } else {
-      Snapshot currentSnapshot = table().currentSnapshot();
-      Preconditions.checkArgument(
-          currentSnapshot != null,
-          "Invalid config: end snapshot is not set and table has no current 
snapshot");
-      return currentSnapshot.snapshotId();
-    }
-  }
-
   private CloseableIterable<FileScanTask> 
appendFilesFromSnapshots(List<Snapshot> snapshots) {
     Set<Long> snapshotIds = Sets.newHashSet(Iterables.transform(snapshots, 
Snapshot::snapshotId));
     Set<ManifestFile> manifests =
@@ -180,12 +80,9 @@ class BaseIncrementalAppendScan
 
     ManifestGroup manifestGroup =
         new ManifestGroup(tableOps().io(), manifests)
-            .caseSensitive(context().caseSensitive())
-            .select(
-                context().returnColumnStats()
-                    ? DataTableScan.SCAN_WITH_STATS_COLUMNS
-                    : DataTableScan.SCAN_COLUMNS)
-            .filterData(context().rowFilter())
+            .caseSensitive(isCaseSensitive())
+            .select(scanColumns())
+            .filterData(filter())
             .filterManifestEntries(
                 manifestEntry ->
                     snapshotIds.contains(manifestEntry.snapshotId())
@@ -197,9 +94,8 @@ class BaseIncrementalAppendScan
       manifestGroup = manifestGroup.ignoreResiduals();
     }
 
-    if (manifests.size() > 1
-        && (DataTableScan.PLAN_SCANS_WITH_WORKER_POOL || 
context().planWithCustomizedExecutor())) {
-      manifestGroup = manifestGroup.planWith(context().planExecutor());
+    if (manifests.size() > 1 && shouldPlanWithExecutor()) {
+      manifestGroup = manifestGroup.planWith(planExecutor());
     }
 
     return manifestGroup.planFiles();
diff --git 
a/core/src/main/java/org/apache/iceberg/BaseIncrementalChangelogScan.java 
b/core/src/main/java/org/apache/iceberg/BaseIncrementalChangelogScan.java
new file mode 100644
index 0000000000..885cf591a5
--- /dev/null
+++ b/core/src/main/java/org/apache/iceberg/BaseIncrementalChangelogScan.java
@@ -0,0 +1,183 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.iceberg;
+
+import java.util.ArrayDeque;
+import java.util.Collection;
+import java.util.Deque;
+import java.util.Map;
+import java.util.Set;
+import java.util.stream.Collectors;
+import org.apache.iceberg.ManifestGroup.CreateTasksFunction;
+import org.apache.iceberg.ManifestGroup.TaskContext;
+import org.apache.iceberg.io.CloseableIterable;
+import org.apache.iceberg.relocated.com.google.common.collect.FluentIterable;
+import org.apache.iceberg.relocated.com.google.common.collect.ImmutableList;
+import org.apache.iceberg.relocated.com.google.common.collect.Maps;
+import org.apache.iceberg.util.SnapshotUtil;
+import org.apache.iceberg.util.TableScanUtil;
+
+class BaseIncrementalChangelogScan
+    extends BaseIncrementalScan<
+        IncrementalChangelogScan, ChangelogScanTask, 
ScanTaskGroup<ChangelogScanTask>>
+    implements IncrementalChangelogScan {
+
+  BaseIncrementalChangelogScan(TableOperations ops, Table table) {
+    this(ops, table, table.schema(), new TableScanContext());
+  }
+
+  BaseIncrementalChangelogScan(
+      TableOperations ops, Table table, Schema schema, TableScanContext 
context) {
+    super(ops, table, schema, context);
+  }
+
+  @Override
+  protected IncrementalChangelogScan newRefinedScan(
+      TableOperations newOps, Table newTable, Schema newSchema, 
TableScanContext newContext) {
+    return new BaseIncrementalChangelogScan(newOps, newTable, newSchema, 
newContext);
+  }
+
+  @Override
+  protected CloseableIterable<ChangelogScanTask> doPlanFiles(
+      Long fromSnapshotIdExclusive, long toSnapshotIdInclusive) {
+
+    Deque<Snapshot> changelogSnapshots =
+        orderedChangelogSnapshots(fromSnapshotIdExclusive, 
toSnapshotIdInclusive);
+
+    if (changelogSnapshots.isEmpty()) {
+      return CloseableIterable.empty();
+    }
+
+    Set<Long> changelogSnapshotIds = toSnapshotIds(changelogSnapshots);
+
+    Set<ManifestFile> newDataManifests =
+        FluentIterable.from(changelogSnapshots)
+            .transformAndConcat(snapshot -> 
snapshot.dataManifests(table().io()))
+            .filter(manifest -> 
changelogSnapshotIds.contains(manifest.snapshotId()))
+            .toSet();
+
+    ManifestGroup manifestGroup =
+        new ManifestGroup(table().io(), newDataManifests, ImmutableList.of())
+            .specsById(table().specs())
+            .caseSensitive(isCaseSensitive())
+            .select(scanColumns())
+            .filterData(filter())
+            .filterManifestEntries(entry -> 
changelogSnapshotIds.contains(entry.snapshotId()))
+            .ignoreExisting();
+
+    if (shouldIgnoreResiduals()) {
+      manifestGroup = manifestGroup.ignoreResiduals();
+    }
+
+    if (newDataManifests.size() > 1 && shouldPlanWithExecutor()) {
+      manifestGroup = manifestGroup.planWith(planExecutor());
+    }
+
+    return manifestGroup.plan(new 
CreateDataFileChangeTasks(changelogSnapshots));
+  }
+
+  @Override
+  public CloseableIterable<ScanTaskGroup<ChangelogScanTask>> planTasks() {
+    return TableScanUtil.planTaskGroups(
+        planFiles(), targetSplitSize(), splitLookback(), splitOpenFileCost());
+  }
+
+  // builds a collection of changelog snapshots (oldest to newest)
+  // the order of the snapshots is important as it is used to determine change 
ordinals
+  private Deque<Snapshot> orderedChangelogSnapshots(Long fromIdExcl, long 
toIdIncl) {
+    Deque<Snapshot> changelogSnapshots = new ArrayDeque<>();
+
+    for (Snapshot snapshot : SnapshotUtil.ancestorsBetween(table(), toIdIncl, 
fromIdExcl)) {
+      if (!snapshot.operation().equals(DataOperations.REPLACE)) {
+        if (snapshot.deleteManifests(table().io()).size() > 0) {
+          throw new UnsupportedOperationException(
+              "Delete files are currently not supported in changelog scans");
+        }
+
+        changelogSnapshots.addFirst(snapshot);
+      }
+    }
+
+    return changelogSnapshots;
+  }
+
+  private Set<Long> toSnapshotIds(Collection<Snapshot> snapshots) {
+    return 
snapshots.stream().map(Snapshot::snapshotId).collect(Collectors.toSet());
+  }
+
+  private static Map<Long, Integer> computeSnapshotOrdinals(Deque<Snapshot> 
snapshots) {
+    Map<Long, Integer> snapshotOrdinals = Maps.newHashMap();
+
+    int ordinal = 0;
+
+    for (Snapshot snapshot : snapshots) {
+      snapshotOrdinals.put(snapshot.snapshotId(), ordinal++);
+    }
+
+    return snapshotOrdinals;
+  }
+
+  private static class CreateDataFileChangeTasks implements 
CreateTasksFunction<ChangelogScanTask> {
+    private static final DeleteFile[] NO_DELETES = new DeleteFile[0];
+
+    private final Map<Long, Integer> snapshotOrdinals;
+
+    CreateDataFileChangeTasks(Deque<Snapshot> snapshots) {
+      this.snapshotOrdinals = computeSnapshotOrdinals(snapshots);
+    }
+
+    @Override
+    public CloseableIterable<ChangelogScanTask> apply(
+        CloseableIterable<ManifestEntry<DataFile>> entries, TaskContext 
context) {
+
+      return CloseableIterable.transform(
+          entries,
+          entry -> {
+            long commitSnapshotId = entry.snapshotId();
+            int changeOrdinal = snapshotOrdinals.get(commitSnapshotId);
+            DataFile dataFile = entry.file().copy(context.shouldKeepStats());
+
+            switch (entry.status()) {
+              case ADDED:
+                return new BaseAddedRowsScanTask(
+                    changeOrdinal,
+                    commitSnapshotId,
+                    dataFile,
+                    NO_DELETES,
+                    context.schemaAsString(),
+                    context.specAsString(),
+                    context.residuals());
+
+              case DELETED:
+                return new BaseDeletedDataFileScanTask(
+                    changeOrdinal,
+                    commitSnapshotId,
+                    dataFile,
+                    NO_DELETES,
+                    context.schemaAsString(),
+                    context.specAsString(),
+                    context.residuals());
+
+              default:
+                throw new IllegalArgumentException("Unexpected entry status: " 
+ entry.status());
+            }
+          });
+    }
+  }
+}
diff --git a/core/src/main/java/org/apache/iceberg/BaseIncrementalScan.java 
b/core/src/main/java/org/apache/iceberg/BaseIncrementalScan.java
new file mode 100644
index 0000000000..1f32bfe016
--- /dev/null
+++ b/core/src/main/java/org/apache/iceberg/BaseIncrementalScan.java
@@ -0,0 +1,142 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.iceberg;
+
+import org.apache.iceberg.events.IncrementalScanEvent;
+import org.apache.iceberg.events.Listeners;
+import org.apache.iceberg.io.CloseableIterable;
+import org.apache.iceberg.relocated.com.google.common.base.Preconditions;
+import org.apache.iceberg.util.SnapshotUtil;
+
+abstract class BaseIncrementalScan<ThisT, T extends ScanTask, G extends 
ScanTaskGroup<T>>
+    extends BaseScan<ThisT, T, G> implements IncrementalScan<ThisT, T, G> {
+
+  protected BaseIncrementalScan(
+      TableOperations ops, Table table, Schema schema, TableScanContext 
context) {
+    super(ops, table, schema, context);
+  }
+
+  protected abstract CloseableIterable<T> doPlanFiles(
+      Long fromSnapshotIdExclusive, long toSnapshotIdInclusive);
+
+  @Override
+  public ThisT fromSnapshotInclusive(long fromSnapshotId) {
+    Preconditions.checkArgument(
+        table().snapshot(fromSnapshotId) != null,
+        "Cannot find the starting snapshot: %s",
+        fromSnapshotId);
+    TableScanContext newContext = 
context().fromSnapshotIdInclusive(fromSnapshotId);
+    return newRefinedScan(tableOps(), table(), schema(), newContext);
+  }
+
+  @Override
+  public ThisT fromSnapshotExclusive(long fromSnapshotId) {
+    // for exclusive behavior, table().snapshot(fromSnapshotId) check can't be 
applied
+    // as fromSnapshotId could be matched to a parent snapshot that is already 
expired
+    TableScanContext newContext = 
context().fromSnapshotIdExclusive(fromSnapshotId);
+    return newRefinedScan(tableOps(), table(), schema(), newContext);
+  }
+
+  @Override
+  public ThisT toSnapshot(long toSnapshotId) {
+    Preconditions.checkArgument(
+        table().snapshot(toSnapshotId) != null, "Cannot find the end snapshot: 
%s", toSnapshotId);
+    TableScanContext newContext = context().toSnapshotId(toSnapshotId);
+    return newRefinedScan(tableOps(), table(), schema(), newContext);
+  }
+
+  @Override
+  public CloseableIterable<T> planFiles() {
+    if (scanCurrentLineage() && table().currentSnapshot() == null) {
+      // If the table is empty (no current snapshot) and both from and to 
snapshots aren't set,
+      // simply return an empty iterable. In this case, the listener 
notification is also skipped.
+      return CloseableIterable.empty();
+    }
+
+    long toSnapshotIdInclusive = toSnapshotIdInclusive();
+    Long fromSnapshotIdExclusive = 
fromSnapshotIdExclusive(toSnapshotIdInclusive);
+
+    if (fromSnapshotIdExclusive != null) {
+      Listeners.notifyAll(
+          new IncrementalScanEvent(
+              table().name(),
+              fromSnapshotIdExclusive,
+              toSnapshotIdInclusive,
+              filter(),
+              schema(),
+              false /* from snapshot ID inclusive */));
+    } else {
+      Listeners.notifyAll(
+          new IncrementalScanEvent(
+              table().name(),
+              SnapshotUtil.oldestAncestorOf(table(), 
toSnapshotIdInclusive).snapshotId(),
+              toSnapshotIdInclusive,
+              filter(),
+              schema(),
+              true /* from snapshot ID inclusive */));
+    }
+
+    return doPlanFiles(fromSnapshotIdExclusive, toSnapshotIdInclusive);
+  }
+
+  private boolean scanCurrentLineage() {
+    return context().fromSnapshotId() == null && context().toSnapshotId() == 
null;
+  }
+
+  private long toSnapshotIdInclusive() {
+    if (context().toSnapshotId() != null) {
+      return context().toSnapshotId();
+    } else {
+      Snapshot currentSnapshot = table().currentSnapshot();
+      Preconditions.checkArgument(
+          currentSnapshot != null, "End snapshot is not set and table has no 
current snapshot");
+      return currentSnapshot.snapshotId();
+    }
+  }
+
+  private Long fromSnapshotIdExclusive(long toSnapshotIdInclusive) {
+    Long fromSnapshotId = context().fromSnapshotId();
+    boolean fromSnapshotInclusive = context().fromSnapshotInclusive();
+
+    if (fromSnapshotId == null) {
+      return null;
+    } else {
+      if (fromSnapshotInclusive) {
+        // validate fromSnapshotId is an ancestor of toSnapshotIdInclusive
+        Preconditions.checkArgument(
+            SnapshotUtil.isAncestorOf(table(), toSnapshotIdInclusive, 
fromSnapshotId),
+            "Starting snapshot (inclusive) %s is not an ancestor of end 
snapshot %s",
+            fromSnapshotId,
+            toSnapshotIdInclusive);
+        // for inclusive behavior fromSnapshotIdExclusive is set to the parent 
snapshot ID,
+        // which can be null
+        return table().snapshot(fromSnapshotId).parentId();
+
+      } else {
+        // validate there is an ancestor of toSnapshotIdInclusive where parent 
is fromSnapshotId
+        Preconditions.checkArgument(
+            SnapshotUtil.isParentAncestorOf(table(), toSnapshotIdInclusive, 
fromSnapshotId),
+            "Starting snapshot (exclusive) %s is not a parent ancestor of end 
snapshot %s",
+            fromSnapshotId,
+            toSnapshotIdInclusive);
+        return fromSnapshotId;
+      }
+    }
+  }
+}
diff --git a/core/src/main/java/org/apache/iceberg/BaseScan.java 
b/core/src/main/java/org/apache/iceberg/BaseScan.java
index bdde1f680f..65501f4a8a 100644
--- a/core/src/main/java/org/apache/iceberg/BaseScan.java
+++ b/core/src/main/java/org/apache/iceberg/BaseScan.java
@@ -20,17 +20,48 @@ package org.apache.iceberg;
 
 import java.util.Collection;
 import java.util.Collections;
+import java.util.List;
 import java.util.Set;
 import java.util.concurrent.ExecutorService;
 import org.apache.iceberg.expressions.Binder;
 import org.apache.iceberg.expressions.Expression;
 import org.apache.iceberg.expressions.Expressions;
+import org.apache.iceberg.relocated.com.google.common.collect.ImmutableList;
 import org.apache.iceberg.relocated.com.google.common.collect.Sets;
 import org.apache.iceberg.types.TypeUtil;
 import org.apache.iceberg.util.PropertyUtil;
 
 abstract class BaseScan<ThisT, T extends ScanTask, G extends ScanTaskGroup<T>>
     implements Scan<ThisT, T, G> {
+
+  private static final List<String> SCAN_COLUMNS =
+      ImmutableList.of(
+          "snapshot_id",
+          "file_path",
+          "file_ordinal",
+          "file_format",
+          "block_size_in_bytes",
+          "file_size_in_bytes",
+          "record_count",
+          "partition",
+          "key_metadata",
+          "split_offsets");
+
+  private static final List<String> STATS_COLUMNS =
+      ImmutableList.of(
+          "value_counts",
+          "null_value_counts",
+          "nan_value_counts",
+          "lower_bounds",
+          "upper_bounds",
+          "column_sizes");
+
+  private static final List<String> SCAN_WITH_STATS_COLUMNS =
+      
ImmutableList.<String>builder().addAll(SCAN_COLUMNS).addAll(STATS_COLUMNS).build();
+
+  private static final boolean PLAN_SCANS_WITH_WORKER_POOL =
+      SystemProperties.getBoolean(SystemProperties.SCAN_THREAD_POOL_ENABLED, 
true);
+
   private final TableOperations ops;
   private final Table table;
   private final Schema schema;
@@ -59,6 +90,22 @@ abstract class BaseScan<ThisT, T extends ScanTask, G extends 
ScanTaskGroup<T>>
     return context;
   }
 
+  protected List<String> scanColumns() {
+    return context.returnColumnStats() ? SCAN_WITH_STATS_COLUMNS : 
SCAN_COLUMNS;
+  }
+
+  protected boolean shouldIgnoreResiduals() {
+    return context().ignoreResiduals();
+  }
+
+  protected boolean shouldPlanWithExecutor() {
+    return PLAN_SCANS_WITH_WORKER_POOL || 
context().planWithCustomizedExecutor();
+  }
+
+  protected ExecutorService planExecutor() {
+    return context().planExecutor();
+  }
+
   protected abstract ThisT newRefinedScan(
       TableOperations newOps, Table newTable, Schema newSchema, 
TableScanContext newContext);
 
@@ -77,6 +124,11 @@ abstract class BaseScan<ThisT, T extends ScanTask, G 
extends ScanTaskGroup<T>>
     return newRefinedScan(ops, table, schema, 
context.setCaseSensitive(caseSensitive));
   }
 
+  @Override
+  public boolean isCaseSensitive() {
+    return context().caseSensitive();
+  }
+
   @Override
   public ThisT includeColumnStats() {
     return newRefinedScan(ops, table, schema, 
context.shouldReturnColumnStats(true));
@@ -93,6 +145,11 @@ abstract class BaseScan<ThisT, T extends ScanTask, G 
extends ScanTaskGroup<T>>
         ops, table, schema, 
context.filterRows(Expressions.and(context.rowFilter(), expr)));
   }
 
+  @Override
+  public Expression filter() {
+    return context().rowFilter();
+  }
+
   @Override
   public ThisT ignoreResiduals() {
     return newRefinedScan(ops, table, schema, context.ignoreResiduals(true));
diff --git a/core/src/main/java/org/apache/iceberg/BaseTable.java 
b/core/src/main/java/org/apache/iceberg/BaseTable.java
index 9605b07d8d..d79c46050f 100644
--- a/core/src/main/java/org/apache/iceberg/BaseTable.java
+++ b/core/src/main/java/org/apache/iceberg/BaseTable.java
@@ -79,6 +79,11 @@ public class BaseTable implements Table, HasTableOperations, 
Serializable {
         ops, this, schema(), new TableScanContext().reportWith(scanReporter));
   }
 
+  @Override
+  public IncrementalChangelogScan newIncrementalChangelogScan() {
+    return new BaseIncrementalChangelogScan(ops, this);
+  }
+
   @Override
   public Schema schema() {
     return ops.current().schema();
diff --git a/core/src/main/java/org/apache/iceberg/BaseTableScan.java 
b/core/src/main/java/org/apache/iceberg/BaseTableScan.java
index 5f48786f5d..2a33fea1e2 100644
--- a/core/src/main/java/org/apache/iceberg/BaseTableScan.java
+++ b/core/src/main/java/org/apache/iceberg/BaseTableScan.java
@@ -19,10 +19,8 @@
 package org.apache.iceberg;
 
 import java.util.Map;
-import java.util.concurrent.ExecutorService;
 import org.apache.iceberg.events.Listeners;
 import org.apache.iceberg.events.ScanEvent;
-import org.apache.iceberg.expressions.Expression;
 import org.apache.iceberg.expressions.ExpressionUtil;
 import org.apache.iceberg.io.CloseableIterable;
 import org.apache.iceberg.metrics.DefaultMetricsContext;
@@ -55,18 +53,6 @@ abstract class BaseTableScan extends BaseScan<TableScan, 
FileScanTask, CombinedS
     return context().snapshotId();
   }
 
-  protected boolean colStats() {
-    return context().returnColumnStats();
-  }
-
-  protected boolean shouldIgnoreResiduals() {
-    return context().ignoreResiduals();
-  }
-
-  protected ExecutorService planExecutor() {
-    return context().planExecutor();
-  }
-
   protected Map<String, String> options() {
     return context().options();
   }
@@ -116,11 +102,6 @@ abstract class BaseTableScan extends BaseScan<TableScan, 
FileScanTask, CombinedS
     return useSnapshot(SnapshotUtil.snapshotIdAsOfTime(table(), 
timestampMillis));
   }
 
-  @Override
-  public Expression filter() {
-    return context().rowFilter();
-  }
-
   @Override
   public CloseableIterable<FileScanTask> planFiles() {
     Snapshot snapshot = snapshot();
@@ -171,11 +152,6 @@ abstract class BaseTableScan extends BaseScan<TableScan, 
FileScanTask, CombinedS
         : tableOps().current().currentSnapshot();
   }
 
-  @Override
-  public boolean isCaseSensitive() {
-    return context().caseSensitive();
-  }
-
   @Override
   public String toString() {
     return MoreObjects.toStringHelper(this)
diff --git a/core/src/main/java/org/apache/iceberg/DataTableScan.java 
b/core/src/main/java/org/apache/iceberg/DataTableScan.java
index 678dd8884a..2d125ae79b 100644
--- a/core/src/main/java/org/apache/iceberg/DataTableScan.java
+++ b/core/src/main/java/org/apache/iceberg/DataTableScan.java
@@ -22,35 +22,9 @@ import java.util.List;
 import org.apache.iceberg.io.CloseableIterable;
 import org.apache.iceberg.io.FileIO;
 import org.apache.iceberg.relocated.com.google.common.base.Preconditions;
-import org.apache.iceberg.relocated.com.google.common.collect.ImmutableList;
 import org.apache.iceberg.util.SnapshotUtil;
 
 public class DataTableScan extends BaseTableScan {
-  static final ImmutableList<String> SCAN_COLUMNS =
-      ImmutableList.of(
-          "snapshot_id",
-          "file_path",
-          "file_ordinal",
-          "file_format",
-          "block_size_in_bytes",
-          "file_size_in_bytes",
-          "record_count",
-          "partition",
-          "key_metadata",
-          "split_offsets");
-  static final ImmutableList<String> SCAN_WITH_STATS_COLUMNS =
-      ImmutableList.<String>builder()
-          .addAll(SCAN_COLUMNS)
-          .add(
-              "value_counts",
-              "null_value_counts",
-              "nan_value_counts",
-              "lower_bounds",
-              "upper_bounds",
-              "column_sizes")
-          .build();
-  static final boolean PLAN_SCANS_WITH_WORKER_POOL =
-      SystemProperties.getBoolean(SystemProperties.SCAN_THREAD_POOL_ENABLED, 
true);
 
   public DataTableScan(TableOperations ops, Table table) {
     super(ops, table, table.schema());
@@ -112,7 +86,7 @@ public class DataTableScan extends BaseTableScan {
     ManifestGroup manifestGroup =
         new ManifestGroup(io, dataManifests, deleteManifests)
             .caseSensitive(isCaseSensitive())
-            .select(colStats() ? SCAN_WITH_STATS_COLUMNS : SCAN_COLUMNS)
+            .select(scanColumns())
             .filterData(filter())
             .specsById(table().specs())
             .scanMetrics(scanMetrics())
@@ -122,8 +96,7 @@ public class DataTableScan extends BaseTableScan {
       manifestGroup = manifestGroup.ignoreResiduals();
     }
 
-    if (dataManifests.size() > 1
-        && (PLAN_SCANS_WITH_WORKER_POOL || 
context().planWithCustomizedExecutor())) {
+    if (dataManifests.size() > 1 && shouldPlanWithExecutor()) {
       manifestGroup = manifestGroup.planWith(planExecutor());
     }
 
diff --git 
a/core/src/main/java/org/apache/iceberg/IncrementalDataTableScan.java 
b/core/src/main/java/org/apache/iceberg/IncrementalDataTableScan.java
index 66bb42b0b4..270dfcf595 100644
--- a/core/src/main/java/org/apache/iceberg/IncrementalDataTableScan.java
+++ b/core/src/main/java/org/apache/iceberg/IncrementalDataTableScan.java
@@ -90,7 +90,7 @@ class IncrementalDataTableScan extends DataTableScan {
     ManifestGroup manifestGroup =
         new ManifestGroup(table().io(), manifests)
             .caseSensitive(isCaseSensitive())
-            .select(colStats() ? SCAN_WITH_STATS_COLUMNS : SCAN_COLUMNS)
+            .select(scanColumns())
             .filterData(filter())
             .filterManifestEntries(
                 manifestEntry ->
@@ -107,8 +107,7 @@ class IncrementalDataTableScan extends DataTableScan {
         new IncrementalScanEvent(
             table().name(), fromSnapshotId, toSnapshotId, filter(), schema(), 
false));
 
-    if (manifests.size() > 1
-        && (PLAN_SCANS_WITH_WORKER_POOL || 
context().planWithCustomizedExecutor())) {
+    if (manifests.size() > 1 && shouldPlanWithExecutor()) {
       manifestGroup = manifestGroup.planWith(planExecutor());
     }
 
diff --git a/core/src/main/java/org/apache/iceberg/PartitionsTable.java 
b/core/src/main/java/org/apache/iceberg/PartitionsTable.java
index 3723a54bc9..2a9d111c66 100644
--- a/core/src/main/java/org/apache/iceberg/PartitionsTable.java
+++ b/core/src/main/java/org/apache/iceberg/PartitionsTable.java
@@ -176,10 +176,7 @@ public class PartitionsTable extends BaseMetadataTable {
         new ManifestGroup(io, snapshot.dataManifests(io), 
snapshot.deleteManifests(io))
             .caseSensitive(caseSensitive)
             .filterManifests(m -> evalCache.get(m.partitionSpecId()).eval(m))
-            .select(
-                scan.colStats()
-                    ? DataTableScan.SCAN_WITH_STATS_COLUMNS
-                    : DataTableScan.SCAN_COLUMNS)
+            .select(scan.scanColumns())
             .specsById(scan.table().specs())
             .ignoreDeleted();
 
diff --git a/core/src/main/java/org/apache/iceberg/util/SnapshotUtil.java 
b/core/src/main/java/org/apache/iceberg/util/SnapshotUtil.java
index 0c33ea878f..93880f97cb 100644
--- a/core/src/main/java/org/apache/iceberg/util/SnapshotUtil.java
+++ b/core/src/main/java/org/apache/iceberg/util/SnapshotUtil.java
@@ -118,6 +118,10 @@ public class SnapshotUtil {
     return lastSnapshot;
   }
 
+  public static Snapshot oldestAncestorOf(Table table, long snapshotId) {
+    return oldestAncestorOf(snapshotId, table::snapshot);
+  }
+
   /**
    * Traverses the history and finds the oldest ancestor of the specified 
snapshot.
    *
@@ -199,6 +203,11 @@ public class SnapshotUtil {
     return toIds(ancestorsBetween(latestSnapshotId, oldestSnapshotId, lookup));
   }
 
+  public static Iterable<Snapshot> ancestorsBetween(
+      Table table, long latestSnapshotId, Long oldestSnapshotId) {
+    return ancestorsBetween(latestSnapshotId, oldestSnapshotId, 
table::snapshot);
+  }
+
   public static Iterable<Snapshot> ancestorsBetween(
       long latestSnapshotId, Long oldestSnapshotId, Function<Long, Snapshot> 
lookup) {
     if (oldestSnapshotId != null) {
diff --git a/core/src/test/java/org/apache/iceberg/ScanTestBase.java 
b/core/src/test/java/org/apache/iceberg/ScanTestBase.java
index 2b68b6b0ec..f33893d816 100644
--- a/core/src/test/java/org/apache/iceberg/ScanTestBase.java
+++ b/core/src/test/java/org/apache/iceberg/ScanTestBase.java
@@ -26,6 +26,7 @@ import java.io.IOException;
 import java.util.Arrays;
 import java.util.concurrent.Executors;
 import java.util.concurrent.atomic.AtomicInteger;
+import org.apache.iceberg.expressions.Expression;
 import org.apache.iceberg.expressions.Expressions;
 import org.apache.iceberg.io.CloseableIterable;
 import org.apache.iceberg.relocated.com.google.common.collect.Iterables;
@@ -37,8 +38,10 @@ import org.junit.runner.RunWith;
 import org.junit.runners.Parameterized;
 
 @RunWith(Parameterized.class)
-public abstract class ScanTestBase<T extends Scan<T, FileScanTask, 
CombinedScanTask>>
+public abstract class ScanTestBase<
+        ScanT extends Scan<ScanT, T, G>, T extends ScanTask, G extends 
ScanTaskGroup<T>>
     extends TableTestBase {
+
   @Parameterized.Parameters(name = "formatVersion = {0}")
   public static Object[] parameters() {
     return new Object[] {1, 2};
@@ -48,11 +51,11 @@ public abstract class ScanTestBase<T extends Scan<T, 
FileScanTask, CombinedScanT
     super(formatVersion);
   }
 
-  protected abstract T newScan();
+  protected abstract ScanT newScan();
 
   @Test
   public void testTableScanHonorsSelect() {
-    T scan = newScan().select(Arrays.asList("id"));
+    ScanT scan = newScan().select(Arrays.asList("id"));
 
     Schema expectedSchema = new Schema(required(1, "id", 
Types.IntegerType.get()));
 
@@ -76,9 +79,9 @@ public abstract class ScanTestBase<T extends Scan<T, 
FileScanTask, CombinedScanT
 
   @Test
   public void testTableScanHonorsSelectWithoutCaseSensitivity() {
-    T scan1 = newScan().caseSensitive(false).select(Arrays.asList("ID"));
+    ScanT scan1 = newScan().caseSensitive(false).select(Arrays.asList("ID"));
     // order of refinements shouldn't matter
-    T scan2 = newScan().select(Arrays.asList("ID")).caseSensitive(false);
+    ScanT scan2 = newScan().select(Arrays.asList("ID")).caseSensitive(false);
 
     Schema expectedSchema = new Schema(required(1, "id", 
Types.IntegerType.get()));
 
@@ -97,26 +100,26 @@ public abstract class ScanTestBase<T extends Scan<T, 
FileScanTask, CombinedScanT
   public void testTableScanHonorsIgnoreResiduals() throws IOException {
     table.newFastAppend().appendFile(FILE_A).appendFile(FILE_B).commit();
 
-    T scan1 = newScan().filter(Expressions.equal("id", 5));
+    ScanT scan1 = newScan().filter(Expressions.equal("id", 5));
 
-    try (CloseableIterable<CombinedScanTask> tasks = scan1.planTasks()) {
-      Assert.assertTrue("Tasks should not be empty", Iterables.size(tasks) > 
0);
-      for (CombinedScanTask combinedScanTask : tasks) {
-        for (FileScanTask fileScanTask : combinedScanTask.files()) {
-          Assert.assertNotEquals(
-              "Residuals must be preserved", Expressions.alwaysTrue(), 
fileScanTask.residual());
+    try (CloseableIterable<G> groups = scan1.planTasks()) {
+      Assert.assertTrue("Tasks should not be empty", Iterables.size(groups) > 
0);
+      for (G group : groups) {
+        for (T task : group.tasks()) {
+          Expression residual = ((ContentScanTask<?>) task).residual();
+          Assert.assertNotEquals("Residuals must be preserved", 
Expressions.alwaysTrue(), residual);
         }
       }
     }
 
-    T scan2 = newScan().filter(Expressions.equal("id", 5)).ignoreResiduals();
+    ScanT scan2 = newScan().filter(Expressions.equal("id", 
5)).ignoreResiduals();
 
-    try (CloseableIterable<CombinedScanTask> tasks = scan2.planTasks()) {
-      Assert.assertTrue("Tasks should not be empty", Iterables.size(tasks) > 
0);
-      for (CombinedScanTask combinedScanTask : tasks) {
-        for (FileScanTask fileScanTask : combinedScanTask.files()) {
-          Assert.assertEquals(
-              "Residuals must be ignored", Expressions.alwaysTrue(), 
fileScanTask.residual());
+    try (CloseableIterable<G> groups = scan2.planTasks()) {
+      Assert.assertTrue("Tasks should not be empty", Iterables.size(groups) > 
0);
+      for (G group : groups) {
+        for (T task : group.tasks()) {
+          Expression residual = ((ContentScanTask<?>) task).residual();
+          Assert.assertEquals("Residuals must be ignored", 
Expressions.alwaysTrue(), residual);
         }
       }
     }
@@ -128,7 +131,7 @@ public abstract class ScanTestBase<T extends Scan<T, 
FileScanTask, CombinedScanT
     table.newFastAppend().appendFile(FILE_B).commit();
 
     AtomicInteger planThreadsIndex = new AtomicInteger(0);
-    T scan =
+    ScanT scan =
         newScan()
             .planWith(
                 Executors.newFixedThreadPool(
diff --git a/core/src/test/java/org/apache/iceberg/TableTestBase.java 
b/core/src/test/java/org/apache/iceberg/TableTestBase.java
index 44b6dd8395..53516c980f 100644
--- a/core/src/test/java/org/apache/iceberg/TableTestBase.java
+++ b/core/src/test/java/org/apache/iceberg/TableTestBase.java
@@ -22,6 +22,9 @@ import static 
org.apache.iceberg.types.Types.NestedField.required;
 
 import java.io.File;
 import java.io.IOException;
+import java.io.UncheckedIOException;
+import java.nio.file.Path;
+import java.nio.file.Paths;
 import java.util.Arrays;
 import java.util.Iterator;
 import java.util.List;
@@ -518,6 +521,31 @@ public class TableTestBase {
     return positionDelete.set(path, pos, row);
   }
 
+  protected void withUnavailableLocations(Iterable<String> locations, Action 
action) {
+    for (String location : locations) {
+      move(location, location + "_temp");
+    }
+
+    try {
+      action.invoke();
+    } finally {
+      for (String location : locations) {
+        move(location + "_temp", location);
+      }
+    }
+  }
+
+  private void move(String location, String newLocation) {
+    Path path = Paths.get(location);
+    Path tempPath = Paths.get(newLocation);
+
+    try {
+      java.nio.file.Files.move(path, tempPath);
+    } catch (IOException e) {
+      throw new UncheckedIOException("Failed to move: " + location, e);
+    }
+  }
+
   static void validateManifestEntries(
       ManifestFile manifest,
       Iterator<Long> ids,
@@ -586,4 +614,9 @@ public class TableTestBase {
       }
     }
   }
+
+  @FunctionalInterface
+  protected interface Action {
+    void invoke();
+  }
 }
diff --git 
a/core/src/test/java/org/apache/iceberg/TestBaseIncrementalAppendScan.java 
b/core/src/test/java/org/apache/iceberg/TestBaseIncrementalAppendScan.java
index b22e03ef0b..00feaf80ab 100644
--- a/core/src/test/java/org/apache/iceberg/TestBaseIncrementalAppendScan.java
+++ b/core/src/test/java/org/apache/iceberg/TestBaseIncrementalAppendScan.java
@@ -22,7 +22,8 @@ import 
org.apache.iceberg.relocated.com.google.common.collect.Iterables;
 import org.junit.Assert;
 import org.junit.Test;
 
-public class TestBaseIncrementalAppendScan extends 
ScanTestBase<IncrementalAppendScan> {
+public class TestBaseIncrementalAppendScan
+    extends ScanTestBase<IncrementalAppendScan, FileScanTask, 
CombinedScanTask> {
   public TestBaseIncrementalAppendScan(int formatVersion) {
     super(formatVersion);
   }
diff --git 
a/core/src/test/java/org/apache/iceberg/TestBaseIncrementalChangelogScan.java 
b/core/src/test/java/org/apache/iceberg/TestBaseIncrementalChangelogScan.java
new file mode 100644
index 0000000000..1a1844345b
--- /dev/null
+++ 
b/core/src/test/java/org/apache/iceberg/TestBaseIncrementalChangelogScan.java
@@ -0,0 +1,290 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.iceberg;
+
+import static org.apache.iceberg.TableProperties.MANIFEST_MERGE_ENABLED;
+import static org.apache.iceberg.TableProperties.MANIFEST_MIN_MERGE_COUNT;
+
+import java.io.IOException;
+import java.util.Comparator;
+import java.util.List;
+import org.apache.iceberg.expressions.Expressions;
+import org.apache.iceberg.io.CloseableIterable;
+import org.apache.iceberg.relocated.com.google.common.collect.ComparisonChain;
+import org.apache.iceberg.relocated.com.google.common.collect.ImmutableList;
+import org.apache.iceberg.relocated.com.google.common.collect.ImmutableSet;
+import org.apache.iceberg.relocated.com.google.common.collect.Iterables;
+import org.apache.iceberg.relocated.com.google.common.collect.Lists;
+import org.junit.Assert;
+import org.junit.Assume;
+import org.junit.Test;
+
+public class TestBaseIncrementalChangelogScan
+    extends ScanTestBase<
+        IncrementalChangelogScan, ChangelogScanTask, 
ScanTaskGroup<ChangelogScanTask>> {
+
+  public TestBaseIncrementalChangelogScan(int formatVersion) {
+    super(formatVersion);
+  }
+
+  @Override
+  protected IncrementalChangelogScan newScan() {
+    return table.newIncrementalChangelogScan();
+  }
+
+  @Test
+  public void testDataFilters() {
+    table.newFastAppend().appendFile(FILE_A).commit();
+
+    Snapshot snap1 = table.currentSnapshot();
+    ManifestFile snap1DataManifest = 
Iterables.getOnlyElement(snap1.dataManifests(table.io()));
+
+    table.newFastAppend().appendFile(FILE_B).commit();
+
+    Snapshot snap2 = table.currentSnapshot();
+
+    Assert.assertEquals("Must be 2 data manifests", 2, 
snap2.dataManifests(table.io()).size());
+
+    withUnavailableLocations(
+        ImmutableList.of(snap1DataManifest.path()),
+        () -> {
+          // bucket(k, 16) is 1 which is supposed to match only FILE_B
+          IncrementalChangelogScan scan = 
newScan().filter(Expressions.equal("data", "k"));
+
+          List<ChangelogScanTask> tasks = plan(scan);
+
+          Assert.assertEquals("Must have 1 task", 1, tasks.size());
+
+          AddedRowsScanTask t1 = (AddedRowsScanTask) 
Iterables.getOnlyElement(tasks);
+          Assert.assertEquals("Ordinal must match", 1, t1.changeOrdinal());
+          Assert.assertEquals("Snapshot must match", snap2.snapshotId(), 
t1.commitSnapshotId());
+          Assert.assertEquals("Data file must match", FILE_B.path(), 
t1.file().path());
+          Assert.assertTrue("Must be no deletes", t1.deletes().isEmpty());
+        });
+  }
+
+  @Test
+  public void testOverwrites() {
+    table.newFastAppend().appendFile(FILE_A).appendFile(FILE_B).commit();
+
+    Snapshot snap1 = table.currentSnapshot();
+
+    table.newOverwrite().addFile(FILE_A2).deleteFile(FILE_A).commit();
+
+    Snapshot snap2 = table.currentSnapshot();
+
+    IncrementalChangelogScan scan =
+        
newScan().fromSnapshotExclusive(snap1.snapshotId()).toSnapshot(snap2.snapshotId());
+
+    List<ChangelogScanTask> tasks = plan(scan);
+
+    Assert.assertEquals("Must have 2 tasks", 2, tasks.size());
+
+    AddedRowsScanTask t1 = (AddedRowsScanTask) tasks.get(0);
+    Assert.assertEquals("Ordinal must match", 0, t1.changeOrdinal());
+    Assert.assertEquals("Snapshot must match", snap2.snapshotId(), 
t1.commitSnapshotId());
+    Assert.assertEquals("Data file must match", FILE_A2.path(), 
t1.file().path());
+    Assert.assertTrue("Must be no deletes", t1.deletes().isEmpty());
+
+    DeletedDataFileScanTask t2 = (DeletedDataFileScanTask) tasks.get(1);
+    Assert.assertEquals("Ordinal must match", 0, t2.changeOrdinal());
+    Assert.assertEquals("Snapshot must match", snap2.snapshotId(), 
t2.commitSnapshotId());
+    Assert.assertEquals("Data file must match", FILE_A.path(), 
t2.file().path());
+    Assert.assertTrue("Must be no deletes", t2.existingDeletes().isEmpty());
+  }
+
+  @Test
+  public void testFileDeletes() {
+    table.newFastAppend().appendFile(FILE_A).appendFile(FILE_B).commit();
+
+    Snapshot snap1 = table.currentSnapshot();
+
+    table.newDelete().deleteFile(FILE_A).commit();
+
+    Snapshot snap2 = table.currentSnapshot();
+
+    IncrementalChangelogScan scan =
+        
newScan().fromSnapshotExclusive(snap1.snapshotId()).toSnapshot(snap2.snapshotId());
+
+    List<ChangelogScanTask> tasks = plan(scan);
+
+    Assert.assertEquals("Must have 1 tasks", 1, tasks.size());
+
+    DeletedDataFileScanTask t1 = (DeletedDataFileScanTask) 
Iterables.getOnlyElement(tasks);
+    Assert.assertEquals("Ordinal must match", 0, t1.changeOrdinal());
+    Assert.assertEquals("Snapshot must match", snap2.snapshotId(), 
t1.commitSnapshotId());
+    Assert.assertEquals("Data file must match", FILE_A.path(), 
t1.file().path());
+    Assert.assertTrue("Must be no deletes", t1.existingDeletes().isEmpty());
+  }
+
+  @Test
+  public void testExistingEntriesInNewDataManifestsAreIgnored() {
+    table
+        .updateProperties()
+        .set(MANIFEST_MIN_MERGE_COUNT, "1")
+        .set(MANIFEST_MERGE_ENABLED, "true")
+        .commit();
+
+    table.newAppend().appendFile(FILE_A).commit();
+
+    table.newAppend().appendFile(FILE_B).commit();
+
+    table.newAppend().appendFile(FILE_C).commit();
+
+    Snapshot snap3 = table.currentSnapshot();
+
+    ManifestFile manifest = 
Iterables.getOnlyElement(snap3.dataManifests(table.io()));
+    Assert.assertTrue("Manifest must have existing files", 
manifest.hasExistingFiles());
+
+    IncrementalChangelogScan scan =
+        
newScan().fromSnapshotInclusive(snap3.snapshotId()).toSnapshot(snap3.snapshotId());
+
+    List<ChangelogScanTask> tasks = plan(scan);
+
+    Assert.assertEquals("Must have 1 task", 1, tasks.size());
+
+    AddedRowsScanTask t1 = (AddedRowsScanTask) Iterables.getOnlyElement(tasks);
+    Assert.assertEquals("Ordinal must match", 0, t1.changeOrdinal());
+    Assert.assertEquals("Snapshot must match", snap3.snapshotId(), 
t1.commitSnapshotId());
+    Assert.assertEquals("Data file must match", FILE_C.path(), 
t1.file().path());
+    Assert.assertTrue("Must be no deletes", t1.deletes().isEmpty());
+  }
+
+  @Test
+  public void testManifestRewritesAreIgnored() throws IOException {
+    table.newAppend().appendFile(FILE_A).commit();
+
+    Snapshot snap1 = table.currentSnapshot();
+
+    table.newAppend().appendFile(FILE_B).commit();
+
+    Snapshot snap2 = table.currentSnapshot();
+
+    ManifestFile newManifest =
+        writeManifest(
+            "manifest-file.avro",
+            manifestEntry(ManifestEntry.Status.EXISTING, snap1.snapshotId(), 
FILE_A),
+            manifestEntry(ManifestEntry.Status.EXISTING, snap2.snapshotId(), 
FILE_B));
+
+    RewriteManifests rewriteManifests = table.rewriteManifests();
+
+    for (ManifestFile manifest : snap2.dataManifests(table.io())) {
+      rewriteManifests.deleteManifest(manifest);
+    }
+
+    rewriteManifests.addManifest(newManifest);
+
+    rewriteManifests.commit();
+
+    table.newAppend().appendFile(FILE_C).commit();
+
+    Snapshot snap4 = table.currentSnapshot();
+
+    List<ChangelogScanTask> tasks = plan(newScan());
+
+    Assert.assertEquals("Must have 3 tasks", 3, tasks.size());
+
+    AddedRowsScanTask t1 = (AddedRowsScanTask) tasks.get(0);
+    Assert.assertEquals("Ordinal must match", 0, t1.changeOrdinal());
+    Assert.assertEquals("Snapshot must match", snap1.snapshotId(), 
t1.commitSnapshotId());
+    Assert.assertEquals("Data file must match", FILE_A.path(), 
t1.file().path());
+    Assert.assertTrue("Must be no deletes", t1.deletes().isEmpty());
+
+    AddedRowsScanTask t2 = (AddedRowsScanTask) tasks.get(1);
+    Assert.assertEquals("Ordinal must match", 1, t2.changeOrdinal());
+    Assert.assertEquals("Snapshot must match", snap2.snapshotId(), 
t2.commitSnapshotId());
+    Assert.assertEquals("Data file must match", FILE_B.path(), 
t2.file().path());
+    Assert.assertTrue("Must be no deletes", t2.deletes().isEmpty());
+
+    AddedRowsScanTask t3 = (AddedRowsScanTask) tasks.get(2);
+    Assert.assertEquals("Ordinal must match", 2, t3.changeOrdinal());
+    Assert.assertEquals("Snapshot must match", snap4.snapshotId(), 
t3.commitSnapshotId());
+    Assert.assertEquals("Data file must match", FILE_C.path(), 
t3.file().path());
+    Assert.assertTrue("Must be no deletes", t3.deletes().isEmpty());
+  }
+
+  @Test
+  public void testDataFileRewrites() {
+    table.newAppend().appendFile(FILE_A).commit();
+
+    Snapshot snap1 = table.currentSnapshot();
+
+    table.newAppend().appendFile(FILE_B).commit();
+
+    Snapshot snap2 = table.currentSnapshot();
+
+    table.newRewrite().rewriteFiles(ImmutableSet.of(FILE_A), 
ImmutableSet.of(FILE_A2)).commit();
+
+    List<ChangelogScanTask> tasks = plan(newScan());
+
+    Assert.assertEquals("Must have 2 tasks", 2, tasks.size());
+
+    AddedRowsScanTask t1 = (AddedRowsScanTask) tasks.get(0);
+    Assert.assertEquals("Ordinal must match", 0, t1.changeOrdinal());
+    Assert.assertEquals("Snapshot must match", snap1.snapshotId(), 
t1.commitSnapshotId());
+    Assert.assertEquals("Data file must match", FILE_A.path(), 
t1.file().path());
+    Assert.assertTrue("Must be no deletes", t1.deletes().isEmpty());
+
+    AddedRowsScanTask t2 = (AddedRowsScanTask) tasks.get(1);
+    Assert.assertEquals("Ordinal must match", 1, t2.changeOrdinal());
+    Assert.assertEquals("Snapshot must match", snap2.snapshotId(), 
t2.commitSnapshotId());
+    Assert.assertEquals("Data file must match", FILE_B.path(), 
t2.file().path());
+    Assert.assertTrue("Must be no deletes", t2.deletes().isEmpty());
+  }
+
+  @Test
+  public void testDeleteFilesAreNotSupported() {
+    Assume.assumeTrue(formatVersion == 2);
+
+    table.newFastAppend().appendFile(FILE_A2).appendFile(FILE_B).commit();
+
+    table.newRowDelta().addDeletes(FILE_A2_DELETES).commit();
+
+    AssertHelpers.assertThrows(
+        "Should complain about delete files",
+        UnsupportedOperationException.class,
+        "Delete files are currently not supported",
+        () -> plan(newScan()));
+  }
+
+  // plans tasks and reorders them to have deterministic order
+  private List<ChangelogScanTask> plan(IncrementalChangelogScan scan) {
+    try (CloseableIterable<ChangelogScanTask> tasks = scan.planFiles()) {
+      List<ChangelogScanTask> tasksAsList = Lists.newArrayList(tasks);
+      tasksAsList.sort(taskComparator());
+      return tasksAsList;
+
+    } catch (IOException e) {
+      throw new RuntimeException(e);
+    }
+  }
+
+  private Comparator<? super ChangelogScanTask> taskComparator() {
+    return (t1, t2) ->
+        ComparisonChain.start()
+            .compare(t1.changeOrdinal(), t2.changeOrdinal())
+            .compare(t1.getClass().getName(), t2.getClass().getName())
+            .compare(path(t1), path(t2))
+            .result();
+  }
+
+  private String path(ChangelogScanTask task) {
+    return ((ContentScanTask<?>) task).file().path().toString();
+  }
+}
diff --git a/core/src/test/java/org/apache/iceberg/TestDataTableScan.java 
b/core/src/test/java/org/apache/iceberg/TestDataTableScan.java
index a5e9f6f9ec..9f3946984c 100644
--- a/core/src/test/java/org/apache/iceberg/TestDataTableScan.java
+++ b/core/src/test/java/org/apache/iceberg/TestDataTableScan.java
@@ -18,7 +18,7 @@
  */
 package org.apache.iceberg;
 
-public class TestDataTableScan extends ScanTestBase<TableScan> {
+public class TestDataTableScan extends ScanTestBase<TableScan, FileScanTask, 
CombinedScanTask> {
   public TestDataTableScan(int formatVersion) {
     super(formatVersion);
   }

Reply via email to