This is an automated email from the ASF dual-hosted git repository.

tzulitai pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/flink-statefun.git

commit af6b87c29b6b9d07444ed2c10bfc0f3b12810484
Author: Igal Shilman <[email protected]>
AuthorDate: Thu Feb 20 22:14:28 2020 +0100

    [FLINK-16063][core] Use BackpressureValve in AsyncSink
---
 .../statefun/flink/core/functions/AsyncSink.java      | 19 ++++++++++++++++++-
 1 file changed, 18 insertions(+), 1 deletion(-)

diff --git 
a/statefun-flink/statefun-flink-core/src/main/java/org/apache/flink/statefun/flink/core/functions/AsyncSink.java
 
b/statefun-flink/statefun-flink-core/src/main/java/org/apache/flink/statefun/flink/core/functions/AsyncSink.java
index 6ba6d51..aa8497b 100644
--- 
a/statefun-flink/statefun-flink-core/src/main/java/org/apache/flink/statefun/flink/core/functions/AsyncSink.java
+++ 
b/statefun-flink/statefun-flink-core/src/main/java/org/apache/flink/statefun/flink/core/functions/AsyncSink.java
@@ -23,17 +23,20 @@ import java.util.concurrent.CompletableFuture;
 import java.util.concurrent.Executor;
 import java.util.concurrent.ThreadLocalRandom;
 import org.apache.flink.api.common.state.MapState;
+import org.apache.flink.statefun.flink.core.backpressure.BackPressureValve;
 import org.apache.flink.statefun.flink.core.di.Inject;
 import org.apache.flink.statefun.flink.core.di.Label;
 import org.apache.flink.statefun.flink.core.di.Lazy;
 import org.apache.flink.statefun.flink.core.message.Message;
 import org.apache.flink.statefun.flink.core.queue.Locks;
 import org.apache.flink.statefun.flink.core.queue.MpscQueue;
+import org.apache.flink.statefun.sdk.Address;
 
 final class AsyncSink {
   private final MapState<Long, Message> pendingAsyncOperations;
   private final Lazy<Reductions> reductions;
   private final Executor operatorMailbox;
+  private final BackPressureValve backPressureValve;
 
   private final MpscQueue<Message> completed = new MpscQueue<>(32768, 
Locks.jdkReentrantLock());
 
@@ -41,10 +44,12 @@ final class AsyncSink {
   AsyncSink(
       @Label("async-operations") MapState<Long, Message> 
pendingAsyncOperations,
       @Label("mailbox-executor") Executor operatorMailbox,
-      @Label("reductions") Lazy<Reductions> reductions) {
+      @Label("reductions") Lazy<Reductions> reductions,
+      @Label("backpressure-valve") BackPressureValve backPressureValve) {
     this.pendingAsyncOperations = 
Objects.requireNonNull(pendingAsyncOperations);
     this.reductions = Objects.requireNonNull(reductions);
     this.operatorMailbox = Objects.requireNonNull(operatorMailbox);
+    this.backPressureValve = Objects.requireNonNull(backPressureValve);
   }
 
   <T> void accept(Message metadata, CompletableFuture<T> future) {
@@ -60,9 +65,20 @@ final class AsyncSink {
     } catch (Exception e) {
       throw new RuntimeException(e);
     }
+    backPressureValve.notifyAsyncOperationRegistered();
     future.whenComplete((result, throwable) -> enqueue(metadata, futureId, 
result, throwable));
   }
 
+  /**
+   * Requests to stop processing any further input for that address, as long 
as there is an
+   * uncompleted async operation (owned by @address).
+   *
+   * @param address the address
+   */
+  void blockAddress(Address address) {
+    backPressureValve.blockAddress(address);
+  }
+
   private <T> void enqueue(Message message, long futureId, T result, Throwable 
throwable) {
     AsyncMessageDecorator<T> decoratedMessage =
         new AsyncMessageDecorator<>(pendingAsyncOperations, futureId, message, 
result, throwable);
@@ -79,6 +95,7 @@ final class AsyncSink {
     Reductions reductions = this.reductions.get();
     Message message;
     while ((message = batchOfCompletedFutures.poll()) != null) {
+      backPressureValve.notifyAsyncOperationCompleted(message.target());
       reductions.enqueue(message);
     }
     reductions.processEnvelopes();

Reply via email to