pnowojski commented on PR #21690:
URL: https://github.com/apache/flink/pull/21690#issuecomment-1400094213

   Hey, again I think your arguments are not fully correct:
   
   > It can be called repeatedly by the caller (Most methods can be called 
repeatedly by the caller, except for some special methods, such as: open(), 
close(), etc.).
   > It shouldn't care what the caller does between calling itself multiple 
times.
   
   Both are not true. There are multiple places where methods number of calls 
and order of calls matter a lot. The most similar are our non blocking outputs. 
Callers of `RecordWriter.emit(...)` should check for the output availability, 
for example. The same applies to availability of the statebackends (currently 
only changelog). 
   
   Also keep in mind, that braking out of the inner loop, is not an 
optimisation, **but a hard requirement**. Before FLINK-30533  
`DataOutput#emitNext` had actually an implicit contract, forbidding the callers 
(`SourceOperator` or `NetworkInput`) to loop over `DataOutput#emitNext`, as 
otherwise that would cause starvation, braking checkpointing mechanism among 
many other issues. Even with your current implementation, multiple/two input  
are still forbidden to loop.
   
   > It shouldn't care about the logic of the caller.
   
   So that's not true as well. Regardless how you implement it, the caller has 
to care about how many times the `DataOutput#emitRecord()` can be called in a 
row. If you hide this logic in a method/interface that passed separately, it 
doesn't make it disconnected in any way. Instead of passing one interface with 
both `emitRecord()` and something that returns `canEmitBatchOfRecords()` 
(either via return value or separate method in the `DataOutput` interface), you 
have to pass two interfaces. One that controls how the other should be used. I 
don't see any benefit of this approach:
   1. caller still needs both of the the things passed
   2. even for testing and dependency injection, you still need to implement 
the same mock logic, having them separate doesn't simplify anything here
   
   And as I mentioned before, passing two separate interface would make bugs 
much easier to appear, when someone forgets about 
`CanEmitBatchOfRecordsChecker`.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org

Reply via email to