Repository: flink
Updated Branches:
  refs/heads/master 9c0d6347e -> d8d642fd6


[FLINK-1648] Add auto-parallelism to select all available task slots


Project: http://git-wip-us.apache.org/repos/asf/flink/repo
Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/d8d642fd
Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/d8d642fd
Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/d8d642fd

Branch: refs/heads/master
Commit: d8d642fd6d7d9b8526325d4efff1015f636c5ddb
Parents: 55f1508
Author: Stephan Ewen <[email protected]>
Authored: Mon Feb 16 21:40:06 2015 +0100
Committer: Stephan Ewen <[email protected]>
Committed: Wed Mar 4 20:44:13 2015 +0100

----------------------------------------------------------------------
 .../flink/api/common/ExecutionConfig.java       |   6 +
 .../base/CollectorMapOperatorBase.java          |   1 +
 .../flink/runtime/jobmanager/JobManager.scala   |   9 ++
 .../flink/test/misc/AutoParallelismITCase.java  | 143 +++++++++++++++++++
 .../test/recovery/SimpleRecoveryITCase.java     |   2 +-
 5 files changed, 160 insertions(+), 1 deletion(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/flink/blob/d8d642fd/flink-core/src/main/java/org/apache/flink/api/common/ExecutionConfig.java
----------------------------------------------------------------------
diff --git 
a/flink-core/src/main/java/org/apache/flink/api/common/ExecutionConfig.java 
b/flink-core/src/main/java/org/apache/flink/api/common/ExecutionConfig.java
index 81ce471..d315440 100644
--- a/flink-core/src/main/java/org/apache/flink/api/common/ExecutionConfig.java
+++ b/flink-core/src/main/java/org/apache/flink/api/common/ExecutionConfig.java
@@ -35,6 +35,12 @@ public class ExecutionConfig implements Serializable {
        // Key for storing it in the Job Configuration
        public static final String CONFIG_KEY = "runtime.config";
 
+       /**
+        * The constant to use for the degree of parallelism, if the system 
should use the number
+        *  of currently available slots.
+        */
+       public static final int PARALLELISM_AUTO_MAX = Integer.MAX_VALUE;
+
        private boolean useClosureCleaner = true;
        private int degreeOfParallelism = -1;
        private int numberOfExecutionRetries = -1;

http://git-wip-us.apache.org/repos/asf/flink/blob/d8d642fd/flink-core/src/main/java/org/apache/flink/api/common/operators/base/CollectorMapOperatorBase.java
----------------------------------------------------------------------
diff --git 
a/flink-core/src/main/java/org/apache/flink/api/common/operators/base/CollectorMapOperatorBase.java
 
b/flink-core/src/main/java/org/apache/flink/api/common/operators/base/CollectorMapOperatorBase.java
index 62bdfbe..b7ff2ce 100644
--- 
a/flink-core/src/main/java/org/apache/flink/api/common/operators/base/CollectorMapOperatorBase.java
+++ 
b/flink-core/src/main/java/org/apache/flink/api/common/operators/base/CollectorMapOperatorBase.java
@@ -36,6 +36,7 @@ import 
org.apache.flink.api.common.operators.util.UserCodeWrapper;
  * @see GenericCollectorMap
  */
 @Deprecated
+@SuppressWarnings("deprecation")
 public class CollectorMapOperatorBase<IN, OUT, FT extends 
GenericCollectorMap<IN, OUT>> extends SingleInputOperator<IN, OUT, FT> {
        
        public CollectorMapOperatorBase(UserCodeWrapper<FT> udf, 
UnaryOperatorInformation<IN, OUT> operatorInfo, String name) {

http://git-wip-us.apache.org/repos/asf/flink/blob/d8d642fd/flink-runtime/src/main/scala/org/apache/flink/runtime/jobmanager/JobManager.scala
----------------------------------------------------------------------
diff --git 
a/flink-runtime/src/main/scala/org/apache/flink/runtime/jobmanager/JobManager.scala
 
b/flink-runtime/src/main/scala/org/apache/flink/runtime/jobmanager/JobManager.scala
index 54d3cf2..e3e96e5 100644
--- 
a/flink-runtime/src/main/scala/org/apache/flink/runtime/jobmanager/JobManager.scala
+++ 
b/flink-runtime/src/main/scala/org/apache/flink/runtime/jobmanager/JobManager.scala
@@ -22,6 +22,7 @@ import java.io.{IOException, File}
 import java.net.InetSocketAddress
 
 import akka.actor.Status.{Success, Failure}
+import org.apache.flink.api.common.ExecutionConfig
 import org.apache.flink.configuration.{ConfigConstants, GlobalConfiguration, 
Configuration}
 import org.apache.flink.core.io.InputSplitAssigner
 import org.apache.flink.runtime.blob.BlobServer
@@ -477,12 +478,20 @@ class JobManager(val configuration: Configuration,
           log.debug(s"Running initialization on master for job ${jobId} 
(${jobName}).")
         }
 
+        val numSlots = scheduler.getTotalNumberOfSlots()
+
         for (vertex <- jobGraph.getVertices.asScala) {
+
           val executableClass = vertex.getInvokableClassName
           if (executableClass == null || executableClass.length == 0) {
             throw new JobSubmissionException(jobId,
               s"The vertex ${vertex.getID} (${vertex.getName}) has no 
invokable class.")
           }
+
+          if (vertex.getParallelism() == ExecutionConfig.PARALLELISM_AUTO_MAX) 
{
+            vertex.setParallelism(numSlots)
+          }
+
           try {
             vertex.initializeOnMaster(userCodeLoader)
           }

http://git-wip-us.apache.org/repos/asf/flink/blob/d8d642fd/flink-tests/src/test/java/org/apache/flink/test/misc/AutoParallelismITCase.java
----------------------------------------------------------------------
diff --git 
a/flink-tests/src/test/java/org/apache/flink/test/misc/AutoParallelismITCase.java
 
b/flink-tests/src/test/java/org/apache/flink/test/misc/AutoParallelismITCase.java
new file mode 100644
index 0000000..ea79a3a
--- /dev/null
+++ 
b/flink-tests/src/test/java/org/apache/flink/test/misc/AutoParallelismITCase.java
@@ -0,0 +1,143 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.test.misc;
+
+import static org.junit.Assert.*;
+
+import org.apache.flink.api.common.ExecutionConfig;
+import org.apache.flink.api.common.functions.RichMapPartitionFunction;
+import org.apache.flink.api.common.io.GenericInputFormat;
+import org.apache.flink.api.java.DataSet;
+import org.apache.flink.api.java.ExecutionEnvironment;
+import org.apache.flink.api.java.io.LocalCollectionOutputFormat;
+import org.apache.flink.configuration.ConfigConstants;
+import org.apache.flink.configuration.Configuration;
+import org.apache.flink.core.io.GenericInputSplit;
+import org.apache.flink.test.util.ForkableFlinkMiniCluster;
+import org.apache.flink.util.Collector;
+import org.junit.AfterClass;
+import org.junit.BeforeClass;
+import org.junit.Test;
+
+import java.io.IOException;
+import java.util.ArrayList;
+import java.util.List;
+
+/**
+ * This test verifies that the auto parallelism is properly forwarded to the 
runtime.
+ */
+@SuppressWarnings("serial")
+public class AutoParallelismITCase {
+
+       private static final int NUM_TM = 2;
+       private static final int SLOTS_PER_TM = 7;
+       private static final int PARALLELISM = NUM_TM * SLOTS_PER_TM;
+
+       private static ForkableFlinkMiniCluster cluster;
+
+       @BeforeClass
+       public static void setupCluster() {
+               Configuration config = new Configuration();
+               
config.setInteger(ConfigConstants.LOCAL_INSTANCE_MANAGER_NUMBER_TASK_MANAGER, 
NUM_TM);
+               config.setInteger(ConfigConstants.TASK_MANAGER_NUM_TASK_SLOTS, 
SLOTS_PER_TM);
+               config.setString(ConfigConstants.AKKA_WATCH_HEARTBEAT_PAUSE, "2 
s");
+
+               cluster = new ForkableFlinkMiniCluster(config, false);
+       }
+
+       @AfterClass
+       public static void teardownCluster() {
+               try {
+                       cluster.stop();
+               }
+               catch (Throwable t) {
+                       System.err.println("Error stopping cluster on 
shutdown");
+                       t.printStackTrace();
+                       fail("Cluster shutdown caused an exception: " + 
t.getMessage());
+               }
+       }
+
+
+       @Test
+       public void testProgramWithAutoParallelism() {
+               try {
+                       ExecutionEnvironment env = 
ExecutionEnvironment.createRemoteEnvironment(
+                                       "localhost", 
cluster.getJobManagerRPCPort());
+
+                       
env.setDegreeOfParallelism(ExecutionConfig.PARALLELISM_AUTO_MAX);
+
+                       DataSet<Integer> result = env
+                                       .createInput(new 
ParallelismDependentInputFormat())
+                                       .rebalance()
+                                       .mapPartition(new 
ParallelismDependentMapPartition());
+
+                       List<Integer> resultCollection = new 
ArrayList<Integer>();
+                       result.output(new 
LocalCollectionOutputFormat<Integer>(resultCollection));
+
+                       env.execute();
+
+                       assertEquals(PARALLELISM, resultCollection.size());
+               }
+               catch (Exception e) {
+                       e.printStackTrace();
+                       fail(e.getMessage());
+               }
+               finally {
+                       try {
+                               cluster.stop();
+                       }
+                       catch (Throwable t) {
+                               // ignore exceptions on shutdown
+                       }
+               }
+       }
+
+       private static class ParallelismDependentInputFormat extends 
GenericInputFormat<Integer> {
+
+               private transient boolean emitted;
+
+               @Override
+               public GenericInputSplit[] createInputSplits(int numSplits) 
throws IOException {
+                       assertEquals(PARALLELISM, numSplits);
+                       return super.createInputSplits(numSplits);
+               }
+
+               @Override
+               public boolean reachedEnd() {
+                       return emitted;
+               }
+
+               @Override
+               public Integer nextRecord(Integer reuse) {
+                       if (emitted) {
+                               return null;
+                       }
+                       emitted = true;
+                       return 1;
+               }
+       }
+
+       private static class ParallelismDependentMapPartition extends 
RichMapPartitionFunction<Integer, Integer> {
+
+               @Override
+               public void mapPartition(Iterable<Integer> values, 
Collector<Integer> out) {
+                       
out.collect(getRuntimeContext().getIndexOfThisSubtask());
+               }
+       }
+}

http://git-wip-us.apache.org/repos/asf/flink/blob/d8d642fd/flink-tests/src/test/java/org/apache/flink/test/recovery/SimpleRecoveryITCase.java
----------------------------------------------------------------------
diff --git 
a/flink-tests/src/test/java/org/apache/flink/test/recovery/SimpleRecoveryITCase.java
 
b/flink-tests/src/test/java/org/apache/flink/test/recovery/SimpleRecoveryITCase.java
index 1591d67..8330109 100644
--- 
a/flink-tests/src/test/java/org/apache/flink/test/recovery/SimpleRecoveryITCase.java
+++ 
b/flink-tests/src/test/java/org/apache/flink/test/recovery/SimpleRecoveryITCase.java
@@ -37,9 +37,9 @@ import java.util.List;
 
 import static org.junit.Assert.*;
 
+@SuppressWarnings("serial")
 public class SimpleRecoveryITCase {
 
-
        private static ForkableFlinkMiniCluster cluster;
 
        @BeforeClass

Reply via email to