This is an automated email from the ASF dual-hosted git repository.

pvary pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/iceberg.git


The following commit(s) were added to refs/heads/main by this push:
     new c34efa0687 Flink: backport #9547 to 1.17 and 1.16 for Adds the ability 
to read from a branch on the Flink Iceberg Source (#9627)
c34efa0687 is described below

commit c34efa0687f74794619d744f875305401df11a33
Author: Rodrigo <[email protected]>
AuthorDate: Mon Feb 5 08:22:34 2024 -0800

    Flink: backport #9547 to 1.17 and 1.16 for Adds the ability to read from a 
branch on the Flink Iceberg Source (#9627)
---
 .../apache/iceberg/flink/source/ScanContext.java   |   5 -
 .../flink/source/StreamingMonitorFunction.java     |   8 +-
 .../enumerator/ContinuousSplitPlannerImpl.java     |   6 +-
 .../flink/source/TestIcebergSourceContinuous.java  |  87 +++++++++++++
 .../iceberg/flink/source/TestStreamScanSql.java    | 142 +++++++++++++++++++--
 .../apache/iceberg/flink/source/ScanContext.java   |   5 -
 .../flink/source/StreamingMonitorFunction.java     |   8 +-
 .../enumerator/ContinuousSplitPlannerImpl.java     |   6 +-
 .../flink/source/TestIcebergSourceContinuous.java  |  87 +++++++++++++
 .../iceberg/flink/source/TestStreamScanSql.java    | 140 ++++++++++++++++++--
 10 files changed, 454 insertions(+), 40 deletions(-)

diff --git 
a/flink/v1.16/flink/src/main/java/org/apache/iceberg/flink/source/ScanContext.java
 
b/flink/v1.16/flink/src/main/java/org/apache/iceberg/flink/source/ScanContext.java
index 3dce5dd590..cf57a126ae 100644
--- 
a/flink/v1.16/flink/src/main/java/org/apache/iceberg/flink/source/ScanContext.java
+++ 
b/flink/v1.16/flink/src/main/java/org/apache/iceberg/flink/source/ScanContext.java
@@ -152,11 +152,6 @@ public class ScanContext implements Serializable {
             "Invalid starting snapshot id for SPECIFIC_START_SNAPSHOT_ID 
strategy: not null");
       }
 
-      Preconditions.checkArgument(
-          branch == null,
-          String.format(
-              "Cannot scan table using ref %s configured for streaming reader 
yet", branch));
-
       Preconditions.checkArgument(
           tag == null,
           String.format("Cannot scan table using ref %s configured for 
streaming reader", tag));
diff --git 
a/flink/v1.16/flink/src/main/java/org/apache/iceberg/flink/source/StreamingMonitorFunction.java
 
b/flink/v1.16/flink/src/main/java/org/apache/iceberg/flink/source/StreamingMonitorFunction.java
index c27e29613f..a07613aee5 100644
--- 
a/flink/v1.16/flink/src/main/java/org/apache/iceberg/flink/source/StreamingMonitorFunction.java
+++ 
b/flink/v1.16/flink/src/main/java/org/apache/iceberg/flink/source/StreamingMonitorFunction.java
@@ -130,9 +130,6 @@ public class StreamingMonitorFunction extends 
RichSourceFunction<FlinkInputSplit
       Preconditions.checkArgument(
           !(scanContext.startTag() != null && scanContext.startSnapshotId() != 
null),
           "START_SNAPSHOT_ID and START_TAG cannot both be set.");
-      Preconditions.checkArgument(
-          scanContext.branch() == null,
-          "Cannot scan table using ref %s configured for streaming reader 
yet.");
       Preconditions.checkNotNull(
           table.currentSnapshot(), "Don't have any available snapshot in 
table.");
 
@@ -195,7 +192,10 @@ public class StreamingMonitorFunction extends 
RichSourceFunction<FlinkInputSplit
     // Refresh the table to get the latest committed snapshot.
     table.refresh();
 
-    Snapshot snapshot = table.currentSnapshot();
+    Snapshot snapshot =
+        scanContext.branch() != null
+            ? table.snapshot(scanContext.branch())
+            : table.currentSnapshot();
     if (snapshot != null && snapshot.snapshotId() != lastSnapshotId) {
       long snapshotId = snapshot.snapshotId();
 
diff --git 
a/flink/v1.16/flink/src/main/java/org/apache/iceberg/flink/source/enumerator/ContinuousSplitPlannerImpl.java
 
b/flink/v1.16/flink/src/main/java/org/apache/iceberg/flink/source/enumerator/ContinuousSplitPlannerImpl.java
index 450b649253..e9e3c159b0 100644
--- 
a/flink/v1.16/flink/src/main/java/org/apache/iceberg/flink/source/enumerator/ContinuousSplitPlannerImpl.java
+++ 
b/flink/v1.16/flink/src/main/java/org/apache/iceberg/flink/source/enumerator/ContinuousSplitPlannerImpl.java
@@ -104,7 +104,11 @@ public class ContinuousSplitPlannerImpl implements 
ContinuousSplitPlanner {
 
   private ContinuousEnumerationResult discoverIncrementalSplits(
       IcebergEnumeratorPosition lastPosition) {
-    Snapshot currentSnapshot = table.currentSnapshot();
+    Snapshot currentSnapshot =
+        scanContext.branch() != null
+            ? table.snapshot(scanContext.branch())
+            : table.currentSnapshot();
+
     if (currentSnapshot == null) {
       // empty table
       Preconditions.checkArgument(
diff --git 
a/flink/v1.16/flink/src/test/java/org/apache/iceberg/flink/source/TestIcebergSourceContinuous.java
 
b/flink/v1.16/flink/src/test/java/org/apache/iceberg/flink/source/TestIcebergSourceContinuous.java
index 31e9733fcd..bfd7fa5758 100644
--- 
a/flink/v1.16/flink/src/test/java/org/apache/iceberg/flink/source/TestIcebergSourceContinuous.java
+++ 
b/flink/v1.16/flink/src/test/java/org/apache/iceberg/flink/source/TestIcebergSourceContinuous.java
@@ -370,6 +370,92 @@ public class TestIcebergSourceContinuous {
     }
   }
 
+  @Test
+  public void testReadingFromBranch() throws Exception {
+    String branch = "b1";
+    GenericAppenderHelper dataAppender =
+        new GenericAppenderHelper(tableResource.table(), FileFormat.PARQUET, 
TEMPORARY_FOLDER);
+
+    List<Record> batchBase =
+        RandomGenericData.generate(tableResource.table().schema(), 2, 
randomSeed.incrementAndGet());
+    dataAppender.appendToTable(batchBase);
+
+    // create branch
+    tableResource
+        .table()
+        .manageSnapshots()
+        .createBranch(branch, 
tableResource.table().currentSnapshot().snapshotId())
+        .commit();
+
+    // snapshot1 to branch
+    List<Record> batch1 =
+        RandomGenericData.generate(tableResource.table().schema(), 2, 
randomSeed.incrementAndGet());
+    dataAppender.appendToTable(branch, batch1);
+
+    // snapshot2 to branch
+    List<Record> batch2 =
+        RandomGenericData.generate(tableResource.table().schema(), 2, 
randomSeed.incrementAndGet());
+    dataAppender.appendToTable(branch, batch2);
+
+    List<Record> branchExpectedRecords = Lists.newArrayList();
+    branchExpectedRecords.addAll(batchBase);
+    branchExpectedRecords.addAll(batch1);
+    branchExpectedRecords.addAll(batch2);
+    // reads from branch: it should contain the first snapshot (before the 
branch creation) followed
+    // by the next 2 snapshots added
+    ScanContext scanContext =
+        ScanContext.builder()
+            .streaming(true)
+            .monitorInterval(Duration.ofMillis(10L))
+            
.startingStrategy(StreamingStartingStrategy.TABLE_SCAN_THEN_INCREMENTAL)
+            .useBranch(branch)
+            .build();
+
+    try (CloseableIterator<Row> iter =
+        
createStream(scanContext).executeAndCollect(getClass().getSimpleName())) {
+      List<Row> resultMain = waitForResult(iter, 6);
+      TestHelpers.assertRecords(resultMain, branchExpectedRecords, 
tableResource.table().schema());
+
+      // snapshot3 to branch
+      List<Record> batch3 =
+          RandomGenericData.generate(
+              tableResource.table().schema(), 2, randomSeed.incrementAndGet());
+      dataAppender.appendToTable(branch, batch3);
+
+      List<Row> result3 = waitForResult(iter, 2);
+      TestHelpers.assertRecords(result3, batch3, 
tableResource.table().schema());
+
+      // snapshot4 to branch
+      List<Record> batch4 =
+          RandomGenericData.generate(
+              tableResource.table().schema(), 2, randomSeed.incrementAndGet());
+      dataAppender.appendToTable(branch, batch4);
+
+      List<Row> result4 = waitForResult(iter, 2);
+      TestHelpers.assertRecords(result4, batch4, 
tableResource.table().schema());
+    }
+
+    // read only from main branch. Should contain only the first snapshot
+    scanContext =
+        ScanContext.builder()
+            .streaming(true)
+            .monitorInterval(Duration.ofMillis(10L))
+            
.startingStrategy(StreamingStartingStrategy.TABLE_SCAN_THEN_INCREMENTAL)
+            .build();
+    try (CloseableIterator<Row> iter =
+        
createStream(scanContext).executeAndCollect(getClass().getSimpleName())) {
+      List<Row> resultMain = waitForResult(iter, 2);
+      TestHelpers.assertRecords(resultMain, batchBase, 
tableResource.table().schema());
+
+      List<Record> batchMain2 =
+          RandomGenericData.generate(
+              tableResource.table().schema(), 2, randomSeed.incrementAndGet());
+      dataAppender.appendToTable(batchMain2);
+      resultMain = waitForResult(iter, 2);
+      TestHelpers.assertRecords(resultMain, batchMain2, 
tableResource.table().schema());
+    }
+  }
+
   private DataStream<Row> createStream(ScanContext scanContext) throws 
Exception {
     // start the source and collect output
     StreamExecutionEnvironment env = 
StreamExecutionEnvironment.getExecutionEnvironment();
@@ -384,6 +470,7 @@ public class TestIcebergSourceContinuous {
                     
.startSnapshotTimestamp(scanContext.startSnapshotTimestamp())
                     .startSnapshotId(scanContext.startSnapshotId())
                     .monitorInterval(Duration.ofMillis(10L))
+                    .branch(scanContext.branch())
                     .build(),
                 WatermarkStrategy.noWatermarks(),
                 "icebergSource",
diff --git 
a/flink/v1.16/flink/src/test/java/org/apache/iceberg/flink/source/TestStreamScanSql.java
 
b/flink/v1.16/flink/src/test/java/org/apache/iceberg/flink/source/TestStreamScanSql.java
index 3499e5fdae..9e043bbbbb 100644
--- 
a/flink/v1.16/flink/src/test/java/org/apache/iceberg/flink/source/TestStreamScanSql.java
+++ 
b/flink/v1.16/flink/src/test/java/org/apache/iceberg/flink/source/TestStreamScanSql.java
@@ -33,6 +33,7 @@ import org.apache.flink.table.api.config.TableConfigOptions;
 import org.apache.flink.types.Row;
 import org.apache.flink.util.CloseableIterator;
 import org.apache.iceberg.FileFormat;
+import org.apache.iceberg.SnapshotRef;
 import org.apache.iceberg.Table;
 import org.apache.iceberg.TestHelpers;
 import org.apache.iceberg.catalog.TableIdentifier;
@@ -98,6 +99,11 @@ public class TestStreamScanSql extends CatalogTestBase {
   }
 
   private void insertRows(String partition, Table table, Row... rows) throws 
IOException {
+    insertRows(partition, SnapshotRef.MAIN_BRANCH, table, rows);
+  }
+
+  private void insertRows(String partition, String branch, Table table, Row... 
rows)
+      throws IOException {
     GenericAppenderHelper appender = new GenericAppenderHelper(table, FORMAT, 
temporaryDirectory);
 
     GenericRecord gRecord = GenericRecord.create(table.schema());
@@ -111,12 +117,16 @@ public class TestStreamScanSql extends CatalogTestBase {
     }
 
     if (partition != null) {
-      appender.appendToTable(TestHelpers.Row.of(partition, 0), records);
+      appender.appendToTable(TestHelpers.Row.of(partition, 0), branch, 
records);
     } else {
-      appender.appendToTable(records);
+      appender.appendToTable(branch, records);
     }
   }
 
+  private void insertRowsInBranch(String branch, Table table, Row... rows) 
throws IOException {
+    insertRows(null, branch, table, rows);
+  }
+
   private void insertRows(Table table, Row... rows) throws IOException {
     insertRows(null, table, rows);
   }
@@ -205,19 +215,130 @@ public class TestStreamScanSql extends CatalogTestBase {
   }
 
   @TestTemplate
-  public void testConsumeFilesWithBranch() throws Exception {
+  /**
+   * Insert records on the main branch. Then, insert in a named branch. Reads 
from the main branch
+   * and assert that the only records from main are returned
+   */
+  public void testConsumeFilesFromMainBranch() throws Exception {
     sql("CREATE TABLE %s (id INT, data VARCHAR, dt VARCHAR)", TABLE);
     Table table = 
validationCatalog.loadTable(TableIdentifier.of(icebergNamespace, TABLE));
+
+    // Produce two snapshots on main branch
     Row row1 = Row.of(1, "aaa", "2021-01-01");
     Row row2 = Row.of(2, "bbb", "2021-01-01");
+
     insertRows(table, row1, row2);
-    Assertions.assertThatThrownBy(
-            () ->
-                exec(
-                    "SELECT * FROM %s /*+ OPTIONS('streaming'='true', 
'monitor-interval'='1s', 'branch'='b1')*/",
-                    TABLE))
-        .isInstanceOf(IllegalArgumentException.class)
-        .hasMessage("Cannot scan table using ref b1 configured for streaming 
reader yet");
+    String branchName = "b1";
+    table.manageSnapshots().createBranch(branchName).commit();
+
+    // insert on the 'b1' branch
+    Row row3 = Row.of(3, "ccc", "2021-01-01");
+    Row row4 = Row.of(4, "ddd", "2021-01-01");
+
+    insertRowsInBranch(branchName, table, row3, row4);
+
+    // read from main
+    TableResult result =
+        exec("SELECT * FROM %s /*+ OPTIONS('streaming'='true', 
'monitor-interval'='1s')*/", TABLE);
+
+    try (CloseableIterator<Row> iterator = result.collect()) {
+      // the start snapshot(row2) is exclusive.
+      assertRows(ImmutableList.of(row1, row2), iterator);
+
+      Row row5 = Row.of(5, "eee", "2021-01-01");
+      Row row6 = Row.of(6, "fff", "2021-01-01");
+      insertRows(table, row5, row6);
+      assertRows(ImmutableList.of(row5, row6), iterator);
+
+      Row row7 = Row.of(7, "ggg", "2021-01-01");
+      insertRows(table, row7);
+      assertRows(ImmutableList.of(row7), iterator);
+    }
+    result.getJobClient().ifPresent(JobClient::cancel);
+  }
+
+  @TestTemplate
+  /**
+   * Insert records on the main branch. Creates a named branch. Insert record 
on named branch. Then
+   * select from the named branch and assert all the records are returned.
+   */
+  public void testConsumeFilesFromBranch() throws Exception {
+    sql("CREATE TABLE %s (id INT, data VARCHAR, dt VARCHAR)", TABLE);
+    Table table = 
validationCatalog.loadTable(TableIdentifier.of(icebergNamespace, TABLE));
+
+    // Produce two snapshots on main branch
+    Row row1 = Row.of(1, "aaa", "2021-01-01");
+    Row row2 = Row.of(2, "bbb", "2021-01-01");
+
+    insertRows(table, row1, row2);
+    String branchName = "b1";
+    table.manageSnapshots().createBranch(branchName).commit();
+
+    TableResult result =
+        exec(
+            "SELECT * FROM %s  /*+ OPTIONS('streaming'='true', 
'monitor-interval'='1s', 'branch'='%s')*/ ",
+            TABLE, branchName);
+
+    try (CloseableIterator<Row> iterator = result.collect()) {
+      assertRows(ImmutableList.of(row1, row2), iterator);
+      // insert on the 'b1' branch
+      Row row3 = Row.of(3, "ccc", "2021-01-01");
+      Row row4 = Row.of(4, "ddd", "2021-01-01");
+      insertRowsInBranch(branchName, table, row3, row4);
+      assertRows(ImmutableList.of(row3, row4), iterator);
+    }
+    result.getJobClient().ifPresent(JobClient::cancel);
+  }
+
+  @TestTemplate
+  /**
+   * Insert records on branch b1. Then insert record on b2. Then select from 
each branch and assert
+   * the correct records are returned
+   */
+  public void testConsumeFilesFromTwoBranches() throws Exception {
+    sql("CREATE TABLE %s (id INT, data VARCHAR, dt VARCHAR)", TABLE);
+    Table table = 
validationCatalog.loadTable(TableIdentifier.of(icebergNamespace, TABLE));
+
+    String branch1 = "b1";
+    String branch2 = "b2";
+    table.manageSnapshots().createBranch(branch1).commit();
+    table.manageSnapshots().createBranch(branch2).commit();
+
+    // Produce two snapshots on main branch
+    Row row1Branch1 = Row.of(1, "b1", "2021-01-01");
+    Row row2Branch1 = Row.of(2, "b1", "2021-01-01");
+
+    Row row1Branch2 = Row.of(2, "b2", "2021-01-01");
+    Row row2Branch2 = Row.of(3, "b3", "2021-01-01");
+
+    insertRowsInBranch(branch1, table, row1Branch1, row2Branch1);
+    insertRowsInBranch(branch2, table, row1Branch2, row2Branch2);
+
+    TableResult resultBranch1 =
+        exec(
+            "SELECT * FROM %s  /*+ OPTIONS('streaming'='true', 
'monitor-interval'='1s', 'branch'='%s')*/ ",
+            TABLE, branch1);
+
+    try (CloseableIterator<Row> iterator = resultBranch1.collect()) {
+      assertRows(ImmutableList.of(row1Branch1, row2Branch1), iterator);
+      Row another = Row.of(4, "ccc", "2021-01-01");
+      insertRowsInBranch(branch1, table, another);
+      assertRows(ImmutableList.of(another), iterator);
+    }
+
+    TableResult resultBranch2 =
+        exec(
+            "SELECT * FROM %s  /*+ OPTIONS('streaming'='true', 
'monitor-interval'='1s', 'branch'='%s')*/ ",
+            TABLE, branch2);
+    try (CloseableIterator<Row> iterator = resultBranch2.collect()) {
+      assertRows(ImmutableList.of(row1Branch2, row2Branch2), iterator);
+      Row another = Row.of(4, "ccc", "2021-01-01");
+      insertRowsInBranch(branch2, table, another);
+      assertRows(ImmutableList.of(another), iterator);
+    }
+
+    resultBranch1.getJobClient().ifPresent(JobClient::cancel);
+    resultBranch2.getJobClient().ifPresent(JobClient::cancel);
   }
 
   @TestTemplate
@@ -296,6 +417,7 @@ public class TestStreamScanSql extends CatalogTestBase {
       assertRows(ImmutableList.of(row7), iterator);
     }
     result.getJobClient().ifPresent(JobClient::cancel);
+
     Assertions.assertThatThrownBy(
             () ->
                 exec(
diff --git 
a/flink/v1.17/flink/src/main/java/org/apache/iceberg/flink/source/ScanContext.java
 
b/flink/v1.17/flink/src/main/java/org/apache/iceberg/flink/source/ScanContext.java
index 3dce5dd590..cf57a126ae 100644
--- 
a/flink/v1.17/flink/src/main/java/org/apache/iceberg/flink/source/ScanContext.java
+++ 
b/flink/v1.17/flink/src/main/java/org/apache/iceberg/flink/source/ScanContext.java
@@ -152,11 +152,6 @@ public class ScanContext implements Serializable {
             "Invalid starting snapshot id for SPECIFIC_START_SNAPSHOT_ID 
strategy: not null");
       }
 
-      Preconditions.checkArgument(
-          branch == null,
-          String.format(
-              "Cannot scan table using ref %s configured for streaming reader 
yet", branch));
-
       Preconditions.checkArgument(
           tag == null,
           String.format("Cannot scan table using ref %s configured for 
streaming reader", tag));
diff --git 
a/flink/v1.17/flink/src/main/java/org/apache/iceberg/flink/source/StreamingMonitorFunction.java
 
b/flink/v1.17/flink/src/main/java/org/apache/iceberg/flink/source/StreamingMonitorFunction.java
index c27e29613f..a07613aee5 100644
--- 
a/flink/v1.17/flink/src/main/java/org/apache/iceberg/flink/source/StreamingMonitorFunction.java
+++ 
b/flink/v1.17/flink/src/main/java/org/apache/iceberg/flink/source/StreamingMonitorFunction.java
@@ -130,9 +130,6 @@ public class StreamingMonitorFunction extends 
RichSourceFunction<FlinkInputSplit
       Preconditions.checkArgument(
           !(scanContext.startTag() != null && scanContext.startSnapshotId() != 
null),
           "START_SNAPSHOT_ID and START_TAG cannot both be set.");
-      Preconditions.checkArgument(
-          scanContext.branch() == null,
-          "Cannot scan table using ref %s configured for streaming reader 
yet.");
       Preconditions.checkNotNull(
           table.currentSnapshot(), "Don't have any available snapshot in 
table.");
 
@@ -195,7 +192,10 @@ public class StreamingMonitorFunction extends 
RichSourceFunction<FlinkInputSplit
     // Refresh the table to get the latest committed snapshot.
     table.refresh();
 
-    Snapshot snapshot = table.currentSnapshot();
+    Snapshot snapshot =
+        scanContext.branch() != null
+            ? table.snapshot(scanContext.branch())
+            : table.currentSnapshot();
     if (snapshot != null && snapshot.snapshotId() != lastSnapshotId) {
       long snapshotId = snapshot.snapshotId();
 
diff --git 
a/flink/v1.17/flink/src/main/java/org/apache/iceberg/flink/source/enumerator/ContinuousSplitPlannerImpl.java
 
b/flink/v1.17/flink/src/main/java/org/apache/iceberg/flink/source/enumerator/ContinuousSplitPlannerImpl.java
index 450b649253..e9e3c159b0 100644
--- 
a/flink/v1.17/flink/src/main/java/org/apache/iceberg/flink/source/enumerator/ContinuousSplitPlannerImpl.java
+++ 
b/flink/v1.17/flink/src/main/java/org/apache/iceberg/flink/source/enumerator/ContinuousSplitPlannerImpl.java
@@ -104,7 +104,11 @@ public class ContinuousSplitPlannerImpl implements 
ContinuousSplitPlanner {
 
   private ContinuousEnumerationResult discoverIncrementalSplits(
       IcebergEnumeratorPosition lastPosition) {
-    Snapshot currentSnapshot = table.currentSnapshot();
+    Snapshot currentSnapshot =
+        scanContext.branch() != null
+            ? table.snapshot(scanContext.branch())
+            : table.currentSnapshot();
+
     if (currentSnapshot == null) {
       // empty table
       Preconditions.checkArgument(
diff --git 
a/flink/v1.17/flink/src/test/java/org/apache/iceberg/flink/source/TestIcebergSourceContinuous.java
 
b/flink/v1.17/flink/src/test/java/org/apache/iceberg/flink/source/TestIcebergSourceContinuous.java
index 31e9733fcd..bfd7fa5758 100644
--- 
a/flink/v1.17/flink/src/test/java/org/apache/iceberg/flink/source/TestIcebergSourceContinuous.java
+++ 
b/flink/v1.17/flink/src/test/java/org/apache/iceberg/flink/source/TestIcebergSourceContinuous.java
@@ -370,6 +370,92 @@ public class TestIcebergSourceContinuous {
     }
   }
 
+  @Test
+  public void testReadingFromBranch() throws Exception {
+    String branch = "b1";
+    GenericAppenderHelper dataAppender =
+        new GenericAppenderHelper(tableResource.table(), FileFormat.PARQUET, 
TEMPORARY_FOLDER);
+
+    List<Record> batchBase =
+        RandomGenericData.generate(tableResource.table().schema(), 2, 
randomSeed.incrementAndGet());
+    dataAppender.appendToTable(batchBase);
+
+    // create branch
+    tableResource
+        .table()
+        .manageSnapshots()
+        .createBranch(branch, 
tableResource.table().currentSnapshot().snapshotId())
+        .commit();
+
+    // snapshot1 to branch
+    List<Record> batch1 =
+        RandomGenericData.generate(tableResource.table().schema(), 2, 
randomSeed.incrementAndGet());
+    dataAppender.appendToTable(branch, batch1);
+
+    // snapshot2 to branch
+    List<Record> batch2 =
+        RandomGenericData.generate(tableResource.table().schema(), 2, 
randomSeed.incrementAndGet());
+    dataAppender.appendToTable(branch, batch2);
+
+    List<Record> branchExpectedRecords = Lists.newArrayList();
+    branchExpectedRecords.addAll(batchBase);
+    branchExpectedRecords.addAll(batch1);
+    branchExpectedRecords.addAll(batch2);
+    // reads from branch: it should contain the first snapshot (before the 
branch creation) followed
+    // by the next 2 snapshots added
+    ScanContext scanContext =
+        ScanContext.builder()
+            .streaming(true)
+            .monitorInterval(Duration.ofMillis(10L))
+            
.startingStrategy(StreamingStartingStrategy.TABLE_SCAN_THEN_INCREMENTAL)
+            .useBranch(branch)
+            .build();
+
+    try (CloseableIterator<Row> iter =
+        
createStream(scanContext).executeAndCollect(getClass().getSimpleName())) {
+      List<Row> resultMain = waitForResult(iter, 6);
+      TestHelpers.assertRecords(resultMain, branchExpectedRecords, 
tableResource.table().schema());
+
+      // snapshot3 to branch
+      List<Record> batch3 =
+          RandomGenericData.generate(
+              tableResource.table().schema(), 2, randomSeed.incrementAndGet());
+      dataAppender.appendToTable(branch, batch3);
+
+      List<Row> result3 = waitForResult(iter, 2);
+      TestHelpers.assertRecords(result3, batch3, 
tableResource.table().schema());
+
+      // snapshot4 to branch
+      List<Record> batch4 =
+          RandomGenericData.generate(
+              tableResource.table().schema(), 2, randomSeed.incrementAndGet());
+      dataAppender.appendToTable(branch, batch4);
+
+      List<Row> result4 = waitForResult(iter, 2);
+      TestHelpers.assertRecords(result4, batch4, 
tableResource.table().schema());
+    }
+
+    // read only from main branch. Should contain only the first snapshot
+    scanContext =
+        ScanContext.builder()
+            .streaming(true)
+            .monitorInterval(Duration.ofMillis(10L))
+            
.startingStrategy(StreamingStartingStrategy.TABLE_SCAN_THEN_INCREMENTAL)
+            .build();
+    try (CloseableIterator<Row> iter =
+        
createStream(scanContext).executeAndCollect(getClass().getSimpleName())) {
+      List<Row> resultMain = waitForResult(iter, 2);
+      TestHelpers.assertRecords(resultMain, batchBase, 
tableResource.table().schema());
+
+      List<Record> batchMain2 =
+          RandomGenericData.generate(
+              tableResource.table().schema(), 2, randomSeed.incrementAndGet());
+      dataAppender.appendToTable(batchMain2);
+      resultMain = waitForResult(iter, 2);
+      TestHelpers.assertRecords(resultMain, batchMain2, 
tableResource.table().schema());
+    }
+  }
+
   private DataStream<Row> createStream(ScanContext scanContext) throws 
Exception {
     // start the source and collect output
     StreamExecutionEnvironment env = 
StreamExecutionEnvironment.getExecutionEnvironment();
@@ -384,6 +470,7 @@ public class TestIcebergSourceContinuous {
                     
.startSnapshotTimestamp(scanContext.startSnapshotTimestamp())
                     .startSnapshotId(scanContext.startSnapshotId())
                     .monitorInterval(Duration.ofMillis(10L))
+                    .branch(scanContext.branch())
                     .build(),
                 WatermarkStrategy.noWatermarks(),
                 "icebergSource",
diff --git 
a/flink/v1.17/flink/src/test/java/org/apache/iceberg/flink/source/TestStreamScanSql.java
 
b/flink/v1.17/flink/src/test/java/org/apache/iceberg/flink/source/TestStreamScanSql.java
index 09d5a5481a..9e043bbbbb 100644
--- 
a/flink/v1.17/flink/src/test/java/org/apache/iceberg/flink/source/TestStreamScanSql.java
+++ 
b/flink/v1.17/flink/src/test/java/org/apache/iceberg/flink/source/TestStreamScanSql.java
@@ -33,6 +33,7 @@ import org.apache.flink.table.api.config.TableConfigOptions;
 import org.apache.flink.types.Row;
 import org.apache.flink.util.CloseableIterator;
 import org.apache.iceberg.FileFormat;
+import org.apache.iceberg.SnapshotRef;
 import org.apache.iceberg.Table;
 import org.apache.iceberg.TestHelpers;
 import org.apache.iceberg.catalog.TableIdentifier;
@@ -98,6 +99,11 @@ public class TestStreamScanSql extends CatalogTestBase {
   }
 
   private void insertRows(String partition, Table table, Row... rows) throws 
IOException {
+    insertRows(partition, SnapshotRef.MAIN_BRANCH, table, rows);
+  }
+
+  private void insertRows(String partition, String branch, Table table, Row... 
rows)
+      throws IOException {
     GenericAppenderHelper appender = new GenericAppenderHelper(table, FORMAT, 
temporaryDirectory);
 
     GenericRecord gRecord = GenericRecord.create(table.schema());
@@ -111,12 +117,16 @@ public class TestStreamScanSql extends CatalogTestBase {
     }
 
     if (partition != null) {
-      appender.appendToTable(TestHelpers.Row.of(partition, 0), records);
+      appender.appendToTable(TestHelpers.Row.of(partition, 0), branch, 
records);
     } else {
-      appender.appendToTable(records);
+      appender.appendToTable(branch, records);
     }
   }
 
+  private void insertRowsInBranch(String branch, Table table, Row... rows) 
throws IOException {
+    insertRows(null, branch, table, rows);
+  }
+
   private void insertRows(Table table, Row... rows) throws IOException {
     insertRows(null, table, rows);
   }
@@ -205,20 +215,130 @@ public class TestStreamScanSql extends CatalogTestBase {
   }
 
   @TestTemplate
-  public void testConsumeFilesWithBranch() throws Exception {
+  /**
+   * Insert records on the main branch. Then, insert in a named branch. Reads 
from the main branch
+   * and assert that the only records from main are returned
+   */
+  public void testConsumeFilesFromMainBranch() throws Exception {
     sql("CREATE TABLE %s (id INT, data VARCHAR, dt VARCHAR)", TABLE);
     Table table = 
validationCatalog.loadTable(TableIdentifier.of(icebergNamespace, TABLE));
+
+    // Produce two snapshots on main branch
     Row row1 = Row.of(1, "aaa", "2021-01-01");
     Row row2 = Row.of(2, "bbb", "2021-01-01");
+
     insertRows(table, row1, row2);
+    String branchName = "b1";
+    table.manageSnapshots().createBranch(branchName).commit();
 
-    Assertions.assertThatThrownBy(
-            () ->
-                exec(
-                    "SELECT * FROM %s /*+ OPTIONS('streaming'='true', 
'monitor-interval'='1s', 'branch'='b1')*/",
-                    TABLE))
-        .isInstanceOf(IllegalArgumentException.class)
-        .hasMessage("Cannot scan table using ref b1 configured for streaming 
reader yet");
+    // insert on the 'b1' branch
+    Row row3 = Row.of(3, "ccc", "2021-01-01");
+    Row row4 = Row.of(4, "ddd", "2021-01-01");
+
+    insertRowsInBranch(branchName, table, row3, row4);
+
+    // read from main
+    TableResult result =
+        exec("SELECT * FROM %s /*+ OPTIONS('streaming'='true', 
'monitor-interval'='1s')*/", TABLE);
+
+    try (CloseableIterator<Row> iterator = result.collect()) {
+      // the start snapshot(row2) is exclusive.
+      assertRows(ImmutableList.of(row1, row2), iterator);
+
+      Row row5 = Row.of(5, "eee", "2021-01-01");
+      Row row6 = Row.of(6, "fff", "2021-01-01");
+      insertRows(table, row5, row6);
+      assertRows(ImmutableList.of(row5, row6), iterator);
+
+      Row row7 = Row.of(7, "ggg", "2021-01-01");
+      insertRows(table, row7);
+      assertRows(ImmutableList.of(row7), iterator);
+    }
+    result.getJobClient().ifPresent(JobClient::cancel);
+  }
+
+  @TestTemplate
+  /**
+   * Insert records on the main branch. Creates a named branch. Insert record 
on named branch. Then
+   * select from the named branch and assert all the records are returned.
+   */
+  public void testConsumeFilesFromBranch() throws Exception {
+    sql("CREATE TABLE %s (id INT, data VARCHAR, dt VARCHAR)", TABLE);
+    Table table = 
validationCatalog.loadTable(TableIdentifier.of(icebergNamespace, TABLE));
+
+    // Produce two snapshots on main branch
+    Row row1 = Row.of(1, "aaa", "2021-01-01");
+    Row row2 = Row.of(2, "bbb", "2021-01-01");
+
+    insertRows(table, row1, row2);
+    String branchName = "b1";
+    table.manageSnapshots().createBranch(branchName).commit();
+
+    TableResult result =
+        exec(
+            "SELECT * FROM %s  /*+ OPTIONS('streaming'='true', 
'monitor-interval'='1s', 'branch'='%s')*/ ",
+            TABLE, branchName);
+
+    try (CloseableIterator<Row> iterator = result.collect()) {
+      assertRows(ImmutableList.of(row1, row2), iterator);
+      // insert on the 'b1' branch
+      Row row3 = Row.of(3, "ccc", "2021-01-01");
+      Row row4 = Row.of(4, "ddd", "2021-01-01");
+      insertRowsInBranch(branchName, table, row3, row4);
+      assertRows(ImmutableList.of(row3, row4), iterator);
+    }
+    result.getJobClient().ifPresent(JobClient::cancel);
+  }
+
+  @TestTemplate
+  /**
+   * Insert records on branch b1. Then insert record on b2. Then select from 
each branch and assert
+   * the correct records are returned
+   */
+  public void testConsumeFilesFromTwoBranches() throws Exception {
+    sql("CREATE TABLE %s (id INT, data VARCHAR, dt VARCHAR)", TABLE);
+    Table table = 
validationCatalog.loadTable(TableIdentifier.of(icebergNamespace, TABLE));
+
+    String branch1 = "b1";
+    String branch2 = "b2";
+    table.manageSnapshots().createBranch(branch1).commit();
+    table.manageSnapshots().createBranch(branch2).commit();
+
+    // Produce two snapshots on main branch
+    Row row1Branch1 = Row.of(1, "b1", "2021-01-01");
+    Row row2Branch1 = Row.of(2, "b1", "2021-01-01");
+
+    Row row1Branch2 = Row.of(2, "b2", "2021-01-01");
+    Row row2Branch2 = Row.of(3, "b3", "2021-01-01");
+
+    insertRowsInBranch(branch1, table, row1Branch1, row2Branch1);
+    insertRowsInBranch(branch2, table, row1Branch2, row2Branch2);
+
+    TableResult resultBranch1 =
+        exec(
+            "SELECT * FROM %s  /*+ OPTIONS('streaming'='true', 
'monitor-interval'='1s', 'branch'='%s')*/ ",
+            TABLE, branch1);
+
+    try (CloseableIterator<Row> iterator = resultBranch1.collect()) {
+      assertRows(ImmutableList.of(row1Branch1, row2Branch1), iterator);
+      Row another = Row.of(4, "ccc", "2021-01-01");
+      insertRowsInBranch(branch1, table, another);
+      assertRows(ImmutableList.of(another), iterator);
+    }
+
+    TableResult resultBranch2 =
+        exec(
+            "SELECT * FROM %s  /*+ OPTIONS('streaming'='true', 
'monitor-interval'='1s', 'branch'='%s')*/ ",
+            TABLE, branch2);
+    try (CloseableIterator<Row> iterator = resultBranch2.collect()) {
+      assertRows(ImmutableList.of(row1Branch2, row2Branch2), iterator);
+      Row another = Row.of(4, "ccc", "2021-01-01");
+      insertRowsInBranch(branch2, table, another);
+      assertRows(ImmutableList.of(another), iterator);
+    }
+
+    resultBranch1.getJobClient().ifPresent(JobClient::cancel);
+    resultBranch2.getJobClient().ifPresent(JobClient::cancel);
   }
 
   @TestTemplate

Reply via email to