ahmedabu98 commented on code in PR #33504:
URL: https://github.com/apache/beam/pull/33504#discussion_r1951278320


##########
sdks/java/io/iceberg/src/main/java/org/apache/beam/sdk/io/iceberg/IcebergIO.java:
##########
@@ -413,29 +438,56 @@ abstract static class Builder {
 
       abstract Builder setTableIdentifier(TableIdentifier identifier);
 
+      abstract Builder setFromSnapshotExclusive(@Nullable Long 
fromSnapshotExclusive);
+
+      abstract Builder setToSnapshot(@Nullable Long toSnapshot);
+
+      abstract Builder setTriggeringFrequency(Duration triggeringFrequency);
+
       abstract ReadRows build();
     }
 
     public ReadRows from(TableIdentifier tableIdentifier) {
       return toBuilder().setTableIdentifier(tableIdentifier).build();
     }
 
+    public ReadRows fromSnapshotExclusive(@Nullable Long 
fromSnapshotExclusive) {
+      return 
toBuilder().setFromSnapshotExclusive(fromSnapshotExclusive).build();
+    }
+
+    public ReadRows toSnapshot(@Nullable Long toSnapshot) {
+      return toBuilder().setToSnapshot(toSnapshot).build();
+    }
+
+    public ReadRows withTriggeringFrequency(Duration triggeringFrequency) {
+      return toBuilder().setTriggeringFrequency(triggeringFrequency).build();
+    }
+
     @Override
     public PCollection<Row> expand(PBegin input) {
       TableIdentifier tableId =
           checkStateNotNull(getTableIdentifier(), "Must set a table to read 
from.");
 
       Table table = getCatalogConfig().catalog().loadTable(tableId);
 
-      return input.apply(
-          Read.from(
-              new ScanSource(
-                  IcebergScanConfig.builder()
-                      .setCatalogConfig(getCatalogConfig())
-                      .setScanType(IcebergScanConfig.ScanType.TABLE)
-                      .setTableIdentifier(tableId)
-                      
.setSchema(IcebergUtils.icebergSchemaToBeamSchema(table.schema()))
-                      .build())));
+      IcebergScanConfig scanConfig =
+          IcebergScanConfig.builder()
+              .setCatalogConfig(getCatalogConfig())
+              .setScanType(IcebergScanConfig.ScanType.TABLE)
+              .setTableIdentifier(tableId)
+              
.setSchema(IcebergUtils.icebergSchemaToBeamSchema(table.schema()))
+              .setFromSnapshotExclusive(getFromSnapshotExclusive())
+              .setToSnapshot(getToSnapshot())
+              .build();
+      if (getTriggeringFrequency() != null

Review Comment:
   I could be wrong about this 🤷🏽‍♂️, I'll mention the `streaming` param in the 
doc and see what others think
   
   Yep I just started learning a little about CDC in Iceberg. We have plenty of 
time until the next release cut, so I'll put some more thought into it before 
pushing this in



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]

Reply via email to