This is an automated email from the ASF dual-hosted git repository.

pnowojski pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/flink.git

commit e546009b7360c341d74b53c5d805e84f6276a897
Author: Andrey Zagrebin <azagre...@gmail.com>
AuthorDate: Fri May 10 17:33:00 2019 +0200

    [hotfix][network] Introduce ResultPartitionFactory
---
 .../runtime/io/network/NetworkEnvironment.java     |  25 ++--
 .../io/network/partition/ResultPartition.java      |  64 +-------
 .../network/partition/ResultPartitionFactory.java  | 162 +++++++++++++++++++++
 .../network/partition/ResultPartitionBuilder.java  |   9 +-
 4 files changed, 176 insertions(+), 84 deletions(-)

diff --git 
a/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/NetworkEnvironment.java
 
b/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/NetworkEnvironment.java
index ea482f1..7974e83 100644
--- 
a/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/NetworkEnvironment.java
+++ 
b/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/NetworkEnvironment.java
@@ -40,6 +40,7 @@ import org.apache.flink.runtime.io.network.netty.NettyConfig;
 import org.apache.flink.runtime.io.network.netty.NettyConnectionManager;
 import org.apache.flink.runtime.io.network.partition.ResultPartition;
 import 
org.apache.flink.runtime.io.network.partition.ResultPartitionConsumableNotifier;
+import org.apache.flink.runtime.io.network.partition.ResultPartitionFactory;
 import org.apache.flink.runtime.io.network.partition.ResultPartitionID;
 import org.apache.flink.runtime.io.network.partition.ResultPartitionManager;
 import org.apache.flink.runtime.io.network.partition.consumer.SingleInputGate;
@@ -88,7 +89,7 @@ public class NetworkEnvironment {
 
        private final TaskEventPublisher taskEventPublisher;
 
-       private final IOManager ioManager;
+       private final ResultPartitionFactory resultPartitionFactory;
 
        private boolean isShutdown;
 
@@ -98,14 +99,14 @@ public class NetworkEnvironment {
                        ConnectionManager connectionManager,
                        ResultPartitionManager resultPartitionManager,
                        TaskEventPublisher taskEventPublisher,
-                       IOManager ioManager) {
+                       ResultPartitionFactory resultPartitionFactory) {
                this.config = config;
                this.networkBufferPool = networkBufferPool;
                this.connectionManager = connectionManager;
                this.resultPartitionManager = resultPartitionManager;
                this.taskEventPublisher = taskEventPublisher;
-               this.ioManager = ioManager;
                this.isShutdown = false;
+               this.resultPartitionFactory = resultPartitionFactory;
        }
 
        public static NetworkEnvironment create(
@@ -128,6 +129,8 @@ public class NetworkEnvironment {
                registerNetworkMetrics(metricGroup, networkBufferPool);
 
                ResultPartitionManager resultPartitionManager = new 
ResultPartitionManager();
+               ResultPartitionFactory resultPartitionFactory =
+                       new ResultPartitionFactory(resultPartitionManager, 
ioManager);
 
                return new NetworkEnvironment(
                        config,
@@ -135,7 +138,7 @@ public class NetworkEnvironment {
                        connectionManager,
                        resultPartitionManager,
                        taskEventPublisher,
-                       ioManager);
+                       resultPartitionFactory);
        }
 
        private static void registerNetworkMetrics(MetricGroup metricGroup, 
NetworkBufferPool networkBufferPool) {
@@ -283,18 +286,8 @@ public class NetworkEnvironment {
                        ResultPartition[] resultPartitions = new 
ResultPartition[resultPartitionDeploymentDescriptors.size()];
                        int counter = 0;
                        for (ResultPartitionDeploymentDescriptor rpdd : 
resultPartitionDeploymentDescriptors) {
-                               resultPartitions[counter++] = new 
ResultPartition(
-                                       taskName,
-                                       taskActions,
-                                       jobId,
-                                       new 
ResultPartitionID(rpdd.getPartitionId(), executionId),
-                                       rpdd.getPartitionType(),
-                                       rpdd.getNumberOfSubpartitions(),
-                                       rpdd.getMaxParallelism(),
-                                       resultPartitionManager,
-                                       partitionConsumableNotifier,
-                                       ioManager,
-                                       
rpdd.sendScheduleOrUpdateConsumersMessage());
+                               resultPartitions[counter++] = 
resultPartitionFactory.create(
+                                       taskName, taskActions, jobId, 
executionId, rpdd, partitionConsumableNotifier);
                        }
 
                        registerOutputMetrics(outputGroup, buffersGroup, 
resultPartitions);
diff --git 
a/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/ResultPartition.java
 
b/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/ResultPartition.java
index 1ff1ec5..30d0dd6 100644
--- 
a/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/ResultPartition.java
+++ 
b/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/ResultPartition.java
@@ -20,7 +20,6 @@ package org.apache.flink.runtime.io.network.partition;
 
 import org.apache.flink.api.common.JobID;
 import org.apache.flink.runtime.executiongraph.IntermediateResultPartition;
-import org.apache.flink.runtime.io.disk.iomanager.IOManager;
 import org.apache.flink.runtime.io.network.api.writer.ResultPartitionWriter;
 import org.apache.flink.runtime.io.network.buffer.Buffer;
 import org.apache.flink.runtime.io.network.buffer.BufferConsumer;
@@ -32,8 +31,6 @@ import 
org.apache.flink.runtime.io.network.partition.consumer.RemoteInputChannel
 import org.apache.flink.runtime.jobgraph.DistributionPattern;
 import org.apache.flink.runtime.taskexecutor.TaskExecutor;
 import org.apache.flink.runtime.taskmanager.TaskActions;
-import org.apache.flink.util.ExceptionUtils;
-import org.apache.flink.util.FlinkRuntimeException;
 
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
@@ -125,17 +122,16 @@ public class ResultPartition implements 
ResultPartitionWriter, BufferPoolOwner {
 
        private volatile Throwable cause;
 
-       public ResultPartition(
+       ResultPartition(
                String owningTaskName,
                TaskActions taskActions, // actions on the owning task
                JobID jobId,
                ResultPartitionID partitionId,
                ResultPartitionType partitionType,
-               int numberOfSubpartitions,
+               ResultSubpartition[] subpartitions,
                int numTargetKeyGroups,
                ResultPartitionManager partitionManager,
                ResultPartitionConsumableNotifier partitionConsumableNotifier,
-               IOManager ioManager,
                boolean sendScheduleOrUpdateConsumersMessage) {
 
                this.owningTaskName = checkNotNull(owningTaskName);
@@ -143,34 +139,11 @@ public class ResultPartition implements 
ResultPartitionWriter, BufferPoolOwner {
                this.jobId = checkNotNull(jobId);
                this.partitionId = checkNotNull(partitionId);
                this.partitionType = checkNotNull(partitionType);
-               this.subpartitions = new 
ResultSubpartition[numberOfSubpartitions];
+               this.subpartitions = checkNotNull(subpartitions);
                this.numTargetKeyGroups = numTargetKeyGroups;
                this.partitionManager = checkNotNull(partitionManager);
                this.partitionConsumableNotifier = 
checkNotNull(partitionConsumableNotifier);
                this.sendScheduleOrUpdateConsumersMessage = 
sendScheduleOrUpdateConsumersMessage;
-
-               // Create the subpartitions.
-               switch (partitionType) {
-                       case BLOCKING:
-                               
initializeBoundedBlockingPartitions(subpartitions, this, ioManager);
-                               break;
-
-                       case PIPELINED:
-                       case PIPELINED_BOUNDED:
-                               for (int i = 0; i < subpartitions.length; i++) {
-                                       subpartitions[i] = new 
PipelinedSubpartition(i, this);
-                               }
-
-                               break;
-
-                       default:
-                               throw new IllegalArgumentException("Unsupported 
result partition type.");
-               }
-
-               // Initially, partitions should be consumed once before release.
-               pin();
-
-               LOG.debug("{}: Initialized {}", owningTaskName, this);
        }
 
        /**
@@ -465,35 +438,4 @@ public class ResultPartition implements 
ResultPartitionWriter, BufferPoolOwner {
                        hasNotifiedPipelinedConsumers = true;
                }
        }
-
-       private static void initializeBoundedBlockingPartitions(
-                       ResultSubpartition[] subpartitions,
-                       ResultPartition parent,
-                       IOManager ioManager) {
-
-               int i = 0;
-               try {
-                       for (; i < subpartitions.length; i++) {
-                               subpartitions[i] = new 
BoundedBlockingSubpartition(
-                                               i, parent, 
ioManager.createChannel().getPathFile().toPath());
-                       }
-               }
-               catch (IOException e) {
-                       // undo all the work so that a failed constructor does 
not leave any resources
-                       // in need of disposal
-                       releasePartitionsQuietly(subpartitions, i);
-
-                       // this is not good, we should not be forced to wrap 
this in a runtime exception.
-                       // the fact that the ResultPartition and Task 
constructor (which calls this) do not tolerate any exceptions
-                       // is incompatible with eager initialization of 
resources (RAII).
-                       throw new FlinkRuntimeException(e);
-               }
-       }
-
-       private static void releasePartitionsQuietly(ResultSubpartition[] 
partitions, int until) {
-               for (int i = 0; i < until; i++) {
-                       final ResultSubpartition subpartition = partitions[i];
-                       
ExceptionUtils.suppressExceptions(subpartition::release);
-               }
-       }
 }
diff --git 
a/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/ResultPartitionFactory.java
 
b/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/ResultPartitionFactory.java
new file mode 100644
index 0000000..3b9a61a
--- /dev/null
+++ 
b/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/ResultPartitionFactory.java
@@ -0,0 +1,162 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.io.network.partition;
+
+import org.apache.flink.annotation.VisibleForTesting;
+import org.apache.flink.api.common.JobID;
+import org.apache.flink.runtime.deployment.ResultPartitionDeploymentDescriptor;
+import org.apache.flink.runtime.executiongraph.ExecutionAttemptID;
+import org.apache.flink.runtime.io.disk.iomanager.IOManager;
+import org.apache.flink.runtime.taskmanager.TaskActions;
+import org.apache.flink.util.ExceptionUtils;
+import org.apache.flink.util.FlinkRuntimeException;
+
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import javax.annotation.Nonnull;
+
+import java.io.IOException;
+
+/**
+ * Factory for {@link ResultPartition} to use in {@link 
org.apache.flink.runtime.io.network.NetworkEnvironment}.
+ */
+public class ResultPartitionFactory {
+       private static final Logger LOG = 
LoggerFactory.getLogger(ResultPartition.class);
+
+       @Nonnull
+       private final ResultPartitionManager partitionManager;
+
+       @Nonnull
+       private final IOManager ioManager;
+
+       public ResultPartitionFactory(@Nonnull ResultPartitionManager 
partitionManager,  @Nonnull IOManager ioManager) {
+               this.partitionManager = partitionManager;
+               this.ioManager = ioManager;
+       }
+
+       @VisibleForTesting
+       public ResultPartition create(
+               @Nonnull String taskNameWithSubtaskAndId,
+               @Nonnull TaskActions taskActions,
+               @Nonnull JobID jobId,
+               @Nonnull ExecutionAttemptID executionAttemptID,
+               @Nonnull ResultPartitionDeploymentDescriptor desc,
+               @Nonnull ResultPartitionConsumableNotifier 
partitionConsumableNotifier) {
+
+               return create(
+                       taskNameWithSubtaskAndId,
+                       taskActions,
+                       jobId,
+                       new ResultPartitionID(desc.getPartitionId(), 
executionAttemptID),
+                       desc.getPartitionType(),
+                       desc.getNumberOfSubpartitions(),
+                       desc.getMaxParallelism(),
+                       partitionConsumableNotifier,
+                       desc.sendScheduleOrUpdateConsumersMessage());
+       }
+
+       public ResultPartition create(
+               @Nonnull String taskNameWithSubtaskAndId,
+               @Nonnull TaskActions taskActions,
+               @Nonnull JobID jobId,
+               @Nonnull ResultPartitionID id,
+               @Nonnull ResultPartitionType type,
+               int numberOfSubpartitions,
+               int maxParallelism,
+               @Nonnull ResultPartitionConsumableNotifier 
partitionConsumableNotifier,
+               boolean sendScheduleOrUpdateConsumersMessage) {
+
+               ResultSubpartition[] subpartitions = new 
ResultSubpartition[numberOfSubpartitions];
+
+               ResultPartition partition = new ResultPartition(
+                       taskNameWithSubtaskAndId,
+                       taskActions,
+                       jobId,
+                       id,
+                       type,
+                       subpartitions,
+                       maxParallelism,
+                       partitionManager,
+                       partitionConsumableNotifier,
+                       sendScheduleOrUpdateConsumersMessage);
+
+               createSubpartitions(partition, type, subpartitions);
+
+               // Initially, partitions should be consumed once before release.
+               partition.pin();
+
+               LOG.debug("{}: Initialized {}", taskNameWithSubtaskAndId, this);
+
+               return partition;
+       }
+
+       private void createSubpartitions(
+               ResultPartition partition, ResultPartitionType type, 
ResultSubpartition[] subpartitions) {
+
+               // Create the subpartitions.
+               switch (type) {
+                       case BLOCKING:
+                               
initializeBoundedBlockingPartitions(subpartitions, partition, ioManager);
+                               break;
+
+                       case PIPELINED:
+                       case PIPELINED_BOUNDED:
+                               for (int i = 0; i < subpartitions.length; i++) {
+                                       subpartitions[i] = new 
PipelinedSubpartition(i, partition);
+                               }
+
+                               break;
+
+                       default:
+                               throw new IllegalArgumentException("Unsupported 
result partition type.");
+               }
+       }
+
+       private static void initializeBoundedBlockingPartitions(
+               ResultSubpartition[] subpartitions,
+               ResultPartition parent,
+               IOManager ioManager) {
+
+               int i = 0;
+               try {
+                       for (; i < subpartitions.length; i++) {
+                               subpartitions[i] = new 
BoundedBlockingSubpartition(
+                                       i, parent, 
ioManager.createChannel().getPathFile().toPath());
+                       }
+               }
+               catch (IOException e) {
+                       // undo all the work so that a failed constructor does 
not leave any resources
+                       // in need of disposal
+                       releasePartitionsQuietly(subpartitions, i);
+
+                       // this is not good, we should not be forced to wrap 
this in a runtime exception.
+                       // the fact that the ResultPartition and Task 
constructor (which calls this) do not tolerate any exceptions
+                       // is incompatible with eager initialization of 
resources (RAII).
+                       throw new FlinkRuntimeException(e);
+               }
+       }
+
+       private static void releasePartitionsQuietly(ResultSubpartition[] 
partitions, int until) {
+               for (int i = 0; i < until; i++) {
+                       final ResultSubpartition subpartition = partitions[i];
+                       
ExceptionUtils.suppressExceptions(subpartition::release);
+               }
+       }
+}
diff --git 
a/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/partition/ResultPartitionBuilder.java
 
b/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/partition/ResultPartitionBuilder.java
index 3d7dab0..7370d6a 100644
--- 
a/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/partition/ResultPartitionBuilder.java
+++ 
b/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/partition/ResultPartitionBuilder.java
@@ -28,9 +28,6 @@ import org.apache.flink.runtime.taskmanager.TaskActions;
  * Utility class to encapsulate the logic of building a {@link 
ResultPartition} instance.
  */
 public class ResultPartitionBuilder {
-
-       private static final String taskName = "Result Partition";
-
        private JobID jobId = new JobID();
 
        private final TaskActions taskActions = new NoOpTaskActions();
@@ -97,17 +94,15 @@ public class ResultPartitionBuilder {
        }
 
        public ResultPartition build() {
-               return new ResultPartition(
-                       taskName,
+               return new ResultPartitionFactory(partitionManager, 
ioManager).create(
+                       "Result Partition task",
                        taskActions,
                        jobId,
                        partitionId,
                        partitionType,
                        numberOfSubpartitions,
                        numTargetKeyGroups,
-                       partitionManager,
                        partitionConsumableNotifier,
-                       ioManager,
                        sendScheduleOrUpdateConsumersMessage);
        }
 }

Reply via email to