[GitHub] [flink] CrynetLogistics commented on a change in pull request #18651: [FLINK-25792][connectors] Only flushing the async sink base if it is …

2022-02-28 Thread GitBox


CrynetLogistics commented on a change in pull request #18651:
URL: https://github.com/apache/flink/pull/18651#discussion_r816109257



##
File path: 
flink-connectors/flink-connector-base/src/test/java/org/apache/flink/connector/base/sink/writer/AsyncSinkWriterTest.java
##
@@ -925,6 +964,18 @@ private AsyncSinkWriterImpl(
 }
 
 public void write(String val) throws IOException, InterruptedException 
{
+boolean canYield = true;
+while (canYield) {
+canYield = sinkInitContext.getMailboxExecutor().tryYield();
+}
+canYield = true;
+while (canYield) {
+canYield = 
sinkInitContextAnyThreadMailbox.getMailboxExecutor().tryYield();
+}
+write(val, null);
+}
+
+public void writeWithNonMailboxThread(String val) throws IOException, 
InterruptedException {

Review comment:
   Agreed, how about `writeAsNonMailboxThread` to indicate the caller is 
not a mail thread and the callee will not use a different thread?




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [flink] CrynetLogistics commented on a change in pull request #18651: [FLINK-25792][connectors] Only flushing the async sink base if it is …

2022-02-28 Thread GitBox


CrynetLogistics commented on a change in pull request #18651:
URL: https://github.com/apache/flink/pull/18651#discussion_r816067234



##
File path: 
flink-connectors/flink-connector-base/src/test/java/org/apache/flink/connector/base/sink/writer/AsyncSinkWriterTest.java
##
@@ -862,6 +867,40 @@ private void 
writeTwoElementsAndInterleaveTheNextTwoElements(
 "Executor Service stuck at termination, not terminated after 
500ms!");
 }
 
+@Test
+public void 
ifTheNumberOfUncompletedInFlightRequestsIsTooManyThenBlockInFlushMethod()
+throws Exception {
+CountDownLatch blockedWriteLatch = new CountDownLatch(1);
+CountDownLatch delayedStartLatch = new CountDownLatch(1);
+AsyncSinkWriterImpl sink =
+new AsyncSinkReleaseAndBlockWriterImpl(
+sinkInitContextAnyThreadMailbox,
+1,
+blockedWriteLatch,
+delayedStartLatch,
+false);
+
+Thread t =
+new Thread(
+() -> {
+try {
+sink.writeWithNonMailboxThread("1");
+sink.writeWithNonMailboxThread("2");
+sink.writeWithNonMailboxThread("3");
+} catch (IOException | InterruptedException e) {
+e.printStackTrace();
+}
+});
+t.start();
+
+delayedStartLatch.await();

Review comment:
   Definitely. Let me add this.




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [flink] CrynetLogistics commented on a change in pull request #18651: [FLINK-25792][connectors] Only flushing the async sink base if it is …

2022-02-28 Thread GitBox


CrynetLogistics commented on a change in pull request #18651:
URL: https://github.com/apache/flink/pull/18651#discussion_r816064365



##
File path: 
flink-connectors/flink-connector-base/src/test/java/org/apache/flink/connector/base/sink/writer/AsyncSinkWriterTest.java
##
@@ -547,20 +560,22 @@ public void 
prepareCommitFlushesInflightElementsIfFlushIsSetToFalse() throws Exc
 AsyncSinkWriterImpl sink =
 new AsyncSinkWriterImplBuilder()
 .context(sinkInitContext)
-.maxBatchSize(3)
+.maxBatchSize(4)
 .maxBufferedRequests(10)
 .simulateFailures(true)
 .build();
-sink.write(String.valueOf(225)); // buffer :[225]
-sink.write(String.valueOf(0)); // buffer [225,0]
-sink.write(String.valueOf(1)); // buffer [225,0,1] -- flushing
-sink.write(String.valueOf(2)); // flushing -- request should have 
[225,0,1], [225] fails,
-// buffer has [2]
-assertEquals(2, res.size());
+sink.write(String.valueOf(225)); // buffer: [225]
+sink.write(String.valueOf(0)); // buffer: [225, 0]
+sink.write(String.valueOf(1)); // buffer: [225, 0, 1]
+sink.write(String.valueOf(2)); // buffer: [225, 0, 1, 2] // flushing 
next round
+sink.write(String.valueOf(3)); // flushing, request is [225, 0, 1, 2], 
[225] fails
+sink.write(String.valueOf(4)); // buffer: [225, 3, 4]
+
+assertEquals(4, res.size());

Review comment:
   Thanks Fabian, I reworked this test case now, so that it more accurately 
reflects what is being tested.




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [flink] CrynetLogistics commented on a change in pull request #18651: [FLINK-25792][connectors] Only flushing the async sink base if it is …

2022-02-25 Thread GitBox


CrynetLogistics commented on a change in pull request #18651:
URL: https://github.com/apache/flink/pull/18651#discussion_r814874120



##
File path: 
flink-connectors/flink-connector-base/src/main/java/org/apache/flink/connector/base/sink/writer/AsyncSinkWriter.java
##
@@ -465,7 +489,7 @@ private void addEntryToBuffer(RequestEntryT entry, boolean 
insertAtHead) {
  * To this end, all in-flight requests need to completed before 
proceeding with the commit.
  */
 @Override
-public void flush(boolean flush) {
+public void flush(boolean flush) throws InterruptedException {
 while (inFlightRequestsCount > 0 || (bufferedRequestEntries.size() > 0 
&& flush)) {
 mailboxExecutor.tryYield();

Review comment:
   On small difference I might suggest here is to have the tryYield line 
replaced with 
   ```
   if(inFlightRequestsCount>0){
   mailboxExecutor.yield();
   }
   ```
   
   Only because it is possible for the code to reach here with 
`inFlightRequestCount=0`. Then we'd be in a real spot of bother ;)




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [flink] CrynetLogistics commented on a change in pull request #18651: [FLINK-25792][connectors] Only flushing the async sink base if it is …

2022-02-25 Thread GitBox


CrynetLogistics commented on a change in pull request #18651:
URL: https://github.com/apache/flink/pull/18651#discussion_r814874120



##
File path: 
flink-connectors/flink-connector-base/src/main/java/org/apache/flink/connector/base/sink/writer/AsyncSinkWriter.java
##
@@ -465,7 +489,7 @@ private void addEntryToBuffer(RequestEntryT entry, boolean 
insertAtHead) {
  * To this end, all in-flight requests need to completed before 
proceeding with the commit.
  */
 @Override
-public void flush(boolean flush) {
+public void flush(boolean flush) throws InterruptedException {
 while (inFlightRequestsCount > 0 || (bufferedRequestEntries.size() > 0 
&& flush)) {
 mailboxExecutor.tryYield();

Review comment:
   On small difference I might suggest here is to have the tryYield line 
replaced with 
   ```
   while(inFlightRequestsCount>0){
   mailboxExecutor.yield();
   }
   ```
   
   Only because it is possible for the code to reach here with 
`inFlightRequestCount=0`. Then we'd be in a real spot of bother ;)




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [flink] CrynetLogistics commented on a change in pull request #18651: [FLINK-25792][connectors] Only flushing the async sink base if it is …

2022-02-25 Thread GitBox


CrynetLogistics commented on a change in pull request #18651:
URL: https://github.com/apache/flink/pull/18651#discussion_r814835723



##
File path: 
flink-connectors/flink-connector-base/src/main/java/org/apache/flink/connector/base/sink/writer/AsyncSinkWriter.java
##
@@ -465,7 +489,7 @@ private void addEntryToBuffer(RequestEntryT entry, boolean 
insertAtHead) {
  * To this end, all in-flight requests need to completed before 
proceeding with the commit.
  */
 @Override
-public void flush(boolean flush) {
+public void flush(boolean flush) throws InterruptedException {
 while (inFlightRequestsCount > 0 || (bufferedRequestEntries.size() > 0 
&& flush)) {
 mailboxExecutor.tryYield();

Review comment:
   I completely agree. Thank you so much.




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [flink] CrynetLogistics commented on a change in pull request #18651: [FLINK-25792][connectors] Only flushing the async sink base if it is …

2022-02-24 Thread GitBox


CrynetLogistics commented on a change in pull request #18651:
URL: https://github.com/apache/flink/pull/18651#discussion_r814230648



##
File path: 
flink-connectors/flink-connector-base/src/main/java/org/apache/flink/connector/base/sink/writer/AsyncSinkWriter.java
##
@@ -290,17 +290,33 @@ private void registerCallback() {
 @Override
 public void write(InputT element, Context context) throws IOException, 
InterruptedException {
 while (bufferedRequestEntries.size() >= maxBufferedRequests) {
-mailboxExecutor.tryYield();
+flush();
 }
 
 addEntryToBuffer(elementConverter.apply(element, context), false);
 
-flushIfAble();
+nonBlockingFlush();
 }
 
-private void flushIfAble() {
-while (bufferedRequestEntries.size() >= maxBatchSize
-|| bufferedRequestEntriesTotalSizeInBytes >= 
maxBatchSizeInBytes) {
+/**
+ * Determines if a call to flush will be non-blocking (i.e. {@code 
inFlightRequestsCount} is
+ * strictly smaller than {@code maxInFlightRequests}). Also requires one 
of the following
+ * requirements to be met:
+ *
+ * 
+ *   The number of elements buffered is greater than or equal to the 
{@code maxBatchSize}
+ *   The sum of the size in bytes of all records in the buffer is 
greater than or equal to
+ *   {@code maxBatchSizeInBytes}
+ * 
+ */
+private void nonBlockingFlush() {
+boolean uncompletedInFlightResponses = true;
+while (uncompletedInFlightResponses) {
+uncompletedInFlightResponses = mailboxExecutor.tryYield();
+}

Review comment:
   Just one last question:
   
   You previously mentioned that:
   
   "Any mails if present, should have been executed before any write() call has 
been made."
   
   Does this mean that there is some mechanism in the sink operator that will 
yield to the mailbox executor to finish off completed requests in addition to 
`mailboxExecutor.yield();` in the `flush()` method? 
   
   Specifically, does it call `mailboxExecutor.tryYield();` repeatedly similar 
to what I was doing, before executing `write()`?




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [flink] CrynetLogistics commented on a change in pull request #18651: [FLINK-25792][connectors] Only flushing the async sink base if it is …

2022-02-24 Thread GitBox


CrynetLogistics commented on a change in pull request #18651:
URL: https://github.com/apache/flink/pull/18651#discussion_r814210933



##
File path: 
flink-connectors/flink-connector-base/src/main/java/org/apache/flink/connector/base/sink/writer/AsyncSinkWriter.java
##
@@ -290,17 +290,33 @@ private void registerCallback() {
 @Override
 public void write(InputT element, Context context) throws IOException, 
InterruptedException {
 while (bufferedRequestEntries.size() >= maxBufferedRequests) {
-mailboxExecutor.tryYield();
+flush();
 }
 
 addEntryToBuffer(elementConverter.apply(element, context), false);
 
-flushIfAble();
+nonBlockingFlush();
 }
 
-private void flushIfAble() {
-while (bufferedRequestEntries.size() >= maxBatchSize
-|| bufferedRequestEntriesTotalSizeInBytes >= 
maxBatchSizeInBytes) {
+/**
+ * Determines if a call to flush will be non-blocking (i.e. {@code 
inFlightRequestsCount} is
+ * strictly smaller than {@code maxInFlightRequests}). Also requires one 
of the following
+ * requirements to be met:
+ *
+ * 
+ *   The number of elements buffered is greater than or equal to the 
{@code maxBatchSize}
+ *   The sum of the size in bytes of all records in the buffer is 
greater than or equal to
+ *   {@code maxBatchSizeInBytes}
+ * 
+ */
+private void nonBlockingFlush() {
+boolean uncompletedInFlightResponses = true;
+while (uncompletedInFlightResponses) {
+uncompletedInFlightResponses = mailboxExecutor.tryYield();
+}

Review comment:
   I agree. Thank you. This is wonderful.




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [flink] CrynetLogistics commented on a change in pull request #18651: [FLINK-25792][connectors] Only flushing the async sink base if it is …

2022-02-24 Thread GitBox


CrynetLogistics commented on a change in pull request #18651:
URL: https://github.com/apache/flink/pull/18651#discussion_r813875078



##
File path: 
flink-connectors/flink-connector-base/src/main/java/org/apache/flink/connector/base/sink/writer/AsyncSinkWriter.java
##
@@ -290,17 +290,33 @@ private void registerCallback() {
 @Override
 public void write(InputT element, Context context) throws IOException, 
InterruptedException {
 while (bufferedRequestEntries.size() >= maxBufferedRequests) {
-mailboxExecutor.tryYield();
+flush();
 }
 
 addEntryToBuffer(elementConverter.apply(element, context), false);
 
-flushIfAble();
+nonBlockingFlush();
 }
 
-private void flushIfAble() {
-while (bufferedRequestEntries.size() >= maxBatchSize
-|| bufferedRequestEntriesTotalSizeInBytes >= 
maxBatchSizeInBytes) {
+/**
+ * Determines if a call to flush will be non-blocking (i.e. {@code 
inFlightRequestsCount} is
+ * strictly smaller than {@code maxInFlightRequests}). Also requires one 
of the following
+ * requirements to be met:
+ *
+ * 
+ *   The number of elements buffered is greater than or equal to the 
{@code maxBatchSize}
+ *   The sum of the size in bytes of all records in the buffer is 
greater than or equal to
+ *   {@code maxBatchSizeInBytes}
+ * 
+ */
+private void nonBlockingFlush() {
+boolean uncompletedInFlightResponses = true;
+while (uncompletedInFlightResponses) {
+uncompletedInFlightResponses = mailboxExecutor.tryYield();
+}

Review comment:
   I think this is completely wonderful. I believe then the code for 
`completeRequest()` should be:
   
   ```
   private void completeRequest(List failedRequestEntries, 
long requestStartTime) {
   // do completeRequest stuff including reducing the 
inFlightRequestsCount, etc.
   mailboxExecutor.tryYield();
   nonBlockingFlush();
   }
   ```
   
   We would get all the previously alluded to benefits.
   
   My only question (and maybe worry) would be if we had a very large number of 
in flight requests that have all completed (say 100+? since it's customisable 
by user). Once one `completeRequest` is triggered, the 
`mailboxExecutor.tryYield()` would repeatedly yield to the next completed 
request in a daisy chain of length equal to the number of completed in flight 
requests. 
   
   I imagine the state of the mailbox thread would have to live alongside the 
others' states during that time and I was wondering would that risk us using 
more stack/heap than otherwise and thereby risking an overflow?




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [flink] CrynetLogistics commented on a change in pull request #18651: [FLINK-25792][connectors] Only flushing the async sink base if it is …

2022-02-24 Thread GitBox


CrynetLogistics commented on a change in pull request #18651:
URL: https://github.com/apache/flink/pull/18651#discussion_r813767413



##
File path: 
flink-connectors/flink-connector-base/src/main/java/org/apache/flink/connector/base/sink/writer/AsyncSinkWriter.java
##
@@ -313,7 +329,14 @@ private void flushIfAble() {
  */
 private void flush() {
 while (inFlightRequestsCount >= maxInFlightRequests) {
-mailboxExecutor.tryYield();
+try {
+mailboxExecutor.yield();
+} catch (InterruptedException e) {

Review comment:
   Implemented! :-)




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [flink] CrynetLogistics commented on a change in pull request #18651: [FLINK-25792][connectors] Only flushing the async sink base if it is …

2022-02-24 Thread GitBox


CrynetLogistics commented on a change in pull request #18651:
URL: https://github.com/apache/flink/pull/18651#discussion_r813746213



##
File path: 
flink-connectors/flink-connector-base/src/main/java/org/apache/flink/connector/base/sink/writer/AsyncSinkWriter.java
##
@@ -313,7 +329,14 @@ private void flushIfAble() {
  */
 private void flush() {
 while (inFlightRequestsCount >= maxInFlightRequests) {
-mailboxExecutor.tryYield();
+try {
+mailboxExecutor.yield();
+} catch (InterruptedException e) {

Review comment:
   Makes sense, it was a tough decision for me to handle it here or in the 
4 places that call `flush()`. 3 of those 4 places throw an 
`InterruptedException` executing on the mailbox thread anyway, so it has the 
same effect. So I will move this block to the fourth and final place the 
`flush(boolean flush)` method.




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [flink] CrynetLogistics commented on a change in pull request #18651: [FLINK-25792][connectors] Only flushing the async sink base if it is …

2022-02-23 Thread GitBox


CrynetLogistics commented on a change in pull request #18651:
URL: https://github.com/apache/flink/pull/18651#discussion_r813374887



##
File path: 
flink-connectors/flink-connector-base/src/main/java/org/apache/flink/connector/base/sink/writer/AsyncSinkWriter.java
##
@@ -290,17 +290,33 @@ private void registerCallback() {
 @Override
 public void write(InputT element, Context context) throws IOException, 
InterruptedException {
 while (bufferedRequestEntries.size() >= maxBufferedRequests) {
-mailboxExecutor.tryYield();
+flush();

Review comment:
   9. Additionally to align with @pnowojski 's example, in this sink, we:
   a) start flushing if `sizeOfBuffer >= maxBatchSize` or `sizeOfBufferBytes >= 
maxBatchSizeInBytes`
   b) block new flushes once `inFlightRequestsCount` >= `maxInFlightRequests`
- unblock as soon as the condition does not hold true
   c) block new writes if the buffer is full
- unblock as soon as the condition does not hold true




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [flink] CrynetLogistics commented on a change in pull request #18651: [FLINK-25792][connectors] Only flushing the async sink base if it is …

2022-02-23 Thread GitBox


CrynetLogistics commented on a change in pull request #18651:
URL: https://github.com/apache/flink/pull/18651#discussion_r813094772



##
File path: 
flink-connectors/flink-connector-base/src/main/java/org/apache/flink/connector/base/sink/writer/AsyncSinkWriter.java
##
@@ -290,17 +290,33 @@ private void registerCallback() {
 @Override
 public void write(InputT element, Context context) throws IOException, 
InterruptedException {
 while (bufferedRequestEntries.size() >= maxBufferedRequests) {
-mailboxExecutor.tryYield();
+flush();
 }
 
 addEntryToBuffer(elementConverter.apply(element, context), false);
 
-flushIfAble();
+nonBlockingFlush();
 }
 
-private void flushIfAble() {
-while (bufferedRequestEntries.size() >= maxBatchSize
-|| bufferedRequestEntriesTotalSizeInBytes >= 
maxBatchSizeInBytes) {
+/**
+ * Determines if a call to flush will be non-blocking (i.e. {@code 
inFlightRequestsCount} is
+ * strictly smaller than {@code maxInFlightRequests}). Also requires one 
of the following
+ * requirements to be met:
+ *
+ * 
+ *   The number of elements buffered is greater than or equal to the 
{@code maxBatchSize}
+ *   The sum of the size in bytes of all records in the buffer is 
greater than or equal to
+ *   {@code maxBatchSizeInBytes}
+ * 
+ */
+private void nonBlockingFlush() {
+boolean uncompletedInFlightResponses = true;
+while (uncompletedInFlightResponses) {
+uncompletedInFlightResponses = mailboxExecutor.tryYield();
+}
+while (inFlightRequestsCount < maxInFlightRequests
+&& (bufferedRequestEntries.size() >= maxBatchSize
+|| bufferedRequestEntriesTotalSizeInBytes >= 
maxBatchSizeInBytes)) {

Review comment:
   The resulting behaviour is: "If we have enough elements in the buffer to 
make a batch and we can flush that without blocking, then we should do that in 
the earliest possible opportunity"




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [flink] CrynetLogistics commented on a change in pull request #18651: [FLINK-25792][connectors] Only flushing the async sink base if it is …

2022-02-23 Thread GitBox


CrynetLogistics commented on a change in pull request #18651:
URL: https://github.com/apache/flink/pull/18651#discussion_r813051820



##
File path: 
flink-connectors/flink-connector-base/src/main/java/org/apache/flink/connector/base/sink/writer/AsyncSinkWriter.java
##
@@ -290,17 +290,33 @@ private void registerCallback() {
 @Override
 public void write(InputT element, Context context) throws IOException, 
InterruptedException {
 while (bufferedRequestEntries.size() >= maxBufferedRequests) {
-mailboxExecutor.tryYield();
+flush();
 }
 
 addEntryToBuffer(elementConverter.apply(element, context), false);
 
-flushIfAble();
+nonBlockingFlush();
 }
 
-private void flushIfAble() {
-while (bufferedRequestEntries.size() >= maxBatchSize
-|| bufferedRequestEntriesTotalSizeInBytes >= 
maxBatchSizeInBytes) {
+/**
+ * Determines if a call to flush will be non-blocking (i.e. {@code 
inFlightRequestsCount} is
+ * strictly smaller than {@code maxInFlightRequests}). Also requires one 
of the following
+ * requirements to be met:
+ *
+ * 
+ *   The number of elements buffered is greater than or equal to the 
{@code maxBatchSize}
+ *   The sum of the size in bytes of all records in the buffer is 
greater than or equal to
+ *   {@code maxBatchSizeInBytes}
+ * 
+ */
+private void nonBlockingFlush() {
+boolean uncompletedInFlightResponses = true;
+while (uncompletedInFlightResponses) {
+uncompletedInFlightResponses = mailboxExecutor.tryYield();
+}

Review comment:
   6. To that end, these lines simply completes any in flight requests that 
have completed asynchronously and called the `completeRequest` method, thereby 
reducing the in flight request count. This loop will not block and will return 
immediately if there are none to be completed. 
   
   7. This then allows the next method to accurately determine whether the call 
to flush will be blocking. 
   
   If it will not be blocking, we benefit from:
* eagerly flushing a batch of records that have reached our desired size 
(by number or by bytes), and,
* failed elements will be retried sooner than otherwise due to eager 
completion of in flight requests.
   
   If it will be blocking, we benefit from:
* returning immediately and not calling flush - and therefore not 
unnecessarily blocking because the element has been already added to the buffer.




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [flink] CrynetLogistics commented on a change in pull request #18651: [FLINK-25792][connectors] Only flushing the async sink base if it is …

2022-02-23 Thread GitBox


CrynetLogistics commented on a change in pull request #18651:
URL: https://github.com/apache/flink/pull/18651#discussion_r813056834



##
File path: 
flink-connectors/flink-connector-base/src/main/java/org/apache/flink/connector/base/sink/writer/AsyncSinkWriter.java
##
@@ -290,17 +290,33 @@ private void registerCallback() {
 @Override
 public void write(InputT element, Context context) throws IOException, 
InterruptedException {
 while (bufferedRequestEntries.size() >= maxBufferedRequests) {
-mailboxExecutor.tryYield();
+flush();

Review comment:
   8. Note, this is the only time in the write method that we call flush 
without checking whether it will block or not. This is precisely when our 
buffer is full. In this case, we have no choice but to do the only action that 
has a chance of reducing the number of elements in the buffer - which is to 
flush. If there are free in flight requests then flush will be non blocking and 
complete after firing off that in flight request. If there are NO free in 
flight requests, then this call will block until the number of in flight 
requests is less than the allowed number.
   
   Therefore an important corollary appears: "calls to write will block if and 
only if the buffer is full and the number of in flight requests is equal to or 
greater than the maximum allowed".




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [flink] CrynetLogistics commented on a change in pull request #18651: [FLINK-25792][connectors] Only flushing the async sink base if it is …

2022-02-23 Thread GitBox


CrynetLogistics commented on a change in pull request #18651:
URL: https://github.com/apache/flink/pull/18651#discussion_r813051820



##
File path: 
flink-connectors/flink-connector-base/src/main/java/org/apache/flink/connector/base/sink/writer/AsyncSinkWriter.java
##
@@ -290,17 +290,33 @@ private void registerCallback() {
 @Override
 public void write(InputT element, Context context) throws IOException, 
InterruptedException {
 while (bufferedRequestEntries.size() >= maxBufferedRequests) {
-mailboxExecutor.tryYield();
+flush();
 }
 
 addEntryToBuffer(elementConverter.apply(element, context), false);
 
-flushIfAble();
+nonBlockingFlush();
 }
 
-private void flushIfAble() {
-while (bufferedRequestEntries.size() >= maxBatchSize
-|| bufferedRequestEntriesTotalSizeInBytes >= 
maxBatchSizeInBytes) {
+/**
+ * Determines if a call to flush will be non-blocking (i.e. {@code 
inFlightRequestsCount} is
+ * strictly smaller than {@code maxInFlightRequests}). Also requires one 
of the following
+ * requirements to be met:
+ *
+ * 
+ *   The number of elements buffered is greater than or equal to the 
{@code maxBatchSize}
+ *   The sum of the size in bytes of all records in the buffer is 
greater than or equal to
+ *   {@code maxBatchSizeInBytes}
+ * 
+ */
+private void nonBlockingFlush() {
+boolean uncompletedInFlightResponses = true;
+while (uncompletedInFlightResponses) {
+uncompletedInFlightResponses = mailboxExecutor.tryYield();
+}

Review comment:
   6. To that end, these lines simply completes any in flight requests that 
have completed asynchronously and called the `completeRequest` method, thereby 
reducing the in flight request count. This loop will not block and will return 
immediately if there are none to be completed. 
   
   7. This then allows the next method to accurately determine whether the call 
to flush will be blocking. If it will not be blocking, we benefit from:
* eagerly flushing a batch of records that have reached our desired size 
(by number or by bytes), and,
* failed elements will be retried sooner than otherwise due to eager 
completion of in flight requests.
   If it will be blocking, we benefit from:
* returning immediately and not calling flush - and therefore not 
unnecessarily blocking because the element has been already added to the buffer.




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [flink] CrynetLogistics commented on a change in pull request #18651: [FLINK-25792][connectors] Only flushing the async sink base if it is …

2022-02-23 Thread GitBox


CrynetLogistics commented on a change in pull request #18651:
URL: https://github.com/apache/flink/pull/18651#discussion_r813043737



##
File path: 
flink-connectors/flink-connector-base/src/main/java/org/apache/flink/connector/base/sink/writer/AsyncSinkWriter.java
##
@@ -290,17 +290,33 @@ private void registerCallback() {
 @Override
 public void write(InputT element, Context context) throws IOException, 
InterruptedException {
 while (bufferedRequestEntries.size() >= maxBufferedRequests) {
-mailboxExecutor.tryYield();
+flush();
 }
 
 addEntryToBuffer(elementConverter.apply(element, context), false);
 
-flushIfAble();
+nonBlockingFlush();
 }
 
-private void flushIfAble() {
-while (bufferedRequestEntries.size() >= maxBatchSize
-|| bufferedRequestEntriesTotalSizeInBytes >= 
maxBatchSizeInBytes) {
+/**
+ * Determines if a call to flush will be non-blocking (i.e. {@code 
inFlightRequestsCount} is
+ * strictly smaller than {@code maxInFlightRequests}). Also requires one 
of the following
+ * requirements to be met:
+ *
+ * 
+ *   The number of elements buffered is greater than or equal to the 
{@code maxBatchSize}
+ *   The sum of the size in bytes of all records in the buffer is 
greater than or equal to
+ *   {@code maxBatchSizeInBytes}
+ * 
+ */
+private void nonBlockingFlush() {
+boolean uncompletedInFlightResponses = true;
+while (uncompletedInFlightResponses) {
+uncompletedInFlightResponses = mailboxExecutor.tryYield();
+}
+while (inFlightRequestsCount < maxInFlightRequests
+&& (bufferedRequestEntries.size() >= maxBatchSize
+|| bufferedRequestEntriesTotalSizeInBytes >= 
maxBatchSizeInBytes)) {

Review comment:
   4. This raises an interesting point. We know flush() will block if and 
only if there are too many in flight requests (`inFlightRequestsCount >= 
maxInFlightRequests`) AND if at least one of those in flight requests have not 
yet called the `completeRequest` method. 
   
   5. If we determine whether at least one of those in flight requests have not 
yet called the `completeRequest` method, then we can accurately ascertain 
whether a call to flush will block or not.




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [flink] CrynetLogistics commented on a change in pull request #18651: [FLINK-25792][connectors] Only flushing the async sink base if it is …

2022-02-23 Thread GitBox


CrynetLogistics commented on a change in pull request #18651:
URL: https://github.com/apache/flink/pull/18651#discussion_r813037580



##
File path: 
flink-connectors/flink-connector-base/src/main/java/org/apache/flink/connector/base/sink/writer/AsyncSinkWriter.java
##
@@ -290,17 +290,33 @@ private void registerCallback() {
 @Override
 public void write(InputT element, Context context) throws IOException, 
InterruptedException {
 while (bufferedRequestEntries.size() >= maxBufferedRequests) {
-mailboxExecutor.tryYield();
+flush();
 }
 
 addEntryToBuffer(elementConverter.apply(element, context), false);
 
-flushIfAble();
+nonBlockingFlush();
 }
 
-private void flushIfAble() {
-while (bufferedRequestEntries.size() >= maxBatchSize
-|| bufferedRequestEntriesTotalSizeInBytes >= 
maxBatchSizeInBytes) {
+/**
+ * Determines if a call to flush will be non-blocking (i.e. {@code 
inFlightRequestsCount} is
+ * strictly smaller than {@code maxInFlightRequests}). Also requires one 
of the following
+ * requirements to be met:
+ *
+ * 
+ *   The number of elements buffered is greater than or equal to the 
{@code maxBatchSize}
+ *   The sum of the size in bytes of all records in the buffer is 
greater than or equal to
+ *   {@code maxBatchSizeInBytes}
+ * 
+ */
+private void nonBlockingFlush() {
+boolean uncompletedInFlightResponses = true;
+while (uncompletedInFlightResponses) {
+uncompletedInFlightResponses = mailboxExecutor.tryYield();
+}
+while (inFlightRequestsCount < maxInFlightRequests
+&& (bufferedRequestEntries.size() >= maxBatchSize
+|| bufferedRequestEntriesTotalSizeInBytes >= 
maxBatchSizeInBytes)) {

Review comment:
   3. I have added a condition here to only call `flush()` if we know 
beforehand it is not blocking. This is desirable because the element to be 
written has already been added to the buffer, so should not proceed to flush 
unless we are certain that operation will not block.




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [flink] CrynetLogistics commented on a change in pull request #18651: [FLINK-25792][connectors] Only flushing the async sink base if it is …

2022-02-23 Thread GitBox


CrynetLogistics commented on a change in pull request #18651:
URL: https://github.com/apache/flink/pull/18651#discussion_r813032727



##
File path: 
flink-connectors/flink-connector-base/src/main/java/org/apache/flink/connector/base/sink/writer/AsyncSinkWriter.java
##
@@ -290,17 +290,33 @@ private void registerCallback() {
 @Override
 public void write(InputT element, Context context) throws IOException, 
InterruptedException {
 while (bufferedRequestEntries.size() >= maxBufferedRequests) {
-mailboxExecutor.tryYield();
+flush();

Review comment:
   2. I have changed the tryYield to a flush. Consider the case where the 
number of requests in the buffer is greater than the `maxBufferedRequests`. 
Calling tryYield is not guaranteed to reduce the number of elements in the 
buffer, e.g. if there are no more mailbox messages scheduled to be run or ready 
to run.




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [flink] CrynetLogistics commented on a change in pull request #18651: [FLINK-25792][connectors] Only flushing the async sink base if it is …

2022-02-23 Thread GitBox


CrynetLogistics commented on a change in pull request #18651:
URL: https://github.com/apache/flink/pull/18651#discussion_r813029451



##
File path: 
flink-connectors/flink-connector-base/src/main/java/org/apache/flink/connector/base/sink/writer/AsyncSinkWriter.java
##
@@ -313,7 +329,14 @@ private void flushIfAble() {
  */
 private void flush() {
 while (inFlightRequestsCount >= maxInFlightRequests) {
-mailboxExecutor.tryYield();
+try {
+mailboxExecutor.yield();
+} catch (InterruptedException e) {
+getFatalExceptionCons()
+.accept(
+new InterruptedException(
+"The mailbox thread was interrupted 
while waiting for asynchronous write operations to complete."));
+}

Review comment:
   1. You mention that I am busy waiting in `flush()` method, and I 
completely agree. I have made a change to using `.yield()` while there are too 
many in flight requests. It will wait for one of the in flight requests to 
complete before flushing and thereby creating its own in flight request.




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [flink] CrynetLogistics commented on a change in pull request #18651: [FLINK-25792][connectors] Only flushing the async sink base if it is …

2022-02-17 Thread GitBox


CrynetLogistics commented on a change in pull request #18651:
URL: https://github.com/apache/flink/pull/18651#discussion_r809011341



##
File path: 
flink-connectors/flink-connector-base/src/main/java/org/apache/flink/connector/base/sink/writer/AsyncSinkWriter.java
##
@@ -267,18 +267,31 @@ private void registerCallback() {
 
 @Override
 public void write(InputT element, Context context) throws IOException, 
InterruptedException {
+while (mailboxExecutor.tryYield()) {}

Review comment:
   Thanks @pnowojski will start work on it next week.




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [flink] CrynetLogistics commented on a change in pull request #18651: [FLINK-25792][connectors] Only flushing the async sink base if it is …

2022-02-10 Thread GitBox


CrynetLogistics commented on a change in pull request #18651:
URL: https://github.com/apache/flink/pull/18651#discussion_r803592814



##
File path: 
flink-connectors/flink-connector-base/src/main/java/org/apache/flink/connector/base/sink/writer/AsyncSinkWriter.java
##
@@ -267,18 +267,31 @@ private void registerCallback() {
 
 @Override
 public void write(InputT element, Context context) throws IOException, 
InterruptedException {
+while (mailboxExecutor.tryYield()) {}

Review comment:
   @dmvk Would you mind letting us know what technical issues you feel 
there are here? I would be happy to address them and make fixes if necessary.
   
   To answer your previous question, the semantic is don't buffer or write 
anything if there are any failed requests waiting to be requeued or fatal 
exceptions to fail the app with. If the user has super frequent checkpointing, 
the async threads will be taking care of writing to the destination, and the 
buffering will proceed as normal and not block here.




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [flink] CrynetLogistics commented on a change in pull request #18651: [FLINK-25792][connectors] Only flushing the async sink base if it is …

2022-02-10 Thread GitBox


CrynetLogistics commented on a change in pull request #18651:
URL: https://github.com/apache/flink/pull/18651#discussion_r803581562



##
File path: 
flink-connectors/flink-connector-base/src/main/java/org/apache/flink/connector/base/sink/writer/AsyncSinkWriter.java
##
@@ -267,18 +267,31 @@ private void registerCallback() {
 
 @Override
 public void write(InputT element, Context context) throws IOException, 
InterruptedException {
+while (mailboxExecutor.tryYield()) {}

Review comment:
   Just wanted to back up my feeling that I don't feel we have a busy loop 
on the 
[tryYield()](https://nightlies.apache.org/flink/flink-docs-master/api/java//org/apache/flink/streaming/runtime/tasks/mailbox/MailboxExecutorImpl.html#tryYield--):
   set number of mailbox threads, call to tryyield will run the first one, and 
return true if it ran, until there are no more, when it will return false 




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [flink] CrynetLogistics commented on a change in pull request #18651: [FLINK-25792][connectors] Only flushing the async sink base if it is …

2022-02-10 Thread GitBox


CrynetLogistics commented on a change in pull request #18651:
URL: https://github.com/apache/flink/pull/18651#discussion_r803573163



##
File path: 
flink-connectors/flink-connector-base/src/main/java/org/apache/flink/connector/base/sink/writer/AsyncSinkWriter.java
##
@@ -267,18 +267,31 @@ private void registerCallback() {
 
 @Override
 public void write(InputT element, Context context) throws IOException, 
InterruptedException {
+while (mailboxExecutor.tryYield()) {}

Review comment:
   Hi @pnowojski, thanks, just on the questions:
   1. There are two types of scenarios where we enqueue to the mailbox (1) to 
handle fatal exceptions and (2) to add to the buffer any failed request 
entries. I believe, these should take priority over flushing new items?
   2. I do agree the line is dubious, perhaps, this is more appropriate: (the 
behaviour would remain identical)
   ```
   while (inFlightRequestsCount > 0) {
   mailboxExecutor.yield();
   }
   ```
   3. Perhaps I'm mistaken, but I don't believe we have a busy loop here. i.e. 
if `mailboxExecutor.tryYield()` returns true, there is some work to be 
elsewhere in the mailbox, then we perform that. Otherwise, it will return false 
and the loop will end. I can't see where CPU resources is being wasted?




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [flink] CrynetLogistics commented on a change in pull request #18651: [FLINK-25792][connectors] Only flushing the async sink base if it is …

2022-02-10 Thread GitBox


CrynetLogistics commented on a change in pull request #18651:
URL: https://github.com/apache/flink/pull/18651#discussion_r803573163



##
File path: 
flink-connectors/flink-connector-base/src/main/java/org/apache/flink/connector/base/sink/writer/AsyncSinkWriter.java
##
@@ -267,18 +267,31 @@ private void registerCallback() {
 
 @Override
 public void write(InputT element, Context context) throws IOException, 
InterruptedException {
+while (mailboxExecutor.tryYield()) {}

Review comment:
   Hi @pnowojski, thanks, just on the questions:
   1. There are two types of scenarios where we enqueue to the mailbox (1) to 
handle fatal exceptions and (2) to add to the buffer any failed request 
entries. I believe, these should take priority over flushing new items?
   2. I do agree the line is dubious, perhaps, this is more appropriate:
   ```
   while (inFlightRequestsCount > 0) {
   mailboxExecutor.yield();
   }
   ```
   3. Perhaps I'm mistaken, but I don't believe we have a busy loop here. i.e. 
if `mailboxExecutor.tryYield()` returns true, there is some work to be 
elsewhere in the mailbox, then we perform that. Otherwise, it will return false 
and the loop will end. I can't see where CPU resources is being wasted?




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org