This is an automated email from the ASF dual-hosted git repository. tzulitai pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/flink-statefun.git
commit f33669ce04ff9437f68e91fbbc5f99c75c513cfe Author: Igal Shilman <[email protected]> AuthorDate: Thu Feb 20 22:02:06 2020 +0100 [FLINK-16063][core] Add a BackPressureValve This commit adds a mechanism to express back pressure management along with a simple threshold based implementation. --- .../flink/core/backpressure/BackPressureValve.java | 56 ++++++++++++ .../backpressure/ThresholdBackPressureValve.java | 99 ++++++++++++++++++++++ .../ThresholdBackPressureValveTest.java | 92 ++++++++++++++++++++ 3 files changed, 247 insertions(+) diff --git a/statefun-flink/statefun-flink-core/src/main/java/org/apache/flink/statefun/flink/core/backpressure/BackPressureValve.java b/statefun-flink/statefun-flink-core/src/main/java/org/apache/flink/statefun/flink/core/backpressure/BackPressureValve.java new file mode 100644 index 0000000..29f1c19 --- /dev/null +++ b/statefun-flink/statefun-flink-core/src/main/java/org/apache/flink/statefun/flink/core/backpressure/BackPressureValve.java @@ -0,0 +1,56 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.statefun.flink.core.backpressure; + +import java.util.concurrent.CompletableFuture; +import org.apache.flink.statefun.sdk.Address; + +public interface BackPressureValve { + + /** + * Indicates rather a back pressure is needed. + * + * @return true if a back pressure should be applied. + */ + boolean shouldBackPressure(); + + /** + * Notifies the back pressure mechanism that a async operation was registered via {@link + * org.apache.flink.statefun.sdk.Context#registerAsyncOperation(Object, CompletableFuture)}. + */ + void notifyAsyncOperationRegistered(); + + /** + * Notifies when a async operation, registered by @owningAddress was completed. + * + * @param owningAddress the owner of the completed async operation. + */ + void notifyAsyncOperationCompleted(Address owningAddress); + + /** + * Requests to stop processing any further input for that address, as long as there is an + * uncompleted async operation (registered by @address). + * + * <p>NOTE: The address would unblocked as soon as some (one) async operation registered by that + * address completes. + * + * @param address the address + */ + void blockAddress(Address address); +} diff --git a/statefun-flink/statefun-flink-core/src/main/java/org/apache/flink/statefun/flink/core/backpressure/ThresholdBackPressureValve.java b/statefun-flink/statefun-flink-core/src/main/java/org/apache/flink/statefun/flink/core/backpressure/ThresholdBackPressureValve.java new file mode 100644 index 0000000..1ce0b00 --- /dev/null +++ b/statefun-flink/statefun-flink-core/src/main/java/org/apache/flink/statefun/flink/core/backpressure/ThresholdBackPressureValve.java @@ -0,0 +1,99 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.statefun.flink.core.backpressure; + +import it.unimi.dsi.fastutil.objects.ObjectOpenHashMap; +import java.util.Objects; +import org.apache.flink.statefun.sdk.Address; + +/** + * A simple Threshold based {@link BackPressureValve}. + * + * <p>There are two cases where a backpressure would be triggered: + * + * <ul> + * <li>The total number of in-flight async operations in a StreamTask exceeds a predefined + * threshold. This is tracked by {@link + * ThresholdBackPressureValve#pendingAsynchronousOperationsCount}, it is incremented when an + * async operation is registered, and decremented when it is completed. + * <li>A specific address has requested to stop processing new inputs, this is tracked by the + * {@link ThresholdBackPressureValve#blockedAddressSet}. The method {@link + * ThresholdBackPressureValve#notifyAsyncOperationCompleted(Address)} is meant to be called + * when ANY async operation has been completed. + * </ul> + */ +public final class ThresholdBackPressureValve implements BackPressureValve { + private final int maximumPendingAsynchronousOperations; + + /** + * a set of address that had explicitly requested to stop processing any new inputs (via {@link + * AsyncWaiter#awaitAsyncOperationComplete()}. Note that this is a set implemented on top of a + * map, and the value (Boolean) has no meaning. + */ + private final ObjectOpenHashMap<Address, Boolean> blockedAddressSet = + new ObjectOpenHashMap<>(1024); + + private int pendingAsynchronousOperationsCount; + + /** + * Constructs a ThresholdBackPressureValve. + * + * @param maximumPendingAsynchronousOperations the total allowed async operations to be inflight + * per StreamTask, or {@code -1} to disable back pressure. + */ + public ThresholdBackPressureValve(int maximumPendingAsynchronousOperations) { + this.maximumPendingAsynchronousOperations = maximumPendingAsynchronousOperations; + } + + /** {@inheritDoc} */ + @Override + public boolean shouldBackPressure() { + return totalPendingAsyncOperationsAtCapacity() || hasBlockedAddress(); + } + + /** {@inheritDoc} */ + @Override + public void blockAddress(Address address) { + Objects.requireNonNull(address); + blockedAddressSet.put(address, Boolean.TRUE); + } + + /** {@inheritDoc} */ + @Override + public void notifyAsyncOperationRegistered() { + pendingAsynchronousOperationsCount++; + } + + /** {@inheritDoc} */ + @Override + public void notifyAsyncOperationCompleted(Address owningAddress) { + Objects.requireNonNull(owningAddress); + pendingAsynchronousOperationsCount = Math.max(0, pendingAsynchronousOperationsCount - 1); + blockedAddressSet.remove(owningAddress); + } + + private boolean totalPendingAsyncOperationsAtCapacity() { + return maximumPendingAsynchronousOperations > 0 + && pendingAsynchronousOperationsCount >= maximumPendingAsynchronousOperations; + } + + private boolean hasBlockedAddress() { + return !blockedAddressSet.isEmpty(); + } +} diff --git a/statefun-flink/statefun-flink-core/src/test/java/org/apache/flink/statefun/flink/core/backpressure/ThresholdBackPressureValveTest.java b/statefun-flink/statefun-flink-core/src/test/java/org/apache/flink/statefun/flink/core/backpressure/ThresholdBackPressureValveTest.java new file mode 100644 index 0000000..4ececcf --- /dev/null +++ b/statefun-flink/statefun-flink-core/src/test/java/org/apache/flink/statefun/flink/core/backpressure/ThresholdBackPressureValveTest.java @@ -0,0 +1,92 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.statefun.flink.core.backpressure; + +import static org.junit.Assert.assertFalse; +import static org.junit.Assert.assertTrue; + +import org.apache.flink.statefun.flink.core.TestUtils; +import org.junit.Test; + +public class ThresholdBackPressureValveTest { + + @Test + public void simpleUsage() { + ThresholdBackPressureValve valve = new ThresholdBackPressureValve(2); + + valve.notifyAsyncOperationRegistered(); + valve.notifyAsyncOperationRegistered(); + + assertTrue(valve.shouldBackPressure()); + } + + @Test + public void completedOperationReleaseBackpressure() { + ThresholdBackPressureValve valve = new ThresholdBackPressureValve(1); + + valve.notifyAsyncOperationRegistered(); + valve.notifyAsyncOperationCompleted(TestUtils.FUNCTION_1_ADDR); + + assertFalse(valve.shouldBackPressure()); + } + + @Test + public void blockAddressTriggerBackpressure() { + ThresholdBackPressureValve valve = new ThresholdBackPressureValve(500); + + valve.blockAddress(TestUtils.FUNCTION_1_ADDR); + + assertTrue(valve.shouldBackPressure()); + } + + @Test + public void blockingAndUnblockingAddress() { + ThresholdBackPressureValve valve = new ThresholdBackPressureValve(500); + + valve.blockAddress(TestUtils.FUNCTION_1_ADDR); + valve.notifyAsyncOperationCompleted(TestUtils.FUNCTION_1_ADDR); + + assertFalse(valve.shouldBackPressure()); + } + + @Test + public void unblockingDifferentAddressStillBackpressures() { + ThresholdBackPressureValve valve = new ThresholdBackPressureValve(500); + + valve.blockAddress(TestUtils.FUNCTION_1_ADDR); + valve.notifyAsyncOperationCompleted(TestUtils.FUNCTION_2_ADDR); + + assertTrue(valve.shouldBackPressure()); + } + + @Test + public void blockTwoAddress() { + ThresholdBackPressureValve valve = new ThresholdBackPressureValve(500); + + valve.blockAddress(TestUtils.FUNCTION_1_ADDR); + valve.blockAddress(TestUtils.FUNCTION_2_ADDR); + assertTrue(valve.shouldBackPressure()); + + valve.notifyAsyncOperationCompleted(TestUtils.FUNCTION_1_ADDR); + assertTrue(valve.shouldBackPressure()); + + valve.notifyAsyncOperationCompleted(TestUtils.FUNCTION_2_ADDR); + assertFalse(valve.shouldBackPressure()); + } +}
