gaoyunhaii commented on a change in pull request #15055:
URL: https://github.com/apache/flink/pull/15055#discussion_r657646552



##########
File path: 
flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/PipelinedResultPartition.java
##########
@@ -164,6 +192,35 @@ public void flush(int targetSubpartition) {
         flushSubpartition(targetSubpartition, false);
     }
 
+    @Override
+    public CompletableFuture<Void> getAllRecordsProcessedFuture() throws 
IOException {
+        synchronized (lock) {
+            if (allRecordsProcessedFuture == null) {
+                allRecordsProcessedFuture = new CompletableFuture<>();
+                broadcastEvent(EndOfUserRecordsEvent.INSTANCE, false);

Review comment:
       Very thanks for the review and suggestions! The initial thought is 
trying to disable callers of `ResultPartition` from emitting 
`EndOfUserRecordsEvent` multiple times, thus the `allRecordsProcessedFuture` 
also serve as a boolean flag indicating whether we have ever emitted 
`EndOfUserRecordsEvent`. But it is indeed not very clear now. 
   
   I split the broadcast event and retrieving the future now:
   
   ```java
   interface getAllRecordsProcessedFuture {
        ...
             
        void notifyEndOfUserRecords() throws IOException;    
      
        CompletableFuture<Void>  getAllRecordsProcessedFuture();
   
   }
   
   class PipelinedResultPartition {
        @GuardedBy("lock")
       private final CompletableFuture<Void> allRecordsProcessedFuture = new 
CompletableFuture<Void>();
   
       private boolean notifiedNoMoreRecords;
       
       @Override
       public CompletableFuture<Void> getAllRecordsProcessedFuture() throws 
IOException {
           return allRecordsProcessedFuture;
       }   
   
       @Override
       public void notifyEndOfUserRecords() {
            if (!notifiedNoMoreRecords) {
                  emits the EndOfUserRecordsEvent;
                  notifiedNoMoreRecords = true;
            }
       }
   }
   ```




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


Reply via email to