ahmedabu98 commented on code in PR #33504: URL: https://github.com/apache/beam/pull/33504#discussion_r1981690809
########## sdks/java/io/iceberg/src/main/java/org/apache/beam/sdk/io/iceberg/IcebergIO.java: ########## @@ -103,22 +108,119 @@ * </tr> * </table> * - * <p><b>Additional configuration options are provided in the `Pre-filtering Options` section below, - * for Iceberg writes.</b> + * <h3>Sink-only Options</h3> * - * <h3>Creating Tables</h3> + * <table border="1" cellspacing="1"> + * <tr> + * <td> <b>Parameter</b> </td> <td> <b>Type</b> </td> <td> <b>Description</b> </td> + * </tr> + * <tr> + * <td> {@code triggering_frequency_seconds} </td> + * <td> {@code int} </td> + * <td>Required for streaming writes. Roughly every + * {@code triggering_frequency_seconds} duration, the sink will write records to data files and produce a table snapshot. + * Generally, a higher value will produce fewer, larger data files. + * </td> + * </tr> + * <tr> + * <td>{@code drop}</td> <td>{@code list<str>}</td> <td>A list of fields to drop before writing to table(s).</td> + * </tr> + * <tr> + * <td>{@code keep}</td> <td>{@code list<str>}</td> <td>A list of fields to keep, dropping the rest before writing to table(s).</td> + * </tr> + * <tr> + * <td>{@code only}</td> <td>{@code str}</td> <td>A nested record field that should be the only thing written to table(s).</td> + * </tr> + * </table> * - * <p>If an Iceberg table does not exist at the time of writing, this connector will automatically - * create one with the data's schema. + * <h3>Source-only Options</h3> * - * <p>Note that this is a best-effort operation that depends on the {@link Catalog} implementation. - * Some implementations may not support creating a table using the Iceberg API. + * <h4>CDC Source options</h4> + * + * <table border="1" cellspacing="1"> + * <tr> + * <td> <b>Parameter</b> </td> <td> <b>Type</b> </td> <td> <b>Description</b> </td> + * </tr> + * <tr> + * <td> {@code from_snapshot} </td> + * <td> {@code long} </td> + * <td> Starts reading from this snapshot ID (inclusive). + * </td> + * </tr> + * <tr> + * <td> {@code to_snapshot} </td> + * <td> {@code long} </td> + * <td> Reads up to this snapshot ID (inclusive). If unset and the source is bounded, it will read + * up to the current snapshot (inclusive). If unset and source is unbounded, it will continue polling for new snapshots forever. + * </td> + * </tr> + * <tr> + * <td> {@code from_timestamp} </td> + * <td> {@code long} </td> + * <td> Starts reading from the earliest snapshot (inclusive) created after this timestamp (in milliseconds). + * </td> + * </tr> + * <tr> + * <td> {@code to_timestamp} </td> + * <td> {@code long} </td> + * <td> Reads up to the latest snapshot (inclusive) created before this timestamp (in milliseconds). + * </td> + * </tr> + * <tr> + * <td> {@code starting_strategy} </td> + * <td> {@code str} </td> + * <td> + * The source's starting strategy. Valid options are: + * <ul> + * <li>{@code earliest}: starts reading from the earliest snapshot</li> + * <li>{@code latest}: starts reading from the latest snapshot</li> + * </ul> + * <p>Defaults to {@code earliest} for batch, and {@code latest} for streaming. + * </td> + * </tr> + * <tr> + * <td> {@code watermark_column} </td> + * <td> {@code str} </td> + * <td> + * The column used to derive event time to track progress. Must be of type: + * <ul> + * <li>{@code timestamp}</li> Review Comment: These are Iceberg types: https://iceberg.apache.org/spec/#primitive-types. Will include this link ########## sdks/java/io/iceberg/src/main/java/org/apache/beam/sdk/io/iceberg/IcebergIO.java: ########## @@ -103,22 +108,119 @@ * </tr> * </table> * - * <p><b>Additional configuration options are provided in the `Pre-filtering Options` section below, - * for Iceberg writes.</b> + * <h3>Sink-only Options</h3> * - * <h3>Creating Tables</h3> + * <table border="1" cellspacing="1"> + * <tr> + * <td> <b>Parameter</b> </td> <td> <b>Type</b> </td> <td> <b>Description</b> </td> + * </tr> + * <tr> + * <td> {@code triggering_frequency_seconds} </td> + * <td> {@code int} </td> + * <td>Required for streaming writes. Roughly every + * {@code triggering_frequency_seconds} duration, the sink will write records to data files and produce a table snapshot. + * Generally, a higher value will produce fewer, larger data files. + * </td> + * </tr> + * <tr> + * <td>{@code drop}</td> <td>{@code list<str>}</td> <td>A list of fields to drop before writing to table(s).</td> + * </tr> + * <tr> + * <td>{@code keep}</td> <td>{@code list<str>}</td> <td>A list of fields to keep, dropping the rest before writing to table(s).</td> + * </tr> + * <tr> + * <td>{@code only}</td> <td>{@code str}</td> <td>A nested record field that should be the only thing written to table(s).</td> + * </tr> + * </table> * - * <p>If an Iceberg table does not exist at the time of writing, this connector will automatically - * create one with the data's schema. + * <h3>Source-only Options</h3> * - * <p>Note that this is a best-effort operation that depends on the {@link Catalog} implementation. - * Some implementations may not support creating a table using the Iceberg API. + * <h4>CDC Source options</h4> + * + * <table border="1" cellspacing="1"> + * <tr> + * <td> <b>Parameter</b> </td> <td> <b>Type</b> </td> <td> <b>Description</b> </td> + * </tr> + * <tr> + * <td> {@code from_snapshot} </td> + * <td> {@code long} </td> + * <td> Starts reading from this snapshot ID (inclusive). + * </td> + * </tr> + * <tr> + * <td> {@code to_snapshot} </td> + * <td> {@code long} </td> + * <td> Reads up to this snapshot ID (inclusive). If unset and the source is bounded, it will read + * up to the current snapshot (inclusive). If unset and source is unbounded, it will continue polling for new snapshots forever. + * </td> + * </tr> + * <tr> + * <td> {@code from_timestamp} </td> + * <td> {@code long} </td> + * <td> Starts reading from the earliest snapshot (inclusive) created after this timestamp (in milliseconds). + * </td> + * </tr> + * <tr> + * <td> {@code to_timestamp} </td> + * <td> {@code long} </td> + * <td> Reads up to the latest snapshot (inclusive) created before this timestamp (in milliseconds). + * </td> + * </tr> + * <tr> + * <td> {@code starting_strategy} </td> + * <td> {@code str} </td> + * <td> + * The source's starting strategy. Valid options are: + * <ul> + * <li>{@code earliest}: starts reading from the earliest snapshot</li> + * <li>{@code latest}: starts reading from the latest snapshot</li> + * </ul> + * <p>Defaults to {@code earliest} for batch, and {@code latest} for streaming. + * </td> + * </tr> + * <tr> + * <td> {@code watermark_column} </td> + * <td> {@code str} </td> + * <td> + * The column used to derive event time to track progress. Must be of type: + * <ul> + * <li>{@code timestamp}</li> + * <li>{@code timestamptz}</li> + * <li>{@code long} (micros)</li> + * </ul> + * </td> + * </tr> + * <tr> + * <td> {@code watermark_time_unit} </td> + * <td> {@code str} </td> + * <td> + * Use only when {@code watermark_column} is set to a column of type Long. Specifies the TimeUnit represented by the watermark column. + * Default is {@code "microseconds"}. + * + * <p>Check {@link TimeUnit} for possible values. + * </td> + * </tr> + * <tr> + * <td> {@code streaming} </td> Review Comment: Mentioned below in the "Choosing an End Point (CDC only)" section. It will still be a streaming pipeline, which will stop by itself after processing the end snapshot. Similar to how PeriodicImpulse behaves. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: github-unsubscr...@beam.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org