This is an automated email from the ASF dual-hosted git repository. gary pushed a commit to branch release-1.9 in repository https://gitbox.apache.org/repos/asf/flink.git
commit 62448ec70575176905b177cac0976959a4bd2d05 Author: Gary Yao <g...@apache.org> AuthorDate: Mon Jul 29 16:29:02 2019 +0200 [FLINK-13384][runtime] Fix back pressure sampling for SourceStreamTask --- .../runtime/jobgraph/tasks/AbstractInvokable.java | 13 + .../TaskStackTraceSampleableTaskAdapter.java | 2 +- .../org/apache/flink/runtime/taskmanager/Task.java | 15 ++ .../BackPressureStatsTrackerImplITCase.java | 290 --------------------- .../apache/flink/runtime/taskmanager/TaskTest.java | 7 + .../streaming/runtime/tasks/SourceStreamTask.java | 11 +- .../test/streaming/runtime/BackPressureITCase.java | 165 ++++++++++++ .../org/apache/flink/test/util/BlockingSink.java | 35 +++ .../flink/test/util/IdentityMapFunction.java | 37 +++ 9 files changed, 283 insertions(+), 292 deletions(-) diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/jobgraph/tasks/AbstractInvokable.java b/flink-runtime/src/main/java/org/apache/flink/runtime/jobgraph/tasks/AbstractInvokable.java index cd53f58..07c7f22 100644 --- a/flink-runtime/src/main/java/org/apache/flink/runtime/jobgraph/tasks/AbstractInvokable.java +++ b/flink-runtime/src/main/java/org/apache/flink/runtime/jobgraph/tasks/AbstractInvokable.java @@ -24,6 +24,9 @@ import org.apache.flink.runtime.checkpoint.CheckpointMetaData; import org.apache.flink.runtime.checkpoint.CheckpointMetrics; import org.apache.flink.runtime.checkpoint.CheckpointOptions; import org.apache.flink.runtime.execution.Environment; +import org.apache.flink.runtime.taskmanager.Task; + +import java.util.Optional; import static org.apache.flink.util.Preconditions.checkNotNull; @@ -123,6 +126,16 @@ public abstract class AbstractInvokable { return shouldInterruptOnCancel; } + /** + * If the invokable implementation executes user code in a thread other than, + * {@link Task#getExecutingThread()}, this method returns that executing thread. + * + * @see Task#getStackTraceOfExecutingThread() + */ + public Optional<Thread> getExecutingThread() { + return Optional.empty(); + } + // ------------------------------------------------------------------------ // Access to Environment and Configuration // ------------------------------------------------------------------------ diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/taskexecutor/TaskStackTraceSampleableTaskAdapter.java b/flink-runtime/src/main/java/org/apache/flink/runtime/taskexecutor/TaskStackTraceSampleableTaskAdapter.java index 585af2a..2297c3a 100644 --- a/flink-runtime/src/main/java/org/apache/flink/runtime/taskexecutor/TaskStackTraceSampleableTaskAdapter.java +++ b/flink-runtime/src/main/java/org/apache/flink/runtime/taskexecutor/TaskStackTraceSampleableTaskAdapter.java @@ -47,7 +47,7 @@ class TaskStackTraceSampleableTaskAdapter implements StackTraceSampleableTask { @Override public StackTraceElement[] getStackTrace() { - return task.getExecutingThread().getStackTrace(); + return task.getStackTraceOfExecutingThread(); } @Override diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/taskmanager/Task.java b/flink-runtime/src/main/java/org/apache/flink/runtime/taskmanager/Task.java index 7dafc4f..575b82e 100644 --- a/flink-runtime/src/main/java/org/apache/flink/runtime/taskmanager/Task.java +++ b/flink-runtime/src/main/java/org/apache/flink/runtime/taskmanager/Task.java @@ -470,6 +470,18 @@ public class Task implements Runnable, TaskActions, PartitionProducerStateProvid return invokable; } + public StackTraceElement[] getStackTraceOfExecutingThread() { + final AbstractInvokable invokable = this.invokable; + + if (invokable == null) { + return new StackTraceElement[0]; + } + + return invokable.getExecutingThread() + .orElse(executingThread) + .getStackTrace(); + } + // ------------------------------------------------------------------------ // Task Execution // ------------------------------------------------------------------------ @@ -686,6 +698,9 @@ public class Task implements Runnable, TaskActions, PartitionProducerStateProvid // notify everyone that we switched to running taskManagerActions.updateTaskExecutionState(new TaskExecutionState(jobId, executionId, ExecutionState.RUNNING)); + // make sure the user code classloader is accessible thread-locally + executingThread.setContextClassLoader(userCodeClassLoader); + // run the invokable invokable.invoke(); diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/rest/handler/legacy/backpressure/BackPressureStatsTrackerImplITCase.java b/flink-runtime/src/test/java/org/apache/flink/runtime/rest/handler/legacy/backpressure/BackPressureStatsTrackerImplITCase.java deleted file mode 100644 index 8a5a1a9..0000000 --- a/flink-runtime/src/test/java/org/apache/flink/runtime/rest/handler/legacy/backpressure/BackPressureStatsTrackerImplITCase.java +++ /dev/null @@ -1,290 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.flink.runtime.rest.handler.legacy.backpressure; - -import org.apache.flink.api.common.JobID; -import org.apache.flink.api.common.time.Deadline; -import org.apache.flink.configuration.Configuration; -import org.apache.flink.configuration.WebOptions; -import org.apache.flink.runtime.dispatcher.DispatcherGateway; -import org.apache.flink.runtime.execution.Environment; -import org.apache.flink.runtime.io.network.buffer.Buffer; -import org.apache.flink.runtime.io.network.buffer.BufferBuilder; -import org.apache.flink.runtime.io.network.buffer.BufferBuilderTestUtils; -import org.apache.flink.runtime.io.network.buffer.BufferPool; -import org.apache.flink.runtime.io.network.buffer.NetworkBufferPool; -import org.apache.flink.runtime.jobgraph.JobGraph; -import org.apache.flink.runtime.jobgraph.JobVertex; -import org.apache.flink.runtime.jobgraph.tasks.AbstractInvokable; -import org.apache.flink.runtime.minicluster.TestingMiniCluster; -import org.apache.flink.runtime.minicluster.TestingMiniClusterConfiguration; -import org.apache.flink.runtime.testtasks.BlockingNoOpInvokable; -import org.apache.flink.runtime.testutils.CommonTestUtils; -import org.apache.flink.util.TestLogger; - -import org.hamcrest.Description; -import org.hamcrest.Matcher; -import org.hamcrest.TypeSafeDiagnosingMatcher; -import org.junit.After; -import org.junit.Before; -import org.junit.Test; - -import java.io.IOException; -import java.time.Duration; -import java.util.ArrayList; -import java.util.List; -import java.util.Optional; -import java.util.concurrent.ExecutionException; -import java.util.concurrent.TimeUnit; -import java.util.concurrent.TimeoutException; -import java.util.stream.Collectors; -import java.util.stream.IntStream; - -import static org.apache.flink.util.Preconditions.checkState; -import static org.hamcrest.Matchers.equalTo; -import static org.hamcrest.Matchers.is; -import static org.junit.Assert.assertThat; -import static org.junit.Assert.assertTrue; - -/** - * Simple back pressured task test. - * - * @see BackPressureStatsTrackerImpl - */ -public class BackPressureStatsTrackerImplITCase extends TestLogger { - - private static final long TIMEOUT_SECONDS = 10; - - private static final Duration TIMEOUT = Duration.ofSeconds(TIMEOUT_SECONDS); - - private static final int BACKPRESSURE_NUM_SAMPLES = 2; - - private static final int JOB_PARALLELISM = 4; - - private static final JobID TEST_JOB_ID = new JobID(); - - private static final JobVertex TEST_JOB_VERTEX = new JobVertex("Task"); - - private NetworkBufferPool networkBufferPool; - - /** Shared as static variable with the test task. */ - private static BufferPool testBufferPool; - - private TestingMiniCluster testingMiniCluster; - - private DispatcherGateway dispatcherGateway; - - @Before - public void setUp() throws Exception { - networkBufferPool = new NetworkBufferPool(100, 8192, 1); - testBufferPool = networkBufferPool.createBufferPool(1, Integer.MAX_VALUE); - - final Configuration configuration = new Configuration(); - configuration.setInteger(WebOptions.BACKPRESSURE_NUM_SAMPLES, BACKPRESSURE_NUM_SAMPLES); - - testingMiniCluster = new TestingMiniCluster(new TestingMiniClusterConfiguration.Builder() - .setNumTaskManagers(JOB_PARALLELISM) - .setConfiguration(configuration) - .build()); - testingMiniCluster.start(); - - dispatcherGateway = testingMiniCluster.getDispatcherGatewayFuture().get(TIMEOUT_SECONDS, TimeUnit.SECONDS); - } - - @After - public void tearDown() throws Exception { - if (testingMiniCluster != null) { - testingMiniCluster.close(); - } - - if (testBufferPool != null) { - testBufferPool.lazyDestroy(); - } - - if (networkBufferPool != null) { - networkBufferPool.destroyAllBufferPools(); - networkBufferPool.destroy(); - } - - } - - /** - * Tests a simple fake-back pressured task. Back pressure is assumed when - * sampled stack traces are in blocking buffer requests. - */ - @Test - public void testBackPressureShouldBeReflectedInStats() throws Exception { - final List<Buffer> buffers = requestAllBuffers(); - try { - final JobGraph jobGraph = createJobWithBackPressure(); - testingMiniCluster.submitJob(jobGraph).get(TIMEOUT_SECONDS, TimeUnit.SECONDS); - - final OperatorBackPressureStats stats = getBackPressureStatsForTestVertex(); - - assertThat(stats.getNumberOfSubTasks(), is(equalTo(JOB_PARALLELISM))); - assertThat(stats, isFullyBackpressured()); - } finally { - releaseBuffers(buffers); - } - } - - @Test - public void testAbsenceOfBackPressureShouldBeReflectedInStats() throws Exception { - final JobGraph jobGraph = createJobWithoutBackPressure(); - testingMiniCluster.submitJob(jobGraph).get(TIMEOUT_SECONDS, TimeUnit.SECONDS); - - final OperatorBackPressureStats stats = getBackPressureStatsForTestVertex(); - - assertThat(stats.getNumberOfSubTasks(), is(equalTo(JOB_PARALLELISM))); - assertThat(stats, isNotBackpressured()); - } - - private static JobGraph createJobWithBackPressure() { - final JobGraph jobGraph = new JobGraph(TEST_JOB_ID, "Test Job"); - - TEST_JOB_VERTEX.setInvokableClass(BackPressuredTask.class); - TEST_JOB_VERTEX.setParallelism(JOB_PARALLELISM); - - jobGraph.addVertex(TEST_JOB_VERTEX); - return jobGraph; - } - - private static JobGraph createJobWithoutBackPressure() { - final JobGraph jobGraph = new JobGraph(TEST_JOB_ID, "Test Job"); - - TEST_JOB_VERTEX.setInvokableClass(BlockingNoOpInvokable.class); - TEST_JOB_VERTEX.setParallelism(JOB_PARALLELISM); - - jobGraph.addVertex(TEST_JOB_VERTEX); - return jobGraph; - } - - private static List<Buffer> requestAllBuffers() throws IOException { - final List<Buffer> buffers = new ArrayList<>(); - while (true) { - final Buffer buffer = testBufferPool.requestBuffer(); - if (buffer != null) { - buffers.add(buffer); - } else { - break; - } - } - return buffers; - } - - private static void releaseBuffers(final List<Buffer> buffers) { - for (Buffer buffer : buffers) { - buffer.recycleBuffer(); - assertTrue(buffer.isRecycled()); - } - } - - private OperatorBackPressureStats getBackPressureStatsForTestVertex() { - waitUntilBackPressureStatsAvailable(); - - final Optional<OperatorBackPressureStats> stats = getBackPressureStats(); - checkState(stats.isPresent()); - return stats.get(); - } - - private void waitUntilBackPressureStatsAvailable() { - try { - CommonTestUtils.waitUntilCondition( - () -> { - final Optional<OperatorBackPressureStats> stats = getBackPressureStats(); - return stats.isPresent(); - }, - Deadline.fromNow(TIMEOUT)); - } catch (Exception e) { - throw new RuntimeException(e); - } - } - - private Optional<OperatorBackPressureStats> getBackPressureStats() { - try { - return dispatcherGateway.requestOperatorBackPressureStats(TEST_JOB_ID, TEST_JOB_VERTEX.getID()) - .get(TIMEOUT.toMillis(), TimeUnit.MILLISECONDS) - .getOperatorBackPressureStats(); - } catch (InterruptedException | ExecutionException | TimeoutException e) { - throw new RuntimeException(e); - } - } - - /** - * A back pressured producer sharing a {@link BufferPool} with the - * test driver. - */ - public static class BackPressuredTask extends AbstractInvokable { - - public BackPressuredTask(Environment environment) { - super(environment); - } - - @Override - public void invoke() throws Exception { - final BufferBuilder bufferBuilder = testBufferPool.requestBufferBuilderBlocking(); - // Got a buffer, yay! - BufferBuilderTestUtils.buildSingleBuffer(bufferBuilder).recycleBuffer(); - - Thread.currentThread().join(); - } - } - - private static Matcher<OperatorBackPressureStats> isNotBackpressured() { - return new OperatorBackPressureRatioMatcher(0); - } - - private static Matcher<OperatorBackPressureStats> isFullyBackpressured() { - return new OperatorBackPressureRatioMatcher(1); - } - - private static class OperatorBackPressureRatioMatcher extends TypeSafeDiagnosingMatcher<OperatorBackPressureStats> { - - private final double expectedBackPressureRatio; - - private OperatorBackPressureRatioMatcher(final double expectedBackPressureRatio) { - this.expectedBackPressureRatio = expectedBackPressureRatio; - } - - @Override - protected boolean matchesSafely(final OperatorBackPressureStats stats, final Description mismatchDescription) { - if (!isBackPressureRatioCorrect(stats)) { - mismatchDescription.appendText("Not all subtask back pressure ratios in " + getBackPressureRatios(stats) + " are " + expectedBackPressureRatio); - return false; - } - return true; - } - - private static List<Double> getBackPressureRatios(final OperatorBackPressureStats stats) { - return IntStream.range(0, stats.getNumberOfSubTasks()) - .mapToObj(stats::getBackPressureRatio).collect(Collectors.toList()); - } - - private boolean isBackPressureRatioCorrect(final OperatorBackPressureStats stats) { - return IntStream.range(0, stats.getNumberOfSubTasks()) - .mapToObj(stats::getBackPressureRatio) - .allMatch(backpressureRatio -> backpressureRatio == expectedBackPressureRatio); - } - - @Override - public void describeTo(final Description description) { - description.appendText("All subtask back pressure ratios are " + expectedBackPressureRatio); - } - } -} diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/taskmanager/TaskTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/taskmanager/TaskTest.java index ad5a23a..30c96cd 100644 --- a/flink-runtime/src/test/java/org/apache/flink/runtime/taskmanager/TaskTest.java +++ b/flink-runtime/src/test/java/org/apache/flink/runtime/taskmanager/TaskTest.java @@ -922,6 +922,13 @@ public class TaskTest extends TestLogger { assertEquals(ExecutionState.FAILED, task.getTerminationFuture().getNow(null)); } + @Test + public void testReturnsEmptyStackTraceIfTaskNotStarted() throws Exception { + final Task task = createTaskBuilder().build(); + final StackTraceElement[] actualStackTrace = task.getStackTraceOfExecutingThread(); + assertEquals(0, actualStackTrace.length); + } + // ------------------------------------------------------------------------ // customized TaskManagerActions // ------------------------------------------------------------------------ diff --git a/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/tasks/SourceStreamTask.java b/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/tasks/SourceStreamTask.java index d7b467d..e06e2b4 100644 --- a/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/tasks/SourceStreamTask.java +++ b/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/tasks/SourceStreamTask.java @@ -27,6 +27,8 @@ import org.apache.flink.streaming.api.functions.source.SourceFunction; import org.apache.flink.streaming.api.operators.StreamSource; import org.apache.flink.util.FlinkException; +import java.util.Optional; + /** * {@link StreamTask} for executing a {@link StreamSource}. * @@ -47,6 +49,8 @@ public class SourceStreamTask<OUT, SRC extends SourceFunction<OUT>, OP extends S private static final Runnable SOURCE_POISON_LETTER = () -> {}; + private final LegacySourceFunctionThread sourceThread; + private volatile boolean externallyInducedCheckpoints; /** @@ -57,6 +61,7 @@ public class SourceStreamTask<OUT, SRC extends SourceFunction<OUT>, OP extends S public SourceStreamTask(Environment env) { super(env); + this.sourceThread = new LegacySourceFunctionThread(); } @Override @@ -109,7 +114,6 @@ public class SourceStreamTask<OUT, SRC extends SourceFunction<OUT>, OP extends S protected void performDefaultAction(ActionContext context) throws Exception { // Against the usual contract of this method, this implementation is not step-wise but blocking instead for // compatibility reasons with the current source interface (source functions run as a loop, not in steps). - final LegacySourceFunctionThread sourceThread = new LegacySourceFunctionThread(); sourceThread.start(); // We run an alternative mailbox loop that does not involve default actions and synchronizes around actions. @@ -159,6 +163,11 @@ public class SourceStreamTask<OUT, SRC extends SourceFunction<OUT>, OP extends S cancelTask(); } + @Override + public Optional<Thread> getExecutingThread() { + return Optional.of(sourceThread); + } + // ------------------------------------------------------------------------ // Checkpointing // ------------------------------------------------------------------------ diff --git a/flink-tests/src/test/java/org/apache/flink/test/streaming/runtime/BackPressureITCase.java b/flink-tests/src/test/java/org/apache/flink/test/streaming/runtime/BackPressureITCase.java new file mode 100644 index 0000000..07fb227 --- /dev/null +++ b/flink-tests/src/test/java/org/apache/flink/test/streaming/runtime/BackPressureITCase.java @@ -0,0 +1,165 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.flink.test.streaming.runtime; + +import org.apache.flink.api.common.JobID; +import org.apache.flink.api.common.time.Deadline; +import org.apache.flink.configuration.Configuration; +import org.apache.flink.configuration.NettyShuffleEnvironmentOptions; +import org.apache.flink.configuration.TaskManagerOptions; +import org.apache.flink.configuration.WebOptions; +import org.apache.flink.runtime.dispatcher.DispatcherGateway; +import org.apache.flink.runtime.jobgraph.JobGraph; +import org.apache.flink.runtime.jobgraph.JobVertex; +import org.apache.flink.runtime.minicluster.TestingMiniCluster; +import org.apache.flink.runtime.minicluster.TestingMiniClusterConfiguration; +import org.apache.flink.runtime.rest.handler.legacy.backpressure.OperatorBackPressureStats; +import org.apache.flink.runtime.rest.handler.legacy.backpressure.OperatorBackPressureStatsResponse; +import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment; +import org.apache.flink.test.util.BlockingSink; +import org.apache.flink.test.util.IdentityMapFunction; +import org.apache.flink.test.util.InfiniteIntegerSource; +import org.apache.flink.util.TestLogger; +import org.apache.flink.util.function.SupplierWithException; + +import org.junit.After; +import org.junit.Before; +import org.junit.Test; + +import java.time.Duration; +import java.util.List; +import java.util.concurrent.TimeoutException; + +import static org.apache.flink.runtime.testutils.CommonTestUtils.waitUntilCondition; + +/** + * Integration test for operator back pressure tracking. + */ +public class BackPressureITCase extends TestLogger { + + private static final JobID TEST_JOB_ID = new JobID(); + private static final int NUM_TASKS = 3; + private static final int BACK_PRESSURE_REQUEST_INTERVAL_MS = 5; + private static final int TASKS_BECOMING_BACK_PRESSURED_TIMEOUT_MS = 15 * 1000; + private static final int MAX_BACK_PRESSURE_RATIO = 1; + + private TestingMiniCluster testingMiniCluster; + private DispatcherGateway dispatcherGateway; + + @Before + public void setUp() throws Exception { + final Configuration configuration = new Configuration(); + configuration.addAll(createBackPressureSamplingConfiguration()); + configuration.addAll(createNetworkBufferConfiguration()); + + final TestingMiniClusterConfiguration testingMiniClusterConfiguration = new TestingMiniClusterConfiguration.Builder() + .setNumSlotsPerTaskManager(NUM_TASKS) + .setConfiguration(configuration) + .build(); + + testingMiniCluster = new TestingMiniCluster(testingMiniClusterConfiguration); + testingMiniCluster.start(); + dispatcherGateway = testingMiniCluster.getDispatcherGatewayFuture().get(); + } + + private static Configuration createBackPressureSamplingConfiguration() { + final Configuration configuration = new Configuration(); + configuration.setInteger(WebOptions.BACKPRESSURE_REFRESH_INTERVAL, 1000); + configuration.setInteger(WebOptions.BACKPRESSURE_NUM_SAMPLES, 1); + configuration.setInteger(WebOptions.BACKPRESSURE_CLEANUP_INTERVAL, Integer.MAX_VALUE); + return configuration; + } + + private static Configuration createNetworkBufferConfiguration() { + final Configuration configuration = new Configuration(); + + final int memorySegmentSizeKb = 32; + final String networkBuffersMemory = (memorySegmentSizeKb * NUM_TASKS) + "kb"; + + configuration.setString(TaskManagerOptions.MEMORY_SEGMENT_SIZE, memorySegmentSizeKb + "kb"); + configuration.setString(NettyShuffleEnvironmentOptions.NETWORK_BUFFERS_MEMORY_MIN, networkBuffersMemory); + configuration.setString(NettyShuffleEnvironmentOptions.NETWORK_BUFFERS_MEMORY_MAX, networkBuffersMemory); + return configuration; + } + + @Test + public void operatorsBecomeBackPressured() throws Exception { + final StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment() + .setParallelism(1); + + env.addSource(new InfiniteIntegerSource()) + .slotSharingGroup("sourceGroup") + .map(new IdentityMapFunction<>()) + .slotSharingGroup("mapGroup") + .addSink(new BlockingSink<>()) + .slotSharingGroup("sinkGroup"); + + final JobGraph jobGraph = env.getStreamGraph().getJobGraph(TEST_JOB_ID); + + final List<JobVertex> vertices = jobGraph.getVerticesSortedTopologicallyFromSources(); + final JobVertex sourceJobVertex = vertices.get(0); + final JobVertex mapJobVertex = vertices.get(1); + + testingMiniCluster.submitJob(jobGraph).get(); + + assertJobVertexSubtasksAreBackPressured(mapJobVertex); + assertJobVertexSubtasksAreBackPressured(sourceJobVertex); + } + + private void assertJobVertexSubtasksAreBackPressured(final JobVertex jobVertex) throws Exception { + try { + final Deadline timeout = Deadline.fromNow(Duration.ofMillis(TASKS_BECOMING_BACK_PRESSURED_TIMEOUT_MS)); + waitUntilCondition( + isJobVertexBackPressured(jobVertex), + timeout, + BACK_PRESSURE_REQUEST_INTERVAL_MS); + } catch (final TimeoutException e) { + final String errorMessage = String.format("Subtasks of job vertex %s were not back pressured within timeout", jobVertex); + throw new AssertionError(errorMessage, e); + } + } + + private SupplierWithException<Boolean, Exception> isJobVertexBackPressured(final JobVertex sourceJobVertex) { + return () -> { + final OperatorBackPressureStatsResponse backPressureStatsResponse = dispatcherGateway + .requestOperatorBackPressureStats(TEST_JOB_ID, sourceJobVertex.getID()) + .get(); + + return backPressureStatsResponse.getOperatorBackPressureStats() + .map(backPressureStats -> isBackPressured(backPressureStats)) + .orElse(false); + }; + } + + private static boolean isBackPressured(final OperatorBackPressureStats backPressureStats) { + for (int i = 0; i < backPressureStats.getNumberOfSubTasks(); i++) { + final double subtaskBackPressureRatio = backPressureStats.getBackPressureRatio(i); + if (subtaskBackPressureRatio != MAX_BACK_PRESSURE_RATIO) { + return false; + } + } + return true; + } + + @After + public void tearDown() throws Exception { + testingMiniCluster.close(); + } +} diff --git a/flink-tests/src/test/java/org/apache/flink/test/util/BlockingSink.java b/flink-tests/src/test/java/org/apache/flink/test/util/BlockingSink.java new file mode 100644 index 0000000..c09ffd6 --- /dev/null +++ b/flink-tests/src/test/java/org/apache/flink/test/util/BlockingSink.java @@ -0,0 +1,35 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.flink.test.util; + +import org.apache.flink.streaming.api.functions.sink.SinkFunction; + +/** + * Sink that blocks until interrupted. + * + * @param <IN> Type of the input. + */ +public class BlockingSink<IN> implements SinkFunction<IN> { + + @Override + public void invoke(final IN value, final Context context) throws Exception { + Thread.currentThread().join(); + } +} diff --git a/flink-tests/src/test/java/org/apache/flink/test/util/IdentityMapFunction.java b/flink-tests/src/test/java/org/apache/flink/test/util/IdentityMapFunction.java new file mode 100644 index 0000000..d4700fa --- /dev/null +++ b/flink-tests/src/test/java/org/apache/flink/test/util/IdentityMapFunction.java @@ -0,0 +1,37 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.flink.test.util; + +import org.apache.flink.api.common.functions.MapFunction; + +/** + * A map function that always returns its input argument. + * + * @param <T> The type of the input and output + */ +public class IdentityMapFunction<T> implements MapFunction<T, T> { + + private static final long serialVersionUID = 1L; + + @Override + public T map(final T value) throws Exception { + return value; + } +}