1996fanrui commented on PR #21690:
URL: https://github.com/apache/flink/pull/21690#issuecomment-1399208093

   Hi @pnowojski , after an offline discussion with @lindong28 , I'm not sure 
if it makes sense to `add a return value for DataOutput#emitRecord` and `the 
return value determines whether the caller can continue to emit the record`?
   
   `DataOutput#emitRecord` is an interface, it should obey these conditions:
   - It shouldn't care about the logic of the caller.
   - It can be called repeatedly by the caller (Most methods can be called 
repeatedly by the caller, except for some special methods, such as: `open()`, 
`close()`, etc.).
   - It shouldn't care what the caller does between calling itself multiple 
times.
   
   Simplified code for our discussion looks like this:
   
   ```java
   MailboxProcessor#runMailboxLoop() {
     while(true) {
       // do some logics
   
       NetworkInput/SourceOperator#emitNext();
   
       // do some logics
     }
   }
   
   // Before change: just emit one record.
   NetworkInput/SourceOperator#emitNext(){
     DataOutput#emitRecord();
   }
   
   // After change: can emit multiple records.
   NetworkInput/SourceOperator#emitNext(){
     while(true) {
       DataOutput#emitRecord();
     }
   }
   ```
   
   We want to add the loop inside `emitNext` to reduce the call stack and 
improve performance. The `loop inside emitNext` logic should belong to 
`NetworkInput/SourceOperator` instead of `DataOutput`.
   
   And I mentioned before: `DataOutput#emitRecord can be called repeatedly by 
the caller`. If we return a boolean to determine whether caller can continue to 
emit records, it means that `DataOutput#emitRecord` knows the caller's two 
loops, however, `it shouldn't care about the logic of the caller`, and it 
shouldn't know the logic about `canEmitBatchOfRecords`. It should only be 
responsible for emitting records.
   
   So I think the logic related to the `canEmitBatchOfRecords` should be 
maintained inside the `NetworkInput and SourceOperator`, and I have a POC 
branch to do it[1].
   
   Also, since flink 1.17 is approaching the feature freeze, could I create a 
new PR to fix bug and this PR focus on the refactoring? We can continue discuss 
the refactoring here, what do you think?
   
   [1] 
https://github.com/1996fanrui/flink/commits/30623/canEmitBatchOfRecords-POC
   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org

Reply via email to