stevenzwu commented on code in PR #9179:
URL: https://github.com/apache/iceberg/pull/9179#discussion_r1411207388


##########
docs/flink-queries.md:
##########
@@ -277,6 +277,66 @@ DataStream<Row> stream = env.fromSource(source, 
WatermarkStrategy.noWatermarks()
     "Iceberg Source as Avro GenericRecord", new 
GenericRecordAvroTypeInfo(avroSchema));
 ```
 
+### Emitting watermarks
+Emitting watermarks from the source itself could be beneficial for several 
purposes, like harnessing the
+[Flink Watermark 
Alignment](https://nightlies.apache.org/flink/flink-docs-stable/docs/dev/datastream/event-time/generating_watermarks/#watermark-alignment),
+or prevent triggering 
[windows](https://nightlies.apache.org/flink/flink-docs-stable/docs/dev/datastream/operators/windows/)
+too early when reading multiple data files concurrently.
+
+Enable watermark generation for an `IcebergSource` by setting the 
`watermarkColumn`.
+The supported column types are `timestamp`, `timestamptz` and `long`.
+Iceberg `timestamp` or `timestamptz` inherently contains the time precision. 
So there is no need
+to specify the time unit. But `long` type column doesn't contain time unit 
information. Use  
+`watermarkTimeUnit` to configure the conversion for long columns.
+
+The watermarks are generated based on column metrics stored for data files and 
emitted once per split.
+If multiple smaller files with different time ranges are combined into a 
single split, it can increase
+the out-of-orderliness and extra data buffering in the Flink state. The main 
purpose of watermark alignment
+is to reduce out-of-orderliness and excess data buffering in the Flink state. 
Hence it is recommended to
+set `read.split.open-file-cost` to a very large value to prevent combining 
multiple smaller files into a
+single split. Do not forget to consider the additional memory and CPU load 
caused by having multiple
+splits in this case.
+
+By default, the column metrics are collected for the first 100 columns of the 
table.
+Use [write properties](configuration.md#write-properties) starting with 
`write.metadata.metrics` when needed.
+
+```java
+StreamExecutionEnvironment env = 
StreamExecutionEnvironment.createLocalEnvironment();
+TableLoader tableLoader = 
TableLoader.fromHadoopTable("hdfs://nn:8020/warehouse/path");
+
+// Ordered data file reads with windowing, using a timestamp column

Review Comment:
   there is no `windowing` in the snippet below. so may not be accurate to say 
it in the comment here.
   
   If I understand this part correctly, it tries to demonstrate emit watermark 
from Iceberg source without enabling watermark alignment.



##########
docs/flink-queries.md:
##########
@@ -277,6 +277,66 @@ DataStream<Row> stream = env.fromSource(source, 
WatermarkStrategy.noWatermarks()
     "Iceberg Source as Avro GenericRecord", new 
GenericRecordAvroTypeInfo(avroSchema));
 ```
 
+### Emitting watermarks
+Emitting watermarks from the source itself could be beneficial for several 
purposes, like harnessing the
+[Flink Watermark 
Alignment](https://nightlies.apache.org/flink/flink-docs-stable/docs/dev/datastream/event-time/generating_watermarks/#watermark-alignment),
+or prevent triggering 
[windows](https://nightlies.apache.org/flink/flink-docs-stable/docs/dev/datastream/operators/windows/)
+too early when reading multiple data files concurrently.
+
+Enable watermark generation for an `IcebergSource` by setting the 
`watermarkColumn`.
+The supported column types are `timestamp`, `timestamptz` and `long`.
+Iceberg `timestamp` or `timestamptz` inherently contains the time precision. 
So there is no need
+to specify the time unit. But `long` type column doesn't contain time unit 
information. Use  
+`watermarkTimeUnit` to configure the conversion for long columns.
+
+The watermarks are generated based on column metrics stored for data files and 
emitted once per split.
+If multiple smaller files with different time ranges are combined into a 
single split, it can increase
+the out-of-orderliness and extra data buffering in the Flink state. The main 
purpose of watermark alignment
+is to reduce out-of-orderliness and excess data buffering in the Flink state. 
Hence it is recommended to
+set `read.split.open-file-cost` to a very large value to prevent combining 
multiple smaller files into a
+single split. Do not forget to consider the additional memory and CPU load 
caused by having multiple
+splits in this case.
+
+By default, the column metrics are collected for the first 100 columns of the 
table.

Review Comment:
   I think we probably need to expand a little bit here to be more clear to 
users. We should emphasize that 
   
   ```
   This feature requires column-level min-max stats. Make sure stats are 
generated for the watermark column during write phase. By default, the column 
metrics are collected for the first 100 columns of the table. If watermark 
column doesn't have stats enabled by default, use ...
   ```



##########
docs/flink-queries.md:
##########
@@ -277,6 +277,66 @@ DataStream<Row> stream = env.fromSource(source, 
WatermarkStrategy.noWatermarks()
     "Iceberg Source as Avro GenericRecord", new 
GenericRecordAvroTypeInfo(avroSchema));
 ```
 
+### Emitting watermarks
+Emitting watermarks from the source itself could be beneficial for several 
purposes, like harnessing the
+[Flink Watermark 
Alignment](https://nightlies.apache.org/flink/flink-docs-stable/docs/dev/datastream/event-time/generating_watermarks/#watermark-alignment),
+or prevent triggering 
[windows](https://nightlies.apache.org/flink/flink-docs-stable/docs/dev/datastream/operators/windows/)
+too early when reading multiple data files concurrently.
+
+Enable watermark generation for an `IcebergSource` by setting the 
`watermarkColumn`.
+The supported column types are `timestamp`, `timestamptz` and `long`.
+Iceberg `timestamp` or `timestamptz` inherently contains the time precision. 
So there is no need
+to specify the time unit. But `long` type column doesn't contain time unit 
information. Use  
+`watermarkTimeUnit` to configure the conversion for long columns.
+
+The watermarks are generated based on column metrics stored for data files and 
emitted once per split.
+If multiple smaller files with different time ranges are combined into a 
single split, it can increase
+the out-of-orderliness and extra data buffering in the Flink state. The main 
purpose of watermark alignment
+is to reduce out-of-orderliness and excess data buffering in the Flink state. 
Hence it is recommended to
+set `read.split.open-file-cost` to a very large value to prevent combining 
multiple smaller files into a
+single split. Do not forget to consider the additional memory and CPU load 
caused by having multiple

Review Comment:
   > Do not forget to consider the additional memory and CPU load caused by 
having multiple
   splits in this case.
   
   I am not sure this is correct. memory increase is probably rather small 
(just a couple of offsets and maybe other small overhead. CPU load is also not 
convincing to me. I would probably rephrase it as 
   ```
   The negative impact (of not combining small files into a single split) is on 
read throughput, especially if there are many small files. In typical stateful 
processing jobs, source read throughput is not the bottleneck. Hence this is 
probably a reasonable tradeoff.
   ```



##########
docs/flink-queries.md:
##########
@@ -277,6 +277,58 @@ DataStream<Row> stream = env.fromSource(source, 
WatermarkStrategy.noWatermarks()
     "Iceberg Source as Avro GenericRecord", new 
GenericRecordAvroTypeInfo(avroSchema));
 ```
 
+### Emitting watermarks
+Emitting watermarks from the source itself could be beneficial for several 
purposes, like harnessing the
+[Flink Watermark 
Alignment](https://nightlies.apache.org/flink/flink-docs-release-1.18/docs/dev/datastream/event-time/generating_watermarks/#watermark-alignment)
+feature to prevent runaway readers, or providing triggers for [Flink 
windowing](https://nightlies.apache.org/flink/flink-docs-release-1.18/docs/dev/datastream/operators/windows/).
+
+Enable watermark generation for an `IcebergSource` by setting the 
`watermarkColumn`.
+The supported column types are `timestamp`, `timestamptz` and `long`.
+Timestamp columns are automatically converted to milliseconds since the Java 
epoch of
+1970-01-01T00:00:00Z. Use `watermarkTimeUnit` to configure the conversion for 
long columns.
+
+The watermarks are generated based on column metrics stored for data files and 
emitted once per split.
+When using watermarks for Flink watermark alignment set 
`read.split.open-file-cost` to prevent
+combining multiple files to a single split.
+By default, the column metrics are collected for the first 100 columns of the 
table. Use [write properties](configuration.md#write-properties) starting with 
`write.metadata.metrics` when needed.
+
+```java
+StreamExecutionEnvironment env = 
StreamExecutionEnvironment.createLocalEnvironment();
+TableLoader tableLoader = 
TableLoader.fromHadoopTable("hdfs://nn:8020/warehouse/path");
+
+// For windowing
+DataStream<RowData> stream =
+    env.fromSource(
+        IcebergSource.forRowData()
+            .tableLoader(tableLoader)
+            // Watermark using timestamp column
+            .watermarkColumn("timestamp_column")
+            .build(),
+        // Watermarks are generated by the source, no need to generate it 
manually
+        WatermarkStrategy.<RowData>noWatermarks()
+            // Extract event timestamp from records
+            .withTimestampAssigner((record, eventTime) -> 
record.getTimestamp(pos, precision).getMillisecond()),
+        SOURCE_NAME,
+        TypeInformation.of(RowData.class));
+
+// For watermark alignment
+DataStream<RowData> stream =
+    env.fromSource(
+        IcebergSource source = IcebergSource.forRowData()
+            .tableLoader(tableLoader)
+            // Disable combining multiple files to a single split 
+            .set(FlinkReadOptions.SPLIT_FILE_OPEN_COST, 
String.valueOf(TableProperties.SPLIT_SIZE_DEFAULT))
+            // Watermark using long column
+            .watermarkColumn("long_column")
+            .watermarkTimeUnit(TimeUnit.MILLI_SCALE)
+            .build(),
+        // Watermarks are generated by the source, no need to generate it 
manually
+        WatermarkStrategy.<RowData>noWatermarks()
+            .withWatermarkAlignment(watermarkGroup, maxAllowedWatermarkDrift),
+        SOURCE_NAME,
+        TypeInformation.of(RowData.class));
+```
+
 ## Options

Review Comment:
   oh. then we would need to add them. cc @mas-chen 



##########
docs/flink-queries.md:
##########
@@ -277,6 +277,58 @@ DataStream<Row> stream = env.fromSource(source, 
WatermarkStrategy.noWatermarks()
     "Iceberg Source as Avro GenericRecord", new 
GenericRecordAvroTypeInfo(avroSchema));
 ```
 
+### Emitting watermarks
+Emitting watermarks from the source itself could be beneficial for several 
purposes, like harnessing the
+[Flink Watermark 
Alignment](https://nightlies.apache.org/flink/flink-docs-release-1.18/docs/dev/datastream/event-time/generating_watermarks/#watermark-alignment)
+feature to prevent runaway readers, or providing triggers for [Flink 
windowing](https://nightlies.apache.org/flink/flink-docs-release-1.18/docs/dev/datastream/operators/windows/).
+
+Enable watermark generation for an `IcebergSource` by setting the 
`watermarkColumn`.
+The supported column types are `timestamp`, `timestamptz` and `long`.
+Timestamp columns are automatically converted to milliseconds since the Java 
epoch of
+1970-01-01T00:00:00Z. Use `watermarkTimeUnit` to configure the conversion for 
long columns.
+
+The watermarks are generated based on column metrics stored for data files and 
emitted once per split.
+When using watermarks for Flink watermark alignment set 
`read.split.open-file-cost` to prevent
+combining multiple files to a single split.
+By default, the column metrics are collected for the first 100 columns of the 
table. Use [write properties](configuration.md#write-properties) starting with 
`write.metadata.metrics` when needed.
+
+```java
+StreamExecutionEnvironment env = 
StreamExecutionEnvironment.createLocalEnvironment();
+TableLoader tableLoader = 
TableLoader.fromHadoopTable("hdfs://nn:8020/warehouse/path");
+
+// For windowing
+DataStream<RowData> stream =
+    env.fromSource(
+        IcebergSource.forRowData()
+            .tableLoader(tableLoader)
+            // Watermark using timestamp column
+            .watermarkColumn("timestamp_column")
+            .build(),
+        // Watermarks are generated by the source, no need to generate it 
manually
+        WatermarkStrategy.<RowData>noWatermarks()
+            // Extract event timestamp from records
+            .withTimestampAssigner((record, eventTime) -> 
record.getTimestamp(pos, precision).getMillisecond()),
+        SOURCE_NAME,
+        TypeInformation.of(RowData.class));
+
+// For watermark alignment
+DataStream<RowData> stream =
+    env.fromSource(
+        IcebergSource source = IcebergSource.forRowData()
+            .tableLoader(tableLoader)
+            // Disable combining multiple files to a single split 
+            .set(FlinkReadOptions.SPLIT_FILE_OPEN_COST, 
String.valueOf(TableProperties.SPLIT_SIZE_DEFAULT))
+            // Watermark using long column
+            .watermarkColumn("long_column")
+            .watermarkTimeUnit(TimeUnit.MILLI_SCALE)

Review Comment:
    @mas-chen can you clarify your comment? I am not quite following. 
   
   @pvary it might be good to separate this into two code snippets. we can 
remove the two lines in the beginning.
   ```
   StreamExecutionEnvironment env = 
StreamExecutionEnvironment.createLocalEnvironment();
   TableLoader tableLoader = 
TableLoader.fromHadoopTable("hdfs://nn:8020/warehouse/path");
   ```



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to