1996fanrui commented on PR #21690: URL: https://github.com/apache/flink/pull/21690#issuecomment-1399208093
Hi @pnowojski , after an offline discussion with @lindong28 , I'm not sure if it makes sense to `add a return value for DataOutput#emitRecord` and `the return value determines whether the caller can continue to emit the record`? `DataOutput#emitRecord` is an interface, it should obey these conditions: - It shouldn't care about the logic of the caller. - It can be called repeatedly by the caller (Most methods can be called repeatedly by the caller, except for some special methods, such as: `open()`, `close()`, etc.). - It shouldn't care what the caller does between calling itself multiple times. Simplified code for our discussion looks like this: ```java MailboxProcessor#runMailboxLoop() { while(true) { // do some logics NetworkInput/SourceOperator#emitNext(); // do some logics } } // Before change: just emit one record. NetworkInput/SourceOperator#emitNext(){ DataOutput#emitRecord(); } // After change: can emit multiple records. NetworkInput/SourceOperator#emitNext(){ while(true) { DataOutput#emitRecord(); } } ``` We want to add the loop inside `emitNext` to reduce the call stack and improve performance. The `loop inside emitNext` logic should belong to `NetworkInput/SourceOperator` instead of `DataOutput`. And I mentioned before: `DataOutput#emitRecord can be called repeatedly by the caller`. If we return a boolean to determine whether caller can continue to emit records, it means that `DataOutput#emitRecord` knows the caller's two loops, however, `it shouldn't care about the logic of the caller`, and it shouldn't know the logic about `canEmitBatchOfRecords`. It should only be responsible for emitting records. So I think the logic related to the `canEmitBatchOfRecords` should be maintained inside the `NetworkInput and SourceOperator`, and I have a POC branch to do it[1]. Also, since flink 1.17 is approaching the feature freeze, could I create a new PR to fix bug and this PR focus on the refactoring? We can continue discuss the refactoring here, what do you think? [1] https://github.com/1996fanrui/flink/commits/30623/canEmitBatchOfRecords-POC -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org