[FLINK-4458] Replace ForkableFlinkMiniCluster by LocalFlinkMiniCluster Rename _configuration to originalConfiguration
Remove testing classes from main scope in flink-runtime Previously, the ForkableFlinkMiniCluster which resided in flink-test-utils required these files to be in the main scope of flink-runtime. With the removal of the ForkableFlinkMiniCluster, these classes are now no longer needed and can be moved back to the test scope. This closes #2450. Project: http://git-wip-us.apache.org/repos/asf/flink/repo Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/02b852e3 Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/02b852e3 Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/02b852e3 Branch: refs/heads/flip-6 Commit: 02b852e3571e46f25fdfc79f43ceb726ddff9ba7 Parents: 920cda4 Author: Till Rohrmann <trohrm...@apache.org> Authored: Wed Aug 31 17:58:09 2016 +0200 Committer: Till Rohrmann <trohrm...@apache.org> Committed: Thu Sep 8 17:17:28 2016 +0200 ---------------------------------------------------------------------- .../api/avro/AvroExternalJarProgramITCase.java | 7 +- .../flink/contrib/streaming/CollectITCase.java | 4 +- .../operations/DegreesWithExceptionITCase.java | 6 +- .../ReduceOnEdgesWithExceptionITCase.java | 6 +- .../ReduceOnNeighborsWithExceptionITCase.java | 6 +- .../apache/flink/ml/util/FlinkTestBase.scala | 11 +- .../clusterframework/FlinkResourceManager.java | 13 +- .../testutils/TestingResourceManager.java | 137 ------ .../flink/runtime/jobmanager/JobManager.scala | 45 +- .../runtime/messages/TaskManagerMessages.scala | 26 ++ .../runtime/minicluster/FlinkMiniCluster.scala | 73 +++- .../minicluster/LocalFlinkMiniCluster.scala | 235 ++++++++--- .../flink/runtime/taskmanager/TaskManager.scala | 130 ++++-- .../testingUtils/TestingJobManager.scala | 72 ---- .../testingUtils/TestingJobManagerLike.scala | 417 ------------------- .../TestingJobManagerMessages.scala | 133 ------ .../testingUtils/TestingMemoryArchivist.scala | 43 -- .../runtime/testingUtils/TestingMessages.scala | 40 -- .../testingUtils/TestingTaskManager.scala | 70 ---- .../testingUtils/TestingTaskManagerLike.scala | 248 ----------- .../TestingTaskManagerMessages.scala | 94 ----- .../LeaderElectionRetrievalTestingCluster.java | 3 +- .../testutils/TestingResourceManager.java | 137 ++++++ .../runtime/testingUtils/TestingCluster.scala | 322 ++++++++------ .../testingUtils/TestingJobManager.scala | 71 ++++ .../testingUtils/TestingJobManagerLike.scala | 417 +++++++++++++++++++ .../TestingJobManagerMessages.scala | 132 ++++++ .../testingUtils/TestingMemoryArchivist.scala | 43 ++ .../runtime/testingUtils/TestingMessages.scala | 40 ++ .../testingUtils/TestingTaskManager.scala | 70 ++++ .../testingUtils/TestingTaskManagerLike.scala | 234 +++++++++++ .../TestingTaskManagerMessages.scala | 82 ++++ .../flink/api/scala/ScalaShellITCase.scala | 7 +- .../cassandra/CassandraConnectorITCase.java | 6 +- .../kafka/KafkaShortRetentionTestBase.java | 6 +- .../connectors/kafka/KafkaTestBase.java | 6 +- .../manualtests/ManualExactlyOnceTest.java | 4 +- ...nualExactlyOnceWithStreamReshardingTest.java | 4 +- ...ScalaStreamingMultipleProgramsTestBase.scala | 5 +- .../flink-test-utils/pom.xml | 149 ------- .../util/StreamingMultipleProgramsTestBase.java | 4 +- .../streaming/util/TestStreamEnvironment.java | 8 +- .../flink/test/util/AbstractTestBase.java | 3 +- .../test/util/MultipleProgramsTestBase.java | 3 +- .../apache/flink/test/util/TestBaseUtils.java | 31 +- .../apache/flink/test/util/TestEnvironment.java | 7 +- .../test/util/ForkableFlinkMiniCluster.scala | 335 --------------- .../accumulators/AccumulatorErrorITCase.java | 6 +- .../accumulators/AccumulatorLiveITCase.java | 1 - .../test/cancelling/CancelingTestBase.java | 7 +- .../EventTimeAllWindowCheckpointingITCase.java | 6 +- .../EventTimeWindowCheckpointingITCase.java | 6 +- .../test/checkpointing/RescalingITCase.java | 6 +- .../test/checkpointing/SavepointITCase.java | 19 +- .../StreamCheckpointNotifierITCase.java | 6 +- .../StreamFaultToleranceTestBase.java | 6 +- .../WindowCheckpointingITCase.java | 6 +- .../test/classloading/ClassLoaderITCase.java | 7 +- .../clients/examples/JobRetrievalITCase.java | 5 +- .../JobSubmissionFailsITCase.java | 6 +- .../CustomDistributionITCase.java | 4 +- .../RemoteEnvironmentITCase.java | 7 +- .../flink/test/misc/AutoParallelismITCase.java | 6 +- .../test/misc/CustomSerializationITCase.java | 6 +- .../test/misc/MiscellaneousIssuesITCase.java | 6 +- ...SuccessAfterNetworkBuffersFailureITCase.java | 6 +- .../flink/test/query/QueryableStateITCase.java | 6 +- .../flink/test/recovery/FastFailuresITCase.java | 4 +- ...SimpleRecoveryFailureRateStrategyITBase.java | 6 +- ...RecoveryFixedDelayRestartStrategyITBase.java | 6 +- .../test/recovery/SimpleRecoveryITCaseBase.java | 4 +- .../TaskManagerFailureRecoveryITCase.java | 6 +- .../flink/test/runtime/IPv6HostnamesITCase.java | 6 +- .../ZooKeeperLeaderElectionITCase.java | 56 +-- .../test/streaming/runtime/TimestampITCase.java | 6 +- .../flink/test/web/WebFrontendITCase.java | 6 +- .../jobmanager/JobManagerFailsITCase.scala | 8 +- .../taskmanager/TaskManagerFailsITCase.scala | 12 +- flink-yarn-tests/pom.xml | 8 + .../org/apache/flink/yarn/YarnTestBase.java | 1 - tools/maven/scalastyle-config.xml | 2 +- 81 files changed, 2037 insertions(+), 2167 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/flink/blob/02b852e3/flink-batch-connectors/flink-avro/src/test/java/org/apache/flink/api/avro/AvroExternalJarProgramITCase.java ---------------------------------------------------------------------- diff --git a/flink-batch-connectors/flink-avro/src/test/java/org/apache/flink/api/avro/AvroExternalJarProgramITCase.java b/flink-batch-connectors/flink-avro/src/test/java/org/apache/flink/api/avro/AvroExternalJarProgramITCase.java index 29a7e58..1030ff8 100644 --- a/flink-batch-connectors/flink-avro/src/test/java/org/apache/flink/api/avro/AvroExternalJarProgramITCase.java +++ b/flink-batch-connectors/flink-avro/src/test/java/org/apache/flink/api/avro/AvroExternalJarProgramITCase.java @@ -25,12 +25,11 @@ import org.apache.flink.client.program.PackagedProgram; import org.apache.flink.client.program.StandaloneClusterClient; import org.apache.flink.configuration.ConfigConstants; import org.apache.flink.configuration.Configuration; -import org.apache.flink.test.util.ForkableFlinkMiniCluster; +import org.apache.flink.runtime.minicluster.LocalFlinkMiniCluster; import org.junit.Assert; import org.junit.Test; - public class AvroExternalJarProgramITCase { private static final String JAR_FILE = "maven-test-jar.jar"; @@ -40,12 +39,12 @@ public class AvroExternalJarProgramITCase { @Test public void testExternalProgram() { - ForkableFlinkMiniCluster testMiniCluster = null; + LocalFlinkMiniCluster testMiniCluster = null; try { Configuration config = new Configuration(); config.setInteger(ConfigConstants.TASK_MANAGER_NUM_TASK_SLOTS, 4); - testMiniCluster = new ForkableFlinkMiniCluster(config, false); + testMiniCluster = new LocalFlinkMiniCluster(config, false); testMiniCluster.start(); String jarFile = JAR_FILE; http://git-wip-us.apache.org/repos/asf/flink/blob/02b852e3/flink-contrib/flink-streaming-contrib/src/test/java/org/apache/flink/contrib/streaming/CollectITCase.java ---------------------------------------------------------------------- diff --git a/flink-contrib/flink-streaming-contrib/src/test/java/org/apache/flink/contrib/streaming/CollectITCase.java b/flink-contrib/flink-streaming-contrib/src/test/java/org/apache/flink/contrib/streaming/CollectITCase.java index 10ea85c..d691621 100644 --- a/flink-contrib/flink-streaming-contrib/src/test/java/org/apache/flink/contrib/streaming/CollectITCase.java +++ b/flink-contrib/flink-streaming-contrib/src/test/java/org/apache/flink/contrib/streaming/CollectITCase.java @@ -19,9 +19,9 @@ package org.apache.flink.contrib.streaming; import org.apache.flink.configuration.Configuration; +import org.apache.flink.runtime.minicluster.LocalFlinkMiniCluster; import org.apache.flink.streaming.api.datastream.DataStream; import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment; -import org.apache.flink.test.util.ForkableFlinkMiniCluster; import org.junit.Test; @@ -36,7 +36,7 @@ public class CollectITCase { @Test public void testCollect() throws Exception { - final ForkableFlinkMiniCluster cluster = new ForkableFlinkMiniCluster(new Configuration(), false); + final LocalFlinkMiniCluster cluster = new LocalFlinkMiniCluster(new Configuration(), false); try { cluster.start(); http://git-wip-us.apache.org/repos/asf/flink/blob/02b852e3/flink-libraries/flink-gelly/src/test/java/org/apache/flink/graph/test/operations/DegreesWithExceptionITCase.java ---------------------------------------------------------------------- diff --git a/flink-libraries/flink-gelly/src/test/java/org/apache/flink/graph/test/operations/DegreesWithExceptionITCase.java b/flink-libraries/flink-gelly/src/test/java/org/apache/flink/graph/test/operations/DegreesWithExceptionITCase.java index 551a97b..02eea07 100644 --- a/flink-libraries/flink-gelly/src/test/java/org/apache/flink/graph/test/operations/DegreesWithExceptionITCase.java +++ b/flink-libraries/flink-gelly/src/test/java/org/apache/flink/graph/test/operations/DegreesWithExceptionITCase.java @@ -26,7 +26,7 @@ import org.apache.flink.configuration.ConfigConstants; import org.apache.flink.configuration.Configuration; import org.apache.flink.graph.Graph; import org.apache.flink.graph.test.TestGraphUtils; -import org.apache.flink.test.util.ForkableFlinkMiniCluster; +import org.apache.flink.runtime.minicluster.LocalFlinkMiniCluster; import org.apache.flink.types.LongValue; import org.junit.AfterClass; @@ -39,7 +39,7 @@ public class DegreesWithExceptionITCase { private static final int PARALLELISM = 4; - private static ForkableFlinkMiniCluster cluster; + private static LocalFlinkMiniCluster cluster; @BeforeClass @@ -47,7 +47,7 @@ public class DegreesWithExceptionITCase { try { Configuration config = new Configuration(); config.setInteger(ConfigConstants.TASK_MANAGER_NUM_TASK_SLOTS, PARALLELISM); - cluster = new ForkableFlinkMiniCluster(config, false); + cluster = new LocalFlinkMiniCluster(config, false); cluster.start(); } catch (Exception e) { http://git-wip-us.apache.org/repos/asf/flink/blob/02b852e3/flink-libraries/flink-gelly/src/test/java/org/apache/flink/graph/test/operations/ReduceOnEdgesWithExceptionITCase.java ---------------------------------------------------------------------- diff --git a/flink-libraries/flink-gelly/src/test/java/org/apache/flink/graph/test/operations/ReduceOnEdgesWithExceptionITCase.java b/flink-libraries/flink-gelly/src/test/java/org/apache/flink/graph/test/operations/ReduceOnEdgesWithExceptionITCase.java index 56a0a59..666f7ef 100644 --- a/flink-libraries/flink-gelly/src/test/java/org/apache/flink/graph/test/operations/ReduceOnEdgesWithExceptionITCase.java +++ b/flink-libraries/flink-gelly/src/test/java/org/apache/flink/graph/test/operations/ReduceOnEdgesWithExceptionITCase.java @@ -30,7 +30,7 @@ import org.apache.flink.graph.EdgesFunctionWithVertexValue; import org.apache.flink.graph.Graph; import org.apache.flink.graph.Vertex; import org.apache.flink.graph.test.TestGraphUtils; -import org.apache.flink.test.util.ForkableFlinkMiniCluster; +import org.apache.flink.runtime.minicluster.LocalFlinkMiniCluster; import org.apache.flink.util.Collector; import org.junit.AfterClass; import org.junit.BeforeClass; @@ -42,7 +42,7 @@ public class ReduceOnEdgesWithExceptionITCase { private static final int PARALLELISM = 4; - private static ForkableFlinkMiniCluster cluster; + private static LocalFlinkMiniCluster cluster; @BeforeClass @@ -50,7 +50,7 @@ public class ReduceOnEdgesWithExceptionITCase { try { Configuration config = new Configuration(); config.setInteger(ConfigConstants.TASK_MANAGER_NUM_TASK_SLOTS, PARALLELISM); - cluster = new ForkableFlinkMiniCluster(config, false); + cluster = new LocalFlinkMiniCluster(config, false); cluster.start(); } catch (Exception e) { http://git-wip-us.apache.org/repos/asf/flink/blob/02b852e3/flink-libraries/flink-gelly/src/test/java/org/apache/flink/graph/test/operations/ReduceOnNeighborsWithExceptionITCase.java ---------------------------------------------------------------------- diff --git a/flink-libraries/flink-gelly/src/test/java/org/apache/flink/graph/test/operations/ReduceOnNeighborsWithExceptionITCase.java b/flink-libraries/flink-gelly/src/test/java/org/apache/flink/graph/test/operations/ReduceOnNeighborsWithExceptionITCase.java index 7458e08..0bbdc84 100644 --- a/flink-libraries/flink-gelly/src/test/java/org/apache/flink/graph/test/operations/ReduceOnNeighborsWithExceptionITCase.java +++ b/flink-libraries/flink-gelly/src/test/java/org/apache/flink/graph/test/operations/ReduceOnNeighborsWithExceptionITCase.java @@ -31,7 +31,7 @@ import org.apache.flink.graph.NeighborsFunctionWithVertexValue; import org.apache.flink.graph.ReduceNeighborsFunction; import org.apache.flink.graph.Vertex; import org.apache.flink.graph.test.TestGraphUtils; -import org.apache.flink.test.util.ForkableFlinkMiniCluster; +import org.apache.flink.runtime.minicluster.LocalFlinkMiniCluster; import org.apache.flink.util.Collector; import org.junit.AfterClass; import org.junit.BeforeClass; @@ -43,7 +43,7 @@ public class ReduceOnNeighborsWithExceptionITCase { private static final int PARALLELISM = 4; - private static ForkableFlinkMiniCluster cluster; + private static LocalFlinkMiniCluster cluster; @BeforeClass @@ -51,7 +51,7 @@ public class ReduceOnNeighborsWithExceptionITCase { try { Configuration config = new Configuration(); config.setInteger(ConfigConstants.TASK_MANAGER_NUM_TASK_SLOTS, PARALLELISM); - cluster = new ForkableFlinkMiniCluster(config, false); + cluster = new LocalFlinkMiniCluster(config, false); cluster.start(); } catch (Exception e) { http://git-wip-us.apache.org/repos/asf/flink/blob/02b852e3/flink-libraries/flink-ml/src/test/scala/org/apache/flink/ml/util/FlinkTestBase.scala ---------------------------------------------------------------------- diff --git a/flink-libraries/flink-ml/src/test/scala/org/apache/flink/ml/util/FlinkTestBase.scala b/flink-libraries/flink-ml/src/test/scala/org/apache/flink/ml/util/FlinkTestBase.scala index fb98f24..6353d6a 100644 --- a/flink-libraries/flink-ml/src/test/scala/org/apache/flink/ml/util/FlinkTestBase.scala +++ b/flink-libraries/flink-ml/src/test/scala/org/apache/flink/ml/util/FlinkTestBase.scala @@ -18,14 +18,15 @@ package org.apache.flink.ml.util -import org.apache.flink.test.util.{ForkableFlinkMiniCluster, TestBaseUtils, TestEnvironment} +import org.apache.flink.runtime.minicluster.LocalFlinkMiniCluster +import org.apache.flink.test.util.{TestBaseUtils, TestEnvironment} import org.scalatest.{BeforeAndAfter, Suite} -/** Mixin to start and stop a ForkableFlinkMiniCluster automatically for Scala based tests. +/** Mixin to start and stop a LocalFlinkMiniCluster automatically for Scala based tests. * Additionally a TestEnvironment with the started cluster is created and set as the default * [[org.apache.flink.api.java.ExecutionEnvironment]]. * - * This mixin starts a ForkableFlinkMiniCluster with one TaskManager and a number of slots given + * This mixin starts a LocalFlinkMiniCluster with one TaskManager and a number of slots given * by parallelism. This value can be overridden in a sub class in order to start the cluster * with a different number of slots. * @@ -37,7 +38,7 @@ import org.scalatest.{BeforeAndAfter, Suite} * @example * {{{ * def testSomething: Unit = { - * // Obtain TestEnvironment with started ForkableFlinkMiniCluster + * // Obtain TestEnvironment with started LocalFlinkMiniCluster * val env = ExecutionEnvironment.getExecutionEnvironment * * env.fromCollection(...) @@ -50,7 +51,7 @@ import org.scalatest.{BeforeAndAfter, Suite} trait FlinkTestBase extends BeforeAndAfter { that: Suite => - var cluster: Option[ForkableFlinkMiniCluster] = None + var cluster: Option[LocalFlinkMiniCluster] = None val parallelism = 4 before { http://git-wip-us.apache.org/repos/asf/flink/blob/02b852e3/flink-runtime/src/main/java/org/apache/flink/runtime/clusterframework/FlinkResourceManager.java ---------------------------------------------------------------------- diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/clusterframework/FlinkResourceManager.java b/flink-runtime/src/main/java/org/apache/flink/runtime/clusterframework/FlinkResourceManager.java index 95be084..7ea286d 100644 --- a/flink-runtime/src/main/java/org/apache/flink/runtime/clusterframework/FlinkResourceManager.java +++ b/flink-runtime/src/main/java/org/apache/flink/runtime/clusterframework/FlinkResourceManager.java @@ -767,8 +767,19 @@ public abstract class FlinkResourceManager<WorkerType extends ResourceIDRetrieva Class<? extends FlinkResourceManager<?>> resourceManagerClass, String resourceManagerActorName) { - Props resourceMasterProps = Props.create(resourceManagerClass, configuration, leaderRetriever); + Props resourceMasterProps = getResourceManagerProps( + resourceManagerClass, + configuration, + leaderRetriever); return actorSystem.actorOf(resourceMasterProps, resourceManagerActorName); } + + public static Props getResourceManagerProps( + Class<? extends FlinkResourceManager> resourceManagerClass, + Configuration configuration, + LeaderRetrievalService leaderRetrievalService) { + + return Props.create(resourceManagerClass, configuration, leaderRetrievalService); + } } http://git-wip-us.apache.org/repos/asf/flink/blob/02b852e3/flink-runtime/src/main/java/org/apache/flink/runtime/testutils/TestingResourceManager.java ---------------------------------------------------------------------- diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/testutils/TestingResourceManager.java b/flink-runtime/src/main/java/org/apache/flink/runtime/testutils/TestingResourceManager.java deleted file mode 100644 index 495cacd..0000000 --- a/flink-runtime/src/main/java/org/apache/flink/runtime/testutils/TestingResourceManager.java +++ /dev/null @@ -1,137 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.flink.runtime.testutils; - -import akka.actor.ActorRef; -import org.apache.flink.configuration.Configuration; -import org.apache.flink.runtime.clusterframework.ApplicationStatus; -import org.apache.flink.runtime.clusterframework.messages.RegisterResourceManagerSuccessful; -import org.apache.flink.runtime.clusterframework.standalone.StandaloneResourceManager; -import org.apache.flink.runtime.clusterframework.types.ResourceID; -import org.apache.flink.runtime.leaderretrieval.LeaderRetrievalService; -import org.apache.flink.runtime.messages.Messages; -import org.apache.flink.runtime.testingUtils.TestingMessages; - -import java.util.Collection; -import java.util.HashSet; -import java.util.Set; - - -/** - * A testing resource manager which may alter the default standalone resource master's behavior. - */ -public class TestingResourceManager extends StandaloneResourceManager { - - /** Set of Actors which want to be informed of a connection to the job manager */ - private Set<ActorRef> waitForResourceManagerConnected = new HashSet<>(); - - /** Set of Actors which want to be informed of a shutdown */ - private Set<ActorRef> waitForShutdown = new HashSet<>(); - - /** Flag to signal a connection to the JobManager */ - private boolean isConnected = false; - - public TestingResourceManager(Configuration flinkConfig, LeaderRetrievalService leaderRetriever) { - super(flinkConfig, leaderRetriever); - } - - /** - * Overwrite messages here if desired - */ - @Override - protected void handleMessage(Object message) { - - if (message instanceof GetRegisteredResources) { - sender().tell(new GetRegisteredResourcesReply(getStartedTaskManagers()), self()); - } else if (message instanceof FailResource) { - ResourceID resourceID = ((FailResource) message).resourceID; - notifyWorkerFailed(resourceID, "Failed for test case."); - - } else if (message instanceof NotifyWhenResourceManagerConnected) { - if (isConnected) { - sender().tell( - Messages.getAcknowledge(), - self()); - } else { - waitForResourceManagerConnected.add(sender()); - } - } else if (message instanceof RegisterResourceManagerSuccessful) { - super.handleMessage(message); - - isConnected = true; - - for (ActorRef ref : waitForResourceManagerConnected) { - ref.tell( - Messages.getAcknowledge(), - self()); - } - waitForResourceManagerConnected.clear(); - - } else if (message instanceof TestingMessages.NotifyOfComponentShutdown$) { - waitForShutdown.add(sender()); - } else if (message instanceof TestingMessages.Alive$) { - sender().tell(Messages.getAcknowledge(), self()); - } else { - super.handleMessage(message); - } - } - - /** - * Testing messages - */ - public static class GetRegisteredResources {} - - public static class GetRegisteredResourcesReply { - - public Collection<ResourceID> resources; - - public GetRegisteredResourcesReply(Collection<ResourceID> resources) { - this.resources = resources; - } - - } - - /** - * Fails all resources that the resource manager has registered - */ - public static class FailResource { - - public ResourceID resourceID; - - public FailResource(ResourceID resourceID) { - this.resourceID = resourceID; - } - } - - /** - * The sender of this message will be informed of a connection to the Job Manager - */ - public static class NotifyWhenResourceManagerConnected {} - - /** - * Inform registered listeners about a shutdown of the application. - */ - @Override - protected void shutdownApplication(ApplicationStatus finalStatus, String optionalDiagnostics) { - for (ActorRef listener : waitForShutdown) { - listener.tell(new TestingMessages.ComponentShutdown(self()), self()); - } - waitForShutdown.clear(); - } -} http://git-wip-us.apache.org/repos/asf/flink/blob/02b852e3/flink-runtime/src/main/scala/org/apache/flink/runtime/jobmanager/JobManager.scala ---------------------------------------------------------------------- diff --git a/flink-runtime/src/main/scala/org/apache/flink/runtime/jobmanager/JobManager.scala b/flink-runtime/src/main/scala/org/apache/flink/runtime/jobmanager/JobManager.scala index 88af604..f67be0e 100644 --- a/flink-runtime/src/main/scala/org/apache/flink/runtime/jobmanager/JobManager.scala +++ b/flink-runtime/src/main/scala/org/apache/flink/runtime/jobmanager/JobManager.scala @@ -49,7 +49,7 @@ import org.apache.flink.runtime.clusterframework.standalone.StandaloneResourceMa import org.apache.flink.runtime.clusterframework.types.ResourceID import org.apache.flink.runtime.execution.librarycache.BlobLibraryCacheManager import org.apache.flink.runtime.executiongraph.restart.RestartStrategyFactory -import org.apache.flink.runtime.executiongraph.{StatusListenerMessenger, ExecutionGraph, ExecutionJobVertex} +import org.apache.flink.runtime.executiongraph.{ExecutionGraph, ExecutionJobVertex, StatusListenerMessenger} import org.apache.flink.runtime.instance.{AkkaActorGateway, InstanceManager} import org.apache.flink.runtime.jobgraph.jsonplan.JsonPlanGenerator import org.apache.flink.runtime.jobgraph.{JobGraph, JobStatus, JobVertexID} @@ -2721,7 +2721,7 @@ object JobManager { configuration, None) - val archiveProps = Props(archiveClass, archiveCount) + val archiveProps = getArchiveProps(archiveClass, archiveCount) // start the archiver with the given name, or without (avoid name conflicts) val archive: ActorRef = archiveActorName match { @@ -2729,7 +2729,7 @@ object JobManager { case None => actorSystem.actorOf(archiveProps) } - val jobManagerProps = Props( + val jobManagerProps = getJobManagerProps( jobManagerClass, configuration, executorService, @@ -2754,6 +2754,45 @@ object JobManager { (jobManager, archive) } + def getArchiveProps(archiveClass: Class[_ <: MemoryArchivist], archiveCount: Int): Props = { + Props(archiveClass, archiveCount) + } + + def getJobManagerProps( + jobManagerClass: Class[_ <: JobManager], + configuration: Configuration, + executorService: ExecutorService, + instanceManager: InstanceManager, + scheduler: FlinkScheduler, + libraryCacheManager: BlobLibraryCacheManager, + archive: ActorRef, + restartStrategyFactory: RestartStrategyFactory, + timeout: FiniteDuration, + leaderElectionService: LeaderElectionService, + submittedJobGraphStore: SubmittedJobGraphStore, + checkpointRecoveryFactory: CheckpointRecoveryFactory, + savepointStore: SavepointStore, + jobRecoveryTimeout: FiniteDuration, + metricsRegistry: Option[FlinkMetricRegistry]): Props = { + + Props( + jobManagerClass, + configuration, + executorService, + instanceManager, + scheduler, + libraryCacheManager, + archive, + restartStrategyFactory, + timeout, + leaderElectionService, + submittedJobGraphStore, + checkpointRecoveryFactory, + savepointStore, + jobRecoveryTimeout, + metricsRegistry) + } + // -------------------------------------------------------------------------- // Resolving the JobManager endpoint // -------------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/flink/blob/02b852e3/flink-runtime/src/main/scala/org/apache/flink/runtime/messages/TaskManagerMessages.scala ---------------------------------------------------------------------- diff --git a/flink-runtime/src/main/scala/org/apache/flink/runtime/messages/TaskManagerMessages.scala b/flink-runtime/src/main/scala/org/apache/flink/runtime/messages/TaskManagerMessages.scala index 2d99245..b433015 100644 --- a/flink-runtime/src/main/scala/org/apache/flink/runtime/messages/TaskManagerMessages.scala +++ b/flink-runtime/src/main/scala/org/apache/flink/runtime/messages/TaskManagerMessages.scala @@ -130,6 +130,16 @@ object TaskManagerMessages { */ case class RequestTaskManagerLog(requestType : LogTypeRequest) + /** Requests the number of active connections at the ConnectionManager */ + case object RequestNumActiveConnections + + case class ResponseNumActiveConnections(number: Int) + + /** Requests the number of broadcast variables with references */ + case object RequestBroadcastVariablesWithReferences + + case class ResponseBroadcastVariablesWithReferences(number: Int) + // -------------------------------------------------------------------------- // Utility getters for case objects to simplify access from Java @@ -166,4 +176,20 @@ object TaskManagerMessages { def getRequestTaskManagerStdout(): AnyRef = { RequestTaskManagerLog(StdOutFileRequest) } + + /** + * Accessor for the case object instance, to simplify Java interoperability. + * @return The RequestBroadcastVariablesWithReferences case object instance. + */ + def getRequestBroadcastVariablesWithReferences(): RequestBroadcastVariablesWithReferences.type = { + RequestBroadcastVariablesWithReferences + } + + /** + * Accessor for the case object instance, to simplify Java interoperability. + * @return The RequestNumActiveConnections case object instance. + */ + def getRequestNumActiveConnections(): RequestNumActiveConnections.type = { + RequestNumActiveConnections + } } http://git-wip-us.apache.org/repos/asf/flink/blob/02b852e3/flink-runtime/src/main/scala/org/apache/flink/runtime/minicluster/FlinkMiniCluster.scala ---------------------------------------------------------------------- diff --git a/flink-runtime/src/main/scala/org/apache/flink/runtime/minicluster/FlinkMiniCluster.scala b/flink-runtime/src/main/scala/org/apache/flink/runtime/minicluster/FlinkMiniCluster.scala index a547d25..0178bd3 100644 --- a/flink-runtime/src/main/scala/org/apache/flink/runtime/minicluster/FlinkMiniCluster.scala +++ b/flink-runtime/src/main/scala/org/apache/flink/runtime/minicluster/FlinkMiniCluster.scala @@ -69,7 +69,7 @@ abstract class FlinkMiniCluster( ConfigConstants.JOB_MANAGER_IPC_ADDRESS_KEY, InetAddress.getByName("localhost").getHostAddress()) - val configuration = generateConfiguration(userConfiguration) + protected val originalConfiguration = generateConfiguration(userConfiguration) /** Future to the [[ActorGateway]] of the current leader */ var leaderGateway: Promise[ActorGateway] = Promise() @@ -79,16 +79,16 @@ abstract class FlinkMiniCluster( /** Future lock */ val futureLock = new Object() - + implicit val executionContext = ExecutionContext.global - implicit val timeout = AkkaUtils.getTimeout(configuration) + implicit val timeout = AkkaUtils.getTimeout(originalConfiguration) - val haMode = HighAvailabilityMode.fromConfig(configuration) + val haMode = HighAvailabilityMode.fromConfig(originalConfiguration) val numJobManagers = getNumberOfJobManagers - var numTaskManagers = configuration.getInteger( + var numTaskManagers = originalConfiguration.getInteger( ConfigConstants.LOCAL_NUMBER_TASK_MANAGER, ConfigConstants.DEFAULT_LOCAL_NUMBER_TASK_MANAGER) @@ -105,6 +105,22 @@ abstract class FlinkMiniCluster( private var isRunning = false + def configuration: Configuration = { + if (originalConfiguration.getInteger( + ConfigConstants.JOB_MANAGER_IPC_PORT_KEY, + ConfigConstants.DEFAULT_JOB_MANAGER_IPC_PORT) == 0) { + val leaderConfiguration = new Configuration(originalConfiguration) + + val leaderPort = getLeaderRPCPort + + leaderConfiguration.setInteger(ConfigConstants.JOB_MANAGER_IPC_PORT_KEY, leaderPort) + + leaderConfiguration + } else { + originalConfiguration + } + } + // -------------------------------------------------------------------------- // Abstract Methods // -------------------------------------------------------------------------- @@ -125,7 +141,7 @@ abstract class FlinkMiniCluster( if(haMode == HighAvailabilityMode.NONE) { 1 } else { - configuration.getInteger( + originalConfiguration.getInteger( ConfigConstants.LOCAL_NUMBER_JOB_MANAGER, ConfigConstants.DEFAULT_LOCAL_NUMBER_JOB_MANAGER ) @@ -136,7 +152,7 @@ abstract class FlinkMiniCluster( if(haMode == HighAvailabilityMode.NONE) { 1 } else { - configuration.getInteger( + originalConfiguration.getInteger( ConfigConstants.LOCAL_NUMBER_RESOURCE_MANAGER, ConfigConstants.DEFAULT_LOCAL_NUMBER_RESOURCE_MANAGER ) @@ -177,40 +193,55 @@ abstract class FlinkMiniCluster( Await.result(indexFuture, timeout) } + def getLeaderRPCPort: Int = { + val index = getLeaderIndex(timeout) + + jobManagerActorSystems match { + case Some(jmActorSystems) => + AkkaUtils.getAddress(jmActorSystems(index)).port match { + case Some(p) => p + case None => -1 + } + + case None => throw new Exception("The JobManager of the LocalFlinkMiniCluster has not been " + + "started properly.") + } + } + def getResourceManagerAkkaConfig(index: Int): Config = { if (useSingleActorSystem) { - AkkaUtils.getAkkaConfig(configuration, None) + AkkaUtils.getAkkaConfig(originalConfiguration, None) } else { - val port = configuration.getInteger(ConfigConstants.RESOURCE_MANAGER_IPC_PORT_KEY, - ConfigConstants.DEFAULT_RESOURCE_MANAGER_IPC_PORT) + val port = originalConfiguration.getInteger(ConfigConstants.RESOURCE_MANAGER_IPC_PORT_KEY, + ConfigConstants.DEFAULT_RESOURCE_MANAGER_IPC_PORT) val resolvedPort = if(port != 0) port + index else port - AkkaUtils.getAkkaConfig(configuration, Some((hostname, resolvedPort))) + AkkaUtils.getAkkaConfig(originalConfiguration, Some((hostname, resolvedPort))) } } def getJobManagerAkkaConfig(index: Int): Config = { if (useSingleActorSystem) { - AkkaUtils.getAkkaConfig(configuration, None) + AkkaUtils.getAkkaConfig(originalConfiguration, None) } else { - val port = configuration.getInteger(ConfigConstants.JOB_MANAGER_IPC_PORT_KEY, - ConfigConstants.DEFAULT_JOB_MANAGER_IPC_PORT) + val port = originalConfiguration.getInteger(ConfigConstants.JOB_MANAGER_IPC_PORT_KEY, + ConfigConstants.DEFAULT_JOB_MANAGER_IPC_PORT) val resolvedPort = if(port != 0) port + index else port - AkkaUtils.getAkkaConfig(configuration, Some((hostname, resolvedPort))) + AkkaUtils.getAkkaConfig(originalConfiguration, Some((hostname, resolvedPort))) } } def getTaskManagerAkkaConfig(index: Int): Config = { - val port = configuration.getInteger(ConfigConstants.TASK_MANAGER_IPC_PORT_KEY, - ConfigConstants.DEFAULT_TASK_MANAGER_IPC_PORT) + val port = originalConfiguration.getInteger(ConfigConstants.TASK_MANAGER_IPC_PORT_KEY, + ConfigConstants.DEFAULT_TASK_MANAGER_IPC_PORT) val resolvedPort = if(port != 0) port + index else port - AkkaUtils.getAkkaConfig(configuration, Some((hostname, resolvedPort))) + AkkaUtils.getAkkaConfig(originalConfiguration, Some((hostname, resolvedPort))) } /** @@ -257,7 +288,7 @@ abstract class FlinkMiniCluster( "The FlinkMiniCluster has not been started yet.") } } else { - JobClient.startJobClientActorSystem(configuration) + JobClient.startJobClientActorSystem(originalConfiguration) } } @@ -320,7 +351,7 @@ abstract class FlinkMiniCluster( val jobManagerAkkaURL = AkkaUtils.getAkkaURL(jmActorSystems(0), jmActors(0)) - webMonitor = startWebServer(configuration, jmActorSystems(0), jobManagerAkkaURL) + webMonitor = startWebServer(originalConfiguration, jmActorSystems(0), jobManagerAkkaURL) if(waitForTaskManagerRegistration) { waitForTaskManagersToBeRegistered() @@ -528,7 +559,7 @@ abstract class FlinkMiniCluster( new StandaloneLeaderRetrievalService( AkkaUtils.getAkkaURL(jmActorSystems(0), jmActors(0))) } else { - ZooKeeperUtils.createLeaderRetrievalService(configuration) + ZooKeeperUtils.createLeaderRetrievalService(originalConfiguration) } case _ => throw new Exception("The FlinkMiniCluster has not been started properly.") http://git-wip-us.apache.org/repos/asf/flink/blob/02b852e3/flink-runtime/src/main/scala/org/apache/flink/runtime/minicluster/LocalFlinkMiniCluster.scala ---------------------------------------------------------------------- diff --git a/flink-runtime/src/main/scala/org/apache/flink/runtime/minicluster/LocalFlinkMiniCluster.scala b/flink-runtime/src/main/scala/org/apache/flink/runtime/minicluster/LocalFlinkMiniCluster.scala index d30c047..cac5d91 100644 --- a/flink-runtime/src/main/scala/org/apache/flink/runtime/minicluster/LocalFlinkMiniCluster.scala +++ b/flink-runtime/src/main/scala/org/apache/flink/runtime/minicluster/LocalFlinkMiniCluster.scala @@ -18,23 +18,36 @@ package org.apache.flink.runtime.minicluster -import akka.actor.{ActorRef, ActorSystem} -import org.apache.flink.api.common.JobID +import java.util.concurrent.ExecutorService +import akka.actor.{ActorRef, ActorSystem, Props} +import org.apache.flink.api.common.JobID import org.apache.flink.api.common.io.FileOutputFormat import org.apache.flink.configuration.{ConfigConstants, Configuration} -import org.apache.flink.runtime.akka.AkkaUtils +import org.apache.flink.runtime.checkpoint.CheckpointRecoveryFactory +import org.apache.flink.runtime.checkpoint.savepoint.SavepointStore import org.apache.flink.runtime.clusterframework.FlinkResourceManager import org.apache.flink.runtime.clusterframework.standalone.StandaloneResourceManager -import org.apache.flink.runtime.clusterframework.types.ResourceID +import org.apache.flink.runtime.clusterframework.types.{ResourceID, ResourceIDRetrievable} +import org.apache.flink.runtime.execution.librarycache.BlobLibraryCacheManager +import org.apache.flink.runtime.executiongraph.restart.RestartStrategyFactory +import org.apache.flink.runtime.instance.InstanceManager +import org.apache.flink.runtime.io.disk.iomanager.IOManager +import org.apache.flink.runtime.io.network.NetworkEnvironment import org.apache.flink.runtime.io.network.netty.NettyConfig -import org.apache.flink.runtime.jobmanager.{MemoryArchivist, JobManager} +import org.apache.flink.runtime.jobmanager.scheduler.Scheduler +import org.apache.flink.runtime.jobmanager.{JobManager, MemoryArchivist, SubmittedJobGraphStore} +import org.apache.flink.runtime.leaderelection.LeaderElectionService +import org.apache.flink.runtime.leaderretrieval.LeaderRetrievalService +import org.apache.flink.runtime.memory.MemoryManager import org.apache.flink.runtime.messages.JobManagerMessages -import org.apache.flink.runtime.messages.JobManagerMessages.{CancellationFailure, CancellationResponse, StoppingFailure, StoppingResponse, RunningJobsStatus, RunningJobs} -import org.apache.flink.runtime.taskmanager.TaskManager +import org.apache.flink.runtime.messages.JobManagerMessages.{RunningJobsStatus, StoppingFailure, StoppingResponse} +import org.apache.flink.runtime.metrics.MetricRegistry +import org.apache.flink.runtime.taskmanager.{TaskManager, TaskManagerConfiguration, TaskManagerLocation} import org.apache.flink.runtime.util.EnvironmentInformation import scala.concurrent.Await +import scala.concurrent.duration.FiniteDuration /** * Local Flink mini cluster which executes all [[TaskManager]]s and the [[JobManager]] in the same @@ -65,8 +78,25 @@ class LocalFlinkMiniCluster( config } + //------------------------------------------------------------------------------------------------ + // Actor classes + //------------------------------------------------------------------------------------------------ + + val jobManagerClass: Class[_ <: JobManager] = classOf[JobManager] + + val taskManagerClass: Class[_ <: TaskManager] = classOf[TaskManager] + + val memoryArchivistClass: Class[_ <: MemoryArchivist] = classOf[MemoryArchivist] + + val resourceManagerClass: Class[_ <: FlinkResourceManager[_ <: ResourceIDRetrievable]] = + classOf[StandaloneResourceManager] + + //------------------------------------------------------------------------------------------------ + // Start methods for the distributed components + //------------------------------------------------------------------------------------------------ + override def startJobManager(index: Int, system: ActorSystem): ActorRef = { - val config = configuration.clone() + val config = originalConfiguration.clone() val jobManagerName = getJobManagerName(index) val archiveName = getArchiveName(index) @@ -79,19 +109,48 @@ class LocalFlinkMiniCluster( config.setInteger(ConfigConstants.JOB_MANAGER_IPC_PORT_KEY, jobManagerPort + index) } - val (jobManager, _) = JobManager.startJobManagerActors( - config, - system, - Some(jobManagerName), - Some(archiveName), - classOf[JobManager], - classOf[MemoryArchivist]) - - jobManager + val (executorService, + instanceManager, + scheduler, + libraryCacheManager, + restartStrategyFactory, + timeout, + archiveCount, + leaderElectionService, + submittedJobGraphStore, + checkpointRecoveryFactory, + savepointStore, + jobRecoveryTimeout, + metricsRegistry) = JobManager.createJobManagerComponents(config, createLeaderElectionService()) + + val archive = system.actorOf( + getArchiveProps( + memoryArchivistClass, + archiveCount), + archiveName) + + system.actorOf( + getJobManagerProps( + jobManagerClass, + config, + executorService, + instanceManager, + scheduler, + libraryCacheManager, + archive, + restartStrategyFactory, + timeout, + leaderElectionService, + submittedJobGraphStore, + checkpointRecoveryFactory, + savepointStore, + jobRecoveryTimeout, + metricsRegistry), + jobManagerName) } override def startResourceManager(index: Int, system: ActorSystem): ActorRef = { - val config = configuration.clone() + val config = originalConfiguration.clone() val resourceManagerName = getResourceManagerName(index) @@ -103,18 +162,16 @@ class LocalFlinkMiniCluster( config.setInteger(ConfigConstants.RESOURCE_MANAGER_IPC_PORT_KEY, resourceManagerPort + index) } - val resourceManager = FlinkResourceManager.startResourceManagerActors( + val resourceManagerProps = getResourceManagerProps( + resourceManagerClass, config, - system, - createLeaderRetrievalService(), - classOf[StandaloneResourceManager], - resourceManagerName) + createLeaderRetrievalService()) - resourceManager + system.actorOf(resourceManagerProps, resourceManagerName) } override def startTaskManager(index: Int, system: ActorSystem): ActorRef = { - val config = configuration.clone() + val config = originalConfiguration.clone() val rpcPort = config.getInteger( ConfigConstants.TASK_MANAGER_IPC_PORT_KEY, @@ -138,32 +195,115 @@ class LocalFlinkMiniCluster( } else { TaskManager.TASK_MANAGER_NAME } - - TaskManager.startTaskManagerComponentsAndActor( + + val resourceID = ResourceID.generate() // generate random resource id + + val (taskManagerConfig, + taskManagerLocation, + memoryManager, + ioManager, + network, + leaderRetrievalService) = TaskManager.createTaskManagerComponents( config, - ResourceID.generate(), // generate random resource id - system, + resourceID, hostname, // network interface to bind to - Some(taskManagerActorName), // actor name - Some(createLeaderRetrievalService()), // job manager leader retrieval service localExecution, // start network stack? - classOf[TaskManager]) + Some(createLeaderRetrievalService())) + + val props = getTaskManagerProps( + taskManagerClass, + taskManagerConfig, + resourceID, + taskManagerLocation, + memoryManager, + ioManager, + network, + leaderRetrievalService) + + system.actorOf(props, taskManagerActorName) } - def getLeaderRPCPort: Int = { - val index = getLeaderIndex(timeout) + //------------------------------------------------------------------------------------------------ + // Props for the distributed components + //------------------------------------------------------------------------------------------------ - jobManagerActorSystems match { - case Some(jmActorSystems) => - AkkaUtils.getAddress(jmActorSystems(index)).port match { - case Some(p) => p - case None => -1 - } + def getArchiveProps(archiveClass: Class[_ <: MemoryArchivist], archiveCount: Int): Props = { + JobManager.getArchiveProps(archiveClass, archiveCount) + } - case None => throw new Exception("The JobManager of the LocalFlinkMiniCluster has not been " + - "started properly.") - } + def getJobManagerProps( + jobManagerClass: Class[_ <: JobManager], + configuration: Configuration, + executorService: ExecutorService, + instanceManager: InstanceManager, + scheduler: Scheduler, + libraryCacheManager: BlobLibraryCacheManager, + archive: ActorRef, + restartStrategyFactory: RestartStrategyFactory, + timeout: FiniteDuration, + leaderElectionService: LeaderElectionService, + submittedJobGraphStore: SubmittedJobGraphStore, + checkpointRecoveryFactory: CheckpointRecoveryFactory, + savepointStore: SavepointStore, + jobRecoveryTimeout: FiniteDuration, + metricsRegistry: Option[MetricRegistry]): Props = { + + JobManager.getJobManagerProps( + jobManagerClass, + configuration, + executorService, + instanceManager, + scheduler, + libraryCacheManager, + archive, + restartStrategyFactory, + timeout, + leaderElectionService, + submittedJobGraphStore, + checkpointRecoveryFactory, + savepointStore, + jobRecoveryTimeout, + metricsRegistry) + } + + def getTaskManagerProps( + taskManagerClass: Class[_ <: TaskManager], + taskManagerConfig: TaskManagerConfiguration, + resourceID: ResourceID, + taskManagerLocation: TaskManagerLocation, + memoryManager: MemoryManager, + ioManager: IOManager, + networkEnvironment: NetworkEnvironment, + leaderRetrievalService: LeaderRetrievalService): Props = { + + TaskManager.getTaskManagerProps( + taskManagerClass, + taskManagerConfig, + resourceID, + taskManagerLocation, + memoryManager, + ioManager, + networkEnvironment, + leaderRetrievalService) + } + + def getResourceManagerProps( + resourceManagerClass: Class[_ <: FlinkResourceManager[_ <: ResourceIDRetrievable]], + configuration: Configuration, + leaderRetrievalService: LeaderRetrievalService): Props = { + + FlinkResourceManager.getResourceManagerProps( + resourceManagerClass, + configuration, + leaderRetrievalService) + } + + //------------------------------------------------------------------------------------------------ + // Helper methods + //------------------------------------------------------------------------------------------------ + def createLeaderElectionService(): Option[LeaderElectionService] = { + None } def initializeIOFormatClasses(configuration: Configuration): Unit = { @@ -186,7 +326,7 @@ class LocalFlinkMiniCluster( val bufferSize: Int = config.getInteger( ConfigConstants.TASK_MANAGER_MEMORY_SEGMENT_SIZE_KEY, ConfigConstants.DEFAULT_TASK_MANAGER_MEMORY_SEGMENT_SIZE) - + val bufferMem: Long = config.getLong( ConfigConstants.TASK_MANAGER_NETWORK_NUM_BUFFERS_KEY, ConfigConstants.DEFAULT_TASK_MANAGER_NETWORK_NUM_BUFFERS) * bufferSize.toLong @@ -218,6 +358,7 @@ class LocalFlinkMiniCluster( val config: Configuration = new Configuration() config.setString(ConfigConstants.JOB_MANAGER_IPC_ADDRESS_KEY, hostname) + config.setInteger(ConfigConstants.JOB_MANAGER_IPC_PORT_KEY, 0) config.setInteger(ConfigConstants.LOCAL_NUMBER_TASK_MANAGER, ConfigConstants.DEFAULT_LOCAL_NUMBER_TASK_MANAGER) @@ -252,11 +393,11 @@ class LocalFlinkMiniCluster( JobManager.ARCHIVE_NAME } } - + // -------------------------------------------------------------------------- // Actions on running jobs // -------------------------------------------------------------------------- - + def currentlyRunningJobs: Iterable[JobID] = { val leader = getLeaderGateway(timeout) val future = leader.ask(JobManagerMessages.RequestRunningJobsStatus, timeout) @@ -269,7 +410,7 @@ class LocalFlinkMiniCluster( currentlyRunningJobs.foreach(list.add) list } - + def stopJob(id: JobID) : Unit = { val leader = getLeaderGateway(timeout) val response = leader.ask(new JobManagerMessages.StopJob(id), timeout) http://git-wip-us.apache.org/repos/asf/flink/blob/02b852e3/flink-runtime/src/main/scala/org/apache/flink/runtime/taskmanager/TaskManager.scala ---------------------------------------------------------------------- diff --git a/flink-runtime/src/main/scala/org/apache/flink/runtime/taskmanager/TaskManager.scala b/flink-runtime/src/main/scala/org/apache/flink/runtime/taskmanager/TaskManager.scala index 84750a3..de85f30 100644 --- a/flink-runtime/src/main/scala/org/apache/flink/runtime/taskmanager/TaskManager.scala +++ b/flink-runtime/src/main/scala/org/apache/flink/runtime/taskmanager/TaskManager.scala @@ -354,6 +354,21 @@ class TaskManager( case None => sender() ! new IOException("BlobService not available. Cannot upload TaskManager logs.") } + + case RequestBroadcastVariablesWithReferences => + sender ! decorateMessage( + ResponseBroadcastVariablesWithReferences( + bcVarManager.getNumberOfVariablesWithReferences) + ) + + case RequestNumActiveConnections => + val numActive = if (!network.isShutdown) { + network.getConnectionManager.getNumberOfActiveConnections + } else { + 0 + } + + sender ! decorateMessage(ResponseNumActiveConnections(numActive)) } /** @@ -1781,6 +1796,7 @@ object TaskManager { } /** + * Starts the task manager actor. * * @param configuration The configuration for the TaskManager. * @param resourceID The id of the resource which the task manager will run on. @@ -1817,11 +1833,75 @@ object TaskManager { taskManagerClass: Class[_ <: TaskManager]) : ActorRef = { - val (taskManagerConfig : TaskManagerConfiguration, - netConfig: NetworkEnvironmentConfiguration, - taskManagerAddress: InetSocketAddress, - memType: MemoryType - ) = parseTaskManagerConfiguration( + val (taskManagerConfig, + connectionInfo, + memoryManager, + ioManager, + network, + leaderRetrievalService) = createTaskManagerComponents( + configuration, + resourceID, + taskManagerHostname, + localTaskManagerCommunication, + leaderRetrievalServiceOption) + + // create the actor properties (which define the actor constructor parameters) + val tmProps = getTaskManagerProps( + taskManagerClass, + taskManagerConfig, + resourceID, + connectionInfo, + memoryManager, + ioManager, + network, + leaderRetrievalService) + + taskManagerActorName match { + case Some(actorName) => actorSystem.actorOf(tmProps, actorName) + case None => actorSystem.actorOf(tmProps) + } + } + + def getTaskManagerProps( + taskManagerClass: Class[_ <: TaskManager], + taskManagerConfig: TaskManagerConfiguration, + resourceID: ResourceID, + taskManagerLocation: TaskManagerLocation, + memoryManager: MemoryManager, + ioManager: IOManager, + networkEnvironment: NetworkEnvironment, + leaderRetrievalService: LeaderRetrievalService + ): Props = { + Props( + taskManagerClass, + taskManagerConfig, + resourceID, + taskManagerLocation, + memoryManager, + ioManager, + networkEnvironment, + taskManagerConfig.numberOfSlots, + leaderRetrievalService) + } + + def createTaskManagerComponents( + configuration: Configuration, + resourceID: ResourceID, + taskManagerHostname: String, + localTaskManagerCommunication: Boolean, + leaderRetrievalServiceOption: Option[LeaderRetrievalService]): + (TaskManagerConfiguration, + TaskManagerLocation, + MemoryManager, + IOManager, + NetworkEnvironment, + LeaderRetrievalService) = { + + val (taskManagerConfig : TaskManagerConfiguration, + netConfig: NetworkEnvironmentConfiguration, + taskManagerAddress: InetSocketAddress, + memType: MemoryType + ) = parseTaskManagerConfiguration( configuration, taskManagerHostname, localTaskManagerCommunication) @@ -1895,10 +1975,10 @@ object TaskManager { // check if a value has been configured val configuredMemory = configuration.getLong(ConfigConstants.TASK_MANAGER_MEMORY_SIZE_KEY, -1L) checkConfigParameter(configuredMemory == -1 || configuredMemory > 0, configuredMemory, - ConfigConstants.TASK_MANAGER_MEMORY_SIZE_KEY, - "MemoryManager needs at least one MB of memory. " + - "If you leave this config parameter empty, the system automatically " + - "pick a fraction of the available memory.") + ConfigConstants.TASK_MANAGER_MEMORY_SIZE_KEY, + "MemoryManager needs at least one MB of memory. " + + "If you leave this config parameter empty, the system automatically " + + "pick a fraction of the available memory.") val preAllocateMemory = configuration.getBoolean( @@ -1910,7 +1990,7 @@ object TaskManager { LOG.info(s"Using $configuredMemory MB for managed memory.") } else { LOG.info(s"Limiting managed memory to $configuredMemory MB, " + - s"memory will be allocated lazily.") + s"memory will be allocated lazily.") } configuredMemory << 20 // megabytes to bytes } @@ -1928,10 +2008,10 @@ object TaskManager { if (preAllocateMemory) { LOG.info(s"Using $fraction of the currently free heap space for managed " + - s"heap memory (${relativeMemSize >> 20} MB).") + s"heap memory (${relativeMemSize >> 20} MB).") } else { LOG.info(s"Limiting managed memory to $fraction of the currently free heap space " + - s"(${relativeMemSize >> 20} MB), memory will be allocated lazily.") + s"(${relativeMemSize >> 20} MB), memory will be allocated lazily.") } relativeMemSize @@ -1944,10 +2024,10 @@ object TaskManager { if (preAllocateMemory) { LOG.info(s"Using $fraction of the maximum memory size for " + - s"managed off-heap memory (${directMemorySize >> 20} MB).") + s"managed off-heap memory (${directMemorySize >> 20} MB).") } else { LOG.info(s"Limiting managed memory to $fraction of the maximum memory size " + - s"(${directMemorySize >> 20} MB), memory will be allocated lazily.") + s"(${directMemorySize >> 20} MB), memory will be allocated lazily.") } directMemorySize @@ -1971,12 +2051,12 @@ object TaskManager { memType match { case MemoryType.HEAP => throw new Exception(s"OutOfMemory error (${e.getMessage()})" + - s" while allocating the TaskManager heap memory ($memorySize bytes).", e) + s" while allocating the TaskManager heap memory ($memorySize bytes).", e) case MemoryType.OFF_HEAP => throw new Exception(s"OutOfMemory error (${e.getMessage()})" + - s" while allocating the TaskManager off-heap memory ($memorySize bytes). " + - s"Try increasing the maximum direct memory (-XX:MaxDirectMemorySize)", e) + s" while allocating the TaskManager off-heap memory ($memorySize bytes). " + + s"Try increasing the maximum direct memory (-XX:MaxDirectMemorySize)", e) case _ => throw e } @@ -1990,22 +2070,12 @@ object TaskManager { case None => LeaderRetrievalUtils.createLeaderRetrievalService(configuration) } - // create the actor properties (which define the actor constructor parameters) - val tmProps = Props( - taskManagerClass, - taskManagerConfig, - resourceID, + (taskManagerConfig, taskManagerLocation, memoryManager, ioManager, network, - taskManagerConfig.numberOfSlots, leaderRetrievalService) - - taskManagerActorName match { - case Some(actorName) => actorSystem.actorOf(tmProps, actorName) - case None => actorSystem.actorOf(tmProps) - } } @@ -2055,8 +2125,8 @@ object TaskManager { * @param taskManagerHostname The host name under which the TaskManager communicates. * @param localTaskManagerCommunication True, to skip initializing the network stack. * Use only in cases where only one task manager runs. - * @return A tuple (TaskManagerConfiguration, network configuration, - * InstanceConnectionInfo, JobManager actor Akka URL). + * @return A tuple (TaskManagerConfiguration, network configuration, inet socket address, + * memory tyep). */ @throws(classOf[IllegalArgumentException]) def parseTaskManagerConfiguration( http://git-wip-us.apache.org/repos/asf/flink/blob/02b852e3/flink-runtime/src/main/scala/org/apache/flink/runtime/testingUtils/TestingJobManager.scala ---------------------------------------------------------------------- diff --git a/flink-runtime/src/main/scala/org/apache/flink/runtime/testingUtils/TestingJobManager.scala b/flink-runtime/src/main/scala/org/apache/flink/runtime/testingUtils/TestingJobManager.scala deleted file mode 100644 index 16331ac..0000000 --- a/flink-runtime/src/main/scala/org/apache/flink/runtime/testingUtils/TestingJobManager.scala +++ /dev/null @@ -1,72 +0,0 @@ -/* -* Licensed to the Apache Software Foundation (ASF) under one -* or more contributor license agreements. See the NOTICE file -* distributed with this work for additional information -* regarding copyright ownership. The ASF licenses this file -* to you under the Apache License, Version 2.0 (the -* "License"); you may not use this file except in compliance -* with the License. You may obtain a copy of the License at -* -* http://www.apache.org/licenses/LICENSE-2.0 -* -* Unless required by applicable law or agreed to in writing, software -* distributed under the License is distributed on an "AS IS" BASIS, -* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. -* See the License for the specific language governing permissions and -* limitations under the License. -*/ - -package org.apache.flink.runtime.testingUtils - -import akka.actor.ActorRef - -import org.apache.flink.configuration.Configuration -import org.apache.flink.runtime.checkpoint.CheckpointRecoveryFactory -import org.apache.flink.runtime.checkpoint.savepoint.SavepointStore -import org.apache.flink.runtime.execution.librarycache.BlobLibraryCacheManager -import org.apache.flink.runtime.executiongraph.restart.RestartStrategyFactory -import org.apache.flink.runtime.instance.InstanceManager -import org.apache.flink.runtime.jobmanager.scheduler.Scheduler -import org.apache.flink.runtime.jobmanager.{JobManager, SubmittedJobGraphStore} -import org.apache.flink.runtime.leaderelection.LeaderElectionService -import org.apache.flink.runtime.metrics.MetricRegistry - -import scala.concurrent.duration._ -import scala.language.postfixOps - -import java.util.concurrent.ExecutorService - -/** JobManager implementation extended by testing messages - * - */ -class TestingJobManager( - flinkConfiguration: Configuration, - executorService: ExecutorService, - instanceManager: InstanceManager, - scheduler: Scheduler, - libraryCacheManager: BlobLibraryCacheManager, - archive: ActorRef, - restartStrategyFactory: RestartStrategyFactory, - timeout: FiniteDuration, - leaderElectionService: LeaderElectionService, - submittedJobGraphs : SubmittedJobGraphStore, - checkpointRecoveryFactory : CheckpointRecoveryFactory, - savepointStore : SavepointStore, - jobRecoveryTimeout : FiniteDuration, - metricRegistry : Option[MetricRegistry]) - extends JobManager( - flinkConfiguration, - executorService, - instanceManager, - scheduler, - libraryCacheManager, - archive, - restartStrategyFactory, - timeout, - leaderElectionService, - submittedJobGraphs, - checkpointRecoveryFactory, - savepointStore, - jobRecoveryTimeout, - metricRegistry) - with TestingJobManagerLike {} http://git-wip-us.apache.org/repos/asf/flink/blob/02b852e3/flink-runtime/src/main/scala/org/apache/flink/runtime/testingUtils/TestingJobManagerLike.scala ---------------------------------------------------------------------- diff --git a/flink-runtime/src/main/scala/org/apache/flink/runtime/testingUtils/TestingJobManagerLike.scala b/flink-runtime/src/main/scala/org/apache/flink/runtime/testingUtils/TestingJobManagerLike.scala deleted file mode 100644 index 3947b17..0000000 --- a/flink-runtime/src/main/scala/org/apache/flink/runtime/testingUtils/TestingJobManagerLike.scala +++ /dev/null @@ -1,417 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.flink.runtime.testingUtils - -import akka.actor.{ActorRef, Cancellable, Terminated} -import akka.pattern.{ask, pipe} -import org.apache.flink.api.common.JobID -import org.apache.flink.runtime.FlinkActor -import org.apache.flink.runtime.execution.ExecutionState -import org.apache.flink.runtime.jobgraph.JobStatus -import org.apache.flink.runtime.jobmanager.JobManager -import org.apache.flink.runtime.messages.ExecutionGraphMessages.JobStatusChanged -import org.apache.flink.runtime.messages.JobManagerMessages.{GrantLeadership, RegisterJobClient, RequestClassloadingProps} -import org.apache.flink.runtime.messages.Messages.{Acknowledge, Disconnect} -import org.apache.flink.runtime.messages.RegistrationMessages.RegisterTaskManager -import org.apache.flink.runtime.messages.TaskManagerMessages.Heartbeat -import org.apache.flink.runtime.testingUtils.TestingJobManagerMessages._ -import org.apache.flink.runtime.testingUtils.TestingMessages._ -import org.apache.flink.runtime.testingUtils.TestingTaskManagerMessages.AccumulatorsChanged - -import scala.collection.mutable -import scala.concurrent.Future -import scala.concurrent.duration._ -import scala.language.postfixOps - -/** This mixin can be used to decorate a JobManager with messages for testing purpose. */ -trait TestingJobManagerLike extends FlinkActor { - that: JobManager => - - import context._ - - import scala.collection.JavaConverters._ - - val waitForAllVerticesToBeRunning = scala.collection.mutable.HashMap[JobID, Set[ActorRef]]() - val waitForTaskManagerToBeTerminated = scala.collection.mutable.HashMap[String, Set[ActorRef]]() - - val waitForAllVerticesToBeRunningOrFinished = - scala.collection.mutable.HashMap[JobID, Set[ActorRef]]() - - var periodicCheck: Option[Cancellable] = None - - val waitForJobStatus = scala.collection.mutable.HashMap[JobID, - collection.mutable.HashMap[JobStatus, Set[ActorRef]]]() - - val waitForAccumulatorUpdate = scala.collection.mutable.HashMap[JobID, (Boolean, Set[ActorRef])]() - - val waitForLeader = scala.collection.mutable.HashSet[ActorRef]() - - val waitForNumRegisteredTaskManagers = mutable.PriorityQueue.newBuilder( - new Ordering[(Int, ActorRef)] { - override def compare(x: (Int, ActorRef), y: (Int, ActorRef)): Int = y._1 - x._1 - }) - - val waitForClient = scala.collection.mutable.HashSet[ActorRef]() - - val waitForShutdown = scala.collection.mutable.HashSet[ActorRef]() - - var disconnectDisabled = false - - var postStopEnabled = true - - abstract override def postStop(): Unit = { - if (postStopEnabled) { - super.postStop() - } else { - // only stop leader election service to revoke the leadership of this JM so that a new JM - // can be elected leader - leaderElectionService.stop() - } - } - - abstract override def handleMessage: Receive = { - handleTestingMessage orElse super.handleMessage - } - - def handleTestingMessage: Receive = { - case Alive => sender() ! Acknowledge - - case RequestExecutionGraph(jobID) => - currentJobs.get(jobID) match { - case Some((executionGraph, jobInfo)) => sender() ! decorateMessage( - ExecutionGraphFound( - jobID, - executionGraph) - ) - - case None => archive.tell(decorateMessage(RequestExecutionGraph(jobID)), sender()) - } - - case WaitForAllVerticesToBeRunning(jobID) => - if(checkIfAllVerticesRunning(jobID)){ - sender() ! decorateMessage(AllVerticesRunning(jobID)) - }else{ - val waiting = waitForAllVerticesToBeRunning.getOrElse(jobID, Set[ActorRef]()) - waitForAllVerticesToBeRunning += jobID -> (waiting + sender()) - - if(periodicCheck.isEmpty){ - periodicCheck = - Some( - context.system.scheduler.schedule( - 0 seconds, - 200 millis, - self, - decorateMessage(NotifyListeners) - ) - ) - } - } - case WaitForAllVerticesToBeRunningOrFinished(jobID) => - if(checkIfAllVerticesRunningOrFinished(jobID)){ - sender() ! decorateMessage(AllVerticesRunning(jobID)) - }else{ - val waiting = waitForAllVerticesToBeRunningOrFinished.getOrElse(jobID, Set[ActorRef]()) - waitForAllVerticesToBeRunningOrFinished += jobID -> (waiting + sender()) - - if(periodicCheck.isEmpty){ - periodicCheck = - Some( - context.system.scheduler.schedule( - 0 seconds, - 200 millis, - self, - decorateMessage(NotifyListeners) - ) - ) - } - } - - case NotifyListeners => - for(jobID <- currentJobs.keySet){ - notifyListeners(jobID) - } - - if(waitForAllVerticesToBeRunning.isEmpty && waitForAllVerticesToBeRunningOrFinished.isEmpty) { - periodicCheck foreach { _.cancel() } - periodicCheck = None - } - - - case NotifyWhenJobRemoved(jobID) => - val gateways = instanceManager.getAllRegisteredInstances.asScala.map(_.getActorGateway) - - val responses = gateways.map{ - gateway => gateway.ask(NotifyWhenJobRemoved(jobID), timeout).mapTo[Boolean] - } - - val jobRemovedOnJobManager = (self ? CheckIfJobRemoved(jobID))(timeout).mapTo[Boolean] - - val allFutures = responses ++ Seq(jobRemovedOnJobManager) - - import context.dispatcher - Future.fold(allFutures)(true)(_ & _) map(decorateMessage(_)) pipeTo sender() - - case CheckIfJobRemoved(jobID) => - if(currentJobs.contains(jobID)) { - context.system.scheduler.scheduleOnce( - 200 milliseconds, - self, - decorateMessage(CheckIfJobRemoved(jobID)) - )(context.dispatcher, sender()) - } else { - sender() ! decorateMessage(true) - } - - case NotifyWhenTaskManagerTerminated(taskManager) => - val waiting = waitForTaskManagerToBeTerminated.getOrElse(taskManager.path.name, Set()) - waitForTaskManagerToBeTerminated += taskManager.path.name -> (waiting + sender) - - case msg@Terminated(taskManager) => - super.handleMessage(msg) - - waitForTaskManagerToBeTerminated.remove(taskManager.path.name) foreach { - _ foreach { - listener => - listener ! decorateMessage(TaskManagerTerminated(taskManager)) - } - } - - // see shutdown method for reply - case NotifyOfComponentShutdown => - waitForShutdown += sender() - - case NotifyWhenAccumulatorChange(jobID) => - - val (updated, registered) = waitForAccumulatorUpdate. - getOrElse(jobID, (false, Set[ActorRef]())) - waitForAccumulatorUpdate += jobID -> (updated, registered + sender) - sender ! true - - /** - * Notification from the task manager that changed accumulator are transferred on next - * Hearbeat. We need to keep this state to notify the listeners on next Heartbeat report. - */ - case AccumulatorsChanged(jobID: JobID) => - waitForAccumulatorUpdate.get(jobID) match { - case Some((updated, registered)) => - waitForAccumulatorUpdate.put(jobID, (true, registered)) - case None => - } - - /** - * Disabled async processing of accumulator values and send accumulators to the listeners if - * we previously received an [[AccumulatorsChanged]] message. - */ - case msg : Heartbeat => - super.handleMessage(msg) - - waitForAccumulatorUpdate foreach { - case (jobID, (updated, actors)) if updated => - currentJobs.get(jobID) match { - case Some((graph, jobInfo)) => - val flinkAccumulators = graph.getFlinkAccumulators - val userAccumulators = graph.aggregateUserAccumulators - actors foreach { - actor => actor ! UpdatedAccumulators(jobID, flinkAccumulators, userAccumulators) - } - case None => - } - waitForAccumulatorUpdate.put(jobID, (false, actors)) - case _ => - } - - case RequestWorkingTaskManager(jobID) => - currentJobs.get(jobID) match { - case Some((eg, _)) => - if(eg.getAllExecutionVertices.asScala.isEmpty){ - sender ! decorateMessage(WorkingTaskManager(None)) - } else { - val resource = eg.getAllExecutionVertices.asScala.head.getCurrentAssignedResource - - if(resource == null){ - sender ! decorateMessage(WorkingTaskManager(None)) - } else { - sender ! decorateMessage( - WorkingTaskManager( - Some(resource.getTaskManagerActorGateway()) - ) - ) - } - } - case None => sender ! decorateMessage(WorkingTaskManager(None)) - } - - case NotifyWhenJobStatus(jobID, state) => - val jobStatusListener = waitForJobStatus.getOrElseUpdate(jobID, - scala.collection.mutable.HashMap[JobStatus, Set[ActorRef]]()) - - val listener = jobStatusListener.getOrElse(state, Set[ActorRef]()) - - jobStatusListener += state -> (listener + sender) - - case msg@JobStatusChanged(jobID, newJobStatus, _, _) => - super.handleMessage(msg) - - val cleanup = waitForJobStatus.get(jobID) match { - case Some(stateListener) => - stateListener.remove(newJobStatus) match { - case Some(listeners) => - listeners foreach { - _ ! decorateMessage(JobStatusIs(jobID, newJobStatus)) - } - case _ => - } - stateListener.isEmpty - - case _ => false - } - - if (cleanup) { - waitForJobStatus.remove(jobID) - } - - case DisableDisconnect => - disconnectDisabled = true - - case DisablePostStop => - postStopEnabled = false - - case RequestSavepoint(savepointPath) => - try { - val savepoint = savepointStore.loadSavepoint(savepointPath) - sender ! ResponseSavepoint(savepoint) - } - catch { - case e: Exception => - sender ! ResponseSavepoint(null) - } - - case msg: Disconnect => - if (!disconnectDisabled) { - super.handleMessage(msg) - - val taskManager = sender() - - waitForTaskManagerToBeTerminated.remove(taskManager.path.name) foreach { - _ foreach { - listener => - listener ! decorateMessage(TaskManagerTerminated(taskManager)) - } - } - } - - case NotifyWhenLeader => - if (leaderElectionService.hasLeadership) { - sender() ! true - } else { - waitForLeader += sender() - } - - case msg: GrantLeadership => - super.handleMessage(msg) - - waitForLeader.foreach(_ ! true) - - waitForLeader.clear() - - case NotifyWhenClientConnects => - waitForClient += sender() - sender() ! true - - case msg: RegisterJobClient => - super.handleMessage(msg) - waitForClient.foreach(_ ! ClientConnected) - case msg: RequestClassloadingProps => - super.handleMessage(msg) - waitForClient.foreach(_ ! ClassLoadingPropsDelivered) - - case NotifyWhenAtLeastNumTaskManagerAreRegistered(numRegisteredTaskManager) => - if (that.instanceManager.getNumberOfRegisteredTaskManagers >= numRegisteredTaskManager) { - // there are already at least numRegisteredTaskManager registered --> send Acknowledge - sender() ! Acknowledge - } else { - // wait until we see at least numRegisteredTaskManager being registered at the JobManager - waitForNumRegisteredTaskManagers += ((numRegisteredTaskManager, sender())) - } - - // TaskManager may be registered on these two messages - case msg @ (_: RegisterTaskManager) => - super.handleMessage(msg) - - // dequeue all senders which wait for instanceManager.getNumberOfStartedTaskManagers or - // fewer registered TaskManagers - while (waitForNumRegisteredTaskManagers.nonEmpty && - waitForNumRegisteredTaskManagers.head._1 <= - instanceManager.getNumberOfRegisteredTaskManagers) { - val receiver = waitForNumRegisteredTaskManagers.dequeue()._2 - receiver ! Acknowledge - } - } - - def checkIfAllVerticesRunning(jobID: JobID): Boolean = { - currentJobs.get(jobID) match { - case Some((eg, _)) => - eg.getAllExecutionVertices.asScala.forall( _.getExecutionState == ExecutionState.RUNNING) - case None => false - } - } - - def checkIfAllVerticesRunningOrFinished(jobID: JobID): Boolean = { - currentJobs.get(jobID) match { - case Some((eg, _)) => - eg.getAllExecutionVertices.asScala.forall { - case vertex => - (vertex.getExecutionState == ExecutionState.RUNNING - || vertex.getExecutionState == ExecutionState.FINISHED) - } - case None => false - } - } - - def notifyListeners(jobID: JobID): Unit = { - if(checkIfAllVerticesRunning(jobID)) { - waitForAllVerticesToBeRunning.remove(jobID) match { - case Some(listeners) => - for (listener <- listeners) { - listener ! decorateMessage(AllVerticesRunning(jobID)) - } - case _ => - } - } - - if(checkIfAllVerticesRunningOrFinished(jobID)) { - waitForAllVerticesToBeRunningOrFinished.remove(jobID) match { - case Some(listeners) => - for (listener <- listeners) { - listener ! decorateMessage(AllVerticesRunning(jobID)) - } - case _ => - } - } - } - - /** - * No killing of the VM for testing. - */ - override protected def shutdown(): Unit = { - log.info("Shutting down TestingJobManager.") - waitForShutdown.foreach(_ ! ComponentShutdown(self)) - waitForShutdown.clear() - } -} http://git-wip-us.apache.org/repos/asf/flink/blob/02b852e3/flink-runtime/src/main/scala/org/apache/flink/runtime/testingUtils/TestingJobManagerMessages.scala ---------------------------------------------------------------------- diff --git a/flink-runtime/src/main/scala/org/apache/flink/runtime/testingUtils/TestingJobManagerMessages.scala b/flink-runtime/src/main/scala/org/apache/flink/runtime/testingUtils/TestingJobManagerMessages.scala deleted file mode 100644 index f121305..0000000 --- a/flink-runtime/src/main/scala/org/apache/flink/runtime/testingUtils/TestingJobManagerMessages.scala +++ /dev/null @@ -1,133 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.flink.runtime.testingUtils - -import java.util.Map - -import akka.actor.ActorRef -import org.apache.flink.api.common.JobID -import org.apache.flink.api.common.accumulators.Accumulator -import org.apache.flink.runtime.accumulators.AccumulatorRegistry -import org.apache.flink.runtime.checkpoint.CompletedCheckpoint -import org.apache.flink.runtime.executiongraph.{ExecutionAttemptID, ExecutionGraph} -import org.apache.flink.runtime.instance.ActorGateway -import org.apache.flink.runtime.jobgraph.JobStatus -import org.apache.flink.runtime.checkpoint.savepoint.Savepoint - -object TestingJobManagerMessages { - - case class RequestExecutionGraph(jobID: JobID) - - sealed trait ResponseExecutionGraph { - def jobID: JobID - } - - case class ExecutionGraphFound(jobID: JobID, executionGraph: ExecutionGraph) extends - ResponseExecutionGraph - - case class ExecutionGraphNotFound(jobID: JobID) extends ResponseExecutionGraph - - case class WaitForAllVerticesToBeRunning(jobID: JobID) - case class WaitForAllVerticesToBeRunningOrFinished(jobID: JobID) - case class AllVerticesRunning(jobID: JobID) - - case class NotifyWhenJobRemoved(jobID: JobID) - - case class RequestWorkingTaskManager(jobID: JobID) - case class WorkingTaskManager(gatewayOption: Option[ActorGateway]) - - case class NotifyWhenJobStatus(jobID: JobID, state: JobStatus) - case class JobStatusIs(jobID: JobID, state: JobStatus) - - case object NotifyListeners - - case class NotifyWhenTaskManagerTerminated(taskManager: ActorRef) - case class TaskManagerTerminated(taskManager: ActorRef) - - /** - * Registers a listener to receive a message when accumulators changed. - * The change must be explicitly triggered by the TestingTaskManager which can receive an - * [[org.apache.flink.runtime.testingUtils.TestingTaskManagerMessages.AccumulatorsChanged]] - * message by a task that changed the accumulators. This message is then - * forwarded to the JobManager which will send the accumulators in the [[UpdatedAccumulators]] - * message when the next Heartbeat occurs. - */ - case class NotifyWhenAccumulatorChange(jobID: JobID) - - /** - * Reports updated accumulators back to the listener. - */ - case class UpdatedAccumulators(jobID: JobID, - flinkAccumulators: Map[ExecutionAttemptID, Map[AccumulatorRegistry.Metric, Accumulator[_,_]]], - userAccumulators: Map[String, Accumulator[_,_]]) - - /** Notifies the sender when the [[TestingJobManager]] has been elected as the leader - * - */ - case object NotifyWhenLeader - - /** - * Notifies the sender when the [[TestingJobManager]] receives new clients for jobs - */ - case object NotifyWhenClientConnects - /** - * Notifes of client connect - */ - case object ClientConnected - /** - * Notifies when the client has requested class loading information - */ - case object ClassLoadingPropsDelivered - - /** - * Registers to be notified by an [[org.apache.flink.runtime.messages.Messages.Acknowledge]] - * message when at least numRegisteredTaskManager have registered at the JobManager. - * - * @param numRegisteredTaskManager minimum number of registered TMs before the sender is notified - */ - case class NotifyWhenAtLeastNumTaskManagerAreRegistered(numRegisteredTaskManager: Int) - - /** Disables the post stop method of the [[TestingJobManager]]. - * - * Only the leaderElectionService is stopped in the postStop method call to revoke the leadership - */ - case object DisablePostStop - - /** - * Requests a savepoint from the job manager. - * - * @param savepointPath The path of the savepoint to request. - */ - case class RequestSavepoint(savepointPath: String) - - /** - * Response to a savepoint request. - * - * @param savepoint The requested savepoint or null if none available. - */ - case class ResponseSavepoint(savepoint: Savepoint) - - def getNotifyWhenLeader(): AnyRef = NotifyWhenLeader - def getNotifyWhenClientConnects(): AnyRef = NotifyWhenClientConnects - def getDisablePostStop(): AnyRef = DisablePostStop - - def getClientConnected(): AnyRef = ClientConnected - def getClassLoadingPropsDelivered(): AnyRef = ClassLoadingPropsDelivered - -} http://git-wip-us.apache.org/repos/asf/flink/blob/02b852e3/flink-runtime/src/main/scala/org/apache/flink/runtime/testingUtils/TestingMemoryArchivist.scala ---------------------------------------------------------------------- diff --git a/flink-runtime/src/main/scala/org/apache/flink/runtime/testingUtils/TestingMemoryArchivist.scala b/flink-runtime/src/main/scala/org/apache/flink/runtime/testingUtils/TestingMemoryArchivist.scala deleted file mode 100644 index 48a1ddd..0000000 --- a/flink-runtime/src/main/scala/org/apache/flink/runtime/testingUtils/TestingMemoryArchivist.scala +++ /dev/null @@ -1,43 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.flink.runtime.testingUtils - -import org.apache.flink.runtime.jobmanager.MemoryArchivist -import org.apache.flink.runtime.testingUtils.TestingJobManagerMessages.{ExecutionGraphFound, ExecutionGraphNotFound, RequestExecutionGraph} - -/** Memory archivist extended by testing messages - * - * @param maxEntries number of maximum number of archived jobs - */ -class TestingMemoryArchivist(maxEntries: Int) extends MemoryArchivist(maxEntries) { - - override def handleMessage: Receive = { - handleTestingMessage orElse super.handleMessage - } - - def handleTestingMessage: Receive = { - case RequestExecutionGraph(jobID) => - val executionGraph = graphs.get(jobID) - - executionGraph match { - case Some(graph) => sender ! decorateMessage(ExecutionGraphFound(jobID, graph)) - case None => sender ! decorateMessage(ExecutionGraphNotFound(jobID)) - } - } -} http://git-wip-us.apache.org/repos/asf/flink/blob/02b852e3/flink-runtime/src/main/scala/org/apache/flink/runtime/testingUtils/TestingMessages.scala ---------------------------------------------------------------------- diff --git a/flink-runtime/src/main/scala/org/apache/flink/runtime/testingUtils/TestingMessages.scala b/flink-runtime/src/main/scala/org/apache/flink/runtime/testingUtils/TestingMessages.scala deleted file mode 100644 index 91d169a..0000000 --- a/flink-runtime/src/main/scala/org/apache/flink/runtime/testingUtils/TestingMessages.scala +++ /dev/null @@ -1,40 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.flink.runtime.testingUtils - -import akka.actor.ActorRef -import org.apache.flink.api.common.JobID - -object TestingMessages { - - case class CheckIfJobRemoved(jobID: JobID) - - case object DisableDisconnect - - case object Alive - - def getAlive: AnyRef = Alive - - def getDisableDisconnect: AnyRef = DisableDisconnect - - case object NotifyOfComponentShutdown - case class ComponentShutdown(ref: ActorRef) - - def getNotifyOfComponentShutdown(): AnyRef = NotifyOfComponentShutdown -} http://git-wip-us.apache.org/repos/asf/flink/blob/02b852e3/flink-runtime/src/main/scala/org/apache/flink/runtime/testingUtils/TestingTaskManager.scala ---------------------------------------------------------------------- diff --git a/flink-runtime/src/main/scala/org/apache/flink/runtime/testingUtils/TestingTaskManager.scala b/flink-runtime/src/main/scala/org/apache/flink/runtime/testingUtils/TestingTaskManager.scala deleted file mode 100644 index 9b5a147..0000000 --- a/flink-runtime/src/main/scala/org/apache/flink/runtime/testingUtils/TestingTaskManager.scala +++ /dev/null @@ -1,70 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.flink.runtime.testingUtils - -import org.apache.flink.runtime.clusterframework.types.ResourceID -import org.apache.flink.runtime.io.disk.iomanager.IOManager -import org.apache.flink.runtime.io.network.NetworkEnvironment -import org.apache.flink.runtime.leaderretrieval.LeaderRetrievalService -import org.apache.flink.runtime.memory.MemoryManager -import org.apache.flink.runtime.taskmanager.{TaskManagerLocation, TaskManager, TaskManagerConfiguration} - -import scala.language.postfixOps - -/** Subclass of the [[TaskManager]] to support testing messages - */ -class TestingTaskManager( - config: TaskManagerConfiguration, - resourceID: ResourceID, - connectionInfo: TaskManagerLocation, - memoryManager: MemoryManager, - ioManager: IOManager, - network: NetworkEnvironment, - numberOfSlots: Int, - leaderRetrievalService: LeaderRetrievalService) - extends TaskManager( - config, - resourceID, - connectionInfo, - memoryManager, - ioManager, - network, - numberOfSlots, - leaderRetrievalService) - with TestingTaskManagerLike { - - def this( - config: TaskManagerConfiguration, - connectionInfo: TaskManagerLocation, - memoryManager: MemoryManager, - ioManager: IOManager, - network: NetworkEnvironment, - numberOfSlots: Int, - leaderRetrievalService: LeaderRetrievalService) { - this( - config, - ResourceID.generate(), - connectionInfo, - memoryManager, - ioManager, - network, - numberOfSlots, - leaderRetrievalService) - } -}