pvary commented on code in PR #8553:
URL: https://github.com/apache/iceberg/pull/8553#discussion_r1401243005


##########
flink/v1.17/flink/src/main/java/org/apache/iceberg/flink/source/IcebergSource.java:
##########
@@ -453,6 +492,18 @@ public IcebergSource<T> build() {
         contextBuilder.project(FlinkSchemaUtil.convert(icebergSchema, 
projectedFlinkSchema));
       }
 
+      SerializableRecordEmitter<T> emitter = 
SerializableRecordEmitter.defaultEmitter();
+      if (watermarkColumn != null) {

Review Comment:
   @dchristle: The code line is mostly irrelevant for the conversation 😄
   
   Iceberg planner collects the files which should be scanned by the execution 
engine, and creates `ScanTask`s from them. If a file is big, then it creates 
multiple splits for a given file, so multiple readers could read the given file 
parallel. OTOH if there are multiple small files, then it combines them to a 
`CombinedScanTask` which are read by a single reader - this way decreasing the 
number of splits, split assignments etc.
   
   We generate a single watermark for every `ScanTask`. One `CombineScanTask` 
could group multiple data files, with wide range of timestamps, so generating a 
single watermark for it could be suboptimal. Setting 
`read.split.open-file-cost` could prevent the creation such 
`CombinedScanTask`s, and could result in better ordered input, and finer 
grained watermarks.



##########
flink/v1.17/flink/src/main/java/org/apache/iceberg/flink/source/IcebergSource.java:
##########
@@ -453,6 +492,18 @@ public IcebergSource<T> build() {
         contextBuilder.project(FlinkSchemaUtil.convert(icebergSchema, 
projectedFlinkSchema));
       }
 
+      SerializableRecordEmitter<T> emitter = 
SerializableRecordEmitter.defaultEmitter();
+      if (watermarkColumn != null) {

Review Comment:
   @dchristle: The code line is mostly irrelevant for the conversation 😄
   
   Iceberg planner collects the files which should be scanned by the execution 
engine, and creates `ScanTask`s from them. If a file is big, then it creates 
multiple splits for a given file, so multiple readers could read the given file 
parallel. OTOH if there are multiple small files, then it combines them to a 
`CombinedScanTask` which are read by a single reader - this way decreasing the 
number of splits, split assignments etc.
   
   We generate a single watermark for every `ScanTask`. One `CombineScanTask` 
could group multiple data files, with wide range of timestamps, so generating a 
single watermark for it could be suboptimal. Setting 
`read.split.open-file-cost` could prevent the creation such 
`CombinedScanTask`s, and could result in better ordered input, and finer 
grained watermarks.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@iceberg.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: issues-unsubscr...@iceberg.apache.org
For additional commands, e-mail: issues-h...@iceberg.apache.org

Reply via email to