This is an automated email from the ASF dual-hosted git repository.

gary pushed a commit to branch release-1.9
in repository https://gitbox.apache.org/repos/asf/flink.git

commit 62448ec70575176905b177cac0976959a4bd2d05
Author: Gary Yao <g...@apache.org>
AuthorDate: Mon Jul 29 16:29:02 2019 +0200

    [FLINK-13384][runtime] Fix back pressure sampling for SourceStreamTask
---
 .../runtime/jobgraph/tasks/AbstractInvokable.java  |  13 +
 .../TaskStackTraceSampleableTaskAdapter.java       |   2 +-
 .../org/apache/flink/runtime/taskmanager/Task.java |  15 ++
 .../BackPressureStatsTrackerImplITCase.java        | 290 ---------------------
 .../apache/flink/runtime/taskmanager/TaskTest.java |   7 +
 .../streaming/runtime/tasks/SourceStreamTask.java  |  11 +-
 .../test/streaming/runtime/BackPressureITCase.java | 165 ++++++++++++
 .../org/apache/flink/test/util/BlockingSink.java   |  35 +++
 .../flink/test/util/IdentityMapFunction.java       |  37 +++
 9 files changed, 283 insertions(+), 292 deletions(-)

diff --git 
a/flink-runtime/src/main/java/org/apache/flink/runtime/jobgraph/tasks/AbstractInvokable.java
 
b/flink-runtime/src/main/java/org/apache/flink/runtime/jobgraph/tasks/AbstractInvokable.java
index cd53f58..07c7f22 100644
--- 
a/flink-runtime/src/main/java/org/apache/flink/runtime/jobgraph/tasks/AbstractInvokable.java
+++ 
b/flink-runtime/src/main/java/org/apache/flink/runtime/jobgraph/tasks/AbstractInvokable.java
@@ -24,6 +24,9 @@ import org.apache.flink.runtime.checkpoint.CheckpointMetaData;
 import org.apache.flink.runtime.checkpoint.CheckpointMetrics;
 import org.apache.flink.runtime.checkpoint.CheckpointOptions;
 import org.apache.flink.runtime.execution.Environment;
+import org.apache.flink.runtime.taskmanager.Task;
+
+import java.util.Optional;
 
 import static org.apache.flink.util.Preconditions.checkNotNull;
 
@@ -123,6 +126,16 @@ public abstract class AbstractInvokable {
                return shouldInterruptOnCancel;
        }
 
+       /**
+        * If the invokable implementation executes user code in a thread other 
than,
+        * {@link Task#getExecutingThread()}, this method returns that 
executing thread.
+        *
+        * @see Task#getStackTraceOfExecutingThread()
+        */
+       public Optional<Thread> getExecutingThread() {
+               return Optional.empty();
+       }
+
        // 
------------------------------------------------------------------------
        //  Access to Environment and Configuration
        // 
------------------------------------------------------------------------
diff --git 
a/flink-runtime/src/main/java/org/apache/flink/runtime/taskexecutor/TaskStackTraceSampleableTaskAdapter.java
 
b/flink-runtime/src/main/java/org/apache/flink/runtime/taskexecutor/TaskStackTraceSampleableTaskAdapter.java
index 585af2a..2297c3a 100644
--- 
a/flink-runtime/src/main/java/org/apache/flink/runtime/taskexecutor/TaskStackTraceSampleableTaskAdapter.java
+++ 
b/flink-runtime/src/main/java/org/apache/flink/runtime/taskexecutor/TaskStackTraceSampleableTaskAdapter.java
@@ -47,7 +47,7 @@ class TaskStackTraceSampleableTaskAdapter implements 
StackTraceSampleableTask {
 
        @Override
        public StackTraceElement[] getStackTrace() {
-               return task.getExecutingThread().getStackTrace();
+               return task.getStackTraceOfExecutingThread();
        }
 
        @Override
diff --git 
a/flink-runtime/src/main/java/org/apache/flink/runtime/taskmanager/Task.java 
b/flink-runtime/src/main/java/org/apache/flink/runtime/taskmanager/Task.java
index 7dafc4f..575b82e 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/taskmanager/Task.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/taskmanager/Task.java
@@ -470,6 +470,18 @@ public class Task implements Runnable, TaskActions, 
PartitionProducerStateProvid
                return invokable;
        }
 
+       public StackTraceElement[] getStackTraceOfExecutingThread() {
+               final AbstractInvokable invokable = this.invokable;
+
+               if (invokable == null) {
+                       return new StackTraceElement[0];
+               }
+
+               return invokable.getExecutingThread()
+                       .orElse(executingThread)
+                       .getStackTrace();
+       }
+
        // 
------------------------------------------------------------------------
        //  Task Execution
        // 
------------------------------------------------------------------------
@@ -686,6 +698,9 @@ public class Task implements Runnable, TaskActions, 
PartitionProducerStateProvid
                        // notify everyone that we switched to running
                        taskManagerActions.updateTaskExecutionState(new 
TaskExecutionState(jobId, executionId, ExecutionState.RUNNING));
 
+                       // make sure the user code classloader is accessible 
thread-locally
+                       
executingThread.setContextClassLoader(userCodeClassLoader);
+
                        // run the invokable
                        invokable.invoke();
 
diff --git 
a/flink-runtime/src/test/java/org/apache/flink/runtime/rest/handler/legacy/backpressure/BackPressureStatsTrackerImplITCase.java
 
b/flink-runtime/src/test/java/org/apache/flink/runtime/rest/handler/legacy/backpressure/BackPressureStatsTrackerImplITCase.java
deleted file mode 100644
index 8a5a1a9..0000000
--- 
a/flink-runtime/src/test/java/org/apache/flink/runtime/rest/handler/legacy/backpressure/BackPressureStatsTrackerImplITCase.java
+++ /dev/null
@@ -1,290 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.flink.runtime.rest.handler.legacy.backpressure;
-
-import org.apache.flink.api.common.JobID;
-import org.apache.flink.api.common.time.Deadline;
-import org.apache.flink.configuration.Configuration;
-import org.apache.flink.configuration.WebOptions;
-import org.apache.flink.runtime.dispatcher.DispatcherGateway;
-import org.apache.flink.runtime.execution.Environment;
-import org.apache.flink.runtime.io.network.buffer.Buffer;
-import org.apache.flink.runtime.io.network.buffer.BufferBuilder;
-import org.apache.flink.runtime.io.network.buffer.BufferBuilderTestUtils;
-import org.apache.flink.runtime.io.network.buffer.BufferPool;
-import org.apache.flink.runtime.io.network.buffer.NetworkBufferPool;
-import org.apache.flink.runtime.jobgraph.JobGraph;
-import org.apache.flink.runtime.jobgraph.JobVertex;
-import org.apache.flink.runtime.jobgraph.tasks.AbstractInvokable;
-import org.apache.flink.runtime.minicluster.TestingMiniCluster;
-import org.apache.flink.runtime.minicluster.TestingMiniClusterConfiguration;
-import org.apache.flink.runtime.testtasks.BlockingNoOpInvokable;
-import org.apache.flink.runtime.testutils.CommonTestUtils;
-import org.apache.flink.util.TestLogger;
-
-import org.hamcrest.Description;
-import org.hamcrest.Matcher;
-import org.hamcrest.TypeSafeDiagnosingMatcher;
-import org.junit.After;
-import org.junit.Before;
-import org.junit.Test;
-
-import java.io.IOException;
-import java.time.Duration;
-import java.util.ArrayList;
-import java.util.List;
-import java.util.Optional;
-import java.util.concurrent.ExecutionException;
-import java.util.concurrent.TimeUnit;
-import java.util.concurrent.TimeoutException;
-import java.util.stream.Collectors;
-import java.util.stream.IntStream;
-
-import static org.apache.flink.util.Preconditions.checkState;
-import static org.hamcrest.Matchers.equalTo;
-import static org.hamcrest.Matchers.is;
-import static org.junit.Assert.assertThat;
-import static org.junit.Assert.assertTrue;
-
-/**
- * Simple back pressured task test.
- *
- * @see BackPressureStatsTrackerImpl
- */
-public class BackPressureStatsTrackerImplITCase extends TestLogger {
-
-       private static final long TIMEOUT_SECONDS = 10;
-
-       private static final Duration TIMEOUT = 
Duration.ofSeconds(TIMEOUT_SECONDS);
-
-       private static final int BACKPRESSURE_NUM_SAMPLES = 2;
-
-       private static final int JOB_PARALLELISM = 4;
-
-       private static final JobID TEST_JOB_ID = new JobID();
-
-       private static final JobVertex TEST_JOB_VERTEX = new JobVertex("Task");
-
-       private NetworkBufferPool networkBufferPool;
-
-       /** Shared as static variable with the test task. */
-       private static BufferPool testBufferPool;
-
-       private TestingMiniCluster testingMiniCluster;
-
-       private DispatcherGateway dispatcherGateway;
-
-       @Before
-       public void setUp() throws Exception {
-               networkBufferPool = new NetworkBufferPool(100, 8192, 1);
-               testBufferPool = networkBufferPool.createBufferPool(1, 
Integer.MAX_VALUE);
-
-               final Configuration configuration = new Configuration();
-               configuration.setInteger(WebOptions.BACKPRESSURE_NUM_SAMPLES, 
BACKPRESSURE_NUM_SAMPLES);
-
-               testingMiniCluster = new TestingMiniCluster(new 
TestingMiniClusterConfiguration.Builder()
-                       .setNumTaskManagers(JOB_PARALLELISM)
-                       .setConfiguration(configuration)
-                       .build());
-               testingMiniCluster.start();
-
-               dispatcherGateway = 
testingMiniCluster.getDispatcherGatewayFuture().get(TIMEOUT_SECONDS, 
TimeUnit.SECONDS);
-       }
-
-       @After
-       public void tearDown() throws Exception {
-               if (testingMiniCluster != null) {
-                       testingMiniCluster.close();
-               }
-
-               if (testBufferPool != null) {
-                       testBufferPool.lazyDestroy();
-               }
-
-               if (networkBufferPool != null) {
-                       networkBufferPool.destroyAllBufferPools();
-                       networkBufferPool.destroy();
-               }
-
-       }
-
-       /**
-        * Tests a simple fake-back pressured task. Back pressure is assumed 
when
-        * sampled stack traces are in blocking buffer requests.
-        */
-       @Test
-       public void testBackPressureShouldBeReflectedInStats() throws Exception 
{
-               final List<Buffer> buffers = requestAllBuffers();
-               try {
-                       final JobGraph jobGraph = createJobWithBackPressure();
-                       
testingMiniCluster.submitJob(jobGraph).get(TIMEOUT_SECONDS, TimeUnit.SECONDS);
-
-                       final OperatorBackPressureStats stats = 
getBackPressureStatsForTestVertex();
-
-                       assertThat(stats.getNumberOfSubTasks(), 
is(equalTo(JOB_PARALLELISM)));
-                       assertThat(stats, isFullyBackpressured());
-               } finally {
-                       releaseBuffers(buffers);
-               }
-       }
-
-       @Test
-       public void testAbsenceOfBackPressureShouldBeReflectedInStats() throws 
Exception {
-               final JobGraph jobGraph = createJobWithoutBackPressure();
-               testingMiniCluster.submitJob(jobGraph).get(TIMEOUT_SECONDS, 
TimeUnit.SECONDS);
-
-               final OperatorBackPressureStats stats = 
getBackPressureStatsForTestVertex();
-
-               assertThat(stats.getNumberOfSubTasks(), 
is(equalTo(JOB_PARALLELISM)));
-               assertThat(stats, isNotBackpressured());
-       }
-
-       private static JobGraph createJobWithBackPressure() {
-               final JobGraph jobGraph = new JobGraph(TEST_JOB_ID, "Test Job");
-
-               TEST_JOB_VERTEX.setInvokableClass(BackPressuredTask.class);
-               TEST_JOB_VERTEX.setParallelism(JOB_PARALLELISM);
-
-               jobGraph.addVertex(TEST_JOB_VERTEX);
-               return jobGraph;
-       }
-
-       private static JobGraph createJobWithoutBackPressure() {
-               final JobGraph jobGraph = new JobGraph(TEST_JOB_ID, "Test Job");
-
-               TEST_JOB_VERTEX.setInvokableClass(BlockingNoOpInvokable.class);
-               TEST_JOB_VERTEX.setParallelism(JOB_PARALLELISM);
-
-               jobGraph.addVertex(TEST_JOB_VERTEX);
-               return jobGraph;
-       }
-
-       private static List<Buffer> requestAllBuffers() throws IOException {
-               final List<Buffer> buffers = new ArrayList<>();
-               while (true) {
-                       final Buffer buffer = testBufferPool.requestBuffer();
-                       if (buffer != null) {
-                               buffers.add(buffer);
-                       } else {
-                               break;
-                       }
-               }
-               return buffers;
-       }
-
-       private static void releaseBuffers(final List<Buffer> buffers) {
-               for (Buffer buffer : buffers) {
-                       buffer.recycleBuffer();
-                       assertTrue(buffer.isRecycled());
-               }
-       }
-
-       private OperatorBackPressureStats getBackPressureStatsForTestVertex() {
-               waitUntilBackPressureStatsAvailable();
-
-               final Optional<OperatorBackPressureStats> stats = 
getBackPressureStats();
-               checkState(stats.isPresent());
-               return stats.get();
-       }
-
-       private void waitUntilBackPressureStatsAvailable() {
-               try {
-                       CommonTestUtils.waitUntilCondition(
-                               () -> {
-                                       final 
Optional<OperatorBackPressureStats> stats = getBackPressureStats();
-                                       return stats.isPresent();
-                                       },
-                               Deadline.fromNow(TIMEOUT));
-               } catch (Exception e) {
-                       throw new RuntimeException(e);
-               }
-       }
-
-       private Optional<OperatorBackPressureStats> getBackPressureStats() {
-               try {
-                       return 
dispatcherGateway.requestOperatorBackPressureStats(TEST_JOB_ID, 
TEST_JOB_VERTEX.getID())
-                               .get(TIMEOUT.toMillis(), TimeUnit.MILLISECONDS)
-                               .getOperatorBackPressureStats();
-               } catch (InterruptedException | ExecutionException | 
TimeoutException e) {
-                       throw new RuntimeException(e);
-               }
-       }
-
-       /**
-        * A back pressured producer sharing a {@link BufferPool} with the
-        * test driver.
-        */
-       public static class BackPressuredTask extends AbstractInvokable {
-
-               public BackPressuredTask(Environment environment) {
-                       super(environment);
-               }
-
-               @Override
-               public void invoke() throws Exception {
-                       final BufferBuilder bufferBuilder = 
testBufferPool.requestBufferBuilderBlocking();
-                       // Got a buffer, yay!
-                       
BufferBuilderTestUtils.buildSingleBuffer(bufferBuilder).recycleBuffer();
-
-                       Thread.currentThread().join();
-               }
-       }
-
-       private static Matcher<OperatorBackPressureStats> isNotBackpressured() {
-               return new OperatorBackPressureRatioMatcher(0);
-       }
-
-       private static Matcher<OperatorBackPressureStats> 
isFullyBackpressured() {
-               return new OperatorBackPressureRatioMatcher(1);
-       }
-
-       private static class OperatorBackPressureRatioMatcher extends 
TypeSafeDiagnosingMatcher<OperatorBackPressureStats> {
-
-               private final double expectedBackPressureRatio;
-
-               private OperatorBackPressureRatioMatcher(final double 
expectedBackPressureRatio) {
-                       this.expectedBackPressureRatio = 
expectedBackPressureRatio;
-               }
-
-               @Override
-               protected boolean matchesSafely(final OperatorBackPressureStats 
stats, final Description mismatchDescription) {
-                       if (!isBackPressureRatioCorrect(stats)) {
-                               mismatchDescription.appendText("Not all subtask 
back pressure ratios in " + getBackPressureRatios(stats) + " are " + 
expectedBackPressureRatio);
-                               return false;
-                       }
-                       return true;
-               }
-
-               private static List<Double> getBackPressureRatios(final 
OperatorBackPressureStats stats) {
-                       return IntStream.range(0, stats.getNumberOfSubTasks())
-                               
.mapToObj(stats::getBackPressureRatio).collect(Collectors.toList());
-               }
-
-               private boolean isBackPressureRatioCorrect(final 
OperatorBackPressureStats stats) {
-                       return IntStream.range(0, stats.getNumberOfSubTasks())
-                               .mapToObj(stats::getBackPressureRatio)
-                               .allMatch(backpressureRatio -> 
backpressureRatio == expectedBackPressureRatio);
-               }
-
-               @Override
-               public void describeTo(final Description description) {
-                       description.appendText("All subtask back pressure 
ratios are " + expectedBackPressureRatio);
-               }
-       }
-}
diff --git 
a/flink-runtime/src/test/java/org/apache/flink/runtime/taskmanager/TaskTest.java
 
b/flink-runtime/src/test/java/org/apache/flink/runtime/taskmanager/TaskTest.java
index ad5a23a..30c96cd 100644
--- 
a/flink-runtime/src/test/java/org/apache/flink/runtime/taskmanager/TaskTest.java
+++ 
b/flink-runtime/src/test/java/org/apache/flink/runtime/taskmanager/TaskTest.java
@@ -922,6 +922,13 @@ public class TaskTest extends TestLogger {
                assertEquals(ExecutionState.FAILED, 
task.getTerminationFuture().getNow(null));
        }
 
+       @Test
+       public void testReturnsEmptyStackTraceIfTaskNotStarted() throws 
Exception {
+               final Task task = createTaskBuilder().build();
+               final StackTraceElement[] actualStackTrace = 
task.getStackTraceOfExecutingThread();
+               assertEquals(0, actualStackTrace.length);
+       }
+
        // 
------------------------------------------------------------------------
        //  customized TaskManagerActions
        // 
------------------------------------------------------------------------
diff --git 
a/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/tasks/SourceStreamTask.java
 
b/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/tasks/SourceStreamTask.java
index d7b467d..e06e2b4 100644
--- 
a/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/tasks/SourceStreamTask.java
+++ 
b/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/tasks/SourceStreamTask.java
@@ -27,6 +27,8 @@ import 
org.apache.flink.streaming.api.functions.source.SourceFunction;
 import org.apache.flink.streaming.api.operators.StreamSource;
 import org.apache.flink.util.FlinkException;
 
+import java.util.Optional;
+
 /**
  * {@link StreamTask} for executing a {@link StreamSource}.
  *
@@ -47,6 +49,8 @@ public class SourceStreamTask<OUT, SRC extends 
SourceFunction<OUT>, OP extends S
 
        private static final Runnable SOURCE_POISON_LETTER = () -> {};
 
+       private final LegacySourceFunctionThread sourceThread;
+
        private volatile boolean externallyInducedCheckpoints;
 
        /**
@@ -57,6 +61,7 @@ public class SourceStreamTask<OUT, SRC extends 
SourceFunction<OUT>, OP extends S
 
        public SourceStreamTask(Environment env) {
                super(env);
+               this.sourceThread = new LegacySourceFunctionThread();
        }
 
        @Override
@@ -109,7 +114,6 @@ public class SourceStreamTask<OUT, SRC extends 
SourceFunction<OUT>, OP extends S
        protected void performDefaultAction(ActionContext context) throws 
Exception {
                // Against the usual contract of this method, this 
implementation is not step-wise but blocking instead for
                // compatibility reasons with the current source interface 
(source functions run as a loop, not in steps).
-               final LegacySourceFunctionThread sourceThread = new 
LegacySourceFunctionThread();
                sourceThread.start();
 
                // We run an alternative mailbox loop that does not involve 
default actions and synchronizes around actions.
@@ -159,6 +163,11 @@ public class SourceStreamTask<OUT, SRC extends 
SourceFunction<OUT>, OP extends S
                cancelTask();
        }
 
+       @Override
+       public Optional<Thread> getExecutingThread() {
+               return Optional.of(sourceThread);
+       }
+
        // 
------------------------------------------------------------------------
        //  Checkpointing
        // 
------------------------------------------------------------------------
diff --git 
a/flink-tests/src/test/java/org/apache/flink/test/streaming/runtime/BackPressureITCase.java
 
b/flink-tests/src/test/java/org/apache/flink/test/streaming/runtime/BackPressureITCase.java
new file mode 100644
index 0000000..07fb227
--- /dev/null
+++ 
b/flink-tests/src/test/java/org/apache/flink/test/streaming/runtime/BackPressureITCase.java
@@ -0,0 +1,165 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.flink.test.streaming.runtime;
+
+import org.apache.flink.api.common.JobID;
+import org.apache.flink.api.common.time.Deadline;
+import org.apache.flink.configuration.Configuration;
+import org.apache.flink.configuration.NettyShuffleEnvironmentOptions;
+import org.apache.flink.configuration.TaskManagerOptions;
+import org.apache.flink.configuration.WebOptions;
+import org.apache.flink.runtime.dispatcher.DispatcherGateway;
+import org.apache.flink.runtime.jobgraph.JobGraph;
+import org.apache.flink.runtime.jobgraph.JobVertex;
+import org.apache.flink.runtime.minicluster.TestingMiniCluster;
+import org.apache.flink.runtime.minicluster.TestingMiniClusterConfiguration;
+import 
org.apache.flink.runtime.rest.handler.legacy.backpressure.OperatorBackPressureStats;
+import 
org.apache.flink.runtime.rest.handler.legacy.backpressure.OperatorBackPressureStatsResponse;
+import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
+import org.apache.flink.test.util.BlockingSink;
+import org.apache.flink.test.util.IdentityMapFunction;
+import org.apache.flink.test.util.InfiniteIntegerSource;
+import org.apache.flink.util.TestLogger;
+import org.apache.flink.util.function.SupplierWithException;
+
+import org.junit.After;
+import org.junit.Before;
+import org.junit.Test;
+
+import java.time.Duration;
+import java.util.List;
+import java.util.concurrent.TimeoutException;
+
+import static 
org.apache.flink.runtime.testutils.CommonTestUtils.waitUntilCondition;
+
+/**
+ * Integration test for operator back pressure tracking.
+ */
+public class BackPressureITCase extends TestLogger {
+
+       private static final JobID TEST_JOB_ID = new JobID();
+       private static final int NUM_TASKS = 3;
+       private static final int BACK_PRESSURE_REQUEST_INTERVAL_MS = 5;
+       private static final int TASKS_BECOMING_BACK_PRESSURED_TIMEOUT_MS = 15 
* 1000;
+       private static final int MAX_BACK_PRESSURE_RATIO = 1;
+
+       private TestingMiniCluster testingMiniCluster;
+       private DispatcherGateway dispatcherGateway;
+
+       @Before
+       public void setUp() throws Exception {
+               final Configuration configuration = new Configuration();
+               configuration.addAll(createBackPressureSamplingConfiguration());
+               configuration.addAll(createNetworkBufferConfiguration());
+
+               final TestingMiniClusterConfiguration 
testingMiniClusterConfiguration = new TestingMiniClusterConfiguration.Builder()
+                       .setNumSlotsPerTaskManager(NUM_TASKS)
+                       .setConfiguration(configuration)
+                       .build();
+
+               testingMiniCluster = new 
TestingMiniCluster(testingMiniClusterConfiguration);
+               testingMiniCluster.start();
+               dispatcherGateway = 
testingMiniCluster.getDispatcherGatewayFuture().get();
+       }
+
+       private static Configuration createBackPressureSamplingConfiguration() {
+               final Configuration configuration = new Configuration();
+               
configuration.setInteger(WebOptions.BACKPRESSURE_REFRESH_INTERVAL, 1000);
+               configuration.setInteger(WebOptions.BACKPRESSURE_NUM_SAMPLES, 
1);
+               
configuration.setInteger(WebOptions.BACKPRESSURE_CLEANUP_INTERVAL, 
Integer.MAX_VALUE);
+               return configuration;
+       }
+
+       private static Configuration createNetworkBufferConfiguration() {
+               final Configuration configuration = new Configuration();
+
+               final int memorySegmentSizeKb = 32;
+               final String networkBuffersMemory = (memorySegmentSizeKb * 
NUM_TASKS) + "kb";
+
+               configuration.setString(TaskManagerOptions.MEMORY_SEGMENT_SIZE, 
memorySegmentSizeKb + "kb");
+               
configuration.setString(NettyShuffleEnvironmentOptions.NETWORK_BUFFERS_MEMORY_MIN,
 networkBuffersMemory);
+               
configuration.setString(NettyShuffleEnvironmentOptions.NETWORK_BUFFERS_MEMORY_MAX,
 networkBuffersMemory);
+               return configuration;
+       }
+
+       @Test
+       public void operatorsBecomeBackPressured() throws Exception {
+               final StreamExecutionEnvironment env = 
StreamExecutionEnvironment.getExecutionEnvironment()
+                       .setParallelism(1);
+
+               env.addSource(new InfiniteIntegerSource())
+                       .slotSharingGroup("sourceGroup")
+                       .map(new IdentityMapFunction<>())
+                       .slotSharingGroup("mapGroup")
+                       .addSink(new BlockingSink<>())
+                       .slotSharingGroup("sinkGroup");
+
+               final JobGraph jobGraph = 
env.getStreamGraph().getJobGraph(TEST_JOB_ID);
+
+               final List<JobVertex> vertices = 
jobGraph.getVerticesSortedTopologicallyFromSources();
+               final JobVertex sourceJobVertex = vertices.get(0);
+               final JobVertex mapJobVertex = vertices.get(1);
+
+               testingMiniCluster.submitJob(jobGraph).get();
+
+               assertJobVertexSubtasksAreBackPressured(mapJobVertex);
+               assertJobVertexSubtasksAreBackPressured(sourceJobVertex);
+       }
+
+       private void assertJobVertexSubtasksAreBackPressured(final JobVertex 
jobVertex) throws Exception {
+               try {
+                       final Deadline timeout = 
Deadline.fromNow(Duration.ofMillis(TASKS_BECOMING_BACK_PRESSURED_TIMEOUT_MS));
+                       waitUntilCondition(
+                               isJobVertexBackPressured(jobVertex),
+                               timeout,
+                               BACK_PRESSURE_REQUEST_INTERVAL_MS);
+               } catch (final TimeoutException e) {
+                       final String errorMessage = String.format("Subtasks of 
job vertex %s were not back pressured within timeout", jobVertex);
+                       throw new AssertionError(errorMessage, e);
+               }
+       }
+
+       private SupplierWithException<Boolean, Exception> 
isJobVertexBackPressured(final JobVertex sourceJobVertex) {
+               return () -> {
+                       final OperatorBackPressureStatsResponse 
backPressureStatsResponse = dispatcherGateway
+                               .requestOperatorBackPressureStats(TEST_JOB_ID, 
sourceJobVertex.getID())
+                               .get();
+
+                       return 
backPressureStatsResponse.getOperatorBackPressureStats()
+                               .map(backPressureStats -> 
isBackPressured(backPressureStats))
+                               .orElse(false);
+               };
+       }
+
+       private static boolean isBackPressured(final OperatorBackPressureStats 
backPressureStats) {
+               for (int i = 0; i < backPressureStats.getNumberOfSubTasks(); 
i++) {
+                       final double subtaskBackPressureRatio = 
backPressureStats.getBackPressureRatio(i);
+                       if (subtaskBackPressureRatio != 
MAX_BACK_PRESSURE_RATIO) {
+                               return false;
+                       }
+               }
+               return true;
+       }
+
+       @After
+       public void tearDown() throws Exception {
+               testingMiniCluster.close();
+       }
+}
diff --git 
a/flink-tests/src/test/java/org/apache/flink/test/util/BlockingSink.java 
b/flink-tests/src/test/java/org/apache/flink/test/util/BlockingSink.java
new file mode 100644
index 0000000..c09ffd6
--- /dev/null
+++ b/flink-tests/src/test/java/org/apache/flink/test/util/BlockingSink.java
@@ -0,0 +1,35 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.flink.test.util;
+
+import org.apache.flink.streaming.api.functions.sink.SinkFunction;
+
+/**
+ * Sink that blocks until interrupted.
+ *
+ * @param <IN> Type of the input.
+ */
+public class BlockingSink<IN> implements SinkFunction<IN> {
+
+       @Override
+       public void invoke(final IN value, final Context context) throws 
Exception {
+               Thread.currentThread().join();
+       }
+}
diff --git 
a/flink-tests/src/test/java/org/apache/flink/test/util/IdentityMapFunction.java 
b/flink-tests/src/test/java/org/apache/flink/test/util/IdentityMapFunction.java
new file mode 100644
index 0000000..d4700fa
--- /dev/null
+++ 
b/flink-tests/src/test/java/org/apache/flink/test/util/IdentityMapFunction.java
@@ -0,0 +1,37 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.flink.test.util;
+
+import org.apache.flink.api.common.functions.MapFunction;
+
+/**
+ * A map function that always returns its input argument.
+ *
+ * @param <T> The type of the input and output
+ */
+public class IdentityMapFunction<T> implements MapFunction<T, T> {
+
+       private static final long serialVersionUID = 1L;
+
+       @Override
+       public T map(final T value) throws Exception {
+               return value;
+       }
+}

Reply via email to