mas-chen commented on code in PR #9179:
URL: https://github.com/apache/iceberg/pull/9179#discussion_r1411286325
##########
docs/flink-queries.md:
##########
@@ -277,6 +277,58 @@ DataStream<Row> stream = env.fromSource(source,
WatermarkStrategy.noWatermarks()
"Iceberg Source as Avro GenericRecord", new
GenericRecordAvroTypeInfo(avroSchema));
```
+### Emitting watermarks
+Emitting watermarks from the source itself could be beneficial for several
purposes, like harnessing the
+[Flink Watermark
Alignment](https://nightlies.apache.org/flink/flink-docs-release-1.18/docs/dev/datastream/event-time/generating_watermarks/#watermark-alignment)
+feature to prevent runaway readers, or providing triggers for [Flink
windowing](https://nightlies.apache.org/flink/flink-docs-release-1.18/docs/dev/datastream/operators/windows/).
+
+Enable watermark generation for an `IcebergSource` by setting the
`watermarkColumn`.
+The supported column types are `timestamp`, `timestamptz` and `long`.
+Timestamp columns are automatically converted to milliseconds since the Java
epoch of
+1970-01-01T00:00:00Z. Use `watermarkTimeUnit` to configure the conversion for
long columns.
+
+The watermarks are generated based on column metrics stored for data files and
emitted once per split.
+When using watermarks for Flink watermark alignment set
`read.split.open-file-cost` to prevent
+combining multiple files to a single split.
+By default, the column metrics are collected for the first 100 columns of the
table. Use [write properties](configuration.md#write-properties) starting with
`write.metadata.metrics` when needed.
+
+```java
+StreamExecutionEnvironment env =
StreamExecutionEnvironment.createLocalEnvironment();
+TableLoader tableLoader =
TableLoader.fromHadoopTable("hdfs://nn:8020/warehouse/path");
+
+// For windowing
+DataStream<RowData> stream =
+ env.fromSource(
+ IcebergSource.forRowData()
+ .tableLoader(tableLoader)
+ // Watermark using timestamp column
+ .watermarkColumn("timestamp_column")
+ .build(),
+ // Watermarks are generated by the source, no need to generate it
manually
+ WatermarkStrategy.<RowData>noWatermarks()
+ // Extract event timestamp from records
+ .withTimestampAssigner((record, eventTime) ->
record.getTimestamp(pos, precision).getMillisecond()),
+ SOURCE_NAME,
+ TypeInformation.of(RowData.class));
+
+// For watermark alignment
+DataStream<RowData> stream =
+ env.fromSource(
+ IcebergSource source = IcebergSource.forRowData()
+ .tableLoader(tableLoader)
+ // Disable combining multiple files to a single split
+ .set(FlinkReadOptions.SPLIT_FILE_OPEN_COST,
String.valueOf(TableProperties.SPLIT_SIZE_DEFAULT))
+ // Watermark using long column
+ .watermarkColumn("long_column")
+ .watermarkTimeUnit(TimeUnit.MILLI_SCALE)
Review Comment:
Oh I just think
```
.watermarkTimeUnit(TimeUnit.MILLI_SCALE)
```
should be advertised in the "basic" example. I think most people would just
configure this, rather than the custom Timestamp assigner. This reduces code in
the first example and keeps it simpler.
The 2nd example I consider as a more "advanced" example where we can show
how to do the custom Timestamp assigner (and furthermore watermark alignment
from the Flink perspective is an advanced feature--it requires lots of tuning
and understanding of how it interacts with the watermark strategy--out of
orderliness/idleness/etc).
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]