This is an automated email from the ASF dual-hosted git repository. srichter pushed a commit to branch release-1.9 in repository https://gitbox.apache.org/repos/asf/flink.git
The following commit(s) were added to refs/heads/release-1.9 by this push: new 3eff638 [FLINK-13249][runtime] Fix handling of partition producer responses by running them with the task's executor 3eff638 is described below commit 3eff6387b5f6716dee5c17b71b10c08760b946cc Author: Stefan Richter <s.rich...@data-artisans.com> AuthorDate: Thu Jul 18 10:37:22 2019 +0200 [FLINK-13249][runtime] Fix handling of partition producer responses by running them with the task's executor Fixes the problem in FLINK-13249 by ensuring that processing the partition producer response is not blocking any netty thread, but is always executed by the task's executor. (cherry picked from commit 23bd23b325f15c726fcc48c76eb252fa2ed2fe59) --- .../partition/PartitionProducerStateProvider.java | 9 +++++---- .../io/network/partition/consumer/SingleInputGate.java | 6 +++--- .../java/org/apache/flink/runtime/taskmanager/Task.java | 16 ++++++++++------ .../partition/consumer/SingleInputGateBuilder.java | 6 +----- .../org/apache/flink/runtime/taskmanager/TaskTest.java | 8 ++++---- 5 files changed, 23 insertions(+), 22 deletions(-) diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/PartitionProducerStateProvider.java b/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/PartitionProducerStateProvider.java index 8bbdaa5..5785095 100644 --- a/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/PartitionProducerStateProvider.java +++ b/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/PartitionProducerStateProvider.java @@ -22,7 +22,7 @@ import org.apache.flink.runtime.execution.ExecutionState; import org.apache.flink.runtime.jobgraph.IntermediateDataSetID; import org.apache.flink.types.Either; -import java.util.concurrent.CompletableFuture; +import java.util.function.Consumer; /** * Request execution state of partition producer, the response accepts state check callbacks. @@ -34,11 +34,12 @@ public interface PartitionProducerStateProvider { * @param intermediateDataSetId ID of the parent intermediate data set. * @param resultPartitionId ID of the result partition to check. This * identifies the producing execution and partition. - * @return a future with response handle. + * @param responseConsumer consumer for the response handle. */ - CompletableFuture<? extends ResponseHandle> requestPartitionProducerState( + void requestPartitionProducerState( IntermediateDataSetID intermediateDataSetId, - ResultPartitionID resultPartitionId); + ResultPartitionID resultPartitionId, + Consumer<? super ResponseHandle> responseConsumer); /** * Result of state query, accepts state check callbacks. diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/consumer/SingleInputGate.java b/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/consumer/SingleInputGate.java index bd75262..534078d9 100644 --- a/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/consumer/SingleInputGate.java +++ b/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/consumer/SingleInputGate.java @@ -601,8 +601,8 @@ public class SingleInputGate extends InputGate { void triggerPartitionStateCheck(ResultPartitionID partitionId) { partitionProducerStateProvider.requestPartitionProducerState( consumedResultId, - partitionId) - .thenAccept(responseHandle -> { + partitionId, + ((PartitionProducerStateProvider.ResponseHandle responseHandle) -> { boolean isProducingState = new RemoteChannelStateChecker(partitionId, owningTaskName) .isProducerReadyOrAbortConsumption(responseHandle); if (isProducingState) { @@ -612,7 +612,7 @@ public class SingleInputGate extends InputGate { responseHandle.failConsumption(t); } } - }); + })); } private void queueChannel(InputChannel channel) { diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/taskmanager/Task.java b/flink-runtime/src/main/java/org/apache/flink/runtime/taskmanager/Task.java index d4e1d8a..12049f2 100644 --- a/flink-runtime/src/main/java/org/apache/flink/runtime/taskmanager/Task.java +++ b/flink-runtime/src/main/java/org/apache/flink/runtime/taskmanager/Task.java @@ -99,6 +99,7 @@ import java.util.concurrent.Future; import java.util.concurrent.RejectedExecutionException; import java.util.concurrent.atomic.AtomicBoolean; import java.util.concurrent.atomic.AtomicReferenceFieldUpdater; +import java.util.function.Consumer; import static org.apache.flink.util.Preconditions.checkArgument; import static org.apache.flink.util.Preconditions.checkNotNull; @@ -1080,18 +1081,21 @@ public class Task implements Runnable, TaskActions, PartitionProducerStateProvid // ------------------------------------------------------------------------ @Override - public CompletableFuture<PartitionProducerStateResponseHandle> requestPartitionProducerState( + public void requestPartitionProducerState( final IntermediateDataSetID intermediateDataSetId, - final ResultPartitionID resultPartitionId) { + final ResultPartitionID resultPartitionId, + Consumer<? super ResponseHandle> responseConsumer) { + final CompletableFuture<ExecutionState> futurePartitionState = partitionProducerStateChecker.requestPartitionProducerState( jobId, intermediateDataSetId, resultPartitionId); - final CompletableFuture<PartitionProducerStateResponseHandle> result = - futurePartitionState.handleAsync(PartitionProducerStateResponseHandle::new, executor); - FutureUtils.assertNoException(result); - return result; + + FutureUtils.assertNoException( + futurePartitionState + .handle(PartitionProducerStateResponseHandle::new) + .thenAcceptAsync(responseConsumer, executor)); } // ------------------------------------------------------------------------ diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/partition/consumer/SingleInputGateBuilder.java b/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/partition/consumer/SingleInputGateBuilder.java index 956bad9..944cc07 100644 --- a/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/partition/consumer/SingleInputGateBuilder.java +++ b/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/partition/consumer/SingleInputGateBuilder.java @@ -21,23 +21,19 @@ package org.apache.flink.runtime.io.network.partition.consumer; import org.apache.flink.runtime.io.network.NettyShuffleEnvironment; import org.apache.flink.runtime.io.network.buffer.BufferPool; import org.apache.flink.runtime.io.network.partition.PartitionProducerStateProvider; -import org.apache.flink.runtime.io.network.partition.PartitionProducerStateProvider.ResponseHandle; import org.apache.flink.runtime.io.network.partition.ResultPartitionType; import org.apache.flink.runtime.jobgraph.IntermediateDataSetID; import org.apache.flink.runtime.taskmanager.NettyShuffleEnvironmentConfiguration; import org.apache.flink.util.function.SupplierWithException; import java.io.IOException; -import java.util.concurrent.CompletableFuture; /** * Utility class to encapsulate the logic of building a {@link SingleInputGate} instance. */ public class SingleInputGateBuilder { - private static final CompletableFuture<ResponseHandle> NO_OP_PRODUCER_CHECKER_RESULT = new CompletableFuture<>(); - - public static final PartitionProducerStateProvider NO_OP_PRODUCER_CHECKER = (dsid, id) -> NO_OP_PRODUCER_CHECKER_RESULT; + public static final PartitionProducerStateProvider NO_OP_PRODUCER_CHECKER = (dsid, id, consumer) -> {}; private final IntermediateDataSetID intermediateDataSetID = new IntermediateDataSetID(); diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/taskmanager/TaskTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/taskmanager/TaskTest.java index ee78963..5879f52 100644 --- a/flink-runtime/src/test/java/org/apache/flink/runtime/taskmanager/TaskTest.java +++ b/flink-runtime/src/test/java/org/apache/flink/runtime/taskmanager/TaskTest.java @@ -656,7 +656,7 @@ public class TaskTest extends TestLogger { final CompletableFuture<ExecutionState> promise = new CompletableFuture<>(); when(partitionChecker.requestPartitionProducerState(eq(task.getJobID()), eq(resultId), eq(partitionId))).thenReturn(promise); - task.requestPartitionProducerState(resultId, partitionId).thenAccept(checkResult -> + task.requestPartitionProducerState(resultId, partitionId, checkResult -> assertThat(remoteChannelStateChecker.isProducerReadyOrAbortConsumption(checkResult), is(false)) ); @@ -680,7 +680,7 @@ public class TaskTest extends TestLogger { final CompletableFuture<ExecutionState> promise = new CompletableFuture<>(); when(partitionChecker.requestPartitionProducerState(eq(task.getJobID()), eq(resultId), eq(partitionId))).thenReturn(promise); - task.requestPartitionProducerState(resultId, partitionId).thenAccept(checkResult -> + task.requestPartitionProducerState(resultId, partitionId, checkResult -> assertThat(remoteChannelStateChecker.isProducerReadyOrAbortConsumption(checkResult), is(false)) ); @@ -711,7 +711,7 @@ public class TaskTest extends TestLogger { CompletableFuture<ExecutionState> promise = new CompletableFuture<>(); when(partitionChecker.requestPartitionProducerState(eq(task.getJobID()), eq(resultId), eq(partitionId))).thenReturn(promise); - task.requestPartitionProducerState(resultId, partitionId).thenAccept(checkResult -> { + task.requestPartitionProducerState(resultId, partitionId, checkResult -> { if (remoteChannelStateChecker.isProducerReadyOrAbortConsumption(checkResult)) { callCount.incrementAndGet(); } @@ -749,7 +749,7 @@ public class TaskTest extends TestLogger { CompletableFuture<ExecutionState> promise = new CompletableFuture<>(); when(partitionChecker.requestPartitionProducerState(eq(task.getJobID()), eq(resultId), eq(partitionId))).thenReturn(promise); - task.requestPartitionProducerState(resultId, partitionId).thenAccept(checkResult -> { + task.requestPartitionProducerState(resultId, partitionId, checkResult -> { if (remoteChannelStateChecker.isProducerReadyOrAbortConsumption(checkResult)) { callCount.incrementAndGet(); }