chamikaramj commented on code in PR #33504:
URL: https://github.com/apache/beam/pull/33504#discussion_r1984039645
##########
sdks/java/io/iceberg/src/main/java/org/apache/beam/sdk/io/iceberg/IcebergIO.java:
##########
@@ -310,7 +409,155 @@
* </tr>
* </table>
*
- * <p>For internal use only; no backwards compatibility guarantees
+ * <br>
+ * <br>
+ *
+ * <h2>Reading from Tables</h2>
+ *
+ * With the following configuration,
+ *
+ * <pre>{@code
+ * Map<String, Object> config = Map.of(
+ * "table", table,
+ * "catalog_name", name,
+ * "catalog_properties", Map.of(...),
+ * "config_properties", Map.of(...));
+ * }</pre>
+ *
+ * Example of a simple batch read:
+ *
+ * <pre>{@code
+ * PCollection<Row> rows = pipeline
+ * .apply(Managed.read(ICEBERG).withConfig(config))
+ * .getSinglePCollection();
+ * }</pre>
+ *
+ * Example of a simple CDC read:
+ *
+ * <pre>{@code
+ * PCollection<Row> output = pipeline
+ * .apply(Managed.read(ICEBERG_CDC).withConfig(config))
+ * .getSinglePCollection();
+ *
+ * PCollection<Row> rows = output
+ * .apply(ReadUtils.extractRecords());
+ * }</pre>
+ *
+ * <p><b>Note</b>: This reads <b>append-only</b> snapshots. Full CDC is not
supported yet.
+ *
+ * <p>The CDC <b>streaming</b> source (enabled with {@code streaming=true})
continuously polls the
Review Comment:
We should validate (and fail) somewhere if the "streaming" flag is set here
and the streaming PipelineOption [1] is not set.
[1]
https://github.com/apache/beam/blob/c1d0fa4f850f70a7dd5817227127fd9beadcc73e/sdks/java/core/src/main/java/org/apache/beam/sdk/options/StreamingOptions.java#L38
##########
sdks/java/managed/src/main/java/org/apache/beam/sdk/managed/Managed.java:
##########
@@ -108,6 +110,7 @@ public class Managed {
*
* <ul>
* <li>{@link Managed#ICEBERG} : Read from Apache Iceberg tables
+ * <li>{@link Managed#ICEBERG_CDC} : CDC Read from Apache Iceberg tables
Review Comment:
We should link to locations where users can find additional Javadocs related
to each of these options (also for write).
##########
sdks/java/io/iceberg/src/main/java/org/apache/beam/sdk/io/iceberg/IcebergIO.java:
##########
@@ -310,7 +409,155 @@
* </tr>
* </table>
*
- * <p>For internal use only; no backwards compatibility guarantees
+ * <br>
+ * <br>
+ *
+ * <h2>Reading from Tables</h2>
+ *
+ * With the following configuration,
+ *
+ * <pre>{@code
+ * Map<String, Object> config = Map.of(
+ * "table", table,
+ * "catalog_name", name,
+ * "catalog_properties", Map.of(...),
+ * "config_properties", Map.of(...));
+ * }</pre>
+ *
+ * Example of a simple batch read:
+ *
+ * <pre>{@code
+ * PCollection<Row> rows = pipeline
+ * .apply(Managed.read(ICEBERG).withConfig(config))
+ * .getSinglePCollection();
+ * }</pre>
+ *
+ * Example of a simple CDC read:
+ *
+ * <pre>{@code
+ * PCollection<Row> output = pipeline
+ * .apply(Managed.read(ICEBERG_CDC).withConfig(config))
+ * .getSinglePCollection();
+ *
+ * PCollection<Row> rows = output
+ * .apply(ReadUtils.extractRecords());
+ * }</pre>
+ *
+ * <p><b>Note</b>: This reads <b>append-only</b> snapshots. Full CDC is not
supported yet.
+ *
+ * <p>The CDC <b>streaming</b> source (enabled with {@code streaming=true})
continuously polls the
+ * table for new snapshots, with a default interval of 60 seconds. This can be
overridden using
+ * <b>{@code poll_interval_seconds}</b>:
+ *
+ * <pre>{@code
+ * config.put("streaming", true);
+ * config.put("poll_interval_seconds", 10);
+ * }</pre>
+ *
+ * <h3>Output Schema</h3>
+ *
+ * <p>Reading with <b>{@code Managed.read(ICEBERG)}</b> produces a <b>{@code
PCollection<Row>}</b>
+ * containing data records that conform to the table schema.
+ *
+ * <p>Reading with <b>{@code Managed.read(ICEBERG_CDC)}</b> produces a
<b>{@code
+ * PCollection<Row>}</b> with the following schema:
+ *
+ * <table border="1" cellspacing="1">
+ * <tr>
+ * <td> <b>Field</b> </td> <td> <b>Type</b> </td> <td> <b>Description</b>
</td>
+ * </tr>
+ * <tr>
+ * <td> {@code record} </td>
+ * <td> {@code Beam Row} </td>
+ * <td>
+ * The data record.
+ * </td>
+ * </tr>
+ * <tr>
+ * <td> {@code operation} </td>
+ * <td> {@code string} </td>
+ * <td>
+ * The snapshot <a
href="https://iceberg.apache.org/javadoc/0.11.0/org/apache/iceberg/DataOperations">operation</a>
associated with this record. For now, only "append" is supported.
Review Comment:
May be change to "APPEND" to be consistent with Iceberg.
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]