zhijiangW commented on a change in pull request #9717: [FLINK-14044] [runtime] Reducing synchronization in AsyncWaitOperator URL: https://github.com/apache/flink/pull/9717#discussion_r327954701
########## File path: flink-streaming-java/src/main/java/org/apache/flink/streaming/api/operators/async/queue/OrderedStreamElementQueue.java ########## @@ -143,88 +92,31 @@ public int size() { } @Override - public <T> void put(StreamElementQueueEntry<T> streamElementQueueEntry) throws InterruptedException { - lock.lockInterruptibly(); - - try { - while (queue.size() >= capacity) { - notFull.await(); - } + public Optional<ResultFuture<OUT>> tryPut(StreamElement streamElement) { + if (queue.size() < capacity) { + StreamElementQueueEntry queueEntry = createEntry(streamElement); - addEntry(streamElementQueueEntry); - } finally { - lock.unlock(); - } - } + queue.add(queueEntry); - @Override - public <T> boolean tryPut(StreamElementQueueEntry<T> streamElementQueueEntry) throws InterruptedException { - lock.lockInterruptibly(); - - try { - if (queue.size() < capacity) { - addEntry(streamElementQueueEntry); - - LOG.debug("Put element into ordered stream element queue. New filling degree " + - "({}/{}).", queue.size(), capacity); + LOG.debug("Put element into ordered stream element queue. New filling degree " + + "({}/{}).", queue.size(), capacity); - return true; - } else { - LOG.debug("Failed to put element into ordered stream element queue because it " + - "was full ({}/{}).", queue.size(), capacity); + return Optional.of(queueEntry); + } else { + LOG.debug("Failed to put element into ordered stream element queue because it " + + "was full ({}/{}).", queue.size(), capacity); - return false; - } - } finally { - lock.unlock(); + return Optional.empty(); } } - /** - * Add the given {@link StreamElementQueueEntry} to the queue. Additionally, this method - * registers a onComplete callback which is triggered once the given queue entry is completed. - * - * @param streamElementQueueEntry to be inserted - * @param <T> Type of the stream element queue entry's result - */ - private <T> void addEntry(StreamElementQueueEntry<T> streamElementQueueEntry) { - assert(lock.isHeldByCurrentThread()); - - queue.addLast(streamElementQueueEntry); - - streamElementQueueEntry.onComplete( - (StreamElementQueueEntry<T> value) -> { - try { - onCompleteHandler(value); - } catch (InterruptedException e) { - // we got interrupted. This indicates a shutdown of the executor - LOG.debug("AsyncBufferEntry could not be properly completed because the " + - "executor thread has been interrupted.", e); - } catch (Throwable t) { - operatorActions.failOperator(new Exception("Could not complete the " + - "stream element queue entry: " + value + '.', t)); - } - }, - executor); - } - - /** - * Check if the completed {@link StreamElementQueueEntry} is the current head. If this is the - * case, then notify the consumer thread about a new consumable entry. - * - * @param streamElementQueueEntry which has been completed - * @throws InterruptedException if the current thread is interrupted - */ - private void onCompleteHandler(StreamElementQueueEntry<?> streamElementQueueEntry) throws InterruptedException { - lock.lockInterruptibly(); - - try { - if (!queue.isEmpty() && queue.peek().isDone()) { - LOG.debug("Signal ordered stream element queue has completed head element."); - headIsCompleted.signalAll(); - } - } finally { - lock.unlock(); + private StreamElementQueueEntry createEntry(StreamElement streamElement) { + if (streamElement.isRecord()) { + return new StreamRecordQueueEntry<>((StreamRecord<?>) streamElement); + } + if (streamElement.isWatermark()) { + return new WatermarkQueueEntry((Watermark) streamElement); Review comment: ditto: `new WatermarkQueueEntry<>` ---------------------------------------------------------------- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services