chamikaramj commented on code in PR #33504:
URL: https://github.com/apache/beam/pull/33504#discussion_r1980518478
##########
sdks/java/io/iceberg/src/main/java/org/apache/beam/sdk/io/iceberg/IcebergIO.java:
##########
@@ -103,22 +108,119 @@
* </tr>
* </table>
*
- * <p><b>Additional configuration options are provided in the `Pre-filtering
Options` section below,
- * for Iceberg writes.</b>
+ * <h3>Sink-only Options</h3>
*
- * <h3>Creating Tables</h3>
+ * <table border="1" cellspacing="1">
+ * <tr>
+ * <td> <b>Parameter</b> </td> <td> <b>Type</b> </td> <td>
<b>Description</b> </td>
+ * </tr>
+ * <tr>
+ * <td> {@code triggering_frequency_seconds} </td>
+ * <td> {@code int} </td>
+ * <td>Required for streaming writes. Roughly every
+ * {@code triggering_frequency_seconds} duration, the sink will write
records to data files and produce a table snapshot.
+ * Generally, a higher value will produce fewer, larger data files.
+ * </td>
+ * </tr>
+ * <tr>
+ * <td>{@code drop}</td> <td>{@code list<str>}</td> <td>A list of fields
to drop before writing to table(s).</td>
+ * </tr>
+ * <tr>
+ * <td>{@code keep}</td> <td>{@code list<str>}</td> <td>A list of fields
to keep, dropping the rest before writing to table(s).</td>
+ * </tr>
+ * <tr>
+ * <td>{@code only}</td> <td>{@code str}</td> <td>A nested record field
that should be the only thing written to table(s).</td>
+ * </tr>
+ * </table>
*
- * <p>If an Iceberg table does not exist at the time of writing, this
connector will automatically
- * create one with the data's schema.
+ * <h3>Source-only Options</h3>
*
- * <p>Note that this is a best-effort operation that depends on the {@link
Catalog} implementation.
- * Some implementations may not support creating a table using the Iceberg API.
+ * <h4>CDC Source options</h4>
+ *
+ * <table border="1" cellspacing="1">
+ * <tr>
+ * <td> <b>Parameter</b> </td> <td> <b>Type</b> </td> <td>
<b>Description</b> </td>
+ * </tr>
+ * <tr>
+ * <td> {@code from_snapshot} </td>
+ * <td> {@code long} </td>
+ * <td> Starts reading from this snapshot ID (inclusive).
+ * </td>
+ * </tr>
+ * <tr>
+ * <td> {@code to_snapshot} </td>
+ * <td> {@code long} </td>
+ * <td> Reads up to this snapshot ID (inclusive). If unset and the source
is bounded, it will read
+ * up to the current snapshot (inclusive). If unset and source is
unbounded, it will continue polling for new snapshots forever.
+ * </td>
+ * </tr>
+ * <tr>
+ * <td> {@code from_timestamp} </td>
+ * <td> {@code long} </td>
+ * <td> Starts reading from the earliest snapshot (inclusive) created
after this timestamp (in milliseconds).
+ * </td>
+ * </tr>
+ * <tr>
+ * <td> {@code to_timestamp} </td>
+ * <td> {@code long} </td>
+ * <td> Reads up to the latest snapshot (inclusive) created before this
timestamp (in milliseconds).
Review Comment:
Is this also optional (similar to to_snapshot) ?
##########
sdks/java/io/iceberg/src/main/java/org/apache/beam/sdk/io/iceberg/ReadUtils.java:
##########
@@ -0,0 +1,296 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.beam.sdk.io.iceberg;
+
+import static org.apache.beam.sdk.util.Preconditions.checkStateNotNull;
+import static org.apache.iceberg.util.SnapshotUtil.ancestorsOf;
+
+import java.nio.ByteBuffer;
+import java.util.Collection;
+import java.util.Collections;
+import java.util.Comparator;
+import java.util.List;
+import java.util.Map;
+import java.util.Set;
+import java.util.concurrent.TimeUnit;
+import java.util.function.BiFunction;
+import java.util.stream.Collectors;
+import org.apache.beam.sdk.io.iceberg.IcebergIO.ReadRows.StartingStrategy;
+import org.apache.beam.sdk.schemas.Schema;
+import org.apache.beam.sdk.transforms.DoFn;
+import org.apache.beam.sdk.transforms.PTransform;
+import org.apache.beam.sdk.transforms.ParDo;
+import org.apache.beam.sdk.transforms.windowing.BoundedWindow;
+import org.apache.beam.sdk.values.PCollection;
+import org.apache.beam.sdk.values.Row;
+import
org.apache.beam.vendor.guava.v32_1_2_jre.com.google.common.base.MoreObjects;
+import
org.apache.beam.vendor.guava.v32_1_2_jre.com.google.common.base.Preconditions;
+import
org.apache.beam.vendor.guava.v32_1_2_jre.com.google.common.collect.Lists;
+import org.apache.beam.vendor.guava.v32_1_2_jre.com.google.common.collect.Sets;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.iceberg.FileScanTask;
+import org.apache.iceberg.PartitionSpec;
+import org.apache.iceberg.Table;
+import org.apache.iceberg.TableProperties;
+import org.apache.iceberg.data.IdentityPartitionConverters;
+import org.apache.iceberg.data.Record;
+import org.apache.iceberg.data.parquet.GenericParquetReaders;
+import org.apache.iceberg.encryption.EncryptedFiles;
+import org.apache.iceberg.encryption.EncryptedInputFile;
+import org.apache.iceberg.hadoop.HadoopInputFile;
+import org.apache.iceberg.io.FileIO;
+import org.apache.iceberg.io.InputFile;
+import org.apache.iceberg.mapping.NameMapping;
+import org.apache.iceberg.mapping.NameMappingParser;
+import org.apache.iceberg.parquet.ParquetReader;
+import org.apache.iceberg.types.Conversions;
+import org.apache.iceberg.types.Type;
+import org.apache.iceberg.types.TypeUtil;
+import org.apache.iceberg.types.Types;
+import org.apache.iceberg.util.PartitionUtil;
+import org.apache.iceberg.util.SnapshotUtil;
+import org.apache.parquet.HadoopReadOptions;
+import org.apache.parquet.ParquetReadOptions;
+import org.checkerframework.checker.nullness.qual.Nullable;
+import org.joda.time.Instant;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+/** Helper class for source operations. */
+public class ReadUtils {
+ private static final Logger LOG = LoggerFactory.getLogger(ReadUtils.class);
+
+ // default is 8MB. keep this low to avoid overwhelming memory
+ static final int MAX_FILE_BUFFER_SIZE = 1 << 20; // 1MB
+ private static final Collection<String> READ_PROPERTIES_TO_REMOVE =
+ Sets.newHashSet(
+ "parquet.read.filter",
+ "parquet.private.read.filter.predicate",
+ "parquet.read.support.class",
+ "parquet.crypto.factory.class");
+ static final String OPERATION = "operation";
+ static final String RECORD = "record";
+ static final String DEFAULT_WATERMARK_TIME_UNIT =
TimeUnit.MICROSECONDS.name();
+
+ /** Extracts {@link Row}s after a CDC streaming read. */
+ public static PTransform<PCollection<Row>, PCollection<Row>>
extractRecords() {
+ return new ExtractRecords();
+ }
+
+ public static Schema outputCdcSchema(Schema tableSchema) {
+ return Schema.builder()
+ .addRowField(RECORD, tableSchema)
+ .addNullableStringField(OPERATION)
+ .build();
+ }
+
+ public static Schema outputCdcSchema(org.apache.iceberg.Schema tableSchema) {
+ return
outputCdcSchema(IcebergUtils.icebergSchemaToBeamSchema(tableSchema));
+ }
+
+ static ParquetReader<Record> createReader(FileScanTask task, Table table) {
+ String filePath = task.file().path().toString();
+ InputFile inputFile;
+ try (FileIO io = table.io()) {
+ EncryptedInputFile encryptedInput =
+ EncryptedFiles.encryptedInput(io.newInputFile(filePath),
task.file().keyMetadata());
+ inputFile = table.encryption().decrypt(encryptedInput);
+ }
+ Map<Integer, ?> idToConstants =
+ ReadUtils.constantsMap(task,
IdentityPartitionConverters::convertConstant, table.schema());
+
+ ParquetReadOptions.Builder optionsBuilder;
+ if (inputFile instanceof HadoopInputFile) {
+ // remove read properties already set that may conflict with this read
+ Configuration conf = new Configuration(((HadoopInputFile)
inputFile).getConf());
+ for (String property : READ_PROPERTIES_TO_REMOVE) {
+ conf.unset(property);
+ }
+ optionsBuilder = HadoopReadOptions.builder(conf);
+ } else {
+ optionsBuilder = ParquetReadOptions.builder();
+ }
+ optionsBuilder =
+ optionsBuilder
+ .withRange(task.start(), task.start() + task.length())
+ .withMaxAllocationInBytes(MAX_FILE_BUFFER_SIZE);
+
+ @Nullable String nameMapping =
table.properties().get(TableProperties.DEFAULT_NAME_MAPPING);
+ NameMapping mapping =
+ nameMapping != null ? NameMappingParser.fromJson(nameMapping) :
NameMapping.empty();
+
+ return new ParquetReader<>(
+ inputFile,
+ table.schema(),
+ optionsBuilder.build(),
+ fileSchema -> GenericParquetReaders.buildReader(table.schema(),
fileSchema, idToConstants),
+ mapping,
+ task.residual(),
+ false,
+ true);
+ }
+
+ static Map<Integer, ?> constantsMap(
+ FileScanTask task,
+ BiFunction<Type, Object, Object> converter,
+ org.apache.iceberg.Schema schema) {
+ PartitionSpec spec = task.spec();
+ Set<Integer> idColumns = spec.identitySourceIds();
+ org.apache.iceberg.Schema partitionSchema = TypeUtil.select(schema,
idColumns);
+ boolean projectsIdentityPartitionColumns =
!partitionSchema.columns().isEmpty();
+
+ if (projectsIdentityPartitionColumns) {
+ return PartitionUtil.constantsMap(task, converter);
+ } else {
+ return Collections.emptyMap();
+ }
+ }
+
+ static @Nullable Long getFromSnapshotExclusive(Table table,
IcebergScanConfig scanConfig) {
+ @Nullable StartingStrategy startingStrategy =
scanConfig.getStartingStrategy();
+ boolean isStreaming = MoreObjects.firstNonNull(scanConfig.getStreaming(),
false);
+ if (startingStrategy == null) {
+ startingStrategy = isStreaming ? StartingStrategy.LATEST :
StartingStrategy.EARLIEST;
+ }
+
+ // 1. fetch from from_snapshot
+ @Nullable Long fromSnapshot = scanConfig.getFromSnapshotInclusive();
+ // 2. fetch from from_timestamp
+ @Nullable Long fromTimestamp = scanConfig.getFromTimestamp();
+ if (fromTimestamp != null) {
+ fromSnapshot = SnapshotUtil.oldestAncestorAfter(table,
fromTimestamp).snapshotId();
+ }
+ // 3. get current snapshot if starting_strategy is LATEST
+ if (fromSnapshot == null &&
startingStrategy.equals(StartingStrategy.LATEST)) {
+ fromSnapshot = table.currentSnapshot().snapshotId();
+ }
Review Comment:
Let's make sure that we have thorough unit testing to cover all these.
##########
sdks/java/io/iceberg/src/main/java/org/apache/beam/sdk/io/iceberg/IcebergIO.java:
##########
@@ -103,22 +108,119 @@
* </tr>
* </table>
*
- * <p><b>Additional configuration options are provided in the `Pre-filtering
Options` section below,
- * for Iceberg writes.</b>
+ * <h3>Sink-only Options</h3>
*
- * <h3>Creating Tables</h3>
+ * <table border="1" cellspacing="1">
+ * <tr>
+ * <td> <b>Parameter</b> </td> <td> <b>Type</b> </td> <td>
<b>Description</b> </td>
+ * </tr>
+ * <tr>
+ * <td> {@code triggering_frequency_seconds} </td>
+ * <td> {@code int} </td>
+ * <td>Required for streaming writes. Roughly every
+ * {@code triggering_frequency_seconds} duration, the sink will write
records to data files and produce a table snapshot.
+ * Generally, a higher value will produce fewer, larger data files.
+ * </td>
+ * </tr>
+ * <tr>
+ * <td>{@code drop}</td> <td>{@code list<str>}</td> <td>A list of fields
to drop before writing to table(s).</td>
+ * </tr>
+ * <tr>
+ * <td>{@code keep}</td> <td>{@code list<str>}</td> <td>A list of fields
to keep, dropping the rest before writing to table(s).</td>
+ * </tr>
+ * <tr>
+ * <td>{@code only}</td> <td>{@code str}</td> <td>A nested record field
that should be the only thing written to table(s).</td>
+ * </tr>
+ * </table>
*
- * <p>If an Iceberg table does not exist at the time of writing, this
connector will automatically
- * create one with the data's schema.
+ * <h3>Source-only Options</h3>
*
- * <p>Note that this is a best-effort operation that depends on the {@link
Catalog} implementation.
- * Some implementations may not support creating a table using the Iceberg API.
+ * <h4>CDC Source options</h4>
+ *
+ * <table border="1" cellspacing="1">
+ * <tr>
+ * <td> <b>Parameter</b> </td> <td> <b>Type</b> </td> <td>
<b>Description</b> </td>
+ * </tr>
+ * <tr>
+ * <td> {@code from_snapshot} </td>
+ * <td> {@code long} </td>
+ * <td> Starts reading from this snapshot ID (inclusive).
+ * </td>
+ * </tr>
+ * <tr>
+ * <td> {@code to_snapshot} </td>
+ * <td> {@code long} </td>
+ * <td> Reads up to this snapshot ID (inclusive). If unset and the source
is bounded, it will read
+ * up to the current snapshot (inclusive). If unset and source is
unbounded, it will continue polling for new snapshots forever.
Review Comment:
For my knowledge, is "current snapshot" a well known concept for Iceberg ?
If so we should probably add a link.
##########
sdks/java/io/iceberg/src/main/java/org/apache/beam/sdk/io/iceberg/IcebergIO.java:
##########
@@ -103,22 +108,119 @@
* </tr>
* </table>
*
- * <p><b>Additional configuration options are provided in the `Pre-filtering
Options` section below,
- * for Iceberg writes.</b>
+ * <h3>Sink-only Options</h3>
*
- * <h3>Creating Tables</h3>
+ * <table border="1" cellspacing="1">
+ * <tr>
+ * <td> <b>Parameter</b> </td> <td> <b>Type</b> </td> <td>
<b>Description</b> </td>
+ * </tr>
+ * <tr>
+ * <td> {@code triggering_frequency_seconds} </td>
+ * <td> {@code int} </td>
+ * <td>Required for streaming writes. Roughly every
+ * {@code triggering_frequency_seconds} duration, the sink will write
records to data files and produce a table snapshot.
+ * Generally, a higher value will produce fewer, larger data files.
+ * </td>
+ * </tr>
+ * <tr>
+ * <td>{@code drop}</td> <td>{@code list<str>}</td> <td>A list of fields
to drop before writing to table(s).</td>
+ * </tr>
+ * <tr>
+ * <td>{@code keep}</td> <td>{@code list<str>}</td> <td>A list of fields
to keep, dropping the rest before writing to table(s).</td>
+ * </tr>
+ * <tr>
+ * <td>{@code only}</td> <td>{@code str}</td> <td>A nested record field
that should be the only thing written to table(s).</td>
+ * </tr>
+ * </table>
*
- * <p>If an Iceberg table does not exist at the time of writing, this
connector will automatically
- * create one with the data's schema.
+ * <h3>Source-only Options</h3>
*
- * <p>Note that this is a best-effort operation that depends on the {@link
Catalog} implementation.
- * Some implementations may not support creating a table using the Iceberg API.
+ * <h4>CDC Source options</h4>
+ *
+ * <table border="1" cellspacing="1">
+ * <tr>
+ * <td> <b>Parameter</b> </td> <td> <b>Type</b> </td> <td>
<b>Description</b> </td>
+ * </tr>
+ * <tr>
+ * <td> {@code from_snapshot} </td>
+ * <td> {@code long} </td>
+ * <td> Starts reading from this snapshot ID (inclusive).
+ * </td>
+ * </tr>
+ * <tr>
+ * <td> {@code to_snapshot} </td>
+ * <td> {@code long} </td>
+ * <td> Reads up to this snapshot ID (inclusive). If unset and the source
is bounded, it will read
+ * up to the current snapshot (inclusive). If unset and source is
unbounded, it will continue polling for new snapshots forever.
Review Comment:
By "source is unbounded" do you mean that the "streaming" flag is set. If so
we should mention that instead.
##########
sdks/java/io/iceberg/src/main/java/org/apache/beam/sdk/io/iceberg/IcebergIO.java:
##########
@@ -103,22 +108,119 @@
* </tr>
* </table>
*
- * <p><b>Additional configuration options are provided in the `Pre-filtering
Options` section below,
- * for Iceberg writes.</b>
+ * <h3>Sink-only Options</h3>
*
- * <h3>Creating Tables</h3>
+ * <table border="1" cellspacing="1">
+ * <tr>
+ * <td> <b>Parameter</b> </td> <td> <b>Type</b> </td> <td>
<b>Description</b> </td>
+ * </tr>
+ * <tr>
+ * <td> {@code triggering_frequency_seconds} </td>
+ * <td> {@code int} </td>
+ * <td>Required for streaming writes. Roughly every
+ * {@code triggering_frequency_seconds} duration, the sink will write
records to data files and produce a table snapshot.
+ * Generally, a higher value will produce fewer, larger data files.
+ * </td>
+ * </tr>
+ * <tr>
+ * <td>{@code drop}</td> <td>{@code list<str>}</td> <td>A list of fields
to drop before writing to table(s).</td>
+ * </tr>
+ * <tr>
+ * <td>{@code keep}</td> <td>{@code list<str>}</td> <td>A list of fields
to keep, dropping the rest before writing to table(s).</td>
+ * </tr>
+ * <tr>
+ * <td>{@code only}</td> <td>{@code str}</td> <td>A nested record field
that should be the only thing written to table(s).</td>
+ * </tr>
+ * </table>
*
- * <p>If an Iceberg table does not exist at the time of writing, this
connector will automatically
- * create one with the data's schema.
+ * <h3>Source-only Options</h3>
*
- * <p>Note that this is a best-effort operation that depends on the {@link
Catalog} implementation.
- * Some implementations may not support creating a table using the Iceberg API.
+ * <h4>CDC Source options</h4>
+ *
+ * <table border="1" cellspacing="1">
+ * <tr>
+ * <td> <b>Parameter</b> </td> <td> <b>Type</b> </td> <td>
<b>Description</b> </td>
+ * </tr>
+ * <tr>
+ * <td> {@code from_snapshot} </td>
+ * <td> {@code long} </td>
+ * <td> Starts reading from this snapshot ID (inclusive).
+ * </td>
+ * </tr>
+ * <tr>
+ * <td> {@code to_snapshot} </td>
+ * <td> {@code long} </td>
+ * <td> Reads up to this snapshot ID (inclusive). If unset and the source
is bounded, it will read
+ * up to the current snapshot (inclusive). If unset and source is
unbounded, it will continue polling for new snapshots forever.
Review Comment:
Also for "source is unbounded"
##########
sdks/java/io/iceberg/src/main/java/org/apache/beam/sdk/io/iceberg/IcebergIO.java:
##########
@@ -310,7 +408,154 @@
* </tr>
* </table>
*
- * <p>For internal use only; no backwards compatibility guarantees
+ * <br>
+ * <br>
+ *
+ * <h2>Reading from Tables</h2>
+ *
+ * With the following configuration,
+ *
+ * <pre>{@code
+ * Map<String, Object> config = Map.of(
+ * "table", table,
+ * "catalog_name", name,
+ * "catalog_properties", Map.of(...),
+ * "config_properties", Map.of(...));
+ * }</pre>
+ *
+ * Example of a simple batch read:
+ *
+ * <pre>{@code
+ * PCollection<Row> rows = pipeline
+ * .apply(Managed.read(ICEBERG).withConfig(config))
+ * .getSinglePCollection();
+ * }</pre>
+ *
+ * Example of a simple CDC read:
+ *
+ * <pre>{@code
+ * PCollection<Row> output = pipeline
+ * .apply(Managed.read(ICEBERG_CDC).withConfig(config))
+ * .getSinglePCollection();
+ *
+ * PCollection<Row> rows = output
+ * .apply(ReadUtils.extractRecords());
+ * }</pre>
+ *
+ * <p><b>Note</b>: This reads <b>append-only</b> snapshots. Full CDC is not
supported yet.
+ *
+ * <p>The CDC <b>streaming</b> source (enabled with {@code streaming=true})
continuously polls the
+ * table for new snapshots, with a default interval of 60 seconds. This can be
overridden using
+ * <b>{@code poll_interval_seconds}</b>:
+ *
+ * <pre>{@code
+ * config.put("streaming", true);
+ * config.put("poll_interval_seconds", 10);
+ * }</pre>
+ *
+ * <h3>Output Schema</h3>
+ *
+ * <p>Reading with <b>{@code Managed.read(ICEBERG)}</b> produces a <b>{@code
PCollection<Row>}</b>
+ * containing data records that conform to the table schema.
+ *
+ * <p>Reading with <b>{@code Managed.read(ICEBERG_CDC)}</b> produces a
<b>{@code
+ * PCollection<Row>}</b> with the following schema:
+ *
+ * <table border="1" cellspacing="1">
+ * <tr>
+ * <td> <b>Field</b> </td> <td> <b>Type</b> </td> <td> <b>Description</b>
</td>
+ * </tr>
+ * <tr>
+ * <td> {@code record} </td>
+ * <td> {@code Beam Row} </td>
+ * <td>
+ * The data record.
+ * </td>
+ * </tr>
+ * <tr>
+ * <td> {@code operation} </td>
+ * <td> {@code string} </td>
+ * <td>
+ * The snapshot operation associated with this record (e.g. "append",
"replace", "delete")
Review Comment:
Probably we should list an exact list of operations that we produce today
instead of making this arbitrary (and add to the list when we update the
source).
##########
sdks/java/managed/src/main/java/org/apache/beam/sdk/managed/Managed.java:
##########
@@ -85,13 +85,15 @@ public class Managed {
// TODO: Dynamically generate a list of supported transforms
public static final String ICEBERG = "iceberg";
+ public static final String ICEBERG_CDC = "iceberg_cdc";
public static final String KAFKA = "kafka";
public static final String BIGQUERY = "bigquery";
// Supported SchemaTransforms
public static final Map<String, String> READ_TRANSFORMS =
ImmutableMap.<String, String>builder()
.put(ICEBERG,
getUrn(ExternalTransforms.ManagedTransforms.Urns.ICEBERG_READ))
+ .put(ICEBERG_CDC,
getUrn(ExternalTransforms.ManagedTransforms.Urns.ICEBERG_CDC_READ))
Review Comment:
We should update the Javadocs to include ICEBERG_CDC.
##########
sdks/java/io/iceberg/src/main/java/org/apache/beam/sdk/io/iceberg/IcebergIO.java:
##########
@@ -103,22 +108,119 @@
* </tr>
* </table>
*
- * <p><b>Additional configuration options are provided in the `Pre-filtering
Options` section below,
- * for Iceberg writes.</b>
+ * <h3>Sink-only Options</h3>
*
- * <h3>Creating Tables</h3>
+ * <table border="1" cellspacing="1">
+ * <tr>
+ * <td> <b>Parameter</b> </td> <td> <b>Type</b> </td> <td>
<b>Description</b> </td>
+ * </tr>
+ * <tr>
+ * <td> {@code triggering_frequency_seconds} </td>
+ * <td> {@code int} </td>
+ * <td>Required for streaming writes. Roughly every
+ * {@code triggering_frequency_seconds} duration, the sink will write
records to data files and produce a table snapshot.
+ * Generally, a higher value will produce fewer, larger data files.
+ * </td>
+ * </tr>
+ * <tr>
+ * <td>{@code drop}</td> <td>{@code list<str>}</td> <td>A list of fields
to drop before writing to table(s).</td>
+ * </tr>
+ * <tr>
+ * <td>{@code keep}</td> <td>{@code list<str>}</td> <td>A list of fields
to keep, dropping the rest before writing to table(s).</td>
+ * </tr>
+ * <tr>
+ * <td>{@code only}</td> <td>{@code str}</td> <td>A nested record field
that should be the only thing written to table(s).</td>
+ * </tr>
+ * </table>
*
- * <p>If an Iceberg table does not exist at the time of writing, this
connector will automatically
- * create one with the data's schema.
+ * <h3>Source-only Options</h3>
*
- * <p>Note that this is a best-effort operation that depends on the {@link
Catalog} implementation.
- * Some implementations may not support creating a table using the Iceberg API.
+ * <h4>CDC Source options</h4>
+ *
+ * <table border="1" cellspacing="1">
+ * <tr>
+ * <td> <b>Parameter</b> </td> <td> <b>Type</b> </td> <td>
<b>Description</b> </td>
+ * </tr>
+ * <tr>
+ * <td> {@code from_snapshot} </td>
+ * <td> {@code long} </td>
+ * <td> Starts reading from this snapshot ID (inclusive).
+ * </td>
+ * </tr>
+ * <tr>
+ * <td> {@code to_snapshot} </td>
+ * <td> {@code long} </td>
+ * <td> Reads up to this snapshot ID (inclusive). If unset and the source
is bounded, it will read
+ * up to the current snapshot (inclusive). If unset and source is
unbounded, it will continue polling for new snapshots forever.
+ * </td>
+ * </tr>
+ * <tr>
+ * <td> {@code from_timestamp} </td>
+ * <td> {@code long} </td>
+ * <td> Starts reading from the earliest snapshot (inclusive) created
after this timestamp (in milliseconds).
+ * </td>
+ * </tr>
+ * <tr>
+ * <td> {@code to_timestamp} </td>
+ * <td> {@code long} </td>
+ * <td> Reads up to the latest snapshot (inclusive) created before this
timestamp (in milliseconds).
+ * </td>
+ * </tr>
+ * <tr>
+ * <td> {@code starting_strategy} </td>
+ * <td> {@code str} </td>
+ * <td>
+ * The source's starting strategy. Valid options are:
+ * <ul>
+ * <li>{@code earliest}: starts reading from the earliest
snapshot</li>
+ * <li>{@code latest}: starts reading from the latest snapshot</li>
+ * </ul>
+ * <p>Defaults to {@code earliest} for batch, and {@code latest} for
streaming.
Review Comment:
By "streaming" do you mean the PipelineOption [1] the Iceberg config
(defined below) or both ?
[1]
https://github.com/apache/beam/blob/b76e45acf4c9dcf35cec2a12414887071e683c48/sdks/java/core/src/main/java/org/apache/beam/sdk/options/StreamingOptions.java#L36
##########
sdks/java/io/iceberg/src/main/java/org/apache/beam/sdk/io/iceberg/IcebergIO.java:
##########
@@ -103,22 +108,119 @@
* </tr>
* </table>
*
- * <p><b>Additional configuration options are provided in the `Pre-filtering
Options` section below,
- * for Iceberg writes.</b>
+ * <h3>Sink-only Options</h3>
*
- * <h3>Creating Tables</h3>
+ * <table border="1" cellspacing="1">
+ * <tr>
+ * <td> <b>Parameter</b> </td> <td> <b>Type</b> </td> <td>
<b>Description</b> </td>
+ * </tr>
+ * <tr>
+ * <td> {@code triggering_frequency_seconds} </td>
+ * <td> {@code int} </td>
+ * <td>Required for streaming writes. Roughly every
+ * {@code triggering_frequency_seconds} duration, the sink will write
records to data files and produce a table snapshot.
+ * Generally, a higher value will produce fewer, larger data files.
+ * </td>
+ * </tr>
+ * <tr>
+ * <td>{@code drop}</td> <td>{@code list<str>}</td> <td>A list of fields
to drop before writing to table(s).</td>
+ * </tr>
+ * <tr>
+ * <td>{@code keep}</td> <td>{@code list<str>}</td> <td>A list of fields
to keep, dropping the rest before writing to table(s).</td>
+ * </tr>
+ * <tr>
+ * <td>{@code only}</td> <td>{@code str}</td> <td>A nested record field
that should be the only thing written to table(s).</td>
+ * </tr>
+ * </table>
*
- * <p>If an Iceberg table does not exist at the time of writing, this
connector will automatically
- * create one with the data's schema.
+ * <h3>Source-only Options</h3>
*
- * <p>Note that this is a best-effort operation that depends on the {@link
Catalog} implementation.
- * Some implementations may not support creating a table using the Iceberg API.
+ * <h4>CDC Source options</h4>
+ *
+ * <table border="1" cellspacing="1">
+ * <tr>
+ * <td> <b>Parameter</b> </td> <td> <b>Type</b> </td> <td>
<b>Description</b> </td>
+ * </tr>
+ * <tr>
+ * <td> {@code from_snapshot} </td>
+ * <td> {@code long} </td>
+ * <td> Starts reading from this snapshot ID (inclusive).
+ * </td>
+ * </tr>
+ * <tr>
+ * <td> {@code to_snapshot} </td>
+ * <td> {@code long} </td>
+ * <td> Reads up to this snapshot ID (inclusive). If unset and the source
is bounded, it will read
+ * up to the current snapshot (inclusive). If unset and source is
unbounded, it will continue polling for new snapshots forever.
+ * </td>
+ * </tr>
+ * <tr>
+ * <td> {@code from_timestamp} </td>
+ * <td> {@code long} </td>
+ * <td> Starts reading from the earliest snapshot (inclusive) created
after this timestamp (in milliseconds).
+ * </td>
+ * </tr>
+ * <tr>
+ * <td> {@code to_timestamp} </td>
+ * <td> {@code long} </td>
+ * <td> Reads up to the latest snapshot (inclusive) created before this
timestamp (in milliseconds).
+ * </td>
+ * </tr>
+ * <tr>
+ * <td> {@code starting_strategy} </td>
+ * <td> {@code str} </td>
+ * <td>
+ * The source's starting strategy. Valid options are:
+ * <ul>
+ * <li>{@code earliest}: starts reading from the earliest
snapshot</li>
+ * <li>{@code latest}: starts reading from the latest snapshot</li>
+ * </ul>
+ * <p>Defaults to {@code earliest} for batch, and {@code latest} for
streaming.
+ * </td>
+ * </tr>
+ * <tr>
+ * <td> {@code watermark_column} </td>
+ * <td> {@code str} </td>
+ * <td>
+ * The column used to derive event time to track progress. Must be of
type:
+ * <ul>
+ * <li>{@code timestamp}</li>
Review Comment:
Could you elaborate what you mean here by timestamp and timestamptz types ?
##########
sdks/java/io/iceberg/src/main/java/org/apache/beam/sdk/io/iceberg/IcebergIO.java:
##########
@@ -103,22 +108,119 @@
* </tr>
* </table>
*
- * <p><b>Additional configuration options are provided in the `Pre-filtering
Options` section below,
- * for Iceberg writes.</b>
+ * <h3>Sink-only Options</h3>
*
- * <h3>Creating Tables</h3>
+ * <table border="1" cellspacing="1">
+ * <tr>
+ * <td> <b>Parameter</b> </td> <td> <b>Type</b> </td> <td>
<b>Description</b> </td>
+ * </tr>
+ * <tr>
+ * <td> {@code triggering_frequency_seconds} </td>
+ * <td> {@code int} </td>
+ * <td>Required for streaming writes. Roughly every
+ * {@code triggering_frequency_seconds} duration, the sink will write
records to data files and produce a table snapshot.
+ * Generally, a higher value will produce fewer, larger data files.
+ * </td>
+ * </tr>
+ * <tr>
+ * <td>{@code drop}</td> <td>{@code list<str>}</td> <td>A list of fields
to drop before writing to table(s).</td>
+ * </tr>
+ * <tr>
+ * <td>{@code keep}</td> <td>{@code list<str>}</td> <td>A list of fields
to keep, dropping the rest before writing to table(s).</td>
+ * </tr>
+ * <tr>
+ * <td>{@code only}</td> <td>{@code str}</td> <td>A nested record field
that should be the only thing written to table(s).</td>
+ * </tr>
+ * </table>
*
- * <p>If an Iceberg table does not exist at the time of writing, this
connector will automatically
- * create one with the data's schema.
+ * <h3>Source-only Options</h3>
*
- * <p>Note that this is a best-effort operation that depends on the {@link
Catalog} implementation.
- * Some implementations may not support creating a table using the Iceberg API.
+ * <h4>CDC Source options</h4>
+ *
+ * <table border="1" cellspacing="1">
+ * <tr>
+ * <td> <b>Parameter</b> </td> <td> <b>Type</b> </td> <td>
<b>Description</b> </td>
+ * </tr>
+ * <tr>
+ * <td> {@code from_snapshot} </td>
+ * <td> {@code long} </td>
+ * <td> Starts reading from this snapshot ID (inclusive).
+ * </td>
+ * </tr>
+ * <tr>
+ * <td> {@code to_snapshot} </td>
+ * <td> {@code long} </td>
+ * <td> Reads up to this snapshot ID (inclusive). If unset and the source
is bounded, it will read
+ * up to the current snapshot (inclusive). If unset and source is
unbounded, it will continue polling for new snapshots forever.
+ * </td>
+ * </tr>
+ * <tr>
+ * <td> {@code from_timestamp} </td>
+ * <td> {@code long} </td>
+ * <td> Starts reading from the earliest snapshot (inclusive) created
after this timestamp (in milliseconds).
+ * </td>
+ * </tr>
+ * <tr>
+ * <td> {@code to_timestamp} </td>
+ * <td> {@code long} </td>
+ * <td> Reads up to the latest snapshot (inclusive) created before this
timestamp (in milliseconds).
+ * </td>
+ * </tr>
+ * <tr>
+ * <td> {@code starting_strategy} </td>
+ * <td> {@code str} </td>
+ * <td>
+ * The source's starting strategy. Valid options are:
+ * <ul>
+ * <li>{@code earliest}: starts reading from the earliest
snapshot</li>
+ * <li>{@code latest}: starts reading from the latest snapshot</li>
+ * </ul>
+ * <p>Defaults to {@code earliest} for batch, and {@code latest} for
streaming.
+ * </td>
+ * </tr>
+ * <tr>
+ * <td> {@code watermark_column} </td>
+ * <td> {@code str} </td>
+ * <td>
+ * The column used to derive event time to track progress. Must be of
type:
+ * <ul>
+ * <li>{@code timestamp}</li>
+ * <li>{@code timestamptz}</li>
+ * <li>{@code long} (micros)</li>
+ * </ul>
+ * </td>
+ * </tr>
+ * <tr>
+ * <td> {@code watermark_time_unit} </td>
+ * <td> {@code str} </td>
+ * <td>
+ * Use only when {@code watermark_column} is set to a column of type
Long. Specifies the TimeUnit represented by the watermark column.
+ * Default is {@code "microseconds"}.
+ *
+ * <p>Check {@link TimeUnit} for possible values.
+ * </td>
+ * </tr>
+ * <tr>
+ * <td> {@code streaming} </td>
Review Comment:
What if both this and to_snapshot or to_timestamp are set ?
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]