This is an automated email from the ASF dual-hosted git repository. pnowojski pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/flink.git
commit e546009b7360c341d74b53c5d805e84f6276a897 Author: Andrey Zagrebin <azagre...@gmail.com> AuthorDate: Fri May 10 17:33:00 2019 +0200 [hotfix][network] Introduce ResultPartitionFactory --- .../runtime/io/network/NetworkEnvironment.java | 25 ++-- .../io/network/partition/ResultPartition.java | 64 +------- .../network/partition/ResultPartitionFactory.java | 162 +++++++++++++++++++++ .../network/partition/ResultPartitionBuilder.java | 9 +- 4 files changed, 176 insertions(+), 84 deletions(-) diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/NetworkEnvironment.java b/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/NetworkEnvironment.java index ea482f1..7974e83 100644 --- a/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/NetworkEnvironment.java +++ b/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/NetworkEnvironment.java @@ -40,6 +40,7 @@ import org.apache.flink.runtime.io.network.netty.NettyConfig; import org.apache.flink.runtime.io.network.netty.NettyConnectionManager; import org.apache.flink.runtime.io.network.partition.ResultPartition; import org.apache.flink.runtime.io.network.partition.ResultPartitionConsumableNotifier; +import org.apache.flink.runtime.io.network.partition.ResultPartitionFactory; import org.apache.flink.runtime.io.network.partition.ResultPartitionID; import org.apache.flink.runtime.io.network.partition.ResultPartitionManager; import org.apache.flink.runtime.io.network.partition.consumer.SingleInputGate; @@ -88,7 +89,7 @@ public class NetworkEnvironment { private final TaskEventPublisher taskEventPublisher; - private final IOManager ioManager; + private final ResultPartitionFactory resultPartitionFactory; private boolean isShutdown; @@ -98,14 +99,14 @@ public class NetworkEnvironment { ConnectionManager connectionManager, ResultPartitionManager resultPartitionManager, TaskEventPublisher taskEventPublisher, - IOManager ioManager) { + ResultPartitionFactory resultPartitionFactory) { this.config = config; this.networkBufferPool = networkBufferPool; this.connectionManager = connectionManager; this.resultPartitionManager = resultPartitionManager; this.taskEventPublisher = taskEventPublisher; - this.ioManager = ioManager; this.isShutdown = false; + this.resultPartitionFactory = resultPartitionFactory; } public static NetworkEnvironment create( @@ -128,6 +129,8 @@ public class NetworkEnvironment { registerNetworkMetrics(metricGroup, networkBufferPool); ResultPartitionManager resultPartitionManager = new ResultPartitionManager(); + ResultPartitionFactory resultPartitionFactory = + new ResultPartitionFactory(resultPartitionManager, ioManager); return new NetworkEnvironment( config, @@ -135,7 +138,7 @@ public class NetworkEnvironment { connectionManager, resultPartitionManager, taskEventPublisher, - ioManager); + resultPartitionFactory); } private static void registerNetworkMetrics(MetricGroup metricGroup, NetworkBufferPool networkBufferPool) { @@ -283,18 +286,8 @@ public class NetworkEnvironment { ResultPartition[] resultPartitions = new ResultPartition[resultPartitionDeploymentDescriptors.size()]; int counter = 0; for (ResultPartitionDeploymentDescriptor rpdd : resultPartitionDeploymentDescriptors) { - resultPartitions[counter++] = new ResultPartition( - taskName, - taskActions, - jobId, - new ResultPartitionID(rpdd.getPartitionId(), executionId), - rpdd.getPartitionType(), - rpdd.getNumberOfSubpartitions(), - rpdd.getMaxParallelism(), - resultPartitionManager, - partitionConsumableNotifier, - ioManager, - rpdd.sendScheduleOrUpdateConsumersMessage()); + resultPartitions[counter++] = resultPartitionFactory.create( + taskName, taskActions, jobId, executionId, rpdd, partitionConsumableNotifier); } registerOutputMetrics(outputGroup, buffersGroup, resultPartitions); diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/ResultPartition.java b/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/ResultPartition.java index 1ff1ec5..30d0dd6 100644 --- a/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/ResultPartition.java +++ b/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/ResultPartition.java @@ -20,7 +20,6 @@ package org.apache.flink.runtime.io.network.partition; import org.apache.flink.api.common.JobID; import org.apache.flink.runtime.executiongraph.IntermediateResultPartition; -import org.apache.flink.runtime.io.disk.iomanager.IOManager; import org.apache.flink.runtime.io.network.api.writer.ResultPartitionWriter; import org.apache.flink.runtime.io.network.buffer.Buffer; import org.apache.flink.runtime.io.network.buffer.BufferConsumer; @@ -32,8 +31,6 @@ import org.apache.flink.runtime.io.network.partition.consumer.RemoteInputChannel import org.apache.flink.runtime.jobgraph.DistributionPattern; import org.apache.flink.runtime.taskexecutor.TaskExecutor; import org.apache.flink.runtime.taskmanager.TaskActions; -import org.apache.flink.util.ExceptionUtils; -import org.apache.flink.util.FlinkRuntimeException; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -125,17 +122,16 @@ public class ResultPartition implements ResultPartitionWriter, BufferPoolOwner { private volatile Throwable cause; - public ResultPartition( + ResultPartition( String owningTaskName, TaskActions taskActions, // actions on the owning task JobID jobId, ResultPartitionID partitionId, ResultPartitionType partitionType, - int numberOfSubpartitions, + ResultSubpartition[] subpartitions, int numTargetKeyGroups, ResultPartitionManager partitionManager, ResultPartitionConsumableNotifier partitionConsumableNotifier, - IOManager ioManager, boolean sendScheduleOrUpdateConsumersMessage) { this.owningTaskName = checkNotNull(owningTaskName); @@ -143,34 +139,11 @@ public class ResultPartition implements ResultPartitionWriter, BufferPoolOwner { this.jobId = checkNotNull(jobId); this.partitionId = checkNotNull(partitionId); this.partitionType = checkNotNull(partitionType); - this.subpartitions = new ResultSubpartition[numberOfSubpartitions]; + this.subpartitions = checkNotNull(subpartitions); this.numTargetKeyGroups = numTargetKeyGroups; this.partitionManager = checkNotNull(partitionManager); this.partitionConsumableNotifier = checkNotNull(partitionConsumableNotifier); this.sendScheduleOrUpdateConsumersMessage = sendScheduleOrUpdateConsumersMessage; - - // Create the subpartitions. - switch (partitionType) { - case BLOCKING: - initializeBoundedBlockingPartitions(subpartitions, this, ioManager); - break; - - case PIPELINED: - case PIPELINED_BOUNDED: - for (int i = 0; i < subpartitions.length; i++) { - subpartitions[i] = new PipelinedSubpartition(i, this); - } - - break; - - default: - throw new IllegalArgumentException("Unsupported result partition type."); - } - - // Initially, partitions should be consumed once before release. - pin(); - - LOG.debug("{}: Initialized {}", owningTaskName, this); } /** @@ -465,35 +438,4 @@ public class ResultPartition implements ResultPartitionWriter, BufferPoolOwner { hasNotifiedPipelinedConsumers = true; } } - - private static void initializeBoundedBlockingPartitions( - ResultSubpartition[] subpartitions, - ResultPartition parent, - IOManager ioManager) { - - int i = 0; - try { - for (; i < subpartitions.length; i++) { - subpartitions[i] = new BoundedBlockingSubpartition( - i, parent, ioManager.createChannel().getPathFile().toPath()); - } - } - catch (IOException e) { - // undo all the work so that a failed constructor does not leave any resources - // in need of disposal - releasePartitionsQuietly(subpartitions, i); - - // this is not good, we should not be forced to wrap this in a runtime exception. - // the fact that the ResultPartition and Task constructor (which calls this) do not tolerate any exceptions - // is incompatible with eager initialization of resources (RAII). - throw new FlinkRuntimeException(e); - } - } - - private static void releasePartitionsQuietly(ResultSubpartition[] partitions, int until) { - for (int i = 0; i < until; i++) { - final ResultSubpartition subpartition = partitions[i]; - ExceptionUtils.suppressExceptions(subpartition::release); - } - } } diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/ResultPartitionFactory.java b/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/ResultPartitionFactory.java new file mode 100644 index 0000000..3b9a61a --- /dev/null +++ b/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/ResultPartitionFactory.java @@ -0,0 +1,162 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.runtime.io.network.partition; + +import org.apache.flink.annotation.VisibleForTesting; +import org.apache.flink.api.common.JobID; +import org.apache.flink.runtime.deployment.ResultPartitionDeploymentDescriptor; +import org.apache.flink.runtime.executiongraph.ExecutionAttemptID; +import org.apache.flink.runtime.io.disk.iomanager.IOManager; +import org.apache.flink.runtime.taskmanager.TaskActions; +import org.apache.flink.util.ExceptionUtils; +import org.apache.flink.util.FlinkRuntimeException; + +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import javax.annotation.Nonnull; + +import java.io.IOException; + +/** + * Factory for {@link ResultPartition} to use in {@link org.apache.flink.runtime.io.network.NetworkEnvironment}. + */ +public class ResultPartitionFactory { + private static final Logger LOG = LoggerFactory.getLogger(ResultPartition.class); + + @Nonnull + private final ResultPartitionManager partitionManager; + + @Nonnull + private final IOManager ioManager; + + public ResultPartitionFactory(@Nonnull ResultPartitionManager partitionManager, @Nonnull IOManager ioManager) { + this.partitionManager = partitionManager; + this.ioManager = ioManager; + } + + @VisibleForTesting + public ResultPartition create( + @Nonnull String taskNameWithSubtaskAndId, + @Nonnull TaskActions taskActions, + @Nonnull JobID jobId, + @Nonnull ExecutionAttemptID executionAttemptID, + @Nonnull ResultPartitionDeploymentDescriptor desc, + @Nonnull ResultPartitionConsumableNotifier partitionConsumableNotifier) { + + return create( + taskNameWithSubtaskAndId, + taskActions, + jobId, + new ResultPartitionID(desc.getPartitionId(), executionAttemptID), + desc.getPartitionType(), + desc.getNumberOfSubpartitions(), + desc.getMaxParallelism(), + partitionConsumableNotifier, + desc.sendScheduleOrUpdateConsumersMessage()); + } + + public ResultPartition create( + @Nonnull String taskNameWithSubtaskAndId, + @Nonnull TaskActions taskActions, + @Nonnull JobID jobId, + @Nonnull ResultPartitionID id, + @Nonnull ResultPartitionType type, + int numberOfSubpartitions, + int maxParallelism, + @Nonnull ResultPartitionConsumableNotifier partitionConsumableNotifier, + boolean sendScheduleOrUpdateConsumersMessage) { + + ResultSubpartition[] subpartitions = new ResultSubpartition[numberOfSubpartitions]; + + ResultPartition partition = new ResultPartition( + taskNameWithSubtaskAndId, + taskActions, + jobId, + id, + type, + subpartitions, + maxParallelism, + partitionManager, + partitionConsumableNotifier, + sendScheduleOrUpdateConsumersMessage); + + createSubpartitions(partition, type, subpartitions); + + // Initially, partitions should be consumed once before release. + partition.pin(); + + LOG.debug("{}: Initialized {}", taskNameWithSubtaskAndId, this); + + return partition; + } + + private void createSubpartitions( + ResultPartition partition, ResultPartitionType type, ResultSubpartition[] subpartitions) { + + // Create the subpartitions. + switch (type) { + case BLOCKING: + initializeBoundedBlockingPartitions(subpartitions, partition, ioManager); + break; + + case PIPELINED: + case PIPELINED_BOUNDED: + for (int i = 0; i < subpartitions.length; i++) { + subpartitions[i] = new PipelinedSubpartition(i, partition); + } + + break; + + default: + throw new IllegalArgumentException("Unsupported result partition type."); + } + } + + private static void initializeBoundedBlockingPartitions( + ResultSubpartition[] subpartitions, + ResultPartition parent, + IOManager ioManager) { + + int i = 0; + try { + for (; i < subpartitions.length; i++) { + subpartitions[i] = new BoundedBlockingSubpartition( + i, parent, ioManager.createChannel().getPathFile().toPath()); + } + } + catch (IOException e) { + // undo all the work so that a failed constructor does not leave any resources + // in need of disposal + releasePartitionsQuietly(subpartitions, i); + + // this is not good, we should not be forced to wrap this in a runtime exception. + // the fact that the ResultPartition and Task constructor (which calls this) do not tolerate any exceptions + // is incompatible with eager initialization of resources (RAII). + throw new FlinkRuntimeException(e); + } + } + + private static void releasePartitionsQuietly(ResultSubpartition[] partitions, int until) { + for (int i = 0; i < until; i++) { + final ResultSubpartition subpartition = partitions[i]; + ExceptionUtils.suppressExceptions(subpartition::release); + } + } +} diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/partition/ResultPartitionBuilder.java b/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/partition/ResultPartitionBuilder.java index 3d7dab0..7370d6a 100644 --- a/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/partition/ResultPartitionBuilder.java +++ b/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/partition/ResultPartitionBuilder.java @@ -28,9 +28,6 @@ import org.apache.flink.runtime.taskmanager.TaskActions; * Utility class to encapsulate the logic of building a {@link ResultPartition} instance. */ public class ResultPartitionBuilder { - - private static final String taskName = "Result Partition"; - private JobID jobId = new JobID(); private final TaskActions taskActions = new NoOpTaskActions(); @@ -97,17 +94,15 @@ public class ResultPartitionBuilder { } public ResultPartition build() { - return new ResultPartition( - taskName, + return new ResultPartitionFactory(partitionManager, ioManager).create( + "Result Partition task", taskActions, jobId, partitionId, partitionType, numberOfSubpartitions, numTargetKeyGroups, - partitionManager, partitionConsumableNotifier, - ioManager, sendScheduleOrUpdateConsumersMessage); } }