ahmedabu98 commented on code in PR #33504:
URL: https://github.com/apache/beam/pull/33504#discussion_r1951278320
##########
sdks/java/io/iceberg/src/main/java/org/apache/beam/sdk/io/iceberg/IcebergIO.java:
##########
@@ -413,29 +438,56 @@ abstract static class Builder {
abstract Builder setTableIdentifier(TableIdentifier identifier);
+ abstract Builder setFromSnapshotExclusive(@Nullable Long
fromSnapshotExclusive);
+
+ abstract Builder setToSnapshot(@Nullable Long toSnapshot);
+
+ abstract Builder setTriggeringFrequency(Duration triggeringFrequency);
+
abstract ReadRows build();
}
public ReadRows from(TableIdentifier tableIdentifier) {
return toBuilder().setTableIdentifier(tableIdentifier).build();
}
+ public ReadRows fromSnapshotExclusive(@Nullable Long
fromSnapshotExclusive) {
+ return
toBuilder().setFromSnapshotExclusive(fromSnapshotExclusive).build();
+ }
+
+ public ReadRows toSnapshot(@Nullable Long toSnapshot) {
+ return toBuilder().setToSnapshot(toSnapshot).build();
+ }
+
+ public ReadRows withTriggeringFrequency(Duration triggeringFrequency) {
+ return toBuilder().setTriggeringFrequency(triggeringFrequency).build();
+ }
+
@Override
public PCollection<Row> expand(PBegin input) {
TableIdentifier tableId =
checkStateNotNull(getTableIdentifier(), "Must set a table to read
from.");
Table table = getCatalogConfig().catalog().loadTable(tableId);
- return input.apply(
- Read.from(
- new ScanSource(
- IcebergScanConfig.builder()
- .setCatalogConfig(getCatalogConfig())
- .setScanType(IcebergScanConfig.ScanType.TABLE)
- .setTableIdentifier(tableId)
-
.setSchema(IcebergUtils.icebergSchemaToBeamSchema(table.schema()))
- .build())));
+ IcebergScanConfig scanConfig =
+ IcebergScanConfig.builder()
+ .setCatalogConfig(getCatalogConfig())
+ .setScanType(IcebergScanConfig.ScanType.TABLE)
+ .setTableIdentifier(tableId)
+
.setSchema(IcebergUtils.icebergSchemaToBeamSchema(table.schema()))
+ .setFromSnapshotExclusive(getFromSnapshotExclusive())
+ .setToSnapshot(getToSnapshot())
+ .build();
+ if (getTriggeringFrequency() != null
Review Comment:
I could be wrong about this 🤷🏽♂️, I'll mention the `streaming` param in the
doc and see what others think
Yep I just started learning a little about CDC in Iceberg. We have plenty of
time until the next release cut, so I'll put some more thought into it before
pushing this in
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]