pvary commented on code in PR #8553: URL: https://github.com/apache/iceberg/pull/8553#discussion_r1401243005
########## flink/v1.17/flink/src/main/java/org/apache/iceberg/flink/source/IcebergSource.java: ########## @@ -453,6 +492,18 @@ public IcebergSource<T> build() { contextBuilder.project(FlinkSchemaUtil.convert(icebergSchema, projectedFlinkSchema)); } + SerializableRecordEmitter<T> emitter = SerializableRecordEmitter.defaultEmitter(); + if (watermarkColumn != null) { Review Comment: @dchristle: The code line is mostly irrelevant for the conversation 😄 Iceberg planner collects the files which should be scanned by the execution engine, and creates `ScanTask`s from them. If a file is big, then it creates multiple splits for a given file, so multiple readers could read the given file parallel. OTOH if there are multiple small files, then it combines them to a `CombinedScanTask` which are read by a single reader - this way decreasing the number of splits, split assignments etc. We generate a single watermark for every `ScanTask`. One `CombineScanTask` could group multiple data files, with wide range of timestamps, so generating a single watermark for it could be suboptimal. Setting `read.split.open-file-cost` could prevent the creation such `CombinedScanTask`s, and could result in better ordered input, and finer grained watermarks. ########## flink/v1.17/flink/src/main/java/org/apache/iceberg/flink/source/IcebergSource.java: ########## @@ -453,6 +492,18 @@ public IcebergSource<T> build() { contextBuilder.project(FlinkSchemaUtil.convert(icebergSchema, projectedFlinkSchema)); } + SerializableRecordEmitter<T> emitter = SerializableRecordEmitter.defaultEmitter(); + if (watermarkColumn != null) { Review Comment: @dchristle: The code line is mostly irrelevant for the conversation 😄 Iceberg planner collects the files which should be scanned by the execution engine, and creates `ScanTask`s from them. If a file is big, then it creates multiple splits for a given file, so multiple readers could read the given file parallel. OTOH if there are multiple small files, then it combines them to a `CombinedScanTask` which are read by a single reader - this way decreasing the number of splits, split assignments etc. We generate a single watermark for every `ScanTask`. One `CombineScanTask` could group multiple data files, with wide range of timestamps, so generating a single watermark for it could be suboptimal. Setting `read.split.open-file-cost` could prevent the creation such `CombinedScanTask`s, and could result in better ordered input, and finer grained watermarks. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: issues-unsubscr...@iceberg.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org --------------------------------------------------------------------- To unsubscribe, e-mail: issues-unsubscr...@iceberg.apache.org For additional commands, e-mail: issues-h...@iceberg.apache.org