pvary commented on code in PR #8553:
URL: https://github.com/apache/iceberg/pull/8553#discussion_r1401243005
##########
flink/v1.17/flink/src/main/java/org/apache/iceberg/flink/source/IcebergSource.java:
##########
@@ -453,6 +492,18 @@ public IcebergSource<T> build() {
contextBuilder.project(FlinkSchemaUtil.convert(icebergSchema,
projectedFlinkSchema));
}
+ SerializableRecordEmitter<T> emitter =
SerializableRecordEmitter.defaultEmitter();
+ if (watermarkColumn != null) {
Review Comment:
@dchristle: The code line is mostly irrelevant for the conversation 😄
Iceberg planner collects the files which should be scanned by the execution
engine, and creates `ScanTask`s from them. If a file is big, then it creates
multiple splits for a given file, so multiple readers could read the given file
parallel. OTOH if there are multiple small files, then it combines them to a
`CombinedScanTask` which are read by a single reader - this way decreasing the
number of splits, split assignments etc.
We generate a single watermark for every `ScanTask`. One `CombineScanTask`
could group multiple data files, with wide range of timestamps, so generating a
single watermark for it could be suboptimal. Setting
`read.split.open-file-cost` could prevent the creation such
`CombinedScanTask`s, and could result in better ordered input, and finer
grained watermarks.
##########
flink/v1.17/flink/src/main/java/org/apache/iceberg/flink/source/IcebergSource.java:
##########
@@ -453,6 +492,18 @@ public IcebergSource<T> build() {
contextBuilder.project(FlinkSchemaUtil.convert(icebergSchema,
projectedFlinkSchema));
}
+ SerializableRecordEmitter<T> emitter =
SerializableRecordEmitter.defaultEmitter();
+ if (watermarkColumn != null) {
Review Comment:
@dchristle: The code line is mostly irrelevant for the conversation 😄
Iceberg planner collects the files which should be scanned by the execution
engine, and creates `ScanTask`s from them. If a file is big, then it creates
multiple splits for a given file, so multiple readers could read the given file
parallel. OTOH if there are multiple small files, then it combines them to a
`CombinedScanTask` which are read by a single reader - this way decreasing the
number of splits, split assignments etc.
We generate a single watermark for every `ScanTask`. One `CombineScanTask`
could group multiple data files, with wide range of timestamps, so generating a
single watermark for it could be suboptimal. Setting
`read.split.open-file-cost` could prevent the creation such
`CombinedScanTask`s, and could result in better ordered input, and finer
grained watermarks.
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]