xuyangzhong commented on code in PR #26708:
URL: https://github.com/apache/flink/pull/26708#discussion_r2162802560
##########
flink-table/flink-table-runtime/src/main/java/org/apache/flink/table/runtime/operators/join/lookup/keyordered/Epoch.java:
##########
@@ -72,6 +74,7 @@ public void setOutput(Consumer<StreamElementQueueEntry<OUT>>
outputConsumer) {
public void decrementCount() {
ongoingRecordCount--;
+ Preconditions.checkState(ongoingRecordCount >= 0);
Review Comment:
The inconsistency lies in the submitRecord method of the
TableAsyncExecutionController. Although we place a record into an epoch, the
ongoing record counter, `ongoingRecordCount`, does not actually increment by 1.
> "I wonder why the caller cannot check that the decrementCount should not
be called if there is nothing to decrement."
I'm not the developer of the TableAsyncExecutionController, but I suspect
this design is intended to minimize the exposure of the `ongoingRecordCount`
counter to external components.
> "Check before the decrement."
Done.
> "The logic should be made thread-safe."
Currently, this logic is handled only within the mailbox thread.
According to FLINK-37921, the TableAsyncExecutionController will be removed,
so I prefer not to invest too much effort into refactoring this code to avoid
blocking 2.1(This has resulted in too many CI failures.). WDYT? @davidradl
@xishuaidelin
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]