dchristle commented on code in PR #8553:
URL: https://github.com/apache/iceberg/pull/8553#discussion_r1401083561


##########
flink/v1.17/flink/src/main/java/org/apache/iceberg/flink/source/IcebergSource.java:
##########
@@ -453,6 +492,18 @@ public IcebergSource<T> build() {
         contextBuilder.project(FlinkSchemaUtil.convert(icebergSchema, 
projectedFlinkSchema));
       }
 
+      SerializableRecordEmitter<T> emitter = 
SerializableRecordEmitter.defaultEmitter();
+      if (watermarkColumn != null) {

Review Comment:
   At least when we use event-time alignment, the only bound we expect to be 
respected is the main "max out-of-orderness" constraint; the ordering of rows 
before the watermark is advanced isn't something we rely on. Our custom 
operators that use event-time won't emit results till the watermark is advanced 
beyond our timers anyway. So, even if the rows were received in perfect order, 
they'd still be buffered into state, which suggests to me there is little speed 
or memory benefit. Am I missing anything here?
   
   We do use some custom process-time operators, where, in theory, less 
out-of-orderness would give more accurate results. But we discard the results 
emitted during the Iceberg backfill phase anyway, since our data are 
partitioned by day & the out-of-orderness allowed by ~25 hours constraint (we 
set it slightly above 24 hours as a precaution to avoid the aligner getting 
stuck w/ daily partitioned files) is too high for accurate-enough results.
   
   I'm a bit confused on how `read.split.open-file-cost` relates to the code 
line this discussion is tagged at, and if maybe I'm not fully understanding.
   
   1) Does the current code try to respect `read.split.open-file-cost` when 
selecting files to include in a split? 
   
   2) The only other case I can think of where ordering of row reads is key is 
in minimizing "straggler" files that would hold up reads, i.e. if all files 
within a perfectly sorted daily partition have been read except for the one 
with the earliest timestamp, a max out-of-orderness of ~24 hours would mean 
most SplitReaders are idle, since they cannot read more than 24 hours ahead of 
the min timestamp. But AFAIK, the current PR's enumerator sorts the files by 
min timestamp & assigns them in order for this exact reason, right?
   



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@iceberg.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: issues-unsubscr...@iceberg.apache.org
For additional commands, e-mail: issues-h...@iceberg.apache.org

Reply via email to