CrynetLogistics commented on a change in pull request #18651: URL: https://github.com/apache/flink/pull/18651#discussion_r814230648
########## File path: flink-connectors/flink-connector-base/src/main/java/org/apache/flink/connector/base/sink/writer/AsyncSinkWriter.java ########## @@ -290,17 +290,33 @@ private void registerCallback() { @Override public void write(InputT element, Context context) throws IOException, InterruptedException { while (bufferedRequestEntries.size() >= maxBufferedRequests) { - mailboxExecutor.tryYield(); + flush(); } addEntryToBuffer(elementConverter.apply(element, context), false); - flushIfAble(); + nonBlockingFlush(); } - private void flushIfAble() { - while (bufferedRequestEntries.size() >= maxBatchSize - || bufferedRequestEntriesTotalSizeInBytes >= maxBatchSizeInBytes) { + /** + * Determines if a call to flush will be non-blocking (i.e. {@code inFlightRequestsCount} is + * strictly smaller than {@code maxInFlightRequests}). Also requires one of the following + * requirements to be met: + * + * <ul> + * <li>The number of elements buffered is greater than or equal to the {@code maxBatchSize} + * <li>The sum of the size in bytes of all records in the buffer is greater than or equal to + * {@code maxBatchSizeInBytes} + * </ul> + */ + private void nonBlockingFlush() { + boolean uncompletedInFlightResponses = true; + while (uncompletedInFlightResponses) { + uncompletedInFlightResponses = mailboxExecutor.tryYield(); + } Review comment: Just one last question: You previously mentioned that: "Any mails if present, should have been executed before any write() call has been made." Does this mean that there is some mechanism in the sink operator that will yield to the mailbox executor to finish off completed requests in addition to `mailboxExecutor.yield();` in the `flush()` method? Specifically, does it call `mailboxExecutor.tryYield();` repeatedly similar to what I was doing, before executing `write()`? -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org