1u0 commented on a change in pull request #9717: [FLINK-14044] [runtime] 
Reducing synchronization in AsyncWaitOperator
URL: https://github.com/apache/flink/pull/9717#discussion_r329973564
 
 

 ##########
 File path: 
flink-streaming-java/src/main/java/org/apache/flink/streaming/api/operators/async/AsyncWaitOperator.java
 ##########
 @@ -293,140 +252,138 @@ public void close() throws Exception {
                        waitInFlightInputsFinished();
                }
                finally {
-                       Exception exception = null;
-
-                       try {
-                               super.close();
-                       } catch (InterruptedException interrupted) {
-                               exception = interrupted;
-
-                               Thread.currentThread().interrupt();
-                       } catch (Exception e) {
-                               exception = e;
-                       }
-
-                       try {
-                               // terminate the emitter, the emitter thread 
and the executor
-                               stopResources(true);
-                       } catch (InterruptedException interrupted) {
-                               exception = 
ExceptionUtils.firstOrSuppressed(interrupted, exception);
-
-                               Thread.currentThread().interrupt();
-                       } catch (Exception e) {
-                               exception = ExceptionUtils.firstOrSuppressed(e, 
exception);
-                       }
-
-                       if (exception != null) {
-                               LOG.warn("Errors occurred while closing the 
AsyncWaitOperator.", exception);
-                       }
+                       super.close();
                }
        }
 
-       @Override
-       public void dispose() throws Exception {
-               Exception exception = null;
+       /**
+        * Add the given stream element to the operator's stream element queue. 
This operation blocks until the element
+        * has been added.
+        *
+        * <p>Between two insertion attempts, this method yields the execution 
to the mailbox, such that events as well
+        * as asynchronous results can be processed.
+        *
+        * @param streamElement to add to the operator's queue
+        * @throws InterruptedException if the current thread has been 
interrupted while yielding to mailbox
+        * @return a handle that allows to set the result of the async 
computation for the given element.
+        */
+       private ResultFuture<OUT> addToWorkQueue(StreamElement streamElement) 
throws InterruptedException {
+               assert(Thread.holdsLock(checkpointingLock));
 
-               try {
-                       super.dispose();
-               } catch (InterruptedException interrupted) {
-                       exception = interrupted;
+               pendingStreamElement = streamElement;
 
-                       Thread.currentThread().interrupt();
-               } catch (Exception e) {
-                       exception = e;
+               Optional<ResultFuture<OUT>> queueEntry;
+               while (!(queueEntry = queue.tryPut(streamElement)).isPresent()) 
{
+                       mailboxExecutor.yield();
                }
 
-               try {
-                       stopResources(false);
-               } catch (InterruptedException interrupted) {
-                       exception = 
ExceptionUtils.firstOrSuppressed(interrupted, exception);
+               pendingStreamElement = null;
 
-                       Thread.currentThread().interrupt();
-               } catch (Exception e) {
-                       exception = ExceptionUtils.firstOrSuppressed(e, 
exception);
-               }
+               return queueEntry.get();
+       }
 
-               if (exception != null) {
-                       throw exception;
+       private void waitInFlightInputsFinished() throws InterruptedException {
+               assert(Thread.holdsLock(checkpointingLock));
+
+               while (!queue.isEmpty()) {
+                       mailboxExecutor.yield();
                }
        }
 
        /**
-        * Close the operator's resources. They include the emitter thread and 
the executor to run
-        * the queue's complete operation.
+        * Batch output of all completed elements. Watermarks are always 
completed if it's their turn to be processed.
         *
-        * @param waitForShutdown is true if the method should wait for the 
resources to be freed;
-        *                           otherwise false.
-        * @throws InterruptedException if current thread has been interrupted
+        * <p>This method will be called from {@link 
#processWatermark(Watermark)} and from a mail processing the result
+        * of an async function call.
         */
-       private void stopResources(boolean waitForShutdown) throws 
InterruptedException {
-               emitter.stop();
-               emitterThread.interrupt();
-
-               executor.shutdown();
-
-               if (waitForShutdown) {
-                       try {
-                               if (!executor.awaitTermination(365L, 
TimeUnit.DAYS)) {
-                                       executor.shutdownNow();
-                               }
-                       } catch (InterruptedException e) {
-                               executor.shutdownNow();
-
-                               Thread.currentThread().interrupt();
+       private void outputCompletedElements() {
+               if (queue.hasCompletedElements()) {
+                       // emit only one element to not block the mailbox 
thread unnecessarily
+                       synchronized (checkpointingLock) {
+                               
queue.emitCompletedElement(timestampedCollector);
                        }
-
-                       /*
-                        * FLINK-5638: If we have the checkpoint lock we might 
have to free it for a while so
-                        * that the emitter thread can complete/react to the 
interrupt signal.
-                        */
-                       if (Thread.holdsLock(checkpointingLock)) {
-                               while (emitterThread.isAlive()) {
-                                       checkpointingLock.wait(100L);
-                               }
+                       // if there are more completed elements, emit them with 
subsequent mails
+                       if (queue.hasCompletedElements()) {
+                               
mailboxExecutor.execute(this::outputCompletedElements);
                        }
-
-                       emitterThread.join();
-               } else {
-                       executor.shutdownNow();
                }
        }
 
        /**
-        * Add the given stream element queue entry to the operator's stream 
element queue. This
-        * operation blocks until the element has been added.
-        *
-        * <p>For that it tries to put the element into the queue and if not 
successful then it waits on
-        * the checkpointing lock. The checkpointing lock is also used by the 
{@link Emitter} to output
-        * elements. The emitter is also responsible for notifying this method 
if the queue has capacity
-        * left again, by calling notifyAll on the checkpointing lock.
-        *
-        * @param streamElementQueueEntry to add to the operator's queue
-        * @param <T> Type of the stream element queue entry's result
-        * @throws InterruptedException if the current thread has been 
interrupted
+        * A handler for the results of a specific input record.
         */
-       private <T> void addAsyncBufferEntry(StreamElementQueueEntry<T> 
streamElementQueueEntry) throws InterruptedException {
-               assert(Thread.holdsLock(checkpointingLock));
-
-               pendingStreamElementQueueEntry = streamElementQueueEntry;
+       private class ResultHandler implements ResultFuture<OUT> {
+               /**
+                * Optional timeout timer used to signal the timeout to the 
AsyncFunction.
+                */
+               private ScheduledFuture<?> timeoutTimer;
+               /**
+                * Record for which this result handler exists. Used only to 
report errors.
+                */
+               private final StreamRecord<IN> inputRecord;
+               /**
+                * The handle received from the queue to update the entry. 
Should only be used to inject the result;
+                * exceptions are handled here.
+                */
+               private final ResultFuture<OUT> resultFuture;
+               /**
+                * A guard against ill-written AsyncFunction. Additional 
(parallel) invokations of
+                * {@link #complete(Collection)} or {@link 
#completeExceptionally(Throwable)} will be ignored. This guard
+                * also helps for cases where proper results and timeouts 
happen at the same time.
+                */
+               private volatile boolean completed = false;
+
+               ResultHandler(StreamRecord<IN> inputRecord, ResultFuture<OUT> 
resultFuture) {
+                       this.inputRecord = inputRecord;
+                       this.resultFuture = resultFuture;
+               }
 
-               while (!queue.tryPut(streamElementQueueEntry)) {
-                       mailboxExecutor.yield();
+               void setTimeoutTimer(ScheduledFuture<?> timeoutTimer) {
+                       this.timeoutTimer = timeoutTimer;
                }
 
-               pendingStreamElementQueueEntry = null;
-       }
+               @Override
+               public void complete(Collection<OUT> results) {
+                       Preconditions.checkNotNull(results, "Results must not 
be null, use empty collection to emit nothing");
 
-       private void waitInFlightInputsFinished() throws InterruptedException {
-               assert(Thread.holdsLock(checkpointingLock));
+                       // already completed, so ignore exception
+                       if(completed) {
+                               return;
+                       }
 
-               while (!queue.isEmpty()) {
-                       mailboxExecutor.yield();
+                       completed = true;
+
+                       // move further processing into the mailbox thread
+                       mailboxExecutor.execute(() -> {
+                               // Cancel the timer once we've completed the 
stream record buffer entry. This will remove the registered
+                               // timer task
+                               if (timeoutTimer != null) {
+                                       // canceling in mailbox thread avoids 
https://issues.apache.org/jira/browse/FLINK-13635
+                                       timeoutTimer.cancel(true);
+                               }
+
+                               // update the queue entry with the result
+                               resultFuture.complete(results);
+                               // now output all elements from the queue that 
have been completed (in the correct order)
+                               outputCompletedElements();
+                       });
                }
-       }
 
-       @Override
-       public void failOperator(Throwable throwable) {
-               getContainingTask().getEnvironment().failExternally(throwable);
+               @Override
+               public void completeExceptionally(Throwable error) {
+                       // already completed, so ignore exception
+                       if(completed) {
+                               return;
+                       }
 
 Review comment:
   Should the `completeExceptionally()` also set the `completed = true`? Or is 
it only to protect from multiple `complete()` calls?
   In case of usage in `complete()`, strictly speaking, should the check and 
update be an atomic operation?
   
   Nitpick: code formatting, you need a space after `if` keyword.

----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services

Reply via email to