This is an automated email from the ASF dual-hosted git repository.
pvary pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/iceberg.git
The following commit(s) were added to refs/heads/main by this push:
new c34efa0687 Flink: backport #9547 to 1.17 and 1.16 for Adds the ability
to read from a branch on the Flink Iceberg Source (#9627)
c34efa0687 is described below
commit c34efa0687f74794619d744f875305401df11a33
Author: Rodrigo <[email protected]>
AuthorDate: Mon Feb 5 08:22:34 2024 -0800
Flink: backport #9547 to 1.17 and 1.16 for Adds the ability to read from a
branch on the Flink Iceberg Source (#9627)
---
.../apache/iceberg/flink/source/ScanContext.java | 5 -
.../flink/source/StreamingMonitorFunction.java | 8 +-
.../enumerator/ContinuousSplitPlannerImpl.java | 6 +-
.../flink/source/TestIcebergSourceContinuous.java | 87 +++++++++++++
.../iceberg/flink/source/TestStreamScanSql.java | 142 +++++++++++++++++++--
.../apache/iceberg/flink/source/ScanContext.java | 5 -
.../flink/source/StreamingMonitorFunction.java | 8 +-
.../enumerator/ContinuousSplitPlannerImpl.java | 6 +-
.../flink/source/TestIcebergSourceContinuous.java | 87 +++++++++++++
.../iceberg/flink/source/TestStreamScanSql.java | 140 ++++++++++++++++++--
10 files changed, 454 insertions(+), 40 deletions(-)
diff --git
a/flink/v1.16/flink/src/main/java/org/apache/iceberg/flink/source/ScanContext.java
b/flink/v1.16/flink/src/main/java/org/apache/iceberg/flink/source/ScanContext.java
index 3dce5dd590..cf57a126ae 100644
---
a/flink/v1.16/flink/src/main/java/org/apache/iceberg/flink/source/ScanContext.java
+++
b/flink/v1.16/flink/src/main/java/org/apache/iceberg/flink/source/ScanContext.java
@@ -152,11 +152,6 @@ public class ScanContext implements Serializable {
"Invalid starting snapshot id for SPECIFIC_START_SNAPSHOT_ID
strategy: not null");
}
- Preconditions.checkArgument(
- branch == null,
- String.format(
- "Cannot scan table using ref %s configured for streaming reader
yet", branch));
-
Preconditions.checkArgument(
tag == null,
String.format("Cannot scan table using ref %s configured for
streaming reader", tag));
diff --git
a/flink/v1.16/flink/src/main/java/org/apache/iceberg/flink/source/StreamingMonitorFunction.java
b/flink/v1.16/flink/src/main/java/org/apache/iceberg/flink/source/StreamingMonitorFunction.java
index c27e29613f..a07613aee5 100644
---
a/flink/v1.16/flink/src/main/java/org/apache/iceberg/flink/source/StreamingMonitorFunction.java
+++
b/flink/v1.16/flink/src/main/java/org/apache/iceberg/flink/source/StreamingMonitorFunction.java
@@ -130,9 +130,6 @@ public class StreamingMonitorFunction extends
RichSourceFunction<FlinkInputSplit
Preconditions.checkArgument(
!(scanContext.startTag() != null && scanContext.startSnapshotId() !=
null),
"START_SNAPSHOT_ID and START_TAG cannot both be set.");
- Preconditions.checkArgument(
- scanContext.branch() == null,
- "Cannot scan table using ref %s configured for streaming reader
yet.");
Preconditions.checkNotNull(
table.currentSnapshot(), "Don't have any available snapshot in
table.");
@@ -195,7 +192,10 @@ public class StreamingMonitorFunction extends
RichSourceFunction<FlinkInputSplit
// Refresh the table to get the latest committed snapshot.
table.refresh();
- Snapshot snapshot = table.currentSnapshot();
+ Snapshot snapshot =
+ scanContext.branch() != null
+ ? table.snapshot(scanContext.branch())
+ : table.currentSnapshot();
if (snapshot != null && snapshot.snapshotId() != lastSnapshotId) {
long snapshotId = snapshot.snapshotId();
diff --git
a/flink/v1.16/flink/src/main/java/org/apache/iceberg/flink/source/enumerator/ContinuousSplitPlannerImpl.java
b/flink/v1.16/flink/src/main/java/org/apache/iceberg/flink/source/enumerator/ContinuousSplitPlannerImpl.java
index 450b649253..e9e3c159b0 100644
---
a/flink/v1.16/flink/src/main/java/org/apache/iceberg/flink/source/enumerator/ContinuousSplitPlannerImpl.java
+++
b/flink/v1.16/flink/src/main/java/org/apache/iceberg/flink/source/enumerator/ContinuousSplitPlannerImpl.java
@@ -104,7 +104,11 @@ public class ContinuousSplitPlannerImpl implements
ContinuousSplitPlanner {
private ContinuousEnumerationResult discoverIncrementalSplits(
IcebergEnumeratorPosition lastPosition) {
- Snapshot currentSnapshot = table.currentSnapshot();
+ Snapshot currentSnapshot =
+ scanContext.branch() != null
+ ? table.snapshot(scanContext.branch())
+ : table.currentSnapshot();
+
if (currentSnapshot == null) {
// empty table
Preconditions.checkArgument(
diff --git
a/flink/v1.16/flink/src/test/java/org/apache/iceberg/flink/source/TestIcebergSourceContinuous.java
b/flink/v1.16/flink/src/test/java/org/apache/iceberg/flink/source/TestIcebergSourceContinuous.java
index 31e9733fcd..bfd7fa5758 100644
---
a/flink/v1.16/flink/src/test/java/org/apache/iceberg/flink/source/TestIcebergSourceContinuous.java
+++
b/flink/v1.16/flink/src/test/java/org/apache/iceberg/flink/source/TestIcebergSourceContinuous.java
@@ -370,6 +370,92 @@ public class TestIcebergSourceContinuous {
}
}
+ @Test
+ public void testReadingFromBranch() throws Exception {
+ String branch = "b1";
+ GenericAppenderHelper dataAppender =
+ new GenericAppenderHelper(tableResource.table(), FileFormat.PARQUET,
TEMPORARY_FOLDER);
+
+ List<Record> batchBase =
+ RandomGenericData.generate(tableResource.table().schema(), 2,
randomSeed.incrementAndGet());
+ dataAppender.appendToTable(batchBase);
+
+ // create branch
+ tableResource
+ .table()
+ .manageSnapshots()
+ .createBranch(branch,
tableResource.table().currentSnapshot().snapshotId())
+ .commit();
+
+ // snapshot1 to branch
+ List<Record> batch1 =
+ RandomGenericData.generate(tableResource.table().schema(), 2,
randomSeed.incrementAndGet());
+ dataAppender.appendToTable(branch, batch1);
+
+ // snapshot2 to branch
+ List<Record> batch2 =
+ RandomGenericData.generate(tableResource.table().schema(), 2,
randomSeed.incrementAndGet());
+ dataAppender.appendToTable(branch, batch2);
+
+ List<Record> branchExpectedRecords = Lists.newArrayList();
+ branchExpectedRecords.addAll(batchBase);
+ branchExpectedRecords.addAll(batch1);
+ branchExpectedRecords.addAll(batch2);
+ // reads from branch: it should contain the first snapshot (before the
branch creation) followed
+ // by the next 2 snapshots added
+ ScanContext scanContext =
+ ScanContext.builder()
+ .streaming(true)
+ .monitorInterval(Duration.ofMillis(10L))
+
.startingStrategy(StreamingStartingStrategy.TABLE_SCAN_THEN_INCREMENTAL)
+ .useBranch(branch)
+ .build();
+
+ try (CloseableIterator<Row> iter =
+
createStream(scanContext).executeAndCollect(getClass().getSimpleName())) {
+ List<Row> resultMain = waitForResult(iter, 6);
+ TestHelpers.assertRecords(resultMain, branchExpectedRecords,
tableResource.table().schema());
+
+ // snapshot3 to branch
+ List<Record> batch3 =
+ RandomGenericData.generate(
+ tableResource.table().schema(), 2, randomSeed.incrementAndGet());
+ dataAppender.appendToTable(branch, batch3);
+
+ List<Row> result3 = waitForResult(iter, 2);
+ TestHelpers.assertRecords(result3, batch3,
tableResource.table().schema());
+
+ // snapshot4 to branch
+ List<Record> batch4 =
+ RandomGenericData.generate(
+ tableResource.table().schema(), 2, randomSeed.incrementAndGet());
+ dataAppender.appendToTable(branch, batch4);
+
+ List<Row> result4 = waitForResult(iter, 2);
+ TestHelpers.assertRecords(result4, batch4,
tableResource.table().schema());
+ }
+
+ // read only from main branch. Should contain only the first snapshot
+ scanContext =
+ ScanContext.builder()
+ .streaming(true)
+ .monitorInterval(Duration.ofMillis(10L))
+
.startingStrategy(StreamingStartingStrategy.TABLE_SCAN_THEN_INCREMENTAL)
+ .build();
+ try (CloseableIterator<Row> iter =
+
createStream(scanContext).executeAndCollect(getClass().getSimpleName())) {
+ List<Row> resultMain = waitForResult(iter, 2);
+ TestHelpers.assertRecords(resultMain, batchBase,
tableResource.table().schema());
+
+ List<Record> batchMain2 =
+ RandomGenericData.generate(
+ tableResource.table().schema(), 2, randomSeed.incrementAndGet());
+ dataAppender.appendToTable(batchMain2);
+ resultMain = waitForResult(iter, 2);
+ TestHelpers.assertRecords(resultMain, batchMain2,
tableResource.table().schema());
+ }
+ }
+
private DataStream<Row> createStream(ScanContext scanContext) throws
Exception {
// start the source and collect output
StreamExecutionEnvironment env =
StreamExecutionEnvironment.getExecutionEnvironment();
@@ -384,6 +470,7 @@ public class TestIcebergSourceContinuous {
.startSnapshotTimestamp(scanContext.startSnapshotTimestamp())
.startSnapshotId(scanContext.startSnapshotId())
.monitorInterval(Duration.ofMillis(10L))
+ .branch(scanContext.branch())
.build(),
WatermarkStrategy.noWatermarks(),
"icebergSource",
diff --git
a/flink/v1.16/flink/src/test/java/org/apache/iceberg/flink/source/TestStreamScanSql.java
b/flink/v1.16/flink/src/test/java/org/apache/iceberg/flink/source/TestStreamScanSql.java
index 3499e5fdae..9e043bbbbb 100644
---
a/flink/v1.16/flink/src/test/java/org/apache/iceberg/flink/source/TestStreamScanSql.java
+++
b/flink/v1.16/flink/src/test/java/org/apache/iceberg/flink/source/TestStreamScanSql.java
@@ -33,6 +33,7 @@ import org.apache.flink.table.api.config.TableConfigOptions;
import org.apache.flink.types.Row;
import org.apache.flink.util.CloseableIterator;
import org.apache.iceberg.FileFormat;
+import org.apache.iceberg.SnapshotRef;
import org.apache.iceberg.Table;
import org.apache.iceberg.TestHelpers;
import org.apache.iceberg.catalog.TableIdentifier;
@@ -98,6 +99,11 @@ public class TestStreamScanSql extends CatalogTestBase {
}
private void insertRows(String partition, Table table, Row... rows) throws
IOException {
+ insertRows(partition, SnapshotRef.MAIN_BRANCH, table, rows);
+ }
+
+ private void insertRows(String partition, String branch, Table table, Row...
rows)
+ throws IOException {
GenericAppenderHelper appender = new GenericAppenderHelper(table, FORMAT,
temporaryDirectory);
GenericRecord gRecord = GenericRecord.create(table.schema());
@@ -111,12 +117,16 @@ public class TestStreamScanSql extends CatalogTestBase {
}
if (partition != null) {
- appender.appendToTable(TestHelpers.Row.of(partition, 0), records);
+ appender.appendToTable(TestHelpers.Row.of(partition, 0), branch,
records);
} else {
- appender.appendToTable(records);
+ appender.appendToTable(branch, records);
}
}
+ private void insertRowsInBranch(String branch, Table table, Row... rows)
throws IOException {
+ insertRows(null, branch, table, rows);
+ }
+
private void insertRows(Table table, Row... rows) throws IOException {
insertRows(null, table, rows);
}
@@ -205,19 +215,130 @@ public class TestStreamScanSql extends CatalogTestBase {
}
@TestTemplate
- public void testConsumeFilesWithBranch() throws Exception {
+ /**
+ * Insert records on the main branch. Then, insert in a named branch. Reads
from the main branch
+ * and assert that the only records from main are returned
+ */
+ public void testConsumeFilesFromMainBranch() throws Exception {
sql("CREATE TABLE %s (id INT, data VARCHAR, dt VARCHAR)", TABLE);
Table table =
validationCatalog.loadTable(TableIdentifier.of(icebergNamespace, TABLE));
+
+ // Produce two snapshots on main branch
Row row1 = Row.of(1, "aaa", "2021-01-01");
Row row2 = Row.of(2, "bbb", "2021-01-01");
+
insertRows(table, row1, row2);
- Assertions.assertThatThrownBy(
- () ->
- exec(
- "SELECT * FROM %s /*+ OPTIONS('streaming'='true',
'monitor-interval'='1s', 'branch'='b1')*/",
- TABLE))
- .isInstanceOf(IllegalArgumentException.class)
- .hasMessage("Cannot scan table using ref b1 configured for streaming
reader yet");
+ String branchName = "b1";
+ table.manageSnapshots().createBranch(branchName).commit();
+
+ // insert on the 'b1' branch
+ Row row3 = Row.of(3, "ccc", "2021-01-01");
+ Row row4 = Row.of(4, "ddd", "2021-01-01");
+
+ insertRowsInBranch(branchName, table, row3, row4);
+
+ // read from main
+ TableResult result =
+ exec("SELECT * FROM %s /*+ OPTIONS('streaming'='true',
'monitor-interval'='1s')*/", TABLE);
+
+ try (CloseableIterator<Row> iterator = result.collect()) {
+ // the start snapshot(row2) is exclusive.
+ assertRows(ImmutableList.of(row1, row2), iterator);
+
+ Row row5 = Row.of(5, "eee", "2021-01-01");
+ Row row6 = Row.of(6, "fff", "2021-01-01");
+ insertRows(table, row5, row6);
+ assertRows(ImmutableList.of(row5, row6), iterator);
+
+ Row row7 = Row.of(7, "ggg", "2021-01-01");
+ insertRows(table, row7);
+ assertRows(ImmutableList.of(row7), iterator);
+ }
+ result.getJobClient().ifPresent(JobClient::cancel);
+ }
+
+ @TestTemplate
+ /**
+ * Insert records on the main branch. Creates a named branch. Insert record
on named branch. Then
+ * select from the named branch and assert all the records are returned.
+ */
+ public void testConsumeFilesFromBranch() throws Exception {
+ sql("CREATE TABLE %s (id INT, data VARCHAR, dt VARCHAR)", TABLE);
+ Table table =
validationCatalog.loadTable(TableIdentifier.of(icebergNamespace, TABLE));
+
+ // Produce two snapshots on main branch
+ Row row1 = Row.of(1, "aaa", "2021-01-01");
+ Row row2 = Row.of(2, "bbb", "2021-01-01");
+
+ insertRows(table, row1, row2);
+ String branchName = "b1";
+ table.manageSnapshots().createBranch(branchName).commit();
+
+ TableResult result =
+ exec(
+ "SELECT * FROM %s /*+ OPTIONS('streaming'='true',
'monitor-interval'='1s', 'branch'='%s')*/ ",
+ TABLE, branchName);
+
+ try (CloseableIterator<Row> iterator = result.collect()) {
+ assertRows(ImmutableList.of(row1, row2), iterator);
+ // insert on the 'b1' branch
+ Row row3 = Row.of(3, "ccc", "2021-01-01");
+ Row row4 = Row.of(4, "ddd", "2021-01-01");
+ insertRowsInBranch(branchName, table, row3, row4);
+ assertRows(ImmutableList.of(row3, row4), iterator);
+ }
+ result.getJobClient().ifPresent(JobClient::cancel);
+ }
+
+ @TestTemplate
+ /**
+ * Insert records on branch b1. Then insert record on b2. Then select from
each branch and assert
+ * the correct records are returned
+ */
+ public void testConsumeFilesFromTwoBranches() throws Exception {
+ sql("CREATE TABLE %s (id INT, data VARCHAR, dt VARCHAR)", TABLE);
+ Table table =
validationCatalog.loadTable(TableIdentifier.of(icebergNamespace, TABLE));
+
+ String branch1 = "b1";
+ String branch2 = "b2";
+ table.manageSnapshots().createBranch(branch1).commit();
+ table.manageSnapshots().createBranch(branch2).commit();
+
+ // Produce two snapshots on main branch
+ Row row1Branch1 = Row.of(1, "b1", "2021-01-01");
+ Row row2Branch1 = Row.of(2, "b1", "2021-01-01");
+
+ Row row1Branch2 = Row.of(2, "b2", "2021-01-01");
+ Row row2Branch2 = Row.of(3, "b3", "2021-01-01");
+
+ insertRowsInBranch(branch1, table, row1Branch1, row2Branch1);
+ insertRowsInBranch(branch2, table, row1Branch2, row2Branch2);
+
+ TableResult resultBranch1 =
+ exec(
+ "SELECT * FROM %s /*+ OPTIONS('streaming'='true',
'monitor-interval'='1s', 'branch'='%s')*/ ",
+ TABLE, branch1);
+
+ try (CloseableIterator<Row> iterator = resultBranch1.collect()) {
+ assertRows(ImmutableList.of(row1Branch1, row2Branch1), iterator);
+ Row another = Row.of(4, "ccc", "2021-01-01");
+ insertRowsInBranch(branch1, table, another);
+ assertRows(ImmutableList.of(another), iterator);
+ }
+
+ TableResult resultBranch2 =
+ exec(
+ "SELECT * FROM %s /*+ OPTIONS('streaming'='true',
'monitor-interval'='1s', 'branch'='%s')*/ ",
+ TABLE, branch2);
+ try (CloseableIterator<Row> iterator = resultBranch2.collect()) {
+ assertRows(ImmutableList.of(row1Branch2, row2Branch2), iterator);
+ Row another = Row.of(4, "ccc", "2021-01-01");
+ insertRowsInBranch(branch2, table, another);
+ assertRows(ImmutableList.of(another), iterator);
+ }
+
+ resultBranch1.getJobClient().ifPresent(JobClient::cancel);
+ resultBranch2.getJobClient().ifPresent(JobClient::cancel);
}
@TestTemplate
@@ -296,6 +417,7 @@ public class TestStreamScanSql extends CatalogTestBase {
assertRows(ImmutableList.of(row7), iterator);
}
result.getJobClient().ifPresent(JobClient::cancel);
+
Assertions.assertThatThrownBy(
() ->
exec(
diff --git
a/flink/v1.17/flink/src/main/java/org/apache/iceberg/flink/source/ScanContext.java
b/flink/v1.17/flink/src/main/java/org/apache/iceberg/flink/source/ScanContext.java
index 3dce5dd590..cf57a126ae 100644
---
a/flink/v1.17/flink/src/main/java/org/apache/iceberg/flink/source/ScanContext.java
+++
b/flink/v1.17/flink/src/main/java/org/apache/iceberg/flink/source/ScanContext.java
@@ -152,11 +152,6 @@ public class ScanContext implements Serializable {
"Invalid starting snapshot id for SPECIFIC_START_SNAPSHOT_ID
strategy: not null");
}
- Preconditions.checkArgument(
- branch == null,
- String.format(
- "Cannot scan table using ref %s configured for streaming reader
yet", branch));
-
Preconditions.checkArgument(
tag == null,
String.format("Cannot scan table using ref %s configured for
streaming reader", tag));
diff --git
a/flink/v1.17/flink/src/main/java/org/apache/iceberg/flink/source/StreamingMonitorFunction.java
b/flink/v1.17/flink/src/main/java/org/apache/iceberg/flink/source/StreamingMonitorFunction.java
index c27e29613f..a07613aee5 100644
---
a/flink/v1.17/flink/src/main/java/org/apache/iceberg/flink/source/StreamingMonitorFunction.java
+++
b/flink/v1.17/flink/src/main/java/org/apache/iceberg/flink/source/StreamingMonitorFunction.java
@@ -130,9 +130,6 @@ public class StreamingMonitorFunction extends
RichSourceFunction<FlinkInputSplit
Preconditions.checkArgument(
!(scanContext.startTag() != null && scanContext.startSnapshotId() !=
null),
"START_SNAPSHOT_ID and START_TAG cannot both be set.");
- Preconditions.checkArgument(
- scanContext.branch() == null,
- "Cannot scan table using ref %s configured for streaming reader
yet.");
Preconditions.checkNotNull(
table.currentSnapshot(), "Don't have any available snapshot in
table.");
@@ -195,7 +192,10 @@ public class StreamingMonitorFunction extends
RichSourceFunction<FlinkInputSplit
// Refresh the table to get the latest committed snapshot.
table.refresh();
- Snapshot snapshot = table.currentSnapshot();
+ Snapshot snapshot =
+ scanContext.branch() != null
+ ? table.snapshot(scanContext.branch())
+ : table.currentSnapshot();
if (snapshot != null && snapshot.snapshotId() != lastSnapshotId) {
long snapshotId = snapshot.snapshotId();
diff --git
a/flink/v1.17/flink/src/main/java/org/apache/iceberg/flink/source/enumerator/ContinuousSplitPlannerImpl.java
b/flink/v1.17/flink/src/main/java/org/apache/iceberg/flink/source/enumerator/ContinuousSplitPlannerImpl.java
index 450b649253..e9e3c159b0 100644
---
a/flink/v1.17/flink/src/main/java/org/apache/iceberg/flink/source/enumerator/ContinuousSplitPlannerImpl.java
+++
b/flink/v1.17/flink/src/main/java/org/apache/iceberg/flink/source/enumerator/ContinuousSplitPlannerImpl.java
@@ -104,7 +104,11 @@ public class ContinuousSplitPlannerImpl implements
ContinuousSplitPlanner {
private ContinuousEnumerationResult discoverIncrementalSplits(
IcebergEnumeratorPosition lastPosition) {
- Snapshot currentSnapshot = table.currentSnapshot();
+ Snapshot currentSnapshot =
+ scanContext.branch() != null
+ ? table.snapshot(scanContext.branch())
+ : table.currentSnapshot();
+
if (currentSnapshot == null) {
// empty table
Preconditions.checkArgument(
diff --git
a/flink/v1.17/flink/src/test/java/org/apache/iceberg/flink/source/TestIcebergSourceContinuous.java
b/flink/v1.17/flink/src/test/java/org/apache/iceberg/flink/source/TestIcebergSourceContinuous.java
index 31e9733fcd..bfd7fa5758 100644
---
a/flink/v1.17/flink/src/test/java/org/apache/iceberg/flink/source/TestIcebergSourceContinuous.java
+++
b/flink/v1.17/flink/src/test/java/org/apache/iceberg/flink/source/TestIcebergSourceContinuous.java
@@ -370,6 +370,92 @@ public class TestIcebergSourceContinuous {
}
}
+ @Test
+ public void testReadingFromBranch() throws Exception {
+ String branch = "b1";
+ GenericAppenderHelper dataAppender =
+ new GenericAppenderHelper(tableResource.table(), FileFormat.PARQUET,
TEMPORARY_FOLDER);
+
+ List<Record> batchBase =
+ RandomGenericData.generate(tableResource.table().schema(), 2,
randomSeed.incrementAndGet());
+ dataAppender.appendToTable(batchBase);
+
+ // create branch
+ tableResource
+ .table()
+ .manageSnapshots()
+ .createBranch(branch,
tableResource.table().currentSnapshot().snapshotId())
+ .commit();
+
+ // snapshot1 to branch
+ List<Record> batch1 =
+ RandomGenericData.generate(tableResource.table().schema(), 2,
randomSeed.incrementAndGet());
+ dataAppender.appendToTable(branch, batch1);
+
+ // snapshot2 to branch
+ List<Record> batch2 =
+ RandomGenericData.generate(tableResource.table().schema(), 2,
randomSeed.incrementAndGet());
+ dataAppender.appendToTable(branch, batch2);
+
+ List<Record> branchExpectedRecords = Lists.newArrayList();
+ branchExpectedRecords.addAll(batchBase);
+ branchExpectedRecords.addAll(batch1);
+ branchExpectedRecords.addAll(batch2);
+ // reads from branch: it should contain the first snapshot (before the
branch creation) followed
+ // by the next 2 snapshots added
+ ScanContext scanContext =
+ ScanContext.builder()
+ .streaming(true)
+ .monitorInterval(Duration.ofMillis(10L))
+
.startingStrategy(StreamingStartingStrategy.TABLE_SCAN_THEN_INCREMENTAL)
+ .useBranch(branch)
+ .build();
+
+ try (CloseableIterator<Row> iter =
+
createStream(scanContext).executeAndCollect(getClass().getSimpleName())) {
+ List<Row> resultMain = waitForResult(iter, 6);
+ TestHelpers.assertRecords(resultMain, branchExpectedRecords,
tableResource.table().schema());
+
+ // snapshot3 to branch
+ List<Record> batch3 =
+ RandomGenericData.generate(
+ tableResource.table().schema(), 2, randomSeed.incrementAndGet());
+ dataAppender.appendToTable(branch, batch3);
+
+ List<Row> result3 = waitForResult(iter, 2);
+ TestHelpers.assertRecords(result3, batch3,
tableResource.table().schema());
+
+ // snapshot4 to branch
+ List<Record> batch4 =
+ RandomGenericData.generate(
+ tableResource.table().schema(), 2, randomSeed.incrementAndGet());
+ dataAppender.appendToTable(branch, batch4);
+
+ List<Row> result4 = waitForResult(iter, 2);
+ TestHelpers.assertRecords(result4, batch4,
tableResource.table().schema());
+ }
+
+ // read only from main branch. Should contain only the first snapshot
+ scanContext =
+ ScanContext.builder()
+ .streaming(true)
+ .monitorInterval(Duration.ofMillis(10L))
+
.startingStrategy(StreamingStartingStrategy.TABLE_SCAN_THEN_INCREMENTAL)
+ .build();
+ try (CloseableIterator<Row> iter =
+
createStream(scanContext).executeAndCollect(getClass().getSimpleName())) {
+ List<Row> resultMain = waitForResult(iter, 2);
+ TestHelpers.assertRecords(resultMain, batchBase,
tableResource.table().schema());
+
+ List<Record> batchMain2 =
+ RandomGenericData.generate(
+ tableResource.table().schema(), 2, randomSeed.incrementAndGet());
+ dataAppender.appendToTable(batchMain2);
+ resultMain = waitForResult(iter, 2);
+ TestHelpers.assertRecords(resultMain, batchMain2,
tableResource.table().schema());
+ }
+ }
+
private DataStream<Row> createStream(ScanContext scanContext) throws
Exception {
// start the source and collect output
StreamExecutionEnvironment env =
StreamExecutionEnvironment.getExecutionEnvironment();
@@ -384,6 +470,7 @@ public class TestIcebergSourceContinuous {
.startSnapshotTimestamp(scanContext.startSnapshotTimestamp())
.startSnapshotId(scanContext.startSnapshotId())
.monitorInterval(Duration.ofMillis(10L))
+ .branch(scanContext.branch())
.build(),
WatermarkStrategy.noWatermarks(),
"icebergSource",
diff --git
a/flink/v1.17/flink/src/test/java/org/apache/iceberg/flink/source/TestStreamScanSql.java
b/flink/v1.17/flink/src/test/java/org/apache/iceberg/flink/source/TestStreamScanSql.java
index 09d5a5481a..9e043bbbbb 100644
---
a/flink/v1.17/flink/src/test/java/org/apache/iceberg/flink/source/TestStreamScanSql.java
+++
b/flink/v1.17/flink/src/test/java/org/apache/iceberg/flink/source/TestStreamScanSql.java
@@ -33,6 +33,7 @@ import org.apache.flink.table.api.config.TableConfigOptions;
import org.apache.flink.types.Row;
import org.apache.flink.util.CloseableIterator;
import org.apache.iceberg.FileFormat;
+import org.apache.iceberg.SnapshotRef;
import org.apache.iceberg.Table;
import org.apache.iceberg.TestHelpers;
import org.apache.iceberg.catalog.TableIdentifier;
@@ -98,6 +99,11 @@ public class TestStreamScanSql extends CatalogTestBase {
}
private void insertRows(String partition, Table table, Row... rows) throws
IOException {
+ insertRows(partition, SnapshotRef.MAIN_BRANCH, table, rows);
+ }
+
+ private void insertRows(String partition, String branch, Table table, Row...
rows)
+ throws IOException {
GenericAppenderHelper appender = new GenericAppenderHelper(table, FORMAT,
temporaryDirectory);
GenericRecord gRecord = GenericRecord.create(table.schema());
@@ -111,12 +117,16 @@ public class TestStreamScanSql extends CatalogTestBase {
}
if (partition != null) {
- appender.appendToTable(TestHelpers.Row.of(partition, 0), records);
+ appender.appendToTable(TestHelpers.Row.of(partition, 0), branch,
records);
} else {
- appender.appendToTable(records);
+ appender.appendToTable(branch, records);
}
}
+ private void insertRowsInBranch(String branch, Table table, Row... rows)
throws IOException {
+ insertRows(null, branch, table, rows);
+ }
+
private void insertRows(Table table, Row... rows) throws IOException {
insertRows(null, table, rows);
}
@@ -205,20 +215,130 @@ public class TestStreamScanSql extends CatalogTestBase {
}
@TestTemplate
- public void testConsumeFilesWithBranch() throws Exception {
+ /**
+ * Insert records on the main branch. Then, insert in a named branch. Reads
from the main branch
+ * and assert that the only records from main are returned
+ */
+ public void testConsumeFilesFromMainBranch() throws Exception {
sql("CREATE TABLE %s (id INT, data VARCHAR, dt VARCHAR)", TABLE);
Table table =
validationCatalog.loadTable(TableIdentifier.of(icebergNamespace, TABLE));
+
+ // Produce two snapshots on main branch
Row row1 = Row.of(1, "aaa", "2021-01-01");
Row row2 = Row.of(2, "bbb", "2021-01-01");
+
insertRows(table, row1, row2);
+ String branchName = "b1";
+ table.manageSnapshots().createBranch(branchName).commit();
- Assertions.assertThatThrownBy(
- () ->
- exec(
- "SELECT * FROM %s /*+ OPTIONS('streaming'='true',
'monitor-interval'='1s', 'branch'='b1')*/",
- TABLE))
- .isInstanceOf(IllegalArgumentException.class)
- .hasMessage("Cannot scan table using ref b1 configured for streaming
reader yet");
+ // insert on the 'b1' branch
+ Row row3 = Row.of(3, "ccc", "2021-01-01");
+ Row row4 = Row.of(4, "ddd", "2021-01-01");
+
+ insertRowsInBranch(branchName, table, row3, row4);
+
+ // read from main
+ TableResult result =
+ exec("SELECT * FROM %s /*+ OPTIONS('streaming'='true',
'monitor-interval'='1s')*/", TABLE);
+
+ try (CloseableIterator<Row> iterator = result.collect()) {
+ // the start snapshot(row2) is exclusive.
+ assertRows(ImmutableList.of(row1, row2), iterator);
+
+ Row row5 = Row.of(5, "eee", "2021-01-01");
+ Row row6 = Row.of(6, "fff", "2021-01-01");
+ insertRows(table, row5, row6);
+ assertRows(ImmutableList.of(row5, row6), iterator);
+
+ Row row7 = Row.of(7, "ggg", "2021-01-01");
+ insertRows(table, row7);
+ assertRows(ImmutableList.of(row7), iterator);
+ }
+ result.getJobClient().ifPresent(JobClient::cancel);
+ }
+
+ @TestTemplate
+ /**
+ * Insert records on the main branch. Creates a named branch. Insert record
on named branch. Then
+ * select from the named branch and assert all the records are returned.
+ */
+ public void testConsumeFilesFromBranch() throws Exception {
+ sql("CREATE TABLE %s (id INT, data VARCHAR, dt VARCHAR)", TABLE);
+ Table table =
validationCatalog.loadTable(TableIdentifier.of(icebergNamespace, TABLE));
+
+ // Produce two snapshots on main branch
+ Row row1 = Row.of(1, "aaa", "2021-01-01");
+ Row row2 = Row.of(2, "bbb", "2021-01-01");
+
+ insertRows(table, row1, row2);
+ String branchName = "b1";
+ table.manageSnapshots().createBranch(branchName).commit();
+
+ TableResult result =
+ exec(
+ "SELECT * FROM %s /*+ OPTIONS('streaming'='true',
'monitor-interval'='1s', 'branch'='%s')*/ ",
+ TABLE, branchName);
+
+ try (CloseableIterator<Row> iterator = result.collect()) {
+ assertRows(ImmutableList.of(row1, row2), iterator);
+ // insert on the 'b1' branch
+ Row row3 = Row.of(3, "ccc", "2021-01-01");
+ Row row4 = Row.of(4, "ddd", "2021-01-01");
+ insertRowsInBranch(branchName, table, row3, row4);
+ assertRows(ImmutableList.of(row3, row4), iterator);
+ }
+ result.getJobClient().ifPresent(JobClient::cancel);
+ }
+
+ @TestTemplate
+ /**
+ * Insert records on branch b1. Then insert record on b2. Then select from
each branch and assert
+ * the correct records are returned
+ */
+ public void testConsumeFilesFromTwoBranches() throws Exception {
+ sql("CREATE TABLE %s (id INT, data VARCHAR, dt VARCHAR)", TABLE);
+ Table table =
validationCatalog.loadTable(TableIdentifier.of(icebergNamespace, TABLE));
+
+ String branch1 = "b1";
+ String branch2 = "b2";
+ table.manageSnapshots().createBranch(branch1).commit();
+ table.manageSnapshots().createBranch(branch2).commit();
+
+ // Produce two snapshots on main branch
+ Row row1Branch1 = Row.of(1, "b1", "2021-01-01");
+ Row row2Branch1 = Row.of(2, "b1", "2021-01-01");
+
+ Row row1Branch2 = Row.of(2, "b2", "2021-01-01");
+ Row row2Branch2 = Row.of(3, "b3", "2021-01-01");
+
+ insertRowsInBranch(branch1, table, row1Branch1, row2Branch1);
+ insertRowsInBranch(branch2, table, row1Branch2, row2Branch2);
+
+ TableResult resultBranch1 =
+ exec(
+ "SELECT * FROM %s /*+ OPTIONS('streaming'='true',
'monitor-interval'='1s', 'branch'='%s')*/ ",
+ TABLE, branch1);
+
+ try (CloseableIterator<Row> iterator = resultBranch1.collect()) {
+ assertRows(ImmutableList.of(row1Branch1, row2Branch1), iterator);
+ Row another = Row.of(4, "ccc", "2021-01-01");
+ insertRowsInBranch(branch1, table, another);
+ assertRows(ImmutableList.of(another), iterator);
+ }
+
+ TableResult resultBranch2 =
+ exec(
+ "SELECT * FROM %s /*+ OPTIONS('streaming'='true',
'monitor-interval'='1s', 'branch'='%s')*/ ",
+ TABLE, branch2);
+ try (CloseableIterator<Row> iterator = resultBranch2.collect()) {
+ assertRows(ImmutableList.of(row1Branch2, row2Branch2), iterator);
+ Row another = Row.of(4, "ccc", "2021-01-01");
+ insertRowsInBranch(branch2, table, another);
+ assertRows(ImmutableList.of(another), iterator);
+ }
+
+ resultBranch1.getJobClient().ifPresent(JobClient::cancel);
+ resultBranch2.getJobClient().ifPresent(JobClient::cancel);
}
@TestTemplate