rdblue commented on a change in pull request #796:
URL: https://github.com/apache/iceberg/pull/796#discussion_r461149219



##########
File path: 
spark2/src/main/java/org/apache/iceberg/spark/source/StreamingReader.java
##########
@@ -0,0 +1,288 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.iceberg.spark.source;
+
+import java.util.Collections;
+import java.util.List;
+import java.util.Optional;
+import java.util.stream.Collectors;
+import org.apache.iceberg.CombinedScanTask;
+import org.apache.iceberg.DataOperations;
+import org.apache.iceberg.FileScanTask;
+import org.apache.iceberg.MicroBatches;
+import org.apache.iceberg.MicroBatches.MicroBatch;
+import org.apache.iceberg.Snapshot;
+import org.apache.iceberg.Table;
+import org.apache.iceberg.encryption.EncryptionManager;
+import org.apache.iceberg.io.CloseableIterable;
+import org.apache.iceberg.io.FileIO;
+import 
org.apache.iceberg.relocated.com.google.common.annotations.VisibleForTesting;
+import org.apache.iceberg.relocated.com.google.common.base.Preconditions;
+import org.apache.iceberg.relocated.com.google.common.collect.ImmutableList;
+import org.apache.iceberg.relocated.com.google.common.collect.Lists;
+import org.apache.iceberg.util.Pair;
+import org.apache.iceberg.util.SnapshotUtil;
+import org.apache.iceberg.util.TableScanUtil;
+import org.apache.spark.broadcast.Broadcast;
+import org.apache.spark.sql.sources.v2.DataSourceOptions;
+import org.apache.spark.sql.sources.v2.reader.streaming.MicroBatchReader;
+import org.apache.spark.sql.sources.v2.reader.streaming.Offset;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+/**
+ * A mirco-batch based Spark Structured Streaming reader for Iceberg table. It 
will track the added
+ * files and generate tasks per batch to process newly added files. By default 
it will process
+ * all the newly added files to the current snapshot in each batch, user could 
also set this
+ * configuration "max-files-per-trigger" to control the number of files 
processed per batch.
+ */
+class StreamingReader extends Reader implements MicroBatchReader {
+  private static final Logger LOG = 
LoggerFactory.getLogger(StreamingReader.class);
+
+  private StreamingOffset startOffset;
+  private StreamingOffset endOffset;
+
+  private final Table table;
+  private final long maxSizePerBatch;
+  private final Long startSnapshotId;
+
+  // Used to cache the pending batches for this streaming batch interval.
+  private Pair<StreamingOffset, List<MicroBatch>> cachedPendingBatches;
+
+  StreamingReader(Table table, Broadcast<FileIO> io, 
Broadcast<EncryptionManager> encryptionManager,
+                  boolean caseSensitive, DataSourceOptions options) {
+    super(table, io, encryptionManager, caseSensitive, options);
+
+    this.table = table;
+    this.maxSizePerBatch = 
options.get("max-size-per-batch").map(Long::parseLong).orElse(Long.MAX_VALUE);
+    Preconditions.checkArgument(maxSizePerBatch > 0L,
+        "Option max-size-per-batch '%d' should > 0", maxSizePerBatch);
+
+    this.startSnapshotId = 
options.get("starting-snapshot-id").map(Long::parseLong).orElse(null);
+    if (startSnapshotId != null) {
+      if (!SnapshotUtil.ancestorOf(table, 
table.currentSnapshot().snapshotId(), startSnapshotId)) {
+        throw new IllegalStateException("The option starting-snapshot-id " + 
startSnapshotId +
+            " is not an ancestor of the current snapshot");
+      }
+    }
+  }
+
+  @Override
+  @SuppressWarnings("unchecked")
+  public void setOffsetRange(Optional<Offset> start, Optional<Offset> end) {
+    table.refresh();
+
+    if (start.isPresent() && 
!StreamingOffset.START_OFFSET.equals(start.get())) {
+      this.startOffset = (StreamingOffset) start.get();
+      this.endOffset = (StreamingOffset) 
end.orElse(calculateEndOffset(startOffset));
+    } else {
+      // If starting offset is "START_OFFSET" (there's no snapshot in the last 
batch), or starting
+      // offset is not set, then we need to calculate the starting offset 
again.
+      this.startOffset = calculateStartingOffset();
+      this.endOffset = calculateEndOffset(startOffset);
+    }
+  }
+
+  @Override
+  public Offset getStartOffset() {
+    if (startOffset == null) {
+      throw new IllegalStateException("Start offset is not set");
+    }
+
+    return startOffset;
+  }
+
+  @Override
+  public Offset getEndOffset() {
+    if (endOffset == null) {
+      throw new IllegalStateException("End offset is not set");
+    }
+
+    return endOffset;
+  }
+
+  @Override
+  public Offset deserializeOffset(String json) {
+    return StreamingOffset.fromJson(json);
+  }
+
+  @Override
+  public void commit(Offset end) {
+    // Since all the data and metadata of Iceberg is as it is, nothing needs 
to commit when
+    // offset is processed, so no need to implement this method.
+  }
+
+  @Override
+  public void stop() {}
+
+  @Override
+  public String toString() {
+    return String.format(
+        "IcebergStreamScan(table=%s, type=%s)", table, 
table.schema().asStruct());
+  }
+
+  @Override
+  @SuppressWarnings("unchecked")
+  protected List<CombinedScanTask> tasks() {
+    if (startOffset.equals(endOffset)) {
+      LOG.info("Start offset {} equals to end offset {}, no data to process", 
startOffset, endOffset);
+      return Collections.emptyList();
+    }
+
+    Preconditions.checkState(cachedPendingBatches != null,
+        "pendingBatches is null, which is unexpected as it will be set when 
calculating end offset");
+
+    List<MicroBatch> pendingBatches = cachedPendingBatches.second();
+    if (pendingBatches.isEmpty()) {
+      LOG.info("There's no task to process in this batch");
+      return Collections.emptyList();
+    }
+
+    MicroBatch lastBatch = pendingBatches.get(pendingBatches.size() - 1);
+    if (lastBatch.snapshotId() != endOffset.snapshotId() || 
lastBatch.endFileIndex() != endOffset.index()) {
+      throw new IllegalStateException("The cached pendingBatches doesn't match 
the current end offset " + endOffset);
+    }
+
+    LOG.info("Processing data from {} to {}", startOffset, endOffset);
+    List<FileScanTask> tasks = pendingBatches.stream()
+        .flatMap(batch -> batch.tasks().stream())
+        .collect(Collectors.toList());
+    CloseableIterable<FileScanTask> splitTasks = 
TableScanUtil.splitFiles(CloseableIterable.withNoopClose(tasks),
+        splitSize());
+    return Lists.newArrayList(
+        TableScanUtil.planTasks(splitTasks, splitSize(), splitLookback(), 
splitOpenFileCost()));
+  }
+
+  private StreamingOffset calculateStartingOffset() {
+    StreamingOffset startingOffset;
+    if (startSnapshotId != null) {
+      startingOffset = new StreamingOffset(startSnapshotId, 0, true, false);
+    } else {
+      List<Long> snapshotIds = SnapshotUtil.currentAncestors(table);
+      if (snapshotIds.isEmpty()) {
+        // there's no snapshot currently.
+        startingOffset = StreamingOffset.START_OFFSET;
+      } else {
+        startingOffset = new 
StreamingOffset(snapshotIds.get(snapshotIds.size() - 1), 0, true, false);
+      }
+    }
+
+    return startingOffset;
+  }
+
+  private StreamingOffset calculateEndOffset(StreamingOffset start) {
+    if (start.equals(StreamingOffset.START_OFFSET)) {
+      return StreamingOffset.START_OFFSET;
+    }
+
+    // Spark will invoke setOffsetRange more than once. If this is already 
calulated, use the cached one to avoid
+    // calculating again.
+    if (cachedPendingBatches == null || 
!cachedPendingBatches.first().equals(start)) {
+      this.cachedPendingBatches = Pair.of(start, 
getChangesWithRateLimit(start.snapshotId(), start.index(),
+          start.isStartingSnapshotId(), start.isLastIndexOfSnapshot(), 
maxSizePerBatch));
+    }
+
+    List<MicroBatch> batches = cachedPendingBatches.second();
+    MicroBatch lastBatch = batches.isEmpty() ? null : 
batches.get(batches.size() - 1);
+
+    if (lastBatch == null) {
+      return start;
+    } else {
+      boolean isStarting = lastBatch.snapshotId() == start.snapshotId() && 
start.isStartingSnapshotId();
+      return new StreamingOffset(lastBatch.snapshotId(), 
lastBatch.endFileIndex(), isStarting,
+          lastBatch.lastIndexOfSnapshot());
+    }
+  }
+
+  @VisibleForTesting
+  @SuppressWarnings("checkstyle:CyclomaticComplexity")
+  List<MicroBatch> getChangesWithRateLimit(long snapshotId, int index, boolean 
isStarting,
+                                           boolean isLastIndexOfSnapshot, long 
maxSize) {
+    List<MicroBatch> batches = Lists.newArrayList();
+    long currentLeftSize = maxSize;
+    MicroBatch lastBatch = null;
+
+    if (!isLastIndexOfSnapshot && (isStarting || 
isValidSnapshot(table.snapshot(snapshotId)))) {
+      MicroBatch batch = MicroBatches.from(table.snapshot(snapshotId), 
table.io())
+          .caseSensitive(super.caseSensitive())
+          .specsById(table.specs())
+          .generate(index, currentLeftSize, isStarting);
+
+      batches.add(batch);
+      currentLeftSize -= batch.sizeInBytes();
+      lastBatch = batch;
+    }
+
+    // Current snapshot can already satisfy the size needs.
+    // Or DataFile size cannot satisfy the current needs of max size. For 
example: the request left size is 100, but the
+    // file size is 200, then file cannot add to this batch.
+    if (currentLeftSize <= 0L || (lastBatch != null && 
!lastBatch.lastIndexOfSnapshot())) {
+      return batches;
+    }
+
+    long currentSnapshotId = table.currentSnapshot().snapshotId();
+    if (currentSnapshotId == snapshotId) {
+      // the snapshot of current offset is already the latest snapshot of this 
table.
+      return batches;
+    }
+
+    ImmutableList<Long> snapshotIds = ImmutableList.<Long>builder()
+        .addAll(SnapshotUtil.snapshotIdsBetween(table, snapshotId, 
currentSnapshotId))
+        .build()
+        .reverse();
+
+    for (Long id : snapshotIds) {
+      Snapshot snapshot = table.snapshot(id);
+      if (!isValidSnapshot(snapshot)) {
+        continue;
+      }
+
+      int startIndex = lastBatch == null || lastBatch.lastIndexOfSnapshot() ? 
0 : lastBatch.endFileIndex();
+      MicroBatch batch = MicroBatches.from(table.snapshot(id), table.io())
+          .caseSensitive(super.caseSensitive())
+          .specsById(table.specs())
+          .generate(startIndex, currentLeftSize, false);
+
+      batches.add(batch);
+      currentLeftSize -= batch.sizeInBytes();
+
+      // If the current request size is already satisfied, or none of the 
DataFile size can satisfy the current left
+      // size, break the current loop.
+      if (currentLeftSize <= 0L || !batch.lastIndexOfSnapshot()) {
+        break;
+      }
+
+      lastBatch = batch;
+    }
+
+    return batches;
+  }
+
+  private static boolean isValidSnapshot(Snapshot snapshot) {

Review comment:
       I think this method does too much. Looks like it is used to skip 
non-append commits, but also to fail the read when an overwrite commit is 
encountered. The exception is surprising behavior given the method contract.
   
   I think this should be divided into a validation that throws an exception if 
there is an overwrite, and a different method to skip rewrite commits.

##########
File path: 
spark2/src/test/java/org/apache/iceberg/spark/source/TestStructuredStreamingRead.java
##########
@@ -0,0 +1,388 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.iceberg.spark.source;
+
+import java.io.File;
+import java.io.IOException;
+import java.util.Collections;
+import java.util.List;
+import java.util.Optional;
+import java.util.stream.Collectors;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.iceberg.AssertHelpers;
+import org.apache.iceberg.MicroBatches.MicroBatch;
+import org.apache.iceberg.PartitionSpec;
+import org.apache.iceberg.Schema;
+import org.apache.iceberg.Table;
+import org.apache.iceberg.hadoop.HadoopTables;
+import org.apache.iceberg.relocated.com.google.common.collect.ImmutableMap;
+import org.apache.iceberg.relocated.com.google.common.collect.Iterables;
+import org.apache.iceberg.relocated.com.google.common.collect.Lists;
+import org.apache.iceberg.types.Types;
+import org.apache.iceberg.util.SnapshotUtil;
+import org.apache.spark.sql.Dataset;
+import org.apache.spark.sql.Row;
+import org.apache.spark.sql.SparkSession;
+import org.apache.spark.sql.sources.v2.DataSourceOptions;
+import org.junit.AfterClass;
+import org.junit.Assert;
+import org.junit.BeforeClass;
+import org.junit.Rule;
+import org.junit.Test;
+import org.junit.rules.ExpectedException;
+import org.junit.rules.TemporaryFolder;
+
+import static org.apache.iceberg.types.Types.NestedField.optional;
+
+public class TestStructuredStreamingRead {
+  private static final Configuration CONF = new Configuration();
+  private static final Schema SCHEMA = new Schema(
+      optional(1, "id", Types.IntegerType.get()),
+      optional(2, "data", Types.StringType.get())
+  );
+  private static SparkSession spark = null;
+
+  @Rule
+  public TemporaryFolder temp = new TemporaryFolder();
+  @Rule
+  public ExpectedException exceptionRule = ExpectedException.none();
+
+  @BeforeClass
+  public static void startSpark() {
+    TestStructuredStreamingRead.spark = SparkSession.builder()
+        .master("local[2]")
+        .config("spark.sql.shuffle.partitions", 4)
+        .getOrCreate();
+  }
+
+  @AfterClass
+  public static void stopSpark() {
+    SparkSession currentSpark = TestStructuredStreamingRead.spark;
+    TestStructuredStreamingRead.spark = null;
+    currentSpark.stop();
+  }
+
+  @SuppressWarnings("unchecked")
+  @Test
+  public void testGetChanges() throws IOException {
+    File parent = temp.newFolder("test");
+    File location = new File(parent, "table");
+    File checkpoint = new File(parent, "checkpoint");
+    Table table = createTable(location.toString());
+
+    DataSourceOptions options = new DataSourceOptions(ImmutableMap.of(
+        "path", location.toString(),
+        "checkpointLocation", checkpoint.toString()));
+    IcebergSource source = new IcebergSource();
+
+    StreamingReader streamingReader = (StreamingReader) 
source.createMicroBatchReader(
+        Optional.empty(), checkpoint.toString(), options);
+
+    List<Long> snapshotIds = SnapshotUtil.currentAncestors(table);
+    Collections.reverse(snapshotIds);
+    long initialSnapshotId = snapshotIds.get(0);
+
+    // Getting all appends from initial snapshot.
+    List<MicroBatch> pendingBatches = streamingReader.getChangesWithRateLimit(
+        initialSnapshotId, 0, true, false, Long.MAX_VALUE);
+    Assert.assertEquals(pendingBatches.size(), 4);
+
+    List<Long> batchSnapshotIds = pendingBatches.stream()
+        .map(MicroBatch::snapshotId)
+        .collect(Collectors.toList());
+    Assert.assertEquals(batchSnapshotIds, snapshotIds);

Review comment:
       Assertions like this one should include context to tell the reader what 
is being verified. In this case, it looks like a batch for each of the table's 
snapshot IDs should be present, right?

##########
File path: 
spark2/src/test/java/org/apache/iceberg/spark/source/TestStructuredStreamingRead.java
##########
@@ -0,0 +1,388 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.iceberg.spark.source;
+
+import java.io.File;
+import java.io.IOException;
+import java.util.Collections;
+import java.util.List;
+import java.util.Optional;
+import java.util.stream.Collectors;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.iceberg.AssertHelpers;
+import org.apache.iceberg.MicroBatches.MicroBatch;
+import org.apache.iceberg.PartitionSpec;
+import org.apache.iceberg.Schema;
+import org.apache.iceberg.Table;
+import org.apache.iceberg.hadoop.HadoopTables;
+import org.apache.iceberg.relocated.com.google.common.collect.ImmutableMap;
+import org.apache.iceberg.relocated.com.google.common.collect.Iterables;
+import org.apache.iceberg.relocated.com.google.common.collect.Lists;
+import org.apache.iceberg.types.Types;
+import org.apache.iceberg.util.SnapshotUtil;
+import org.apache.spark.sql.Dataset;
+import org.apache.spark.sql.Row;
+import org.apache.spark.sql.SparkSession;
+import org.apache.spark.sql.sources.v2.DataSourceOptions;
+import org.junit.AfterClass;
+import org.junit.Assert;
+import org.junit.BeforeClass;
+import org.junit.Rule;
+import org.junit.Test;
+import org.junit.rules.ExpectedException;
+import org.junit.rules.TemporaryFolder;
+
+import static org.apache.iceberg.types.Types.NestedField.optional;
+
+public class TestStructuredStreamingRead {
+  private static final Configuration CONF = new Configuration();
+  private static final Schema SCHEMA = new Schema(
+      optional(1, "id", Types.IntegerType.get()),
+      optional(2, "data", Types.StringType.get())
+  );
+  private static SparkSession spark = null;
+
+  @Rule
+  public TemporaryFolder temp = new TemporaryFolder();
+  @Rule
+  public ExpectedException exceptionRule = ExpectedException.none();
+
+  @BeforeClass
+  public static void startSpark() {
+    TestStructuredStreamingRead.spark = SparkSession.builder()
+        .master("local[2]")
+        .config("spark.sql.shuffle.partitions", 4)
+        .getOrCreate();
+  }
+
+  @AfterClass
+  public static void stopSpark() {
+    SparkSession currentSpark = TestStructuredStreamingRead.spark;
+    TestStructuredStreamingRead.spark = null;
+    currentSpark.stop();
+  }
+
+  @SuppressWarnings("unchecked")
+  @Test
+  public void testGetChanges() throws IOException {
+    File parent = temp.newFolder("test");
+    File location = new File(parent, "table");
+    File checkpoint = new File(parent, "checkpoint");
+    Table table = createTable(location.toString());

Review comment:
       Why isn't table creation done in a `@BeforeClass` method? It seems odd 
to create it every time using a method call like this.

##########
File path: 
spark2/src/test/java/org/apache/iceberg/spark/source/TestStructuredStreamingRead.java
##########
@@ -0,0 +1,388 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.iceberg.spark.source;
+
+import java.io.File;
+import java.io.IOException;
+import java.util.Collections;
+import java.util.List;
+import java.util.Optional;
+import java.util.stream.Collectors;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.iceberg.AssertHelpers;
+import org.apache.iceberg.MicroBatches.MicroBatch;
+import org.apache.iceberg.PartitionSpec;
+import org.apache.iceberg.Schema;
+import org.apache.iceberg.Table;
+import org.apache.iceberg.hadoop.HadoopTables;
+import org.apache.iceberg.relocated.com.google.common.collect.ImmutableMap;
+import org.apache.iceberg.relocated.com.google.common.collect.Iterables;
+import org.apache.iceberg.relocated.com.google.common.collect.Lists;
+import org.apache.iceberg.types.Types;
+import org.apache.iceberg.util.SnapshotUtil;
+import org.apache.spark.sql.Dataset;
+import org.apache.spark.sql.Row;
+import org.apache.spark.sql.SparkSession;
+import org.apache.spark.sql.sources.v2.DataSourceOptions;
+import org.junit.AfterClass;
+import org.junit.Assert;
+import org.junit.BeforeClass;
+import org.junit.Rule;
+import org.junit.Test;
+import org.junit.rules.ExpectedException;
+import org.junit.rules.TemporaryFolder;
+
+import static org.apache.iceberg.types.Types.NestedField.optional;
+
+public class TestStructuredStreamingRead {
+  private static final Configuration CONF = new Configuration();
+  private static final Schema SCHEMA = new Schema(
+      optional(1, "id", Types.IntegerType.get()),
+      optional(2, "data", Types.StringType.get())
+  );
+  private static SparkSession spark = null;
+
+  @Rule
+  public TemporaryFolder temp = new TemporaryFolder();
+  @Rule
+  public ExpectedException exceptionRule = ExpectedException.none();
+
+  @BeforeClass
+  public static void startSpark() {
+    TestStructuredStreamingRead.spark = SparkSession.builder()
+        .master("local[2]")
+        .config("spark.sql.shuffle.partitions", 4)
+        .getOrCreate();
+  }
+
+  @AfterClass
+  public static void stopSpark() {
+    SparkSession currentSpark = TestStructuredStreamingRead.spark;
+    TestStructuredStreamingRead.spark = null;
+    currentSpark.stop();
+  }
+
+  @SuppressWarnings("unchecked")
+  @Test
+  public void testGetChanges() throws IOException {
+    File parent = temp.newFolder("test");
+    File location = new File(parent, "table");
+    File checkpoint = new File(parent, "checkpoint");
+    Table table = createTable(location.toString());
+
+    DataSourceOptions options = new DataSourceOptions(ImmutableMap.of(
+        "path", location.toString(),
+        "checkpointLocation", checkpoint.toString()));
+    IcebergSource source = new IcebergSource();
+
+    StreamingReader streamingReader = (StreamingReader) 
source.createMicroBatchReader(
+        Optional.empty(), checkpoint.toString(), options);
+
+    List<Long> snapshotIds = SnapshotUtil.currentAncestors(table);
+    Collections.reverse(snapshotIds);
+    long initialSnapshotId = snapshotIds.get(0);
+
+    // Getting all appends from initial snapshot.
+    List<MicroBatch> pendingBatches = streamingReader.getChangesWithRateLimit(
+        initialSnapshotId, 0, true, false, Long.MAX_VALUE);
+    Assert.assertEquals(pendingBatches.size(), 4);

Review comment:
       Expected values should be the first argument. Otherwise, failures result 
in confusing error messages because the "expected" value is wrong.

##########
File path: 
spark2/src/main/java/org/apache/iceberg/spark/source/StreamingReader.java
##########
@@ -0,0 +1,288 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.iceberg.spark.source;
+
+import java.util.Collections;
+import java.util.List;
+import java.util.Optional;
+import java.util.stream.Collectors;
+import org.apache.iceberg.CombinedScanTask;
+import org.apache.iceberg.DataOperations;
+import org.apache.iceberg.FileScanTask;
+import org.apache.iceberg.MicroBatches;
+import org.apache.iceberg.MicroBatches.MicroBatch;
+import org.apache.iceberg.Snapshot;
+import org.apache.iceberg.Table;
+import org.apache.iceberg.encryption.EncryptionManager;
+import org.apache.iceberg.io.CloseableIterable;
+import org.apache.iceberg.io.FileIO;
+import 
org.apache.iceberg.relocated.com.google.common.annotations.VisibleForTesting;
+import org.apache.iceberg.relocated.com.google.common.base.Preconditions;
+import org.apache.iceberg.relocated.com.google.common.collect.ImmutableList;
+import org.apache.iceberg.relocated.com.google.common.collect.Lists;
+import org.apache.iceberg.util.Pair;
+import org.apache.iceberg.util.SnapshotUtil;
+import org.apache.iceberg.util.TableScanUtil;
+import org.apache.spark.broadcast.Broadcast;
+import org.apache.spark.sql.sources.v2.DataSourceOptions;
+import org.apache.spark.sql.sources.v2.reader.streaming.MicroBatchReader;
+import org.apache.spark.sql.sources.v2.reader.streaming.Offset;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+/**
+ * A mirco-batch based Spark Structured Streaming reader for Iceberg table. It 
will track the added
+ * files and generate tasks per batch to process newly added files. By default 
it will process
+ * all the newly added files to the current snapshot in each batch, user could 
also set this
+ * configuration "max-files-per-trigger" to control the number of files 
processed per batch.
+ */
+class StreamingReader extends Reader implements MicroBatchReader {
+  private static final Logger LOG = 
LoggerFactory.getLogger(StreamingReader.class);
+
+  private StreamingOffset startOffset;
+  private StreamingOffset endOffset;
+
+  private final Table table;
+  private final long maxSizePerBatch;
+  private final Long startSnapshotId;
+
+  // Used to cache the pending batches for this streaming batch interval.
+  private Pair<StreamingOffset, List<MicroBatch>> cachedPendingBatches;
+
+  StreamingReader(Table table, Broadcast<FileIO> io, 
Broadcast<EncryptionManager> encryptionManager,
+                  boolean caseSensitive, DataSourceOptions options) {
+    super(table, io, encryptionManager, caseSensitive, options);
+
+    this.table = table;
+    this.maxSizePerBatch = 
options.get("max-size-per-batch").map(Long::parseLong).orElse(Long.MAX_VALUE);
+    Preconditions.checkArgument(maxSizePerBatch > 0L,
+        "Option max-size-per-batch '%d' should > 0", maxSizePerBatch);
+
+    this.startSnapshotId = 
options.get("starting-snapshot-id").map(Long::parseLong).orElse(null);
+    if (startSnapshotId != null) {
+      if (!SnapshotUtil.ancestorOf(table, 
table.currentSnapshot().snapshotId(), startSnapshotId)) {
+        throw new IllegalStateException("The option starting-snapshot-id " + 
startSnapshotId +
+            " is not an ancestor of the current snapshot");
+      }
+    }
+  }
+
+  @Override
+  @SuppressWarnings("unchecked")
+  public void setOffsetRange(Optional<Offset> start, Optional<Offset> end) {
+    table.refresh();
+
+    if (start.isPresent() && 
!StreamingOffset.START_OFFSET.equals(start.get())) {
+      this.startOffset = (StreamingOffset) start.get();
+      this.endOffset = (StreamingOffset) 
end.orElse(calculateEndOffset(startOffset));
+    } else {
+      // If starting offset is "START_OFFSET" (there's no snapshot in the last 
batch), or starting
+      // offset is not set, then we need to calculate the starting offset 
again.
+      this.startOffset = calculateStartingOffset();
+      this.endOffset = calculateEndOffset(startOffset);
+    }
+  }
+
+  @Override
+  public Offset getStartOffset() {
+    if (startOffset == null) {
+      throw new IllegalStateException("Start offset is not set");
+    }
+
+    return startOffset;
+  }
+
+  @Override
+  public Offset getEndOffset() {
+    if (endOffset == null) {
+      throw new IllegalStateException("End offset is not set");
+    }
+
+    return endOffset;
+  }
+
+  @Override
+  public Offset deserializeOffset(String json) {
+    return StreamingOffset.fromJson(json);
+  }
+
+  @Override
+  public void commit(Offset end) {
+    // Since all the data and metadata of Iceberg is as it is, nothing needs 
to commit when
+    // offset is processed, so no need to implement this method.
+  }
+
+  @Override
+  public void stop() {}
+
+  @Override
+  public String toString() {
+    return String.format(
+        "IcebergStreamScan(table=%s, type=%s)", table, 
table.schema().asStruct());
+  }
+
+  @Override
+  @SuppressWarnings("unchecked")
+  protected List<CombinedScanTask> tasks() {
+    if (startOffset.equals(endOffset)) {
+      LOG.info("Start offset {} equals to end offset {}, no data to process", 
startOffset, endOffset);
+      return Collections.emptyList();
+    }
+
+    Preconditions.checkState(cachedPendingBatches != null,
+        "pendingBatches is null, which is unexpected as it will be set when 
calculating end offset");
+
+    List<MicroBatch> pendingBatches = cachedPendingBatches.second();
+    if (pendingBatches.isEmpty()) {
+      LOG.info("There's no task to process in this batch");
+      return Collections.emptyList();
+    }
+
+    MicroBatch lastBatch = pendingBatches.get(pendingBatches.size() - 1);
+    if (lastBatch.snapshotId() != endOffset.snapshotId() || 
lastBatch.endFileIndex() != endOffset.index()) {
+      throw new IllegalStateException("The cached pendingBatches doesn't match 
the current end offset " + endOffset);
+    }
+
+    LOG.info("Processing data from {} to {}", startOffset, endOffset);
+    List<FileScanTask> tasks = pendingBatches.stream()
+        .flatMap(batch -> batch.tasks().stream())
+        .collect(Collectors.toList());
+    CloseableIterable<FileScanTask> splitTasks = 
TableScanUtil.splitFiles(CloseableIterable.withNoopClose(tasks),
+        splitSize());
+    return Lists.newArrayList(
+        TableScanUtil.planTasks(splitTasks, splitSize(), splitLookback(), 
splitOpenFileCost()));
+  }
+
+  private StreamingOffset calculateStartingOffset() {
+    StreamingOffset startingOffset;
+    if (startSnapshotId != null) {
+      startingOffset = new StreamingOffset(startSnapshotId, 0, true, false);
+    } else {
+      List<Long> snapshotIds = SnapshotUtil.currentAncestors(table);
+      if (snapshotIds.isEmpty()) {
+        // there's no snapshot currently.
+        startingOffset = StreamingOffset.START_OFFSET;
+      } else {
+        startingOffset = new 
StreamingOffset(snapshotIds.get(snapshotIds.size() - 1), 0, true, false);
+      }
+    }
+
+    return startingOffset;
+  }
+
+  private StreamingOffset calculateEndOffset(StreamingOffset start) {
+    if (start.equals(StreamingOffset.START_OFFSET)) {
+      return StreamingOffset.START_OFFSET;
+    }
+
+    // Spark will invoke setOffsetRange more than once. If this is already 
calulated, use the cached one to avoid
+    // calculating again.
+    if (cachedPendingBatches == null || 
!cachedPendingBatches.first().equals(start)) {
+      this.cachedPendingBatches = Pair.of(start, 
getChangesWithRateLimit(start.snapshotId(), start.index(),
+          start.isStartingSnapshotId(), start.isLastIndexOfSnapshot(), 
maxSizePerBatch));
+    }
+
+    List<MicroBatch> batches = cachedPendingBatches.second();
+    MicroBatch lastBatch = batches.isEmpty() ? null : 
batches.get(batches.size() - 1);
+
+    if (lastBatch == null) {
+      return start;
+    } else {
+      boolean isStarting = lastBatch.snapshotId() == start.snapshotId() && 
start.isStartingSnapshotId();
+      return new StreamingOffset(lastBatch.snapshotId(), 
lastBatch.endFileIndex(), isStarting,
+          lastBatch.lastIndexOfSnapshot());
+    }
+  }
+
+  @VisibleForTesting
+  @SuppressWarnings("checkstyle:CyclomaticComplexity")
+  List<MicroBatch> getChangesWithRateLimit(long snapshotId, int index, boolean 
isStarting,
+                                           boolean isLastIndexOfSnapshot, long 
maxSize) {
+    List<MicroBatch> batches = Lists.newArrayList();
+    long currentLeftSize = maxSize;
+    MicroBatch lastBatch = null;
+
+    if (!isLastIndexOfSnapshot && (isStarting || 
isValidSnapshot(table.snapshot(snapshotId)))) {

Review comment:
       It is confusing that `isStarting` overrides `isValidSnapshot` here. I 
think a better name for `isValidSnapshot`, like `isAppend` would help.

##########
File path: 
spark2/src/main/java/org/apache/iceberg/spark/source/StreamingReader.java
##########
@@ -0,0 +1,288 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.iceberg.spark.source;
+
+import java.util.Collections;
+import java.util.List;
+import java.util.Optional;
+import java.util.stream.Collectors;
+import org.apache.iceberg.CombinedScanTask;
+import org.apache.iceberg.DataOperations;
+import org.apache.iceberg.FileScanTask;
+import org.apache.iceberg.MicroBatches;
+import org.apache.iceberg.MicroBatches.MicroBatch;
+import org.apache.iceberg.Snapshot;
+import org.apache.iceberg.Table;
+import org.apache.iceberg.encryption.EncryptionManager;
+import org.apache.iceberg.io.CloseableIterable;
+import org.apache.iceberg.io.FileIO;
+import 
org.apache.iceberg.relocated.com.google.common.annotations.VisibleForTesting;
+import org.apache.iceberg.relocated.com.google.common.base.Preconditions;
+import org.apache.iceberg.relocated.com.google.common.collect.ImmutableList;
+import org.apache.iceberg.relocated.com.google.common.collect.Lists;
+import org.apache.iceberg.util.Pair;
+import org.apache.iceberg.util.SnapshotUtil;
+import org.apache.iceberg.util.TableScanUtil;
+import org.apache.spark.broadcast.Broadcast;
+import org.apache.spark.sql.sources.v2.DataSourceOptions;
+import org.apache.spark.sql.sources.v2.reader.streaming.MicroBatchReader;
+import org.apache.spark.sql.sources.v2.reader.streaming.Offset;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+/**
+ * A mirco-batch based Spark Structured Streaming reader for Iceberg table. It 
will track the added
+ * files and generate tasks per batch to process newly added files. By default 
it will process
+ * all the newly added files to the current snapshot in each batch, user could 
also set this
+ * configuration "max-files-per-trigger" to control the number of files 
processed per batch.
+ */
+class StreamingReader extends Reader implements MicroBatchReader {
+  private static final Logger LOG = 
LoggerFactory.getLogger(StreamingReader.class);
+
+  private StreamingOffset startOffset;
+  private StreamingOffset endOffset;
+
+  private final Table table;
+  private final long maxSizePerBatch;
+  private final Long startSnapshotId;
+
+  // Used to cache the pending batches for this streaming batch interval.
+  private Pair<StreamingOffset, List<MicroBatch>> cachedPendingBatches;
+
+  StreamingReader(Table table, Broadcast<FileIO> io, 
Broadcast<EncryptionManager> encryptionManager,
+                  boolean caseSensitive, DataSourceOptions options) {
+    super(table, io, encryptionManager, caseSensitive, options);
+
+    this.table = table;
+    this.maxSizePerBatch = 
options.get("max-size-per-batch").map(Long::parseLong).orElse(Long.MAX_VALUE);
+    Preconditions.checkArgument(maxSizePerBatch > 0L,
+        "Option max-size-per-batch '%d' should > 0", maxSizePerBatch);
+
+    this.startSnapshotId = 
options.get("starting-snapshot-id").map(Long::parseLong).orElse(null);
+    if (startSnapshotId != null) {
+      if (!SnapshotUtil.ancestorOf(table, 
table.currentSnapshot().snapshotId(), startSnapshotId)) {
+        throw new IllegalStateException("The option starting-snapshot-id " + 
startSnapshotId +
+            " is not an ancestor of the current snapshot");
+      }
+    }
+  }
+
+  @Override
+  @SuppressWarnings("unchecked")
+  public void setOffsetRange(Optional<Offset> start, Optional<Offset> end) {
+    table.refresh();
+
+    if (start.isPresent() && 
!StreamingOffset.START_OFFSET.equals(start.get())) {
+      this.startOffset = (StreamingOffset) start.get();
+      this.endOffset = (StreamingOffset) 
end.orElse(calculateEndOffset(startOffset));
+    } else {
+      // If starting offset is "START_OFFSET" (there's no snapshot in the last 
batch), or starting
+      // offset is not set, then we need to calculate the starting offset 
again.
+      this.startOffset = calculateStartingOffset();
+      this.endOffset = calculateEndOffset(startOffset);
+    }
+  }
+
+  @Override
+  public Offset getStartOffset() {
+    if (startOffset == null) {
+      throw new IllegalStateException("Start offset is not set");
+    }
+
+    return startOffset;
+  }
+
+  @Override
+  public Offset getEndOffset() {
+    if (endOffset == null) {
+      throw new IllegalStateException("End offset is not set");
+    }
+
+    return endOffset;
+  }
+
+  @Override
+  public Offset deserializeOffset(String json) {
+    return StreamingOffset.fromJson(json);
+  }
+
+  @Override
+  public void commit(Offset end) {
+    // Since all the data and metadata of Iceberg is as it is, nothing needs 
to commit when
+    // offset is processed, so no need to implement this method.
+  }
+
+  @Override
+  public void stop() {}
+
+  @Override
+  public String toString() {
+    return String.format(
+        "IcebergStreamScan(table=%s, type=%s)", table, 
table.schema().asStruct());
+  }
+
+  @Override
+  @SuppressWarnings("unchecked")
+  protected List<CombinedScanTask> tasks() {
+    if (startOffset.equals(endOffset)) {
+      LOG.info("Start offset {} equals to end offset {}, no data to process", 
startOffset, endOffset);
+      return Collections.emptyList();
+    }
+
+    Preconditions.checkState(cachedPendingBatches != null,
+        "pendingBatches is null, which is unexpected as it will be set when 
calculating end offset");
+
+    List<MicroBatch> pendingBatches = cachedPendingBatches.second();
+    if (pendingBatches.isEmpty()) {
+      LOG.info("There's no task to process in this batch");
+      return Collections.emptyList();
+    }
+
+    MicroBatch lastBatch = pendingBatches.get(pendingBatches.size() - 1);
+    if (lastBatch.snapshotId() != endOffset.snapshotId() || 
lastBatch.endFileIndex() != endOffset.index()) {
+      throw new IllegalStateException("The cached pendingBatches doesn't match 
the current end offset " + endOffset);
+    }
+
+    LOG.info("Processing data from {} to {}", startOffset, endOffset);
+    List<FileScanTask> tasks = pendingBatches.stream()
+        .flatMap(batch -> batch.tasks().stream())
+        .collect(Collectors.toList());
+    CloseableIterable<FileScanTask> splitTasks = 
TableScanUtil.splitFiles(CloseableIterable.withNoopClose(tasks),
+        splitSize());
+    return Lists.newArrayList(
+        TableScanUtil.planTasks(splitTasks, splitSize(), splitLookback(), 
splitOpenFileCost()));
+  }
+
+  private StreamingOffset calculateStartingOffset() {
+    StreamingOffset startingOffset;
+    if (startSnapshotId != null) {
+      startingOffset = new StreamingOffset(startSnapshotId, 0, true, false);
+    } else {
+      List<Long> snapshotIds = SnapshotUtil.currentAncestors(table);
+      if (snapshotIds.isEmpty()) {
+        // there's no snapshot currently.
+        startingOffset = StreamingOffset.START_OFFSET;
+      } else {
+        startingOffset = new 
StreamingOffset(snapshotIds.get(snapshotIds.size() - 1), 0, true, false);
+      }
+    }
+
+    return startingOffset;
+  }
+
+  private StreamingOffset calculateEndOffset(StreamingOffset start) {
+    if (start.equals(StreamingOffset.START_OFFSET)) {
+      return StreamingOffset.START_OFFSET;
+    }
+
+    // Spark will invoke setOffsetRange more than once. If this is already 
calulated, use the cached one to avoid
+    // calculating again.
+    if (cachedPendingBatches == null || 
!cachedPendingBatches.first().equals(start)) {
+      this.cachedPendingBatches = Pair.of(start, 
getChangesWithRateLimit(start.snapshotId(), start.index(),
+          start.isStartingSnapshotId(), start.isLastIndexOfSnapshot(), 
maxSizePerBatch));
+    }
+
+    List<MicroBatch> batches = cachedPendingBatches.second();
+    MicroBatch lastBatch = batches.isEmpty() ? null : 
batches.get(batches.size() - 1);
+
+    if (lastBatch == null) {
+      return start;
+    } else {
+      boolean isStarting = lastBatch.snapshotId() == start.snapshotId() && 
start.isStartingSnapshotId();
+      return new StreamingOffset(lastBatch.snapshotId(), 
lastBatch.endFileIndex(), isStarting,
+          lastBatch.lastIndexOfSnapshot());
+    }
+  }
+
+  @VisibleForTesting
+  @SuppressWarnings("checkstyle:CyclomaticComplexity")
+  List<MicroBatch> getChangesWithRateLimit(long snapshotId, int index, boolean 
isStarting,
+                                           boolean isLastIndexOfSnapshot, long 
maxSize) {
+    List<MicroBatch> batches = Lists.newArrayList();
+    long currentLeftSize = maxSize;
+    MicroBatch lastBatch = null;
+
+    if (!isLastIndexOfSnapshot && (isStarting || 
isValidSnapshot(table.snapshot(snapshotId)))) {
+      MicroBatch batch = MicroBatches.from(table.snapshot(snapshotId), 
table.io())
+          .caseSensitive(super.caseSensitive())
+          .specsById(table.specs())
+          .generate(index, currentLeftSize, isStarting);
+
+      batches.add(batch);
+      currentLeftSize -= batch.sizeInBytes();
+      lastBatch = batch;
+    }
+
+    // Current snapshot can already satisfy the size needs.
+    // Or DataFile size cannot satisfy the current needs of max size. For 
example: the request left size is 100, but the
+    // file size is 200, then file cannot add to this batch.
+    if (currentLeftSize <= 0L || (lastBatch != null && 
!lastBatch.lastIndexOfSnapshot())) {
+      return batches;
+    }
+
+    long currentSnapshotId = table.currentSnapshot().snapshotId();
+    if (currentSnapshotId == snapshotId) {
+      // the snapshot of current offset is already the latest snapshot of this 
table.
+      return batches;
+    }

Review comment:
       I don't think that this case is needed. If the two are equal, then 
`SnapshotUtil.snapshotIdsBetween` should return an empty list, in which case 
there are no runs through the loop and the batches are returned.

##########
File path: 
spark2/src/main/java/org/apache/iceberg/spark/source/StreamingReader.java
##########
@@ -0,0 +1,288 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.iceberg.spark.source;
+
+import java.util.Collections;
+import java.util.List;
+import java.util.Optional;
+import java.util.stream.Collectors;
+import org.apache.iceberg.CombinedScanTask;
+import org.apache.iceberg.DataOperations;
+import org.apache.iceberg.FileScanTask;
+import org.apache.iceberg.MicroBatches;
+import org.apache.iceberg.MicroBatches.MicroBatch;
+import org.apache.iceberg.Snapshot;
+import org.apache.iceberg.Table;
+import org.apache.iceberg.encryption.EncryptionManager;
+import org.apache.iceberg.io.CloseableIterable;
+import org.apache.iceberg.io.FileIO;
+import 
org.apache.iceberg.relocated.com.google.common.annotations.VisibleForTesting;
+import org.apache.iceberg.relocated.com.google.common.base.Preconditions;
+import org.apache.iceberg.relocated.com.google.common.collect.ImmutableList;
+import org.apache.iceberg.relocated.com.google.common.collect.Lists;
+import org.apache.iceberg.util.Pair;
+import org.apache.iceberg.util.SnapshotUtil;
+import org.apache.iceberg.util.TableScanUtil;
+import org.apache.spark.broadcast.Broadcast;
+import org.apache.spark.sql.sources.v2.DataSourceOptions;
+import org.apache.spark.sql.sources.v2.reader.streaming.MicroBatchReader;
+import org.apache.spark.sql.sources.v2.reader.streaming.Offset;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+/**
+ * A mirco-batch based Spark Structured Streaming reader for Iceberg table. It 
will track the added
+ * files and generate tasks per batch to process newly added files. By default 
it will process
+ * all the newly added files to the current snapshot in each batch, user could 
also set this
+ * configuration "max-files-per-trigger" to control the number of files 
processed per batch.
+ */
+class StreamingReader extends Reader implements MicroBatchReader {
+  private static final Logger LOG = 
LoggerFactory.getLogger(StreamingReader.class);
+
+  private StreamingOffset startOffset;
+  private StreamingOffset endOffset;
+
+  private final Table table;
+  private final long maxSizePerBatch;
+  private final Long startSnapshotId;
+
+  // Used to cache the pending batches for this streaming batch interval.
+  private Pair<StreamingOffset, List<MicroBatch>> cachedPendingBatches;
+
+  StreamingReader(Table table, Broadcast<FileIO> io, 
Broadcast<EncryptionManager> encryptionManager,
+                  boolean caseSensitive, DataSourceOptions options) {
+    super(table, io, encryptionManager, caseSensitive, options);
+
+    this.table = table;
+    this.maxSizePerBatch = 
options.get("max-size-per-batch").map(Long::parseLong).orElse(Long.MAX_VALUE);
+    Preconditions.checkArgument(maxSizePerBatch > 0L,
+        "Option max-size-per-batch '%d' should > 0", maxSizePerBatch);
+
+    this.startSnapshotId = 
options.get("starting-snapshot-id").map(Long::parseLong).orElse(null);
+    if (startSnapshotId != null) {
+      if (!SnapshotUtil.ancestorOf(table, 
table.currentSnapshot().snapshotId(), startSnapshotId)) {
+        throw new IllegalStateException("The option starting-snapshot-id " + 
startSnapshotId +
+            " is not an ancestor of the current snapshot");
+      }
+    }
+  }
+
+  @Override
+  @SuppressWarnings("unchecked")
+  public void setOffsetRange(Optional<Offset> start, Optional<Offset> end) {
+    table.refresh();
+
+    if (start.isPresent() && 
!StreamingOffset.START_OFFSET.equals(start.get())) {
+      this.startOffset = (StreamingOffset) start.get();
+      this.endOffset = (StreamingOffset) 
end.orElse(calculateEndOffset(startOffset));
+    } else {
+      // If starting offset is "START_OFFSET" (there's no snapshot in the last 
batch), or starting
+      // offset is not set, then we need to calculate the starting offset 
again.
+      this.startOffset = calculateStartingOffset();
+      this.endOffset = calculateEndOffset(startOffset);
+    }
+  }
+
+  @Override
+  public Offset getStartOffset() {
+    if (startOffset == null) {
+      throw new IllegalStateException("Start offset is not set");
+    }
+
+    return startOffset;
+  }
+
+  @Override
+  public Offset getEndOffset() {
+    if (endOffset == null) {
+      throw new IllegalStateException("End offset is not set");
+    }
+
+    return endOffset;
+  }
+
+  @Override
+  public Offset deserializeOffset(String json) {
+    return StreamingOffset.fromJson(json);
+  }
+
+  @Override
+  public void commit(Offset end) {
+    // Since all the data and metadata of Iceberg is as it is, nothing needs 
to commit when
+    // offset is processed, so no need to implement this method.
+  }
+
+  @Override
+  public void stop() {}
+
+  @Override
+  public String toString() {
+    return String.format(
+        "IcebergStreamScan(table=%s, type=%s)", table, 
table.schema().asStruct());
+  }
+
+  @Override
+  @SuppressWarnings("unchecked")
+  protected List<CombinedScanTask> tasks() {
+    if (startOffset.equals(endOffset)) {
+      LOG.info("Start offset {} equals to end offset {}, no data to process", 
startOffset, endOffset);
+      return Collections.emptyList();
+    }
+
+    Preconditions.checkState(cachedPendingBatches != null,
+        "pendingBatches is null, which is unexpected as it will be set when 
calculating end offset");
+
+    List<MicroBatch> pendingBatches = cachedPendingBatches.second();
+    if (pendingBatches.isEmpty()) {
+      LOG.info("There's no task to process in this batch");
+      return Collections.emptyList();
+    }
+
+    MicroBatch lastBatch = pendingBatches.get(pendingBatches.size() - 1);
+    if (lastBatch.snapshotId() != endOffset.snapshotId() || 
lastBatch.endFileIndex() != endOffset.index()) {
+      throw new IllegalStateException("The cached pendingBatches doesn't match 
the current end offset " + endOffset);
+    }
+
+    LOG.info("Processing data from {} to {}", startOffset, endOffset);
+    List<FileScanTask> tasks = pendingBatches.stream()
+        .flatMap(batch -> batch.tasks().stream())
+        .collect(Collectors.toList());
+    CloseableIterable<FileScanTask> splitTasks = 
TableScanUtil.splitFiles(CloseableIterable.withNoopClose(tasks),
+        splitSize());
+    return Lists.newArrayList(
+        TableScanUtil.planTasks(splitTasks, splitSize(), splitLookback(), 
splitOpenFileCost()));
+  }
+
+  private StreamingOffset calculateStartingOffset() {
+    StreamingOffset startingOffset;
+    if (startSnapshotId != null) {
+      startingOffset = new StreamingOffset(startSnapshotId, 0, true, false);
+    } else {
+      List<Long> snapshotIds = SnapshotUtil.currentAncestors(table);
+      if (snapshotIds.isEmpty()) {
+        // there's no snapshot currently.
+        startingOffset = StreamingOffset.START_OFFSET;
+      } else {
+        startingOffset = new 
StreamingOffset(snapshotIds.get(snapshotIds.size() - 1), 0, true, false);
+      }
+    }
+
+    return startingOffset;
+  }
+
+  private StreamingOffset calculateEndOffset(StreamingOffset start) {
+    if (start.equals(StreamingOffset.START_OFFSET)) {
+      return StreamingOffset.START_OFFSET;
+    }
+
+    // Spark will invoke setOffsetRange more than once. If this is already 
calulated, use the cached one to avoid
+    // calculating again.
+    if (cachedPendingBatches == null || 
!cachedPendingBatches.first().equals(start)) {
+      this.cachedPendingBatches = Pair.of(start, 
getChangesWithRateLimit(start.snapshotId(), start.index(),
+          start.isStartingSnapshotId(), start.isLastIndexOfSnapshot(), 
maxSizePerBatch));
+    }
+
+    List<MicroBatch> batches = cachedPendingBatches.second();
+    MicroBatch lastBatch = batches.isEmpty() ? null : 
batches.get(batches.size() - 1);
+
+    if (lastBatch == null) {
+      return start;
+    } else {
+      boolean isStarting = lastBatch.snapshotId() == start.snapshotId() && 
start.isStartingSnapshotId();
+      return new StreamingOffset(lastBatch.snapshotId(), 
lastBatch.endFileIndex(), isStarting,
+          lastBatch.lastIndexOfSnapshot());
+    }
+  }
+
+  @VisibleForTesting
+  @SuppressWarnings("checkstyle:CyclomaticComplexity")

Review comment:
       I don't think that this should suppress the complexity check. I agree 
that this method is a bit long and difficult to read.
   
   It would help to rewrite it so that the loop from the given snapshotId to 
the table's current snapshotId handles the snapshotId passed in as well. That 
would make this much easier to read.

##########
File path: 
spark2/src/main/java/org/apache/iceberg/spark/source/StreamingReader.java
##########
@@ -0,0 +1,288 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.iceberg.spark.source;
+
+import java.util.Collections;
+import java.util.List;
+import java.util.Optional;
+import java.util.stream.Collectors;
+import org.apache.iceberg.CombinedScanTask;
+import org.apache.iceberg.DataOperations;
+import org.apache.iceberg.FileScanTask;
+import org.apache.iceberg.MicroBatches;
+import org.apache.iceberg.MicroBatches.MicroBatch;
+import org.apache.iceberg.Snapshot;
+import org.apache.iceberg.Table;
+import org.apache.iceberg.encryption.EncryptionManager;
+import org.apache.iceberg.io.CloseableIterable;
+import org.apache.iceberg.io.FileIO;
+import 
org.apache.iceberg.relocated.com.google.common.annotations.VisibleForTesting;
+import org.apache.iceberg.relocated.com.google.common.base.Preconditions;
+import org.apache.iceberg.relocated.com.google.common.collect.ImmutableList;
+import org.apache.iceberg.relocated.com.google.common.collect.Lists;
+import org.apache.iceberg.util.Pair;
+import org.apache.iceberg.util.SnapshotUtil;
+import org.apache.iceberg.util.TableScanUtil;
+import org.apache.spark.broadcast.Broadcast;
+import org.apache.spark.sql.sources.v2.DataSourceOptions;
+import org.apache.spark.sql.sources.v2.reader.streaming.MicroBatchReader;
+import org.apache.spark.sql.sources.v2.reader.streaming.Offset;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+/**
+ * A mirco-batch based Spark Structured Streaming reader for Iceberg table. It 
will track the added
+ * files and generate tasks per batch to process newly added files. By default 
it will process
+ * all the newly added files to the current snapshot in each batch, user could 
also set this
+ * configuration "max-files-per-trigger" to control the number of files 
processed per batch.
+ */
+class StreamingReader extends Reader implements MicroBatchReader {
+  private static final Logger LOG = 
LoggerFactory.getLogger(StreamingReader.class);
+
+  private StreamingOffset startOffset;
+  private StreamingOffset endOffset;
+
+  private final Table table;
+  private final long maxSizePerBatch;
+  private final Long startSnapshotId;
+
+  // Used to cache the pending batches for this streaming batch interval.
+  private Pair<StreamingOffset, List<MicroBatch>> cachedPendingBatches;
+
+  StreamingReader(Table table, Broadcast<FileIO> io, 
Broadcast<EncryptionManager> encryptionManager,
+                  boolean caseSensitive, DataSourceOptions options) {
+    super(table, io, encryptionManager, caseSensitive, options);
+
+    this.table = table;
+    this.maxSizePerBatch = 
options.get("max-size-per-batch").map(Long::parseLong).orElse(Long.MAX_VALUE);
+    Preconditions.checkArgument(maxSizePerBatch > 0L,
+        "Option max-size-per-batch '%d' should > 0", maxSizePerBatch);
+
+    this.startSnapshotId = 
options.get("starting-snapshot-id").map(Long::parseLong).orElse(null);
+    if (startSnapshotId != null) {
+      if (!SnapshotUtil.ancestorOf(table, 
table.currentSnapshot().snapshotId(), startSnapshotId)) {
+        throw new IllegalStateException("The option starting-snapshot-id " + 
startSnapshotId +
+            " is not an ancestor of the current snapshot");
+      }
+    }
+  }
+
+  @Override
+  @SuppressWarnings("unchecked")
+  public void setOffsetRange(Optional<Offset> start, Optional<Offset> end) {
+    table.refresh();
+
+    if (start.isPresent() && 
!StreamingOffset.START_OFFSET.equals(start.get())) {
+      this.startOffset = (StreamingOffset) start.get();
+      this.endOffset = (StreamingOffset) 
end.orElse(calculateEndOffset(startOffset));
+    } else {
+      // If starting offset is "START_OFFSET" (there's no snapshot in the last 
batch), or starting
+      // offset is not set, then we need to calculate the starting offset 
again.
+      this.startOffset = calculateStartingOffset();
+      this.endOffset = calculateEndOffset(startOffset);
+    }
+  }
+
+  @Override
+  public Offset getStartOffset() {
+    if (startOffset == null) {
+      throw new IllegalStateException("Start offset is not set");
+    }
+
+    return startOffset;
+  }
+
+  @Override
+  public Offset getEndOffset() {
+    if (endOffset == null) {
+      throw new IllegalStateException("End offset is not set");
+    }
+
+    return endOffset;
+  }
+
+  @Override
+  public Offset deserializeOffset(String json) {
+    return StreamingOffset.fromJson(json);
+  }
+
+  @Override
+  public void commit(Offset end) {
+    // Since all the data and metadata of Iceberg is as it is, nothing needs 
to commit when
+    // offset is processed, so no need to implement this method.
+  }
+
+  @Override
+  public void stop() {}
+
+  @Override
+  public String toString() {
+    return String.format(
+        "IcebergStreamScan(table=%s, type=%s)", table, 
table.schema().asStruct());
+  }
+
+  @Override
+  @SuppressWarnings("unchecked")
+  protected List<CombinedScanTask> tasks() {
+    if (startOffset.equals(endOffset)) {
+      LOG.info("Start offset {} equals to end offset {}, no data to process", 
startOffset, endOffset);
+      return Collections.emptyList();
+    }
+
+    Preconditions.checkState(cachedPendingBatches != null,
+        "pendingBatches is null, which is unexpected as it will be set when 
calculating end offset");
+
+    List<MicroBatch> pendingBatches = cachedPendingBatches.second();
+    if (pendingBatches.isEmpty()) {
+      LOG.info("There's no task to process in this batch");
+      return Collections.emptyList();
+    }
+
+    MicroBatch lastBatch = pendingBatches.get(pendingBatches.size() - 1);
+    if (lastBatch.snapshotId() != endOffset.snapshotId() || 
lastBatch.endFileIndex() != endOffset.index()) {
+      throw new IllegalStateException("The cached pendingBatches doesn't match 
the current end offset " + endOffset);
+    }
+
+    LOG.info("Processing data from {} to {}", startOffset, endOffset);
+    List<FileScanTask> tasks = pendingBatches.stream()
+        .flatMap(batch -> batch.tasks().stream())
+        .collect(Collectors.toList());
+    CloseableIterable<FileScanTask> splitTasks = 
TableScanUtil.splitFiles(CloseableIterable.withNoopClose(tasks),
+        splitSize());
+    return Lists.newArrayList(
+        TableScanUtil.planTasks(splitTasks, splitSize(), splitLookback(), 
splitOpenFileCost()));
+  }
+
+  private StreamingOffset calculateStartingOffset() {
+    StreamingOffset startingOffset;
+    if (startSnapshotId != null) {
+      startingOffset = new StreamingOffset(startSnapshotId, 0, true, false);
+    } else {
+      List<Long> snapshotIds = SnapshotUtil.currentAncestors(table);
+      if (snapshotIds.isEmpty()) {
+        // there's no snapshot currently.
+        startingOffset = StreamingOffset.START_OFFSET;
+      } else {
+        startingOffset = new 
StreamingOffset(snapshotIds.get(snapshotIds.size() - 1), 0, true, false);
+      }
+    }
+
+    return startingOffset;
+  }
+
+  private StreamingOffset calculateEndOffset(StreamingOffset start) {
+    if (start.equals(StreamingOffset.START_OFFSET)) {
+      return StreamingOffset.START_OFFSET;
+    }
+
+    // Spark will invoke setOffsetRange more than once. If this is already 
calulated, use the cached one to avoid
+    // calculating again.
+    if (cachedPendingBatches == null || 
!cachedPendingBatches.first().equals(start)) {
+      this.cachedPendingBatches = Pair.of(start, 
getChangesWithRateLimit(start.snapshotId(), start.index(),
+          start.isStartingSnapshotId(), start.isLastIndexOfSnapshot(), 
maxSizePerBatch));
+    }
+
+    List<MicroBatch> batches = cachedPendingBatches.second();
+    MicroBatch lastBatch = batches.isEmpty() ? null : 
batches.get(batches.size() - 1);
+
+    if (lastBatch == null) {
+      return start;
+    } else {
+      boolean isStarting = lastBatch.snapshotId() == start.snapshotId() && 
start.isStartingSnapshotId();
+      return new StreamingOffset(lastBatch.snapshotId(), 
lastBatch.endFileIndex(), isStarting,
+          lastBatch.lastIndexOfSnapshot());
+    }
+  }
+
+  @VisibleForTesting
+  @SuppressWarnings("checkstyle:CyclomaticComplexity")
+  List<MicroBatch> getChangesWithRateLimit(long snapshotId, int index, boolean 
isStarting,
+                                           boolean isLastIndexOfSnapshot, long 
maxSize) {

Review comment:
       Why pass all of the arguments separately? Couldn't this pass 
`StreamingOffset start, long maxSize`?

##########
File path: 
spark2/src/test/java/org/apache/iceberg/spark/source/TestStructuredStreamingRead.java
##########
@@ -0,0 +1,388 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.iceberg.spark.source;
+
+import java.io.File;
+import java.io.IOException;
+import java.util.Collections;
+import java.util.List;
+import java.util.Optional;
+import java.util.stream.Collectors;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.iceberg.AssertHelpers;
+import org.apache.iceberg.MicroBatches.MicroBatch;
+import org.apache.iceberg.PartitionSpec;
+import org.apache.iceberg.Schema;
+import org.apache.iceberg.Table;
+import org.apache.iceberg.hadoop.HadoopTables;
+import org.apache.iceberg.relocated.com.google.common.collect.ImmutableMap;
+import org.apache.iceberg.relocated.com.google.common.collect.Iterables;
+import org.apache.iceberg.relocated.com.google.common.collect.Lists;
+import org.apache.iceberg.types.Types;
+import org.apache.iceberg.util.SnapshotUtil;
+import org.apache.spark.sql.Dataset;
+import org.apache.spark.sql.Row;
+import org.apache.spark.sql.SparkSession;
+import org.apache.spark.sql.sources.v2.DataSourceOptions;
+import org.junit.AfterClass;
+import org.junit.Assert;
+import org.junit.BeforeClass;
+import org.junit.Rule;
+import org.junit.Test;
+import org.junit.rules.ExpectedException;
+import org.junit.rules.TemporaryFolder;
+
+import static org.apache.iceberg.types.Types.NestedField.optional;
+
+public class TestStructuredStreamingRead {
+  private static final Configuration CONF = new Configuration();
+  private static final Schema SCHEMA = new Schema(
+      optional(1, "id", Types.IntegerType.get()),
+      optional(2, "data", Types.StringType.get())
+  );
+  private static SparkSession spark = null;
+
+  @Rule
+  public TemporaryFolder temp = new TemporaryFolder();
+  @Rule
+  public ExpectedException exceptionRule = ExpectedException.none();
+
+  @BeforeClass
+  public static void startSpark() {
+    TestStructuredStreamingRead.spark = SparkSession.builder()
+        .master("local[2]")
+        .config("spark.sql.shuffle.partitions", 4)
+        .getOrCreate();
+  }
+
+  @AfterClass
+  public static void stopSpark() {
+    SparkSession currentSpark = TestStructuredStreamingRead.spark;
+    TestStructuredStreamingRead.spark = null;
+    currentSpark.stop();
+  }
+
+  @SuppressWarnings("unchecked")
+  @Test
+  public void testGetChanges() throws IOException {
+    File parent = temp.newFolder("test");
+    File location = new File(parent, "table");
+    File checkpoint = new File(parent, "checkpoint");
+    Table table = createTable(location.toString());
+
+    DataSourceOptions options = new DataSourceOptions(ImmutableMap.of(
+        "path", location.toString(),
+        "checkpointLocation", checkpoint.toString()));
+    IcebergSource source = new IcebergSource();
+
+    StreamingReader streamingReader = (StreamingReader) 
source.createMicroBatchReader(
+        Optional.empty(), checkpoint.toString(), options);
+
+    List<Long> snapshotIds = SnapshotUtil.currentAncestors(table);
+    Collections.reverse(snapshotIds);
+    long initialSnapshotId = snapshotIds.get(0);
+
+    // Getting all appends from initial snapshot.
+    List<MicroBatch> pendingBatches = streamingReader.getChangesWithRateLimit(
+        initialSnapshotId, 0, true, false, Long.MAX_VALUE);
+    Assert.assertEquals(pendingBatches.size(), 4);
+
+    List<Long> batchSnapshotIds = pendingBatches.stream()
+        .map(MicroBatch::snapshotId)
+        .collect(Collectors.toList());
+    Assert.assertEquals(batchSnapshotIds, snapshotIds);
+
+    // Getting appends from initial snapshot with index, 1st snapshot will be 
filtered out.
+    List<MicroBatch> pendingBatches1 = streamingReader.getChangesWithRateLimit(
+        initialSnapshotId, 1, true, false, Long.MAX_VALUE);
+
+    Assert.assertEquals(pendingBatches1.size(), 4);
+    MicroBatch batch = pendingBatches1.get(0);
+    Assert.assertEquals(batch.sizeInBytes(), 0L);
+    Assert.assertEquals(batch.endFileIndex(), 1);
+    Assert.assertTrue(Iterables.isEmpty(batch.tasks()));

Review comment:
       Why is there an empty batch? This doesn't seem right to me.

##########
File path: 
spark2/src/test/java/org/apache/iceberg/spark/source/TestStructuredStreamingRead.java
##########
@@ -0,0 +1,388 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.iceberg.spark.source;
+
+import java.io.File;
+import java.io.IOException;
+import java.util.Collections;
+import java.util.List;
+import java.util.Optional;
+import java.util.stream.Collectors;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.iceberg.AssertHelpers;
+import org.apache.iceberg.MicroBatches.MicroBatch;
+import org.apache.iceberg.PartitionSpec;
+import org.apache.iceberg.Schema;
+import org.apache.iceberg.Table;
+import org.apache.iceberg.hadoop.HadoopTables;
+import org.apache.iceberg.relocated.com.google.common.collect.ImmutableMap;
+import org.apache.iceberg.relocated.com.google.common.collect.Iterables;
+import org.apache.iceberg.relocated.com.google.common.collect.Lists;
+import org.apache.iceberg.types.Types;
+import org.apache.iceberg.util.SnapshotUtil;
+import org.apache.spark.sql.Dataset;
+import org.apache.spark.sql.Row;
+import org.apache.spark.sql.SparkSession;
+import org.apache.spark.sql.sources.v2.DataSourceOptions;
+import org.junit.AfterClass;
+import org.junit.Assert;
+import org.junit.BeforeClass;
+import org.junit.Rule;
+import org.junit.Test;
+import org.junit.rules.ExpectedException;
+import org.junit.rules.TemporaryFolder;
+
+import static org.apache.iceberg.types.Types.NestedField.optional;
+
+public class TestStructuredStreamingRead {
+  private static final Configuration CONF = new Configuration();
+  private static final Schema SCHEMA = new Schema(
+      optional(1, "id", Types.IntegerType.get()),
+      optional(2, "data", Types.StringType.get())
+  );
+  private static SparkSession spark = null;
+
+  @Rule
+  public TemporaryFolder temp = new TemporaryFolder();
+  @Rule
+  public ExpectedException exceptionRule = ExpectedException.none();
+
+  @BeforeClass
+  public static void startSpark() {
+    TestStructuredStreamingRead.spark = SparkSession.builder()
+        .master("local[2]")
+        .config("spark.sql.shuffle.partitions", 4)
+        .getOrCreate();
+  }
+
+  @AfterClass
+  public static void stopSpark() {
+    SparkSession currentSpark = TestStructuredStreamingRead.spark;
+    TestStructuredStreamingRead.spark = null;
+    currentSpark.stop();
+  }
+
+  @SuppressWarnings("unchecked")
+  @Test
+  public void testGetChanges() throws IOException {
+    File parent = temp.newFolder("test");
+    File location = new File(parent, "table");
+    File checkpoint = new File(parent, "checkpoint");
+    Table table = createTable(location.toString());
+
+    DataSourceOptions options = new DataSourceOptions(ImmutableMap.of(
+        "path", location.toString(),
+        "checkpointLocation", checkpoint.toString()));
+    IcebergSource source = new IcebergSource();
+
+    StreamingReader streamingReader = (StreamingReader) 
source.createMicroBatchReader(
+        Optional.empty(), checkpoint.toString(), options);
+
+    List<Long> snapshotIds = SnapshotUtil.currentAncestors(table);
+    Collections.reverse(snapshotIds);
+    long initialSnapshotId = snapshotIds.get(0);
+
+    // Getting all appends from initial snapshot.
+    List<MicroBatch> pendingBatches = streamingReader.getChangesWithRateLimit(
+        initialSnapshotId, 0, true, false, Long.MAX_VALUE);
+    Assert.assertEquals(pendingBatches.size(), 4);
+
+    List<Long> batchSnapshotIds = pendingBatches.stream()
+        .map(MicroBatch::snapshotId)
+        .collect(Collectors.toList());
+    Assert.assertEquals(batchSnapshotIds, snapshotIds);
+
+    // Getting appends from initial snapshot with index, 1st snapshot will be 
filtered out.
+    List<MicroBatch> pendingBatches1 = streamingReader.getChangesWithRateLimit(
+        initialSnapshotId, 1, true, false, Long.MAX_VALUE);
+
+    Assert.assertEquals(pendingBatches1.size(), 4);
+    MicroBatch batch = pendingBatches1.get(0);
+    Assert.assertEquals(batch.sizeInBytes(), 0L);
+    Assert.assertEquals(batch.endFileIndex(), 1);
+    Assert.assertTrue(Iterables.isEmpty(batch.tasks()));
+
+    // Getting appends from 2nd snapshot, 1st snapshot should be filtered out.
+    long snapshotId2 = snapshotIds.get(1);
+    List<MicroBatch> pendingBatches2 = streamingReader.getChangesWithRateLimit(
+        snapshotId2, 0, false, false, Long.MAX_VALUE);
+
+    Assert.assertEquals(pendingBatches2.size(), 3);
+    List<Long> batchSnapshotIds1 = pendingBatches2.stream()
+        .map(MicroBatch::snapshotId)
+        .collect(Collectors.toList());
+    Assert.assertEquals(batchSnapshotIds1.indexOf(initialSnapshotId), -1);

Review comment:
       How about `assertFalse(batchSnapshotIds1.contains(initialSnapshotId))`?

##########
File path: 
spark2/src/test/java/org/apache/iceberg/spark/source/TestStructuredStreamingRead.java
##########
@@ -0,0 +1,388 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.iceberg.spark.source;
+
+import java.io.File;
+import java.io.IOException;
+import java.util.Collections;
+import java.util.List;
+import java.util.Optional;
+import java.util.stream.Collectors;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.iceberg.AssertHelpers;
+import org.apache.iceberg.MicroBatches.MicroBatch;
+import org.apache.iceberg.PartitionSpec;
+import org.apache.iceberg.Schema;
+import org.apache.iceberg.Table;
+import org.apache.iceberg.hadoop.HadoopTables;
+import org.apache.iceberg.relocated.com.google.common.collect.ImmutableMap;
+import org.apache.iceberg.relocated.com.google.common.collect.Iterables;
+import org.apache.iceberg.relocated.com.google.common.collect.Lists;
+import org.apache.iceberg.types.Types;
+import org.apache.iceberg.util.SnapshotUtil;
+import org.apache.spark.sql.Dataset;
+import org.apache.spark.sql.Row;
+import org.apache.spark.sql.SparkSession;
+import org.apache.spark.sql.sources.v2.DataSourceOptions;
+import org.junit.AfterClass;
+import org.junit.Assert;
+import org.junit.BeforeClass;
+import org.junit.Rule;
+import org.junit.Test;
+import org.junit.rules.ExpectedException;
+import org.junit.rules.TemporaryFolder;
+
+import static org.apache.iceberg.types.Types.NestedField.optional;
+
+public class TestStructuredStreamingRead {
+  private static final Configuration CONF = new Configuration();
+  private static final Schema SCHEMA = new Schema(
+      optional(1, "id", Types.IntegerType.get()),
+      optional(2, "data", Types.StringType.get())
+  );
+  private static SparkSession spark = null;
+
+  @Rule
+  public TemporaryFolder temp = new TemporaryFolder();
+  @Rule
+  public ExpectedException exceptionRule = ExpectedException.none();
+
+  @BeforeClass
+  public static void startSpark() {
+    TestStructuredStreamingRead.spark = SparkSession.builder()
+        .master("local[2]")
+        .config("spark.sql.shuffle.partitions", 4)
+        .getOrCreate();
+  }
+
+  @AfterClass
+  public static void stopSpark() {
+    SparkSession currentSpark = TestStructuredStreamingRead.spark;
+    TestStructuredStreamingRead.spark = null;
+    currentSpark.stop();
+  }
+
+  @SuppressWarnings("unchecked")
+  @Test
+  public void testGetChanges() throws IOException {

Review comment:
       I think that this should be refactored into separate test cases. This 
currently puts several different cases in the same method. As a result, any 
failure prevents more tests from running. Since these are independent cases 
with different starting conditions, I think they can be easily broken out into 
cases.

##########
File path: 
spark2/src/test/java/org/apache/iceberg/spark/source/TestStructuredStreamingRead.java
##########
@@ -0,0 +1,388 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.iceberg.spark.source;
+
+import java.io.File;
+import java.io.IOException;
+import java.util.Collections;
+import java.util.List;
+import java.util.Optional;
+import java.util.stream.Collectors;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.iceberg.AssertHelpers;
+import org.apache.iceberg.MicroBatches.MicroBatch;
+import org.apache.iceberg.PartitionSpec;
+import org.apache.iceberg.Schema;
+import org.apache.iceberg.Table;
+import org.apache.iceberg.hadoop.HadoopTables;
+import org.apache.iceberg.relocated.com.google.common.collect.ImmutableMap;
+import org.apache.iceberg.relocated.com.google.common.collect.Iterables;
+import org.apache.iceberg.relocated.com.google.common.collect.Lists;
+import org.apache.iceberg.types.Types;
+import org.apache.iceberg.util.SnapshotUtil;
+import org.apache.spark.sql.Dataset;
+import org.apache.spark.sql.Row;
+import org.apache.spark.sql.SparkSession;
+import org.apache.spark.sql.sources.v2.DataSourceOptions;
+import org.junit.AfterClass;
+import org.junit.Assert;
+import org.junit.BeforeClass;
+import org.junit.Rule;
+import org.junit.Test;
+import org.junit.rules.ExpectedException;
+import org.junit.rules.TemporaryFolder;
+
+import static org.apache.iceberg.types.Types.NestedField.optional;
+
+public class TestStructuredStreamingRead {
+  private static final Configuration CONF = new Configuration();
+  private static final Schema SCHEMA = new Schema(
+      optional(1, "id", Types.IntegerType.get()),
+      optional(2, "data", Types.StringType.get())
+  );
+  private static SparkSession spark = null;
+
+  @Rule
+  public TemporaryFolder temp = new TemporaryFolder();
+  @Rule
+  public ExpectedException exceptionRule = ExpectedException.none();
+
+  @BeforeClass
+  public static void startSpark() {
+    TestStructuredStreamingRead.spark = SparkSession.builder()
+        .master("local[2]")
+        .config("spark.sql.shuffle.partitions", 4)
+        .getOrCreate();
+  }
+
+  @AfterClass
+  public static void stopSpark() {
+    SparkSession currentSpark = TestStructuredStreamingRead.spark;
+    TestStructuredStreamingRead.spark = null;
+    currentSpark.stop();
+  }
+
+  @SuppressWarnings("unchecked")
+  @Test
+  public void testGetChanges() throws IOException {
+    File parent = temp.newFolder("test");
+    File location = new File(parent, "table");
+    File checkpoint = new File(parent, "checkpoint");
+    Table table = createTable(location.toString());
+
+    DataSourceOptions options = new DataSourceOptions(ImmutableMap.of(
+        "path", location.toString(),
+        "checkpointLocation", checkpoint.toString()));
+    IcebergSource source = new IcebergSource();
+
+    StreamingReader streamingReader = (StreamingReader) 
source.createMicroBatchReader(
+        Optional.empty(), checkpoint.toString(), options);
+
+    List<Long> snapshotIds = SnapshotUtil.currentAncestors(table);
+    Collections.reverse(snapshotIds);
+    long initialSnapshotId = snapshotIds.get(0);
+
+    // Getting all appends from initial snapshot.
+    List<MicroBatch> pendingBatches = streamingReader.getChangesWithRateLimit(
+        initialSnapshotId, 0, true, false, Long.MAX_VALUE);
+    Assert.assertEquals(pendingBatches.size(), 4);
+
+    List<Long> batchSnapshotIds = pendingBatches.stream()
+        .map(MicroBatch::snapshotId)
+        .collect(Collectors.toList());
+    Assert.assertEquals(batchSnapshotIds, snapshotIds);
+
+    // Getting appends from initial snapshot with index, 1st snapshot will be 
filtered out.
+    List<MicroBatch> pendingBatches1 = streamingReader.getChangesWithRateLimit(
+        initialSnapshotId, 1, true, false, Long.MAX_VALUE);
+
+    Assert.assertEquals(pendingBatches1.size(), 4);
+    MicroBatch batch = pendingBatches1.get(0);
+    Assert.assertEquals(batch.sizeInBytes(), 0L);
+    Assert.assertEquals(batch.endFileIndex(), 1);
+    Assert.assertTrue(Iterables.isEmpty(batch.tasks()));
+
+    // Getting appends from 2nd snapshot, 1st snapshot should be filtered out.
+    long snapshotId2 = snapshotIds.get(1);
+    List<MicroBatch> pendingBatches2 = streamingReader.getChangesWithRateLimit(
+        snapshotId2, 0, false, false, Long.MAX_VALUE);
+
+    Assert.assertEquals(pendingBatches2.size(), 3);
+    List<Long> batchSnapshotIds1 = pendingBatches2.stream()
+        .map(MicroBatch::snapshotId)
+        .collect(Collectors.toList());
+    Assert.assertEquals(batchSnapshotIds1.indexOf(initialSnapshotId), -1);
+
+    // Getting appends from last snapshot with index, should have no task 
included.
+    long lastSnapshotId = snapshotIds.get(3);
+    List<MicroBatch> pendingBatches3 = streamingReader.getChangesWithRateLimit(
+        lastSnapshotId, 1, false, false, Long.MAX_VALUE);
+
+    Assert.assertEquals(pendingBatches3.size(), 1);
+    MicroBatch batch1 = pendingBatches3.get(0);
+    Assert.assertEquals(batch1.sizeInBytes(), 0L);
+    Assert.assertEquals(batch1.endFileIndex(), 1);
+    Assert.assertTrue(Iterables.isEmpty(batch.tasks()));
+  }
+
+  @SuppressWarnings("unchecked")
+  @Test
+  public void testGetChangesWithRateLimit() throws IOException {
+    File parent = temp.newFolder("test");
+    File location = new File(parent, "table");
+    File checkpoint = new File(parent, "checkpoint");
+    Table table = createTable(location.toString());

Review comment:
       I think the MicroBatch tests are easier to understand because they use 
fake files that are all the same size. Could you convert the tests for 
`getChangesWithRateLimit` to do the same thing?

##########
File path: 
spark2/src/test/java/org/apache/iceberg/spark/source/TestStructuredStreamingRead.java
##########
@@ -0,0 +1,388 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.iceberg.spark.source;
+
+import java.io.File;
+import java.io.IOException;
+import java.util.Collections;
+import java.util.List;
+import java.util.Optional;
+import java.util.stream.Collectors;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.iceberg.AssertHelpers;
+import org.apache.iceberg.MicroBatches.MicroBatch;
+import org.apache.iceberg.PartitionSpec;
+import org.apache.iceberg.Schema;
+import org.apache.iceberg.Table;
+import org.apache.iceberg.hadoop.HadoopTables;
+import org.apache.iceberg.relocated.com.google.common.collect.ImmutableMap;
+import org.apache.iceberg.relocated.com.google.common.collect.Iterables;
+import org.apache.iceberg.relocated.com.google.common.collect.Lists;
+import org.apache.iceberg.types.Types;
+import org.apache.iceberg.util.SnapshotUtil;
+import org.apache.spark.sql.Dataset;
+import org.apache.spark.sql.Row;
+import org.apache.spark.sql.SparkSession;
+import org.apache.spark.sql.sources.v2.DataSourceOptions;
+import org.junit.AfterClass;
+import org.junit.Assert;
+import org.junit.BeforeClass;
+import org.junit.Rule;
+import org.junit.Test;
+import org.junit.rules.ExpectedException;
+import org.junit.rules.TemporaryFolder;
+
+import static org.apache.iceberg.types.Types.NestedField.optional;
+
+public class TestStructuredStreamingRead {
+  private static final Configuration CONF = new Configuration();
+  private static final Schema SCHEMA = new Schema(
+      optional(1, "id", Types.IntegerType.get()),
+      optional(2, "data", Types.StringType.get())
+  );
+  private static SparkSession spark = null;
+
+  @Rule
+  public TemporaryFolder temp = new TemporaryFolder();
+  @Rule
+  public ExpectedException exceptionRule = ExpectedException.none();
+
+  @BeforeClass
+  public static void startSpark() {
+    TestStructuredStreamingRead.spark = SparkSession.builder()
+        .master("local[2]")
+        .config("spark.sql.shuffle.partitions", 4)
+        .getOrCreate();
+  }
+
+  @AfterClass
+  public static void stopSpark() {
+    SparkSession currentSpark = TestStructuredStreamingRead.spark;
+    TestStructuredStreamingRead.spark = null;
+    currentSpark.stop();
+  }
+
+  @SuppressWarnings("unchecked")
+  @Test
+  public void testGetChanges() throws IOException {
+    File parent = temp.newFolder("test");
+    File location = new File(parent, "table");
+    File checkpoint = new File(parent, "checkpoint");
+    Table table = createTable(location.toString());
+
+    DataSourceOptions options = new DataSourceOptions(ImmutableMap.of(
+        "path", location.toString(),
+        "checkpointLocation", checkpoint.toString()));
+    IcebergSource source = new IcebergSource();
+
+    StreamingReader streamingReader = (StreamingReader) 
source.createMicroBatchReader(
+        Optional.empty(), checkpoint.toString(), options);
+
+    List<Long> snapshotIds = SnapshotUtil.currentAncestors(table);
+    Collections.reverse(snapshotIds);
+    long initialSnapshotId = snapshotIds.get(0);
+
+    // Getting all appends from initial snapshot.
+    List<MicroBatch> pendingBatches = streamingReader.getChangesWithRateLimit(
+        initialSnapshotId, 0, true, false, Long.MAX_VALUE);
+    Assert.assertEquals(pendingBatches.size(), 4);
+
+    List<Long> batchSnapshotIds = pendingBatches.stream()
+        .map(MicroBatch::snapshotId)
+        .collect(Collectors.toList());
+    Assert.assertEquals(batchSnapshotIds, snapshotIds);
+
+    // Getting appends from initial snapshot with index, 1st snapshot will be 
filtered out.
+    List<MicroBatch> pendingBatches1 = streamingReader.getChangesWithRateLimit(
+        initialSnapshotId, 1, true, false, Long.MAX_VALUE);
+
+    Assert.assertEquals(pendingBatches1.size(), 4);
+    MicroBatch batch = pendingBatches1.get(0);
+    Assert.assertEquals(batch.sizeInBytes(), 0L);
+    Assert.assertEquals(batch.endFileIndex(), 1);
+    Assert.assertTrue(Iterables.isEmpty(batch.tasks()));
+
+    // Getting appends from 2nd snapshot, 1st snapshot should be filtered out.
+    long snapshotId2 = snapshotIds.get(1);
+    List<MicroBatch> pendingBatches2 = streamingReader.getChangesWithRateLimit(
+        snapshotId2, 0, false, false, Long.MAX_VALUE);
+
+    Assert.assertEquals(pendingBatches2.size(), 3);
+    List<Long> batchSnapshotIds1 = pendingBatches2.stream()
+        .map(MicroBatch::snapshotId)
+        .collect(Collectors.toList());
+    Assert.assertEquals(batchSnapshotIds1.indexOf(initialSnapshotId), -1);
+
+    // Getting appends from last snapshot with index, should have no task 
included.
+    long lastSnapshotId = snapshotIds.get(3);
+    List<MicroBatch> pendingBatches3 = streamingReader.getChangesWithRateLimit(
+        lastSnapshotId, 1, false, false, Long.MAX_VALUE);
+
+    Assert.assertEquals(pendingBatches3.size(), 1);
+    MicroBatch batch1 = pendingBatches3.get(0);
+    Assert.assertEquals(batch1.sizeInBytes(), 0L);
+    Assert.assertEquals(batch1.endFileIndex(), 1);
+    Assert.assertTrue(Iterables.isEmpty(batch.tasks()));
+  }
+
+  @SuppressWarnings("unchecked")
+  @Test
+  public void testGetChangesWithRateLimit() throws IOException {
+    File parent = temp.newFolder("test");
+    File location = new File(parent, "table");
+    File checkpoint = new File(parent, "checkpoint");
+    Table table = createTable(location.toString());
+
+    IcebergSource source = new IcebergSource();
+    List<Long> snapshotIds = SnapshotUtil.currentAncestors(table);
+    Collections.reverse(snapshotIds);
+    long initialSnapshotId = snapshotIds.get(0);
+
+    DataSourceOptions options = new DataSourceOptions(ImmutableMap.of(
+        "path", location.toString(),
+        "checkpointLocation", checkpoint.toString()));
+    StreamingReader streamingReader = (StreamingReader) 
source.createMicroBatchReader(
+        Optional.empty(), checkpoint.toString(), options);

Review comment:
       This setup isn't really necessary to test `getChangesWithRateLimit`. If 
the table were passed into that method, then it could be a static method and 
tested independently in its own suite. I think that would be better.

##########
File path: 
spark2/src/test/java/org/apache/iceberg/spark/source/TestStructuredStreamingRead.java
##########
@@ -0,0 +1,388 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.iceberg.spark.source;
+
+import java.io.File;
+import java.io.IOException;
+import java.util.Collections;
+import java.util.List;
+import java.util.Optional;
+import java.util.stream.Collectors;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.iceberg.AssertHelpers;
+import org.apache.iceberg.MicroBatches.MicroBatch;
+import org.apache.iceberg.PartitionSpec;
+import org.apache.iceberg.Schema;
+import org.apache.iceberg.Table;
+import org.apache.iceberg.hadoop.HadoopTables;
+import org.apache.iceberg.relocated.com.google.common.collect.ImmutableMap;
+import org.apache.iceberg.relocated.com.google.common.collect.Iterables;
+import org.apache.iceberg.relocated.com.google.common.collect.Lists;
+import org.apache.iceberg.types.Types;
+import org.apache.iceberg.util.SnapshotUtil;
+import org.apache.spark.sql.Dataset;
+import org.apache.spark.sql.Row;
+import org.apache.spark.sql.SparkSession;
+import org.apache.spark.sql.sources.v2.DataSourceOptions;
+import org.junit.AfterClass;
+import org.junit.Assert;
+import org.junit.BeforeClass;
+import org.junit.Rule;
+import org.junit.Test;
+import org.junit.rules.ExpectedException;
+import org.junit.rules.TemporaryFolder;
+
+import static org.apache.iceberg.types.Types.NestedField.optional;
+
+public class TestStructuredStreamingRead {
+  private static final Configuration CONF = new Configuration();
+  private static final Schema SCHEMA = new Schema(
+      optional(1, "id", Types.IntegerType.get()),
+      optional(2, "data", Types.StringType.get())
+  );
+  private static SparkSession spark = null;
+
+  @Rule
+  public TemporaryFolder temp = new TemporaryFolder();
+  @Rule
+  public ExpectedException exceptionRule = ExpectedException.none();
+
+  @BeforeClass
+  public static void startSpark() {
+    TestStructuredStreamingRead.spark = SparkSession.builder()
+        .master("local[2]")
+        .config("spark.sql.shuffle.partitions", 4)
+        .getOrCreate();
+  }
+
+  @AfterClass
+  public static void stopSpark() {
+    SparkSession currentSpark = TestStructuredStreamingRead.spark;
+    TestStructuredStreamingRead.spark = null;
+    currentSpark.stop();
+  }
+
+  @SuppressWarnings("unchecked")
+  @Test
+  public void testGetChanges() throws IOException {
+    File parent = temp.newFolder("test");
+    File location = new File(parent, "table");
+    File checkpoint = new File(parent, "checkpoint");
+    Table table = createTable(location.toString());
+
+    DataSourceOptions options = new DataSourceOptions(ImmutableMap.of(
+        "path", location.toString(),
+        "checkpointLocation", checkpoint.toString()));
+    IcebergSource source = new IcebergSource();
+
+    StreamingReader streamingReader = (StreamingReader) 
source.createMicroBatchReader(
+        Optional.empty(), checkpoint.toString(), options);
+
+    List<Long> snapshotIds = SnapshotUtil.currentAncestors(table);
+    Collections.reverse(snapshotIds);
+    long initialSnapshotId = snapshotIds.get(0);
+
+    // Getting all appends from initial snapshot.
+    List<MicroBatch> pendingBatches = streamingReader.getChangesWithRateLimit(
+        initialSnapshotId, 0, true, false, Long.MAX_VALUE);
+    Assert.assertEquals(pendingBatches.size(), 4);
+
+    List<Long> batchSnapshotIds = pendingBatches.stream()
+        .map(MicroBatch::snapshotId)
+        .collect(Collectors.toList());
+    Assert.assertEquals(batchSnapshotIds, snapshotIds);
+
+    // Getting appends from initial snapshot with index, 1st snapshot will be 
filtered out.
+    List<MicroBatch> pendingBatches1 = streamingReader.getChangesWithRateLimit(
+        initialSnapshotId, 1, true, false, Long.MAX_VALUE);
+
+    Assert.assertEquals(pendingBatches1.size(), 4);
+    MicroBatch batch = pendingBatches1.get(0);
+    Assert.assertEquals(batch.sizeInBytes(), 0L);
+    Assert.assertEquals(batch.endFileIndex(), 1);
+    Assert.assertTrue(Iterables.isEmpty(batch.tasks()));
+
+    // Getting appends from 2nd snapshot, 1st snapshot should be filtered out.
+    long snapshotId2 = snapshotIds.get(1);
+    List<MicroBatch> pendingBatches2 = streamingReader.getChangesWithRateLimit(
+        snapshotId2, 0, false, false, Long.MAX_VALUE);
+
+    Assert.assertEquals(pendingBatches2.size(), 3);
+    List<Long> batchSnapshotIds1 = pendingBatches2.stream()
+        .map(MicroBatch::snapshotId)
+        .collect(Collectors.toList());
+    Assert.assertEquals(batchSnapshotIds1.indexOf(initialSnapshotId), -1);
+
+    // Getting appends from last snapshot with index, should have no task 
included.
+    long lastSnapshotId = snapshotIds.get(3);
+    List<MicroBatch> pendingBatches3 = streamingReader.getChangesWithRateLimit(
+        lastSnapshotId, 1, false, false, Long.MAX_VALUE);
+
+    Assert.assertEquals(pendingBatches3.size(), 1);
+    MicroBatch batch1 = pendingBatches3.get(0);
+    Assert.assertEquals(batch1.sizeInBytes(), 0L);
+    Assert.assertEquals(batch1.endFileIndex(), 1);
+    Assert.assertTrue(Iterables.isEmpty(batch.tasks()));
+  }
+
+  @SuppressWarnings("unchecked")
+  @Test
+  public void testGetChangesWithRateLimit() throws IOException {
+    File parent = temp.newFolder("test");
+    File location = new File(parent, "table");
+    File checkpoint = new File(parent, "checkpoint");
+    Table table = createTable(location.toString());
+
+    IcebergSource source = new IcebergSource();
+    List<Long> snapshotIds = SnapshotUtil.currentAncestors(table);
+    Collections.reverse(snapshotIds);
+    long initialSnapshotId = snapshotIds.get(0);
+
+    DataSourceOptions options = new DataSourceOptions(ImmutableMap.of(
+        "path", location.toString(),
+        "checkpointLocation", checkpoint.toString()));
+    StreamingReader streamingReader = (StreamingReader) 
source.createMicroBatchReader(
+        Optional.empty(), checkpoint.toString(), options);
+
+    // the size of each data file is around 600 bytes.
+    // max size set to 1000
+    List<MicroBatch> rateLimitedBatches = 
streamingReader.getChangesWithRateLimit(
+        initialSnapshotId, 0, true, false, 1000);
+
+    Assert.assertEquals(rateLimitedBatches.size(), 2);
+    MicroBatch batch = rateLimitedBatches.get(0);
+    Assert.assertEquals(batch.endFileIndex(), 1);
+    Assert.assertTrue(batch.lastIndexOfSnapshot());
+    Assert.assertTrue(batch.tasks().size() == 1);
+    Assert.assertTrue(batch.sizeInBytes() < 1000 && batch.sizeInBytes() > 0);
+
+    MicroBatch batch1 = rateLimitedBatches.get(1);
+    Assert.assertEquals(batch1.endFileIndex(), 1);
+    Assert.assertTrue(batch1.lastIndexOfSnapshot());
+    Assert.assertTrue(batch1.tasks().size() == 1);
+    Assert.assertTrue(batch1.sizeInBytes() < 1000 && batch1.sizeInBytes() > 0);
+
+    // max size less than file size
+    List<MicroBatch> rateLimitedBatches1 = 
streamingReader.getChangesWithRateLimit(
+        batch1.snapshotId(), batch1.endFileIndex(), false, 
batch1.lastIndexOfSnapshot(), 100);
+
+    Assert.assertEquals(rateLimitedBatches1.size(), 1);
+    MicroBatch batch2 = rateLimitedBatches1.get(0);
+    Assert.assertEquals(batch2.endFileIndex(), 1);
+    Assert.assertTrue(batch2.lastIndexOfSnapshot());
+    Assert.assertTrue(batch2.tasks().size() == 1);
+    Assert.assertTrue(batch2.sizeInBytes() < 1000 && batch2.sizeInBytes() > 0);
+
+    // max size set to 10000
+    List<MicroBatch> rateLimitedBatches2 = 
streamingReader.getChangesWithRateLimit(
+        batch2.snapshotId(), batch2.endFileIndex(), false, 
batch2.lastIndexOfSnapshot(), 10000);
+
+    Assert.assertEquals(rateLimitedBatches2.size(), 1);
+    MicroBatch batch3 = rateLimitedBatches2.get(0);
+    Assert.assertEquals(batch3.endFileIndex(), 1);
+    Assert.assertTrue(batch3.tasks().size() == 1);
+    Assert.assertTrue(batch3.lastIndexOfSnapshot());
+    Assert.assertTrue(batch3.sizeInBytes() < 1000 && batch3.sizeInBytes() > 0);
+  }
+
+  @SuppressWarnings("unchecked")
+  @Test
+  public void testGetOffset() throws IOException {
+    File parent = temp.newFolder("test");
+    File location = new File(parent, "table");
+    File checkpoint = new File(parent, "checkpoint");
+    Table table = createTable(location.toString());
+
+    IcebergSource source = new IcebergSource();
+    List<Long> snapshotIds = SnapshotUtil.currentAncestors(table);
+    Collections.reverse(snapshotIds);
+
+    // default max size per batch
+    DataSourceOptions options = new DataSourceOptions(ImmutableMap.of(
+        "path", location.toString(),
+        "checkpointLocation", checkpoint.toString()));
+    StreamingReader streamingReader = (StreamingReader) 
source.createMicroBatchReader(
+        Optional.empty(), checkpoint.toString(), options);
+    streamingReader.setOffsetRange(Optional.empty(), Optional.empty());
+
+    StreamingOffset start = (StreamingOffset) streamingReader.getStartOffset();
+    Assert.assertEquals(start.snapshotId(), snapshotIds.get(0).longValue());
+    Assert.assertEquals(start.index(), 0);
+    Assert.assertTrue(start.isStartingSnapshotId());
+    Assert.assertFalse(start.isLastIndexOfSnapshot());
+
+    StreamingOffset end = (StreamingOffset) streamingReader.getEndOffset();
+    Assert.assertEquals(end.snapshotId(), snapshotIds.get(3).longValue());
+    Assert.assertEquals(end.index(), 1);
+    Assert.assertFalse(end.isStartingSnapshotId());
+    Assert.assertTrue(end.isLastIndexOfSnapshot());
+
+    streamingReader.setOffsetRange(Optional.of(end), Optional.empty());
+    StreamingOffset end1 = (StreamingOffset) streamingReader.getEndOffset();
+    Assert.assertEquals(end, end1);
+
+    // max size to 1000
+    DataSourceOptions options1 = new DataSourceOptions(ImmutableMap.of(
+        "path", location.toString(),
+        "checkpointLocation", checkpoint.toString(),
+        "max-size-per-batch", "1000"));
+    StreamingReader streamingReader1 = (StreamingReader) 
source.createMicroBatchReader(
+        Optional.empty(), checkpoint.toString(), options1);
+
+    streamingReader1.setOffsetRange(Optional.empty(), Optional.empty());
+    StreamingOffset start1 = (StreamingOffset) 
streamingReader1.getStartOffset();
+    Assert.assertEquals(start1.snapshotId(), snapshotIds.get(0).longValue());
+    Assert.assertEquals(start1.index(), 0);
+    Assert.assertTrue(start1.isStartingSnapshotId());
+    Assert.assertFalse(start1.isLastIndexOfSnapshot());
+
+    StreamingOffset end2 = (StreamingOffset) streamingReader1.getEndOffset();
+    Assert.assertEquals(end2.snapshotId(), snapshotIds.get(1).longValue());
+    Assert.assertEquals(end2.index(), 1);
+    Assert.assertFalse(end2.isStartingSnapshotId());
+    Assert.assertTrue(end2.isLastIndexOfSnapshot());
+
+    streamingReader1.setOffsetRange(Optional.of(end2), Optional.empty());
+    StreamingOffset end3 = (StreamingOffset) streamingReader1.getEndOffset();
+    Assert.assertEquals(end3.snapshotId(), snapshotIds.get(3).longValue());
+    Assert.assertEquals(end3.index(), 1);
+    Assert.assertFalse(end3.isStartingSnapshotId());
+    Assert.assertTrue(end3.isLastIndexOfSnapshot());
+
+    streamingReader1.setOffsetRange(Optional.of(end3), Optional.empty());
+    StreamingOffset end4 = (StreamingOffset) streamingReader1.getEndOffset();
+    Assert.assertEquals(end3, end4);
+
+    // max size to 100
+    DataSourceOptions options2 = new DataSourceOptions(ImmutableMap.of(
+        "path", location.toString(),
+        "checkpointLocation", checkpoint.toString(),
+        "max-size-per-batch", "100"));
+    StreamingReader streamingReader2 = (StreamingReader) 
source.createMicroBatchReader(
+        Optional.empty(), checkpoint.toString(), options2);
+
+    streamingReader2.setOffsetRange(Optional.empty(), Optional.empty());
+    StreamingOffset start2 = (StreamingOffset) 
streamingReader2.getStartOffset();
+    Assert.assertEquals(start2.snapshotId(), snapshotIds.get(0).longValue());
+    Assert.assertEquals(start2.index(), 0);
+    Assert.assertTrue(start2.isStartingSnapshotId());
+    Assert.assertFalse(start2.isLastIndexOfSnapshot());
+
+    StreamingOffset end6 = (StreamingOffset) streamingReader2.getEndOffset();
+    Assert.assertEquals(end6.snapshotId(), snapshotIds.get(0).longValue());
+    Assert.assertEquals(end6.index(), 1);
+    Assert.assertTrue(end6.isStartingSnapshotId());
+    Assert.assertTrue(end6.isLastIndexOfSnapshot());
+
+    streamingReader2.setOffsetRange(Optional.of(end6), Optional.empty());
+    StreamingOffset end7 = (StreamingOffset) streamingReader2.getEndOffset();
+    Assert.assertEquals(end7.snapshotId(), snapshotIds.get(1).longValue());
+    Assert.assertEquals(end7.index(), 1);
+    Assert.assertFalse(end7.isStartingSnapshotId());
+    Assert.assertTrue(end7.isLastIndexOfSnapshot());
+  }
+
+  @SuppressWarnings("unchecked")
+  @Test
+  public void testWithSnapshotId() throws IOException {
+    File parent = temp.newFolder("test");
+    File location = new File(parent, "table");
+    File checkpoint = new File(parent, "checkpoint");
+    Table table = createTable(location.toString());
+
+    IcebergSource source = new IcebergSource();
+    List<Long> snapshotIds = SnapshotUtil.currentAncestors(table);
+    Collections.reverse(snapshotIds);
+
+    // test invalid snapshot id
+    DataSourceOptions options = new DataSourceOptions(ImmutableMap.of(
+        "path", location.toString(),
+        "checkpointLocation", checkpoint.toString(),
+        "starting-snapshot-id", "-1"));
+    AssertHelpers.assertThrows("Test invalid snapshot id",
+        IllegalStateException.class, "The option starting-snapshot-id -1 is 
not an ancestor",
+        () -> source.createMicroBatchReader(Optional.empty(), 
checkpoint.toString(), options));
+
+    // test specify snapshot-id
+    DataSourceOptions options1 = new DataSourceOptions(ImmutableMap.of(
+        "path", location.toString(),
+        "checkpointLocation", checkpoint.toString(),
+        "starting-snapshot-id", snapshotIds.get(1).toString(),
+        "max-size-per-batch", "1000"));
+    StreamingReader streamingReader = (StreamingReader) 
source.createMicroBatchReader(
+        Optional.empty(), checkpoint.toString(), options1);
+
+    streamingReader.setOffsetRange(Optional.empty(), Optional.empty());
+    StreamingOffset start = (StreamingOffset) streamingReader.getStartOffset();
+    Assert.assertEquals(start.snapshotId(), snapshotIds.get(1).longValue());
+    Assert.assertEquals(start.index(), 0);
+    Assert.assertTrue(start.isStartingSnapshotId());
+    Assert.assertFalse(start.isLastIndexOfSnapshot());
+
+    StreamingOffset end = (StreamingOffset) streamingReader.getEndOffset();
+    Assert.assertEquals(end.snapshotId(), snapshotIds.get(1).longValue());
+    Assert.assertEquals(end.index(), 1);
+    Assert.assertTrue(end.isStartingSnapshotId());
+    Assert.assertFalse(end.isLastIndexOfSnapshot());
+
+    streamingReader.setOffsetRange(Optional.of(end), Optional.empty());
+    StreamingOffset end1 = (StreamingOffset) streamingReader.getEndOffset();
+    Assert.assertEquals(end1.snapshotId(), snapshotIds.get(2).longValue());
+    Assert.assertEquals(end1.index(), 1);
+    Assert.assertFalse(end1.isStartingSnapshotId());
+    Assert.assertTrue(end1.isLastIndexOfSnapshot());
+
+    streamingReader.setOffsetRange(Optional.of(end1), Optional.empty());
+    StreamingOffset end2 = (StreamingOffset) streamingReader.getEndOffset();
+    Assert.assertEquals(end2.snapshotId(), snapshotIds.get(3).longValue());
+    Assert.assertEquals(end2.index(), 1);
+    Assert.assertFalse(end2.isStartingSnapshotId());
+    Assert.assertTrue(end2.isLastIndexOfSnapshot());
+
+    streamingReader.setOffsetRange(Optional.of(end2), Optional.empty());
+    StreamingOffset end3 = (StreamingOffset) streamingReader.getEndOffset();
+    Assert.assertEquals(end3, end2);
+  }
+
+  private Table createTable(String location) {

Review comment:
       I think this should be in a `@Before` or `@BeforeClass` method. Also, no 
need for real files for the `getChanges` tests.

##########
File path: 
spark2/src/main/java/org/apache/iceberg/spark/source/StreamingReader.java
##########
@@ -0,0 +1,288 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.iceberg.spark.source;
+
+import java.util.Collections;
+import java.util.List;
+import java.util.Optional;
+import java.util.stream.Collectors;
+import org.apache.iceberg.CombinedScanTask;
+import org.apache.iceberg.DataOperations;
+import org.apache.iceberg.FileScanTask;
+import org.apache.iceberg.MicroBatches;
+import org.apache.iceberg.MicroBatches.MicroBatch;
+import org.apache.iceberg.Snapshot;
+import org.apache.iceberg.Table;
+import org.apache.iceberg.encryption.EncryptionManager;
+import org.apache.iceberg.io.CloseableIterable;
+import org.apache.iceberg.io.FileIO;
+import 
org.apache.iceberg.relocated.com.google.common.annotations.VisibleForTesting;
+import org.apache.iceberg.relocated.com.google.common.base.Preconditions;
+import org.apache.iceberg.relocated.com.google.common.collect.ImmutableList;
+import org.apache.iceberg.relocated.com.google.common.collect.Lists;
+import org.apache.iceberg.util.Pair;
+import org.apache.iceberg.util.SnapshotUtil;
+import org.apache.iceberg.util.TableScanUtil;
+import org.apache.spark.broadcast.Broadcast;
+import org.apache.spark.sql.sources.v2.DataSourceOptions;
+import org.apache.spark.sql.sources.v2.reader.streaming.MicroBatchReader;
+import org.apache.spark.sql.sources.v2.reader.streaming.Offset;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+/**
+ * A mirco-batch based Spark Structured Streaming reader for Iceberg table. It 
will track the added
+ * files and generate tasks per batch to process newly added files. By default 
it will process
+ * all the newly added files to the current snapshot in each batch, user could 
also set this
+ * configuration "max-files-per-trigger" to control the number of files 
processed per batch.
+ */
+class StreamingReader extends Reader implements MicroBatchReader {
+  private static final Logger LOG = 
LoggerFactory.getLogger(StreamingReader.class);
+
+  private StreamingOffset startOffset;
+  private StreamingOffset endOffset;
+
+  private final Table table;
+  private final long maxSizePerBatch;
+  private final Long startSnapshotId;
+
+  // Used to cache the pending batches for this streaming batch interval.
+  private Pair<StreamingOffset, List<MicroBatch>> cachedPendingBatches;
+
+  StreamingReader(Table table, Broadcast<FileIO> io, 
Broadcast<EncryptionManager> encryptionManager,
+                  boolean caseSensitive, DataSourceOptions options) {
+    super(table, io, encryptionManager, caseSensitive, options);
+
+    this.table = table;
+    this.maxSizePerBatch = 
options.get("max-size-per-batch").map(Long::parseLong).orElse(Long.MAX_VALUE);
+    Preconditions.checkArgument(maxSizePerBatch > 0L,
+        "Option max-size-per-batch '%d' should > 0", maxSizePerBatch);
+
+    this.startSnapshotId = 
options.get("starting-snapshot-id").map(Long::parseLong).orElse(null);
+    if (startSnapshotId != null) {
+      if (!SnapshotUtil.ancestorOf(table, 
table.currentSnapshot().snapshotId(), startSnapshotId)) {
+        throw new IllegalStateException("The option starting-snapshot-id " + 
startSnapshotId +
+            " is not an ancestor of the current snapshot");
+      }
+    }
+  }
+
+  @Override
+  @SuppressWarnings("unchecked")
+  public void setOffsetRange(Optional<Offset> start, Optional<Offset> end) {
+    table.refresh();
+
+    if (start.isPresent() && 
!StreamingOffset.START_OFFSET.equals(start.get())) {
+      this.startOffset = (StreamingOffset) start.get();
+      this.endOffset = (StreamingOffset) 
end.orElse(calculateEndOffset(startOffset));
+    } else {
+      // If starting offset is "START_OFFSET" (there's no snapshot in the last 
batch), or starting
+      // offset is not set, then we need to calculate the starting offset 
again.
+      this.startOffset = calculateStartingOffset();
+      this.endOffset = calculateEndOffset(startOffset);
+    }
+  }
+
+  @Override
+  public Offset getStartOffset() {
+    if (startOffset == null) {
+      throw new IllegalStateException("Start offset is not set");
+    }
+
+    return startOffset;
+  }
+
+  @Override
+  public Offset getEndOffset() {
+    if (endOffset == null) {
+      throw new IllegalStateException("End offset is not set");
+    }
+
+    return endOffset;
+  }
+
+  @Override
+  public Offset deserializeOffset(String json) {
+    return StreamingOffset.fromJson(json);
+  }
+
+  @Override
+  public void commit(Offset end) {
+    // Since all the data and metadata of Iceberg is as it is, nothing needs 
to commit when
+    // offset is processed, so no need to implement this method.
+  }
+
+  @Override
+  public void stop() {}
+
+  @Override
+  public String toString() {
+    return String.format(
+        "IcebergStreamScan(table=%s, type=%s)", table, 
table.schema().asStruct());
+  }
+
+  @Override
+  @SuppressWarnings("unchecked")
+  protected List<CombinedScanTask> tasks() {
+    if (startOffset.equals(endOffset)) {
+      LOG.info("Start offset {} equals to end offset {}, no data to process", 
startOffset, endOffset);
+      return Collections.emptyList();
+    }
+
+    Preconditions.checkState(cachedPendingBatches != null,
+        "pendingBatches is null, which is unexpected as it will be set when 
calculating end offset");
+
+    List<MicroBatch> pendingBatches = cachedPendingBatches.second();
+    if (pendingBatches.isEmpty()) {
+      LOG.info("There's no task to process in this batch");

Review comment:
       Should this also log the start and end offset? I think the case where 
this would happen is when the table has had a few rewrite commits, but no new 
data. So the end offset is updated, but there is no work to do.

##########
File path: 
spark2/src/main/java/org/apache/iceberg/spark/source/StreamingReader.java
##########
@@ -0,0 +1,288 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.iceberg.spark.source;
+
+import java.util.Collections;
+import java.util.List;
+import java.util.Optional;
+import java.util.stream.Collectors;
+import org.apache.iceberg.CombinedScanTask;
+import org.apache.iceberg.DataOperations;
+import org.apache.iceberg.FileScanTask;
+import org.apache.iceberg.MicroBatches;
+import org.apache.iceberg.MicroBatches.MicroBatch;
+import org.apache.iceberg.Snapshot;
+import org.apache.iceberg.Table;
+import org.apache.iceberg.encryption.EncryptionManager;
+import org.apache.iceberg.io.CloseableIterable;
+import org.apache.iceberg.io.FileIO;
+import 
org.apache.iceberg.relocated.com.google.common.annotations.VisibleForTesting;
+import org.apache.iceberg.relocated.com.google.common.base.Preconditions;
+import org.apache.iceberg.relocated.com.google.common.collect.ImmutableList;
+import org.apache.iceberg.relocated.com.google.common.collect.Lists;
+import org.apache.iceberg.util.Pair;
+import org.apache.iceberg.util.SnapshotUtil;
+import org.apache.iceberg.util.TableScanUtil;
+import org.apache.spark.broadcast.Broadcast;
+import org.apache.spark.sql.sources.v2.DataSourceOptions;
+import org.apache.spark.sql.sources.v2.reader.streaming.MicroBatchReader;
+import org.apache.spark.sql.sources.v2.reader.streaming.Offset;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+/**
+ * A mirco-batch based Spark Structured Streaming reader for Iceberg table. It 
will track the added
+ * files and generate tasks per batch to process newly added files. By default 
it will process
+ * all the newly added files to the current snapshot in each batch, user could 
also set this
+ * configuration "max-files-per-trigger" to control the number of files 
processed per batch.
+ */
+class StreamingReader extends Reader implements MicroBatchReader {
+  private static final Logger LOG = 
LoggerFactory.getLogger(StreamingReader.class);
+
+  private StreamingOffset startOffset;
+  private StreamingOffset endOffset;
+
+  private final Table table;
+  private final long maxSizePerBatch;
+  private final Long startSnapshotId;
+
+  // Used to cache the pending batches for this streaming batch interval.
+  private Pair<StreamingOffset, List<MicroBatch>> cachedPendingBatches;
+
+  StreamingReader(Table table, Broadcast<FileIO> io, 
Broadcast<EncryptionManager> encryptionManager,
+                  boolean caseSensitive, DataSourceOptions options) {
+    super(table, io, encryptionManager, caseSensitive, options);
+
+    this.table = table;
+    this.maxSizePerBatch = 
options.get("max-size-per-batch").map(Long::parseLong).orElse(Long.MAX_VALUE);
+    Preconditions.checkArgument(maxSizePerBatch > 0L,
+        "Option max-size-per-batch '%d' should > 0", maxSizePerBatch);
+
+    this.startSnapshotId = 
options.get("starting-snapshot-id").map(Long::parseLong).orElse(null);
+    if (startSnapshotId != null) {
+      if (!SnapshotUtil.ancestorOf(table, 
table.currentSnapshot().snapshotId(), startSnapshotId)) {
+        throw new IllegalStateException("The option starting-snapshot-id " + 
startSnapshotId +
+            " is not an ancestor of the current snapshot");
+      }
+    }
+  }
+
+  @Override
+  @SuppressWarnings("unchecked")
+  public void setOffsetRange(Optional<Offset> start, Optional<Offset> end) {
+    table.refresh();
+
+    if (start.isPresent() && 
!StreamingOffset.START_OFFSET.equals(start.get())) {
+      this.startOffset = (StreamingOffset) start.get();
+      this.endOffset = (StreamingOffset) 
end.orElse(calculateEndOffset(startOffset));
+    } else {
+      // If starting offset is "START_OFFSET" (there's no snapshot in the last 
batch), or starting
+      // offset is not set, then we need to calculate the starting offset 
again.
+      this.startOffset = calculateStartingOffset();
+      this.endOffset = calculateEndOffset(startOffset);
+    }
+  }
+
+  @Override
+  public Offset getStartOffset() {
+    if (startOffset == null) {
+      throw new IllegalStateException("Start offset is not set");
+    }
+
+    return startOffset;
+  }
+
+  @Override
+  public Offset getEndOffset() {
+    if (endOffset == null) {
+      throw new IllegalStateException("End offset is not set");
+    }
+
+    return endOffset;
+  }
+
+  @Override
+  public Offset deserializeOffset(String json) {
+    return StreamingOffset.fromJson(json);
+  }
+
+  @Override
+  public void commit(Offset end) {
+    // Since all the data and metadata of Iceberg is as it is, nothing needs 
to commit when
+    // offset is processed, so no need to implement this method.
+  }
+
+  @Override
+  public void stop() {}
+
+  @Override
+  public String toString() {
+    return String.format(
+        "IcebergStreamScan(table=%s, type=%s)", table, 
table.schema().asStruct());
+  }
+
+  @Override
+  @SuppressWarnings("unchecked")
+  protected List<CombinedScanTask> tasks() {
+    if (startOffset.equals(endOffset)) {
+      LOG.info("Start offset {} equals to end offset {}, no data to process", 
startOffset, endOffset);
+      return Collections.emptyList();
+    }
+
+    Preconditions.checkState(cachedPendingBatches != null,
+        "pendingBatches is null, which is unexpected as it will be set when 
calculating end offset");
+
+    List<MicroBatch> pendingBatches = cachedPendingBatches.second();
+    if (pendingBatches.isEmpty()) {
+      LOG.info("There's no task to process in this batch");
+      return Collections.emptyList();
+    }
+
+    MicroBatch lastBatch = pendingBatches.get(pendingBatches.size() - 1);
+    if (lastBatch.snapshotId() != endOffset.snapshotId() || 
lastBatch.endFileIndex() != endOffset.index()) {
+      throw new IllegalStateException("The cached pendingBatches doesn't match 
the current end offset " + endOffset);

Review comment:
       This could also use `Preconditions.checkState`.

##########
File path: 
spark2/src/main/java/org/apache/iceberg/spark/source/StreamingReader.java
##########
@@ -0,0 +1,288 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.iceberg.spark.source;
+
+import java.util.Collections;
+import java.util.List;
+import java.util.Optional;
+import java.util.stream.Collectors;
+import org.apache.iceberg.CombinedScanTask;
+import org.apache.iceberg.DataOperations;
+import org.apache.iceberg.FileScanTask;
+import org.apache.iceberg.MicroBatches;
+import org.apache.iceberg.MicroBatches.MicroBatch;
+import org.apache.iceberg.Snapshot;
+import org.apache.iceberg.Table;
+import org.apache.iceberg.encryption.EncryptionManager;
+import org.apache.iceberg.io.CloseableIterable;
+import org.apache.iceberg.io.FileIO;
+import 
org.apache.iceberg.relocated.com.google.common.annotations.VisibleForTesting;
+import org.apache.iceberg.relocated.com.google.common.base.Preconditions;
+import org.apache.iceberg.relocated.com.google.common.collect.ImmutableList;
+import org.apache.iceberg.relocated.com.google.common.collect.Lists;
+import org.apache.iceberg.util.Pair;
+import org.apache.iceberg.util.SnapshotUtil;
+import org.apache.iceberg.util.TableScanUtil;
+import org.apache.spark.broadcast.Broadcast;
+import org.apache.spark.sql.sources.v2.DataSourceOptions;
+import org.apache.spark.sql.sources.v2.reader.streaming.MicroBatchReader;
+import org.apache.spark.sql.sources.v2.reader.streaming.Offset;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+/**
+ * A mirco-batch based Spark Structured Streaming reader for Iceberg table. It 
will track the added
+ * files and generate tasks per batch to process newly added files. By default 
it will process
+ * all the newly added files to the current snapshot in each batch, user could 
also set this
+ * configuration "max-files-per-trigger" to control the number of files 
processed per batch.
+ */
+class StreamingReader extends Reader implements MicroBatchReader {
+  private static final Logger LOG = 
LoggerFactory.getLogger(StreamingReader.class);
+
+  private StreamingOffset startOffset;
+  private StreamingOffset endOffset;
+
+  private final Table table;
+  private final long maxSizePerBatch;
+  private final Long startSnapshotId;
+
+  // Used to cache the pending batches for this streaming batch interval.
+  private Pair<StreamingOffset, List<MicroBatch>> cachedPendingBatches;
+
+  StreamingReader(Table table, Broadcast<FileIO> io, 
Broadcast<EncryptionManager> encryptionManager,
+                  boolean caseSensitive, DataSourceOptions options) {
+    super(table, io, encryptionManager, caseSensitive, options);
+
+    this.table = table;
+    this.maxSizePerBatch = 
options.get("max-size-per-batch").map(Long::parseLong).orElse(Long.MAX_VALUE);
+    Preconditions.checkArgument(maxSizePerBatch > 0L,
+        "Option max-size-per-batch '%d' should > 0", maxSizePerBatch);
+
+    this.startSnapshotId = 
options.get("starting-snapshot-id").map(Long::parseLong).orElse(null);
+    if (startSnapshotId != null) {
+      if (!SnapshotUtil.ancestorOf(table, 
table.currentSnapshot().snapshotId(), startSnapshotId)) {
+        throw new IllegalStateException("The option starting-snapshot-id " + 
startSnapshotId +
+            " is not an ancestor of the current snapshot");
+      }
+    }
+  }
+
+  @Override
+  @SuppressWarnings("unchecked")
+  public void setOffsetRange(Optional<Offset> start, Optional<Offset> end) {
+    table.refresh();
+
+    if (start.isPresent() && 
!StreamingOffset.START_OFFSET.equals(start.get())) {
+      this.startOffset = (StreamingOffset) start.get();
+      this.endOffset = (StreamingOffset) 
end.orElse(calculateEndOffset(startOffset));
+    } else {
+      // If starting offset is "START_OFFSET" (there's no snapshot in the last 
batch), or starting
+      // offset is not set, then we need to calculate the starting offset 
again.
+      this.startOffset = calculateStartingOffset();
+      this.endOffset = calculateEndOffset(startOffset);
+    }
+  }
+
+  @Override
+  public Offset getStartOffset() {
+    if (startOffset == null) {
+      throw new IllegalStateException("Start offset is not set");
+    }
+
+    return startOffset;
+  }
+
+  @Override
+  public Offset getEndOffset() {
+    if (endOffset == null) {
+      throw new IllegalStateException("End offset is not set");
+    }
+
+    return endOffset;
+  }
+
+  @Override
+  public Offset deserializeOffset(String json) {
+    return StreamingOffset.fromJson(json);
+  }
+
+  @Override
+  public void commit(Offset end) {
+    // Since all the data and metadata of Iceberg is as it is, nothing needs 
to commit when
+    // offset is processed, so no need to implement this method.
+  }
+
+  @Override
+  public void stop() {}
+
+  @Override
+  public String toString() {
+    return String.format(
+        "IcebergStreamScan(table=%s, type=%s)", table, 
table.schema().asStruct());
+  }
+
+  @Override
+  @SuppressWarnings("unchecked")
+  protected List<CombinedScanTask> tasks() {
+    if (startOffset.equals(endOffset)) {
+      LOG.info("Start offset {} equals to end offset {}, no data to process", 
startOffset, endOffset);
+      return Collections.emptyList();
+    }
+
+    Preconditions.checkState(cachedPendingBatches != null,
+        "pendingBatches is null, which is unexpected as it will be set when 
calculating end offset");
+
+    List<MicroBatch> pendingBatches = cachedPendingBatches.second();
+    if (pendingBatches.isEmpty()) {
+      LOG.info("There's no task to process in this batch");
+      return Collections.emptyList();
+    }
+
+    MicroBatch lastBatch = pendingBatches.get(pendingBatches.size() - 1);
+    if (lastBatch.snapshotId() != endOffset.snapshotId() || 
lastBatch.endFileIndex() != endOffset.index()) {
+      throw new IllegalStateException("The cached pendingBatches doesn't match 
the current end offset " + endOffset);
+    }
+
+    LOG.info("Processing data from {} to {}", startOffset, endOffset);
+    List<FileScanTask> tasks = pendingBatches.stream()
+        .flatMap(batch -> batch.tasks().stream())
+        .collect(Collectors.toList());
+    CloseableIterable<FileScanTask> splitTasks = 
TableScanUtil.splitFiles(CloseableIterable.withNoopClose(tasks),
+        splitSize());
+    return Lists.newArrayList(
+        TableScanUtil.planTasks(splitTasks, splitSize(), splitLookback(), 
splitOpenFileCost()));

Review comment:
       This doesn't use the table's configuration for split size, lookback, and 
file open cost. If those aren't configured in the read options, the value from 
the table should be used. If that isn't present, then the default should be 
used.
   
   Can you update this by returning `null` from the methods in `Reader` if the 
options aren't present? Then this can detect null and get the value from the 
table.




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
[email protected]



---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to