gaoyunhaii commented on a change in pull request #15055: URL: https://github.com/apache/flink/pull/15055#discussion_r657646552
########## File path: flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/PipelinedResultPartition.java ########## @@ -164,6 +192,35 @@ public void flush(int targetSubpartition) { flushSubpartition(targetSubpartition, false); } + @Override + public CompletableFuture<Void> getAllRecordsProcessedFuture() throws IOException { + synchronized (lock) { + if (allRecordsProcessedFuture == null) { + allRecordsProcessedFuture = new CompletableFuture<>(); + broadcastEvent(EndOfUserRecordsEvent.INSTANCE, false); Review comment: Very thanks for the review and suggestions! The initial thought is trying to disable callers of `ResultPartition` from emitting `EndOfUserRecordsEvent` multiple times, thus the `allRecordsProcessedFuture` also serve as a boolean flag indicating whether we have ever emitted `EndOfUserRecordsEvent`. But it is indeed not very clear now. I split the broadcast event and retrieving the future now: ```java interface getAllRecordsProcessedFuture { ... void notifyEndOfUserRecords() throws IOException; CompletableFuture<Void> getAllRecordsProcessedFuture(); } class PipelinedResultPartition { @GuardedBy("lock") private final CompletableFuture<Void> allRecordsProcessedFuture = new CompletableFuture<Void>(); private boolean notifiedNoMoreRecords; @Override public CompletableFuture<Void> getAllRecordsProcessedFuture() throws IOException { return allRecordsProcessedFuture; } @Override public void notifyEndOfUserRecords() { if (!notifiedNoMoreRecords) { emits the EndOfUserRecordsEvent; notifiedNoMoreRecords = true; } } } ``` -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org