zhijiangW commented on a change in pull request #9717: [FLINK-14044] [runtime] 
Reducing synchronization in AsyncWaitOperator
URL: https://github.com/apache/flink/pull/9717#discussion_r327954701
 
 

 ##########
 File path: 
flink-streaming-java/src/main/java/org/apache/flink/streaming/api/operators/async/queue/OrderedStreamElementQueue.java
 ##########
 @@ -143,88 +92,31 @@ public int size() {
        }
 
        @Override
-       public <T> void put(StreamElementQueueEntry<T> streamElementQueueEntry) 
throws InterruptedException {
-               lock.lockInterruptibly();
-
-               try {
-                       while (queue.size() >= capacity) {
-                               notFull.await();
-                       }
+       public Optional<ResultFuture<OUT>> tryPut(StreamElement streamElement) {
+               if (queue.size() < capacity) {
+                       StreamElementQueueEntry queueEntry = 
createEntry(streamElement);
 
-                       addEntry(streamElementQueueEntry);
-               } finally {
-                       lock.unlock();
-               }
-       }
+                       queue.add(queueEntry);
 
-       @Override
-       public <T> boolean tryPut(StreamElementQueueEntry<T> 
streamElementQueueEntry) throws InterruptedException {
-               lock.lockInterruptibly();
-
-               try {
-                       if (queue.size() < capacity) {
-                               addEntry(streamElementQueueEntry);
-
-                               LOG.debug("Put element into ordered stream 
element queue. New filling degree " +
-                                       "({}/{}).", queue.size(), capacity);
+                       LOG.debug("Put element into ordered stream element 
queue. New filling degree " +
+                               "({}/{}).", queue.size(), capacity);
 
-                               return true;
-                       } else {
-                               LOG.debug("Failed to put element into ordered 
stream element queue because it " +
-                                       "was full ({}/{}).", queue.size(), 
capacity);
+                       return Optional.of(queueEntry);
+               } else {
+                       LOG.debug("Failed to put element into ordered stream 
element queue because it " +
+                               "was full ({}/{}).", queue.size(), capacity);
 
-                               return false;
-                       }
-               } finally {
-                       lock.unlock();
+                       return Optional.empty();
                }
        }
 
-       /**
-        * Add the given {@link StreamElementQueueEntry} to the queue. 
Additionally, this method
-        * registers a onComplete callback which is triggered once the given 
queue entry is completed.
-        *
-        * @param streamElementQueueEntry to be inserted
-        * @param <T> Type of the stream element queue entry's result
-        */
-       private <T> void addEntry(StreamElementQueueEntry<T> 
streamElementQueueEntry) {
-               assert(lock.isHeldByCurrentThread());
-
-               queue.addLast(streamElementQueueEntry);
-
-               streamElementQueueEntry.onComplete(
-                       (StreamElementQueueEntry<T> value) -> {
-                               try {
-                                       onCompleteHandler(value);
-                               } catch (InterruptedException e) {
-                                       // we got interrupted. This indicates a 
shutdown of the executor
-                                       LOG.debug("AsyncBufferEntry could not 
be properly completed because the " +
-                                               "executor thread has been 
interrupted.", e);
-                               } catch (Throwable t) {
-                                       operatorActions.failOperator(new 
Exception("Could not complete the " +
-                                               "stream element queue entry: " 
+ value + '.', t));
-                               }
-                       },
-                       executor);
-       }
-
-       /**
-        * Check if the completed {@link StreamElementQueueEntry} is the 
current head. If this is the
-        * case, then notify the consumer thread about a new consumable entry.
-        *
-        * @param streamElementQueueEntry which has been completed
-        * @throws InterruptedException if the current thread is interrupted
-        */
-       private void onCompleteHandler(StreamElementQueueEntry<?> 
streamElementQueueEntry) throws InterruptedException {
-               lock.lockInterruptibly();
-
-               try {
-                       if (!queue.isEmpty() && queue.peek().isDone()) {
-                               LOG.debug("Signal ordered stream element queue 
has completed head element.");
-                               headIsCompleted.signalAll();
-                       }
-               } finally {
-                       lock.unlock();
+       private StreamElementQueueEntry createEntry(StreamElement 
streamElement) {
+               if (streamElement.isRecord()) {
+                       return new StreamRecordQueueEntry<>((StreamRecord<?>) 
streamElement);
+               }
+               if (streamElement.isWatermark()) {
+                       return new WatermarkQueueEntry((Watermark) 
streamElement);
 
 Review comment:
   ditto:  new WatermarkQueueEntry<>

----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services

Reply via email to