dchristle commented on code in PR #8553: URL: https://github.com/apache/iceberg/pull/8553#discussion_r1401083561
########## flink/v1.17/flink/src/main/java/org/apache/iceberg/flink/source/IcebergSource.java: ########## @@ -453,6 +492,18 @@ public IcebergSource<T> build() { contextBuilder.project(FlinkSchemaUtil.convert(icebergSchema, projectedFlinkSchema)); } + SerializableRecordEmitter<T> emitter = SerializableRecordEmitter.defaultEmitter(); + if (watermarkColumn != null) { Review Comment: At least when we use event-time alignment, the only bound we expect to be respected is the main "max out-of-orderness" constraint; the ordering of rows before the watermark is advanced isn't something we rely on. Our custom operators that use event-time won't emit results till the watermark is advanced beyond our timers anyway. So, even if the rows were received in perfect order, they'd still be buffered into state, which suggests to me there is little speed or memory benefit. Am I missing anything here? We do use some custom process-time operators, where, in theory, less out-of-orderness would give more accurate results. But we discard the results emitted during the Iceberg backfill phase anyway, since our data are partitioned by day & the out-of-orderness allowed by ~25 hours constraint (we set it slightly above 24 hours as a precaution to avoid the aligner getting stuck w/ daily partitioned files) is too high for accurate-enough results. I'm a bit confused on how `read.split.open-file-cost` relates to the code line this discussion is tagged at, and if maybe I'm not fully understanding. 1) Does the current code try to respect `read.split.open-file-cost` when selecting files to include in a split? 2) The only other case I can think of where ordering of row reads is key is in minimizing "straggler" files that would hold up reads, i.e. if all files within a perfectly sorted daily datepartition have been read except for the one with the earliest timestamp, a max out-of-orderness of ~24 hours would mean most SplitReaders are idle, since they cannot read more than 24 hours ahead of the min timestamp. But AFAIK, the current PR's enumerator sorts the files by min timestamp & assigns them in order for this exact reason, right? -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: issues-unsubscr...@iceberg.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org --------------------------------------------------------------------- To unsubscribe, e-mail: issues-unsubscr...@iceberg.apache.org For additional commands, e-mail: issues-h...@iceberg.apache.org