This is an automated email from the ASF dual-hosted git repository. trohrmann pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/flink.git
The following commit(s) were added to refs/heads/master by this push: new b62db93 [FLINK-12203] Refactor ResultPartitionManager to break tie with Task b62db93 is described below commit b62db93bf63cb3bb34dd03d611a779d9e3fc61ac Author: Andrey Zagrebin <azagre...@gmail.com> AuthorDate: Thu Apr 18 15:26:24 2019 +0200 [FLINK-12203] Refactor ResultPartitionManager to break tie with Task At the moment, we have ResultPartitionManager.releasePartitionsProducedBy which uses indexing by task in network environment. These methods are eventually used only by Task which already knows its partitions so Task can use ResultPartition.fail(cause) and TaskExecutor.failPartition could directly use NetworkEnviroment.releasePartitions(Collection<ResultPartitionID>). This also requires that JM Execution sends produced partition ids instead of just ExecutionAttemptID. This closes #8210. --- .../flink/runtime/executiongraph/Execution.java | 20 ++++++--- .../runtime/io/network/NetworkEnvironment.java | 13 ++++++ .../io/network/partition/ResultPartition.java | 2 +- .../network/partition/ResultPartitionManager.java | 50 ++++++---------------- .../jobmanager/slots/TaskManagerGateway.java | 8 ++-- .../runtime/jobmaster/RpcTaskManagerGateway.java | 6 ++- .../flink/runtime/taskexecutor/TaskExecutor.java | 7 ++- .../runtime/taskexecutor/TaskExecutorGateway.java | 8 ++-- .../utils/SimpleAckingTaskManagerGateway.java | 5 ++- .../taskexecutor/TestingTaskExecutorGateway.java | 4 +- 10 files changed, 65 insertions(+), 58 deletions(-) diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/Execution.java b/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/Execution.java index 63f3125..e413619 100644 --- a/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/Execution.java +++ b/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/Execution.java @@ -37,6 +37,7 @@ import org.apache.flink.runtime.concurrent.FutureUtils; import org.apache.flink.runtime.deployment.TaskDeploymentDescriptor; import org.apache.flink.runtime.execution.ExecutionState; import org.apache.flink.runtime.instance.SlotSharingGroupId; +import org.apache.flink.runtime.io.network.partition.ResultPartitionID; import org.apache.flink.runtime.jobmanager.scheduler.CoLocationConstraint; import org.apache.flink.runtime.jobmanager.scheduler.LocationPreferenceConstraint; import org.apache.flink.runtime.jobmanager.scheduler.NoResourceAvailableException; @@ -688,7 +689,7 @@ public class Execution implements AccessExecution, Archiveable<ArchivedExecution else if (current == FINISHED || current == FAILED) { // nothing to do any more. finished failed before it could be cancelled. // in any case, the task is removed from the TaskManager already - sendFailIntermediateResultPartitionsRpcCall(); + sendReleaseIntermediateResultPartitionsRpcCall(); return; } @@ -721,7 +722,7 @@ public class Execution implements AccessExecution, Archiveable<ArchivedExecution break; case FINISHED: case FAILED: - sendFailIntermediateResultPartitionsRpcCall(); + sendReleaseIntermediateResultPartitionsRpcCall(); break; case CANCELED: break; @@ -1202,14 +1203,23 @@ public class Execution implements AccessExecution, Archiveable<ArchivedExecution } } - private void sendFailIntermediateResultPartitionsRpcCall() { + private void sendReleaseIntermediateResultPartitionsRpcCall() { + LOG.info("Discarding the results produced by task execution {}.", attemptId); final LogicalSlot slot = assignedResource; if (slot != null) { final TaskManagerGateway taskManagerGateway = slot.getTaskManagerGateway(); - // TODO For some tests this could be a problem when querying too early if all resources were released - taskManagerGateway.failPartition(attemptId); + Collection<IntermediateResultPartition> partitions = vertex.getProducedPartitions().values(); + Collection<ResultPartitionID> partitionIds = new ArrayList<>(partitions.size()); + for (IntermediateResultPartition partition : partitions) { + partitionIds.add(new ResultPartitionID(partition.getPartitionId(), attemptId)); + } + + if (!partitionIds.isEmpty()) { + // TODO For some tests this could be a problem when querying too early if all resources were released + taskManagerGateway.releasePartitions(partitionIds); + } } } diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/NetworkEnvironment.java b/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/NetworkEnvironment.java index 98c61a4..0ee8595 100644 --- a/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/NetworkEnvironment.java +++ b/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/NetworkEnvironment.java @@ -26,6 +26,7 @@ import org.apache.flink.runtime.io.network.buffer.NetworkBufferPool; import org.apache.flink.runtime.io.network.netty.NettyConfig; import org.apache.flink.runtime.io.network.netty.NettyConnectionManager; import org.apache.flink.runtime.io.network.partition.ResultPartition; +import org.apache.flink.runtime.io.network.partition.ResultPartitionID; import org.apache.flink.runtime.io.network.partition.ResultPartitionManager; import org.apache.flink.runtime.io.network.partition.consumer.SingleInputGate; import org.apache.flink.runtime.taskexecutor.TaskExecutor; @@ -38,6 +39,7 @@ import org.slf4j.Logger; import org.slf4j.LoggerFactory; import java.io.IOException; +import java.util.Collection; import java.util.Optional; import static org.apache.flink.util.Preconditions.checkNotNull; @@ -207,6 +209,17 @@ public class NetworkEnvironment { } } + /** + * Batch release intermediate result partitions. + * + * @param partitionIds partition ids to release + */ + public void releasePartitions(Collection<ResultPartitionID> partitionIds) { + for (ResultPartitionID partitionId : partitionIds) { + resultPartitionManager.releasePartition(partitionId, null); + } + } + public void start() throws IOException { synchronized (lock) { Preconditions.checkState(!isShutdown, "The NetworkEnvironment has already been shut down."); diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/ResultPartition.java b/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/ResultPartition.java index e0e7829..fb73a70 100644 --- a/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/ResultPartition.java +++ b/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/ResultPartition.java @@ -334,7 +334,7 @@ public class ResultPartition implements ResultPartitionWriter, BufferPoolOwner { } public void fail(@Nullable Throwable throwable) { - partitionManager.releasePartitionsProducedBy(partitionId.getProducerId(), throwable); + partitionManager.releasePartition(partitionId, throwable); } /** diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/ResultPartitionManager.java b/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/ResultPartitionManager.java index 09a62ed..8d96e2b 100644 --- a/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/ResultPartitionManager.java +++ b/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/ResultPartitionManager.java @@ -18,17 +18,11 @@ package org.apache.flink.runtime.io.network.partition; -import org.apache.flink.runtime.executiongraph.ExecutionAttemptID; -import org.apache.flink.runtime.jobgraph.IntermediateResultPartitionID; - -import org.apache.flink.shaded.guava18.com.google.common.collect.HashBasedTable; -import org.apache.flink.shaded.guava18.com.google.common.collect.ImmutableList; -import org.apache.flink.shaded.guava18.com.google.common.collect.Table; - import org.slf4j.Logger; import org.slf4j.LoggerFactory; import java.io.IOException; +import java.util.HashMap; import java.util.Map; import static org.apache.flink.util.Preconditions.checkState; @@ -41,19 +35,15 @@ public class ResultPartitionManager implements ResultPartitionProvider { private static final Logger LOG = LoggerFactory.getLogger(ResultPartitionManager.class); - public final Table<ExecutionAttemptID, IntermediateResultPartitionID, ResultPartition> - registeredPartitions = HashBasedTable.create(); + private final Map<ResultPartitionID, ResultPartition> registeredPartitions = new HashMap<>(16); private boolean isShutdown; - public void registerResultPartition(ResultPartition partition) throws IOException { + public void registerResultPartition(ResultPartition partition) { synchronized (registeredPartitions) { checkState(!isShutdown, "Result partition manager already shut down."); - ResultPartitionID partitionId = partition.getPartitionId(); - - ResultPartition previous = registeredPartitions.put( - partitionId.getProducerId(), partitionId.getPartitionId(), partition); + ResultPartition previous = registeredPartitions.put(partition.getPartitionId(), partition); if (previous != null) { throw new IllegalStateException("Result partition already registered."); @@ -70,8 +60,7 @@ public class ResultPartitionManager implements ResultPartitionProvider { BufferAvailabilityListener availabilityListener) throws IOException { synchronized (registeredPartitions) { - final ResultPartition partition = registeredPartitions.get(partitionId.getProducerId(), - partitionId.getPartitionId()); + final ResultPartition partition = registeredPartitions.get(partitionId); if (partition == null) { throw new PartitionNotFoundException(partitionId); @@ -83,26 +72,14 @@ public class ResultPartitionManager implements ResultPartitionProvider { } } - public void releasePartitionsProducedBy(ExecutionAttemptID executionId) { - releasePartitionsProducedBy(executionId, null); - } - - public void releasePartitionsProducedBy(ExecutionAttemptID executionId, Throwable cause) { + public void releasePartition(ResultPartitionID partitionId, Throwable cause) { synchronized (registeredPartitions) { - final Map<IntermediateResultPartitionID, ResultPartition> partitions = - registeredPartitions.row(executionId); - - for (ResultPartition partition : partitions.values()) { - partition.release(cause); + ResultPartition resultPartition = registeredPartitions.remove(partitionId); + if (resultPartition != null) { + resultPartition.release(cause); + LOG.debug("Released partition {} produced by {}.", + partitionId.getPartitionId(), partitionId.getProducerId()); } - - for (IntermediateResultPartitionID partitionId : ImmutableList - .copyOf(partitions.keySet())) { - - registeredPartitions.remove(executionId, partitionId); - } - - LOG.debug("Released all partitions produced by {}.", executionId); } } @@ -134,10 +111,7 @@ public class ResultPartitionManager implements ResultPartitionProvider { LOG.debug("Received consume notification from {}.", partition); synchronized (registeredPartitions) { - ResultPartitionID partitionId = partition.getPartitionId(); - - previous = registeredPartitions.remove(partitionId.getProducerId(), - partitionId.getPartitionId()); + previous = registeredPartitions.remove(partition.getPartitionId()); } // Release the partition if it was successfully removed diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/jobmanager/slots/TaskManagerGateway.java b/flink-runtime/src/main/java/org/apache/flink/runtime/jobmanager/slots/TaskManagerGateway.java index 6922b05..593a853 100644 --- a/flink-runtime/src/main/java/org/apache/flink/runtime/jobmanager/slots/TaskManagerGateway.java +++ b/flink-runtime/src/main/java/org/apache/flink/runtime/jobmanager/slots/TaskManagerGateway.java @@ -25,10 +25,12 @@ import org.apache.flink.runtime.clusterframework.types.AllocationID; import org.apache.flink.runtime.deployment.TaskDeploymentDescriptor; import org.apache.flink.runtime.executiongraph.ExecutionAttemptID; import org.apache.flink.runtime.executiongraph.PartitionInfo; +import org.apache.flink.runtime.io.network.partition.ResultPartitionID; import org.apache.flink.runtime.messages.Acknowledge; import org.apache.flink.runtime.messages.StackTraceSampleResponse; import org.apache.flink.runtime.rpc.RpcTimeout; +import java.util.Collection; import java.util.concurrent.CompletableFuture; /** @@ -98,11 +100,11 @@ public interface TaskManagerGateway { Time timeout); /** - * Fail all intermediate result partitions of the given task. + * Batch release intermediate result partitions. * - * @param executionAttemptID identifying the task + * @param partitionIds partition ids to release */ - void failPartition(ExecutionAttemptID executionAttemptID); + void releasePartitions(Collection<ResultPartitionID> partitionIds); /** * Notify the given task about a completed checkpoint. diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/jobmaster/RpcTaskManagerGateway.java b/flink-runtime/src/main/java/org/apache/flink/runtime/jobmaster/RpcTaskManagerGateway.java index 064eef5..1fb2d49 100644 --- a/flink-runtime/src/main/java/org/apache/flink/runtime/jobmaster/RpcTaskManagerGateway.java +++ b/flink-runtime/src/main/java/org/apache/flink/runtime/jobmaster/RpcTaskManagerGateway.java @@ -25,12 +25,14 @@ import org.apache.flink.runtime.clusterframework.types.AllocationID; import org.apache.flink.runtime.deployment.TaskDeploymentDescriptor; import org.apache.flink.runtime.executiongraph.ExecutionAttemptID; import org.apache.flink.runtime.executiongraph.PartitionInfo; +import org.apache.flink.runtime.io.network.partition.ResultPartitionID; import org.apache.flink.runtime.jobmanager.slots.TaskManagerGateway; import org.apache.flink.runtime.messages.Acknowledge; import org.apache.flink.runtime.messages.StackTraceSampleResponse; import org.apache.flink.runtime.taskexecutor.TaskExecutorGateway; import org.apache.flink.util.Preconditions; +import java.util.Collection; import java.util.concurrent.CompletableFuture; /** @@ -86,8 +88,8 @@ public class RpcTaskManagerGateway implements TaskManagerGateway { } @Override - public void failPartition(ExecutionAttemptID executionAttemptID) { - taskExecutorGateway.failPartition(executionAttemptID); + public void releasePartitions(Collection<ResultPartitionID> partitionIds) { + taskExecutorGateway.releasePartitions(partitionIds); } @Override diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/taskexecutor/TaskExecutor.java b/flink-runtime/src/main/java/org/apache/flink/runtime/taskexecutor/TaskExecutor.java index 9b9ad5b..b35d65e 100644 --- a/flink-runtime/src/main/java/org/apache/flink/runtime/taskexecutor/TaskExecutor.java +++ b/flink-runtime/src/main/java/org/apache/flink/runtime/taskexecutor/TaskExecutor.java @@ -51,6 +51,7 @@ import org.apache.flink.runtime.instance.InstanceID; import org.apache.flink.runtime.io.network.NetworkEnvironment; import org.apache.flink.runtime.io.network.netty.PartitionProducerStateChecker; import org.apache.flink.runtime.io.network.partition.ResultPartitionConsumableNotifier; +import org.apache.flink.runtime.io.network.partition.ResultPartitionID; import org.apache.flink.runtime.io.network.partition.consumer.SingleInputGate; import org.apache.flink.runtime.jobgraph.IntermediateDataSetID; import org.apache.flink.runtime.jobgraph.tasks.InputSplitProvider; @@ -649,11 +650,9 @@ public class TaskExecutor extends RpcEndpoint implements TaskExecutorGateway { } @Override - public void failPartition(ExecutionAttemptID executionAttemptID) { - log.info("Discarding the results produced by task execution {}.", executionAttemptID); - + public void releasePartitions(Collection<ResultPartitionID> partitionIds) { try { - networkEnvironment.getResultPartitionManager().releasePartitionsProducedBy(executionAttemptID); + networkEnvironment.releasePartitions(partitionIds); } catch (Throwable t) { // TODO: Do we still need this catch branch? onFatalError(t); diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/taskexecutor/TaskExecutorGateway.java b/flink-runtime/src/main/java/org/apache/flink/runtime/taskexecutor/TaskExecutorGateway.java index 728087a..8a653df 100644 --- a/flink-runtime/src/main/java/org/apache/flink/runtime/taskexecutor/TaskExecutorGateway.java +++ b/flink-runtime/src/main/java/org/apache/flink/runtime/taskexecutor/TaskExecutorGateway.java @@ -29,6 +29,7 @@ import org.apache.flink.runtime.clusterframework.types.SlotID; import org.apache.flink.runtime.deployment.TaskDeploymentDescriptor; import org.apache.flink.runtime.executiongraph.ExecutionAttemptID; import org.apache.flink.runtime.executiongraph.PartitionInfo; +import org.apache.flink.runtime.io.network.partition.ResultPartitionID; import org.apache.flink.runtime.jobmaster.JobMasterId; import org.apache.flink.runtime.messages.Acknowledge; import org.apache.flink.runtime.messages.StackTraceSampleResponse; @@ -38,6 +39,7 @@ import org.apache.flink.runtime.rpc.RpcTimeout; import org.apache.flink.runtime.taskmanager.Task; import org.apache.flink.types.SerializableOptional; +import java.util.Collection; import java.util.concurrent.CompletableFuture; /** @@ -99,11 +101,11 @@ public interface TaskExecutorGateway extends RpcGateway { @RpcTimeout Time timeout); /** - * Fail all intermediate result partitions of the given task. + * Batch release intermediate result partitions. * - * @param executionAttemptID identifying the task + * @param partitionIds partition ids to release */ - void failPartition(ExecutionAttemptID executionAttemptID); + void releasePartitions(Collection<ResultPartitionID> partitionIds); /** * Trigger the checkpoint for the given task. The checkpoint is identified by the checkpoint ID diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/executiongraph/utils/SimpleAckingTaskManagerGateway.java b/flink-runtime/src/test/java/org/apache/flink/runtime/executiongraph/utils/SimpleAckingTaskManagerGateway.java index 22d5df0..dba0e7d 100644 --- a/flink-runtime/src/test/java/org/apache/flink/runtime/executiongraph/utils/SimpleAckingTaskManagerGateway.java +++ b/flink-runtime/src/test/java/org/apache/flink/runtime/executiongraph/utils/SimpleAckingTaskManagerGateway.java @@ -26,10 +26,12 @@ import org.apache.flink.runtime.concurrent.FutureUtils; import org.apache.flink.runtime.deployment.TaskDeploymentDescriptor; import org.apache.flink.runtime.executiongraph.ExecutionAttemptID; import org.apache.flink.runtime.executiongraph.PartitionInfo; +import org.apache.flink.runtime.io.network.partition.ResultPartitionID; import org.apache.flink.runtime.jobmanager.slots.TaskManagerGateway; import org.apache.flink.runtime.messages.Acknowledge; import org.apache.flink.runtime.messages.StackTraceSampleResponse; +import java.util.Collection; import java.util.UUID; import java.util.concurrent.CompletableFuture; import java.util.function.BiFunction; @@ -95,7 +97,8 @@ public class SimpleAckingTaskManagerGateway implements TaskManagerGateway { } @Override - public void failPartition(ExecutionAttemptID executionAttemptID) {} + public void releasePartitions(Collection<ResultPartitionID> partitionIds) { + } @Override public void notifyCheckpointComplete( diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/taskexecutor/TestingTaskExecutorGateway.java b/flink-runtime/src/test/java/org/apache/flink/runtime/taskexecutor/TestingTaskExecutorGateway.java index 789956f..aca0bb2 100644 --- a/flink-runtime/src/test/java/org/apache/flink/runtime/taskexecutor/TestingTaskExecutorGateway.java +++ b/flink-runtime/src/test/java/org/apache/flink/runtime/taskexecutor/TestingTaskExecutorGateway.java @@ -30,6 +30,7 @@ import org.apache.flink.runtime.concurrent.FutureUtils; import org.apache.flink.runtime.deployment.TaskDeploymentDescriptor; import org.apache.flink.runtime.executiongraph.ExecutionAttemptID; import org.apache.flink.runtime.executiongraph.PartitionInfo; +import org.apache.flink.runtime.io.network.partition.ResultPartitionID; import org.apache.flink.runtime.jobmaster.JobMasterId; import org.apache.flink.runtime.messages.Acknowledge; import org.apache.flink.runtime.messages.StackTraceSampleResponse; @@ -37,6 +38,7 @@ import org.apache.flink.runtime.resourcemanager.ResourceManagerId; import org.apache.flink.types.SerializableOptional; import org.apache.flink.util.Preconditions; +import java.util.Collection; import java.util.concurrent.CompletableFuture; import java.util.function.BiConsumer; import java.util.function.BiFunction; @@ -123,7 +125,7 @@ public class TestingTaskExecutorGateway implements TaskExecutorGateway { } @Override - public void failPartition(ExecutionAttemptID executionAttemptID) { + public void releasePartitions(Collection<ResultPartitionID> partitionIds) { // noop }