This is an automated email from the ASF dual-hosted git repository. tzulitai pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/flink-statefun.git
commit f3f5c729502d333b0410ea84689234831e9b3a2b Author: Igal Shilman <[email protected]> AuthorDate: Thu Feb 20 22:26:45 2020 +0100 [FLINK-16063][core] Add address blocking to ReusableContext This commit introdcues an internal interface to be used by internal built in functions (like HttpFunction) to signal to the runtime, that it can not take any more input. --- .../flink/core/backpressure/AsyncWaiter.java | 39 ++++++++++++++++++++++ .../flink/core/functions/ReusableContext.java | 8 ++++- 2 files changed, 46 insertions(+), 1 deletion(-) diff --git a/statefun-flink/statefun-flink-core/src/main/java/org/apache/flink/statefun/flink/core/backpressure/AsyncWaiter.java b/statefun-flink/statefun-flink-core/src/main/java/org/apache/flink/statefun/flink/core/backpressure/AsyncWaiter.java new file mode 100644 index 0000000..ddbe247 --- /dev/null +++ b/statefun-flink/statefun-flink-core/src/main/java/org/apache/flink/statefun/flink/core/backpressure/AsyncWaiter.java @@ -0,0 +1,39 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.statefun.flink.core.backpressure; + +import org.apache.flink.annotation.Internal; + +@Internal +public interface AsyncWaiter { + + /** + * Signals the runtime to stop invoking the currently executing function with new input until at + * least one {@link org.apache.flink.statefun.sdk.AsyncOperationResult} belonging to this function + * would be delivered. + * + * <p>NOTE: If a function would request to block without actually registering any async operations + * either previously or during its current invocation, then it would remain blocked. Since this is + * an internal API to be used by the remote functions we don't do anything to prevent that. + * + * <p>If we would like it to be a part of the SDK then we would have to make sure that we track + * every async operation registered per each address. + */ + void awaitAsyncOperationComplete(); +} diff --git a/statefun-flink/statefun-flink-core/src/main/java/org/apache/flink/statefun/flink/core/functions/ReusableContext.java b/statefun-flink/statefun-flink-core/src/main/java/org/apache/flink/statefun/flink/core/functions/ReusableContext.java index 1408d8b..55c646b 100644 --- a/statefun-flink/statefun-flink-core/src/main/java/org/apache/flink/statefun/flink/core/functions/ReusableContext.java +++ b/statefun-flink/statefun-flink-core/src/main/java/org/apache/flink/statefun/flink/core/functions/ReusableContext.java @@ -20,6 +20,7 @@ package org.apache.flink.statefun.flink.core.functions; import java.time.Duration; import java.util.Objects; import java.util.concurrent.CompletableFuture; +import org.apache.flink.statefun.flink.core.backpressure.AsyncWaiter; import org.apache.flink.statefun.flink.core.di.Inject; import org.apache.flink.statefun.flink.core.di.Label; import org.apache.flink.statefun.flink.core.message.Message; @@ -28,7 +29,7 @@ import org.apache.flink.statefun.flink.core.state.State; import org.apache.flink.statefun.sdk.Address; import org.apache.flink.statefun.sdk.io.EgressIdentifier; -final class ReusableContext implements ApplyingContext { +final class ReusableContext implements ApplyingContext, AsyncWaiter { private final Partition thisPartition; private final LocalSink localSink; private final RemoteSink remoteSink; @@ -116,6 +117,11 @@ final class ReusableContext implements ApplyingContext { } @Override + public void awaitAsyncOperationComplete() { + asyncSink.blockAddress(self()); + } + + @Override public Address caller() { return in.source(); }
