Repository: flink Updated Branches: refs/heads/master 9c0d6347e -> d8d642fd6
[FLINK-1648] Add auto-parallelism to select all available task slots Project: http://git-wip-us.apache.org/repos/asf/flink/repo Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/d8d642fd Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/d8d642fd Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/d8d642fd Branch: refs/heads/master Commit: d8d642fd6d7d9b8526325d4efff1015f636c5ddb Parents: 55f1508 Author: Stephan Ewen <[email protected]> Authored: Mon Feb 16 21:40:06 2015 +0100 Committer: Stephan Ewen <[email protected]> Committed: Wed Mar 4 20:44:13 2015 +0100 ---------------------------------------------------------------------- .../flink/api/common/ExecutionConfig.java | 6 + .../base/CollectorMapOperatorBase.java | 1 + .../flink/runtime/jobmanager/JobManager.scala | 9 ++ .../flink/test/misc/AutoParallelismITCase.java | 143 +++++++++++++++++++ .../test/recovery/SimpleRecoveryITCase.java | 2 +- 5 files changed, 160 insertions(+), 1 deletion(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/flink/blob/d8d642fd/flink-core/src/main/java/org/apache/flink/api/common/ExecutionConfig.java ---------------------------------------------------------------------- diff --git a/flink-core/src/main/java/org/apache/flink/api/common/ExecutionConfig.java b/flink-core/src/main/java/org/apache/flink/api/common/ExecutionConfig.java index 81ce471..d315440 100644 --- a/flink-core/src/main/java/org/apache/flink/api/common/ExecutionConfig.java +++ b/flink-core/src/main/java/org/apache/flink/api/common/ExecutionConfig.java @@ -35,6 +35,12 @@ public class ExecutionConfig implements Serializable { // Key for storing it in the Job Configuration public static final String CONFIG_KEY = "runtime.config"; + /** + * The constant to use for the degree of parallelism, if the system should use the number + * of currently available slots. + */ + public static final int PARALLELISM_AUTO_MAX = Integer.MAX_VALUE; + private boolean useClosureCleaner = true; private int degreeOfParallelism = -1; private int numberOfExecutionRetries = -1; http://git-wip-us.apache.org/repos/asf/flink/blob/d8d642fd/flink-core/src/main/java/org/apache/flink/api/common/operators/base/CollectorMapOperatorBase.java ---------------------------------------------------------------------- diff --git a/flink-core/src/main/java/org/apache/flink/api/common/operators/base/CollectorMapOperatorBase.java b/flink-core/src/main/java/org/apache/flink/api/common/operators/base/CollectorMapOperatorBase.java index 62bdfbe..b7ff2ce 100644 --- a/flink-core/src/main/java/org/apache/flink/api/common/operators/base/CollectorMapOperatorBase.java +++ b/flink-core/src/main/java/org/apache/flink/api/common/operators/base/CollectorMapOperatorBase.java @@ -36,6 +36,7 @@ import org.apache.flink.api.common.operators.util.UserCodeWrapper; * @see GenericCollectorMap */ @Deprecated +@SuppressWarnings("deprecation") public class CollectorMapOperatorBase<IN, OUT, FT extends GenericCollectorMap<IN, OUT>> extends SingleInputOperator<IN, OUT, FT> { public CollectorMapOperatorBase(UserCodeWrapper<FT> udf, UnaryOperatorInformation<IN, OUT> operatorInfo, String name) { http://git-wip-us.apache.org/repos/asf/flink/blob/d8d642fd/flink-runtime/src/main/scala/org/apache/flink/runtime/jobmanager/JobManager.scala ---------------------------------------------------------------------- diff --git a/flink-runtime/src/main/scala/org/apache/flink/runtime/jobmanager/JobManager.scala b/flink-runtime/src/main/scala/org/apache/flink/runtime/jobmanager/JobManager.scala index 54d3cf2..e3e96e5 100644 --- a/flink-runtime/src/main/scala/org/apache/flink/runtime/jobmanager/JobManager.scala +++ b/flink-runtime/src/main/scala/org/apache/flink/runtime/jobmanager/JobManager.scala @@ -22,6 +22,7 @@ import java.io.{IOException, File} import java.net.InetSocketAddress import akka.actor.Status.{Success, Failure} +import org.apache.flink.api.common.ExecutionConfig import org.apache.flink.configuration.{ConfigConstants, GlobalConfiguration, Configuration} import org.apache.flink.core.io.InputSplitAssigner import org.apache.flink.runtime.blob.BlobServer @@ -477,12 +478,20 @@ class JobManager(val configuration: Configuration, log.debug(s"Running initialization on master for job ${jobId} (${jobName}).") } + val numSlots = scheduler.getTotalNumberOfSlots() + for (vertex <- jobGraph.getVertices.asScala) { + val executableClass = vertex.getInvokableClassName if (executableClass == null || executableClass.length == 0) { throw new JobSubmissionException(jobId, s"The vertex ${vertex.getID} (${vertex.getName}) has no invokable class.") } + + if (vertex.getParallelism() == ExecutionConfig.PARALLELISM_AUTO_MAX) { + vertex.setParallelism(numSlots) + } + try { vertex.initializeOnMaster(userCodeLoader) } http://git-wip-us.apache.org/repos/asf/flink/blob/d8d642fd/flink-tests/src/test/java/org/apache/flink/test/misc/AutoParallelismITCase.java ---------------------------------------------------------------------- diff --git a/flink-tests/src/test/java/org/apache/flink/test/misc/AutoParallelismITCase.java b/flink-tests/src/test/java/org/apache/flink/test/misc/AutoParallelismITCase.java new file mode 100644 index 0000000..ea79a3a --- /dev/null +++ b/flink-tests/src/test/java/org/apache/flink/test/misc/AutoParallelismITCase.java @@ -0,0 +1,143 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.test.misc; + +import static org.junit.Assert.*; + +import org.apache.flink.api.common.ExecutionConfig; +import org.apache.flink.api.common.functions.RichMapPartitionFunction; +import org.apache.flink.api.common.io.GenericInputFormat; +import org.apache.flink.api.java.DataSet; +import org.apache.flink.api.java.ExecutionEnvironment; +import org.apache.flink.api.java.io.LocalCollectionOutputFormat; +import org.apache.flink.configuration.ConfigConstants; +import org.apache.flink.configuration.Configuration; +import org.apache.flink.core.io.GenericInputSplit; +import org.apache.flink.test.util.ForkableFlinkMiniCluster; +import org.apache.flink.util.Collector; +import org.junit.AfterClass; +import org.junit.BeforeClass; +import org.junit.Test; + +import java.io.IOException; +import java.util.ArrayList; +import java.util.List; + +/** + * This test verifies that the auto parallelism is properly forwarded to the runtime. + */ +@SuppressWarnings("serial") +public class AutoParallelismITCase { + + private static final int NUM_TM = 2; + private static final int SLOTS_PER_TM = 7; + private static final int PARALLELISM = NUM_TM * SLOTS_PER_TM; + + private static ForkableFlinkMiniCluster cluster; + + @BeforeClass + public static void setupCluster() { + Configuration config = new Configuration(); + config.setInteger(ConfigConstants.LOCAL_INSTANCE_MANAGER_NUMBER_TASK_MANAGER, NUM_TM); + config.setInteger(ConfigConstants.TASK_MANAGER_NUM_TASK_SLOTS, SLOTS_PER_TM); + config.setString(ConfigConstants.AKKA_WATCH_HEARTBEAT_PAUSE, "2 s"); + + cluster = new ForkableFlinkMiniCluster(config, false); + } + + @AfterClass + public static void teardownCluster() { + try { + cluster.stop(); + } + catch (Throwable t) { + System.err.println("Error stopping cluster on shutdown"); + t.printStackTrace(); + fail("Cluster shutdown caused an exception: " + t.getMessage()); + } + } + + + @Test + public void testProgramWithAutoParallelism() { + try { + ExecutionEnvironment env = ExecutionEnvironment.createRemoteEnvironment( + "localhost", cluster.getJobManagerRPCPort()); + + env.setDegreeOfParallelism(ExecutionConfig.PARALLELISM_AUTO_MAX); + + DataSet<Integer> result = env + .createInput(new ParallelismDependentInputFormat()) + .rebalance() + .mapPartition(new ParallelismDependentMapPartition()); + + List<Integer> resultCollection = new ArrayList<Integer>(); + result.output(new LocalCollectionOutputFormat<Integer>(resultCollection)); + + env.execute(); + + assertEquals(PARALLELISM, resultCollection.size()); + } + catch (Exception e) { + e.printStackTrace(); + fail(e.getMessage()); + } + finally { + try { + cluster.stop(); + } + catch (Throwable t) { + // ignore exceptions on shutdown + } + } + } + + private static class ParallelismDependentInputFormat extends GenericInputFormat<Integer> { + + private transient boolean emitted; + + @Override + public GenericInputSplit[] createInputSplits(int numSplits) throws IOException { + assertEquals(PARALLELISM, numSplits); + return super.createInputSplits(numSplits); + } + + @Override + public boolean reachedEnd() { + return emitted; + } + + @Override + public Integer nextRecord(Integer reuse) { + if (emitted) { + return null; + } + emitted = true; + return 1; + } + } + + private static class ParallelismDependentMapPartition extends RichMapPartitionFunction<Integer, Integer> { + + @Override + public void mapPartition(Iterable<Integer> values, Collector<Integer> out) { + out.collect(getRuntimeContext().getIndexOfThisSubtask()); + } + } +} http://git-wip-us.apache.org/repos/asf/flink/blob/d8d642fd/flink-tests/src/test/java/org/apache/flink/test/recovery/SimpleRecoveryITCase.java ---------------------------------------------------------------------- diff --git a/flink-tests/src/test/java/org/apache/flink/test/recovery/SimpleRecoveryITCase.java b/flink-tests/src/test/java/org/apache/flink/test/recovery/SimpleRecoveryITCase.java index 1591d67..8330109 100644 --- a/flink-tests/src/test/java/org/apache/flink/test/recovery/SimpleRecoveryITCase.java +++ b/flink-tests/src/test/java/org/apache/flink/test/recovery/SimpleRecoveryITCase.java @@ -37,9 +37,9 @@ import java.util.List; import static org.junit.Assert.*; +@SuppressWarnings("serial") public class SimpleRecoveryITCase { - private static ForkableFlinkMiniCluster cluster; @BeforeClass
