[FLINK-3364] [runtime, yarn] Move SavepointStore initialization out of 
JobManager constructor

This closes #1622.


Project: http://git-wip-us.apache.org/repos/asf/flink/repo
Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/dcea46e8
Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/dcea46e8
Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/dcea46e8

Branch: refs/heads/tableOnCalcite
Commit: dcea46e891a1479205fdfe939858d340cde87d57
Parents: ed7d3da
Author: Ufuk Celebi <u...@apache.org>
Authored: Thu Feb 11 14:58:35 2016 +0100
Committer: Ufuk Celebi <u...@apache.org>
Committed: Thu Feb 11 20:49:28 2016 +0100

----------------------------------------------------------------------
 .../flink/runtime/jobmanager/JobManager.scala   | 22 +++++++++++---------
 .../JobManagerLeaderElectionTest.java           |  6 +++++-
 .../runtime/testingUtils/TestingCluster.scala   |  6 ++++--
 .../testingUtils/TestingJobManager.scala        |  8 ++++---
 .../runtime/testingUtils/TestingUtils.scala     |  3 ++-
 .../flink/yarn/TestingYarnJobManager.scala      |  8 ++++---
 .../org/apache/flink/yarn/YarnJobManager.scala  |  8 ++++---
 7 files changed, 38 insertions(+), 23 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/flink/blob/dcea46e8/flink-runtime/src/main/scala/org/apache/flink/runtime/jobmanager/JobManager.scala
----------------------------------------------------------------------
diff --git 
a/flink-runtime/src/main/scala/org/apache/flink/runtime/jobmanager/JobManager.scala
 
b/flink-runtime/src/main/scala/org/apache/flink/runtime/jobmanager/JobManager.scala
index d96575f..78612c0 100644
--- 
a/flink-runtime/src/main/scala/org/apache/flink/runtime/jobmanager/JobManager.scala
+++ 
b/flink-runtime/src/main/scala/org/apache/flink/runtime/jobmanager/JobManager.scala
@@ -113,7 +113,8 @@ class JobManager(
     protected val timeout: FiniteDuration,
     protected val leaderElectionService: LeaderElectionService,
     protected val submittedJobGraphs : SubmittedJobGraphStore,
-    protected val checkpointRecoveryFactory : CheckpointRecoveryFactory)
+    protected val checkpointRecoveryFactory : CheckpointRecoveryFactory,
+    protected val savepointStore: SavepointStore)
   extends FlinkActor
   with LeaderSessionMessageFilter // mixin oder is important, we want 
filtering after logging
   with LogMessages // mixin order is important, we want first logging
@@ -151,9 +152,6 @@ class JobManager(
   val webMonitorPort : Int = flinkConfiguration.getInteger(
     ConfigConstants.JOB_MANAGER_WEB_PORT_KEY, -1)
 
-  protected val savepointStore : SavepointStore =
-    SavepointStoreFactory.createFromConfig(flinkConfiguration)
-
   /**
    * Run when the job manager is started. Simply logs an informational message.
    * The method also starts the leader election service.
@@ -2040,7 +2038,8 @@ object JobManager {
     Int, // number of archived jobs
     LeaderElectionService,
     SubmittedJobGraphStore,
-    CheckpointRecoveryFactory) = {
+    CheckpointRecoveryFactory,
+    SavepointStore) = {
 
     val timeout: FiniteDuration = AkkaUtils.getTimeout(configuration)
 
@@ -2078,8 +2077,6 @@ object JobManager {
           }
       }
 
-    
-
     var blobServer: BlobServer = null
     var instanceManager: InstanceManager = null
     var scheduler: FlinkScheduler = null
@@ -2140,6 +2137,8 @@ object JobManager {
             new ZooKeeperCheckpointRecoveryFactory(client, configuration))
       }
 
+    val savepointStore = SavepointStoreFactory.createFromConfig(configuration)
+
     (executorService,
       instanceManager,
       scheduler,
@@ -2150,7 +2149,8 @@ object JobManager {
       archiveCount,
       leaderElectionService,
       submittedJobGraphs,
-      checkpointRecoveryFactory)
+      checkpointRecoveryFactory,
+      savepointStore)
   }
 
   /**
@@ -2212,7 +2212,8 @@ object JobManager {
     archiveCount,
     leaderElectionService,
     submittedJobGraphs,
-    checkpointRecoveryFactory) = createJobManagerComponents(
+    checkpointRecoveryFactory,
+    savepointStore) = createJobManagerComponents(
       configuration,
       None)
 
@@ -2237,7 +2238,8 @@ object JobManager {
       timeout,
       leaderElectionService,
       submittedJobGraphs,
-      checkpointRecoveryFactory)
+      checkpointRecoveryFactory,
+      savepointStore)
 
     val jobManager: ActorRef = jobMangerActorName match {
       case Some(actorName) => actorSystem.actorOf(jobManagerProps, actorName)

http://git-wip-us.apache.org/repos/asf/flink/blob/dcea46e8/flink-runtime/src/test/java/org/apache/flink/runtime/leaderelection/JobManagerLeaderElectionTest.java
----------------------------------------------------------------------
diff --git 
a/flink-runtime/src/test/java/org/apache/flink/runtime/leaderelection/JobManagerLeaderElectionTest.java
 
b/flink-runtime/src/test/java/org/apache/flink/runtime/leaderelection/JobManagerLeaderElectionTest.java
index f50a0a0..73c7646 100644
--- 
a/flink-runtime/src/test/java/org/apache/flink/runtime/leaderelection/JobManagerLeaderElectionTest.java
+++ 
b/flink-runtime/src/test/java/org/apache/flink/runtime/leaderelection/JobManagerLeaderElectionTest.java
@@ -31,6 +31,8 @@ import org.apache.flink.configuration.Configuration;
 import org.apache.flink.runtime.akka.AkkaUtils;
 import org.apache.flink.runtime.blob.BlobServer;
 import org.apache.flink.runtime.checkpoint.CheckpointRecoveryFactory;
+import org.apache.flink.runtime.checkpoint.SavepointStore;
+import org.apache.flink.runtime.checkpoint.SavepointStoreFactory;
 import org.apache.flink.runtime.checkpoint.StandaloneCheckpointRecoveryFactory;
 import org.apache.flink.runtime.execution.librarycache.BlobLibraryCacheManager;
 import org.apache.flink.runtime.instance.InstanceManager;
@@ -179,6 +181,7 @@ public class JobManagerLeaderElectionTest extends 
TestLogger {
                // We don't need recovery in this test
                SubmittedJobGraphStore submittedJobGraphStore = new 
StandaloneSubmittedJobGraphStore();
                CheckpointRecoveryFactory checkpointRecoveryFactory = new 
StandaloneCheckpointRecoveryFactory();
+               SavepointStore savepointStore = 
SavepointStoreFactory.createFromConfig(configuration);
 
                return Props.create(
                                TestingJobManager.class,
@@ -193,7 +196,8 @@ public class JobManagerLeaderElectionTest extends 
TestLogger {
                                AkkaUtils.getDefaultTimeout(),
                                leaderElectionService,
                                submittedJobGraphStore,
-                               checkpointRecoveryFactory
+                               checkpointRecoveryFactory,
+                               savepointStore
                );
        }
 }

http://git-wip-us.apache.org/repos/asf/flink/blob/dcea46e8/flink-runtime/src/test/scala/org/apache/flink/runtime/testingUtils/TestingCluster.scala
----------------------------------------------------------------------
diff --git 
a/flink-runtime/src/test/scala/org/apache/flink/runtime/testingUtils/TestingCluster.scala
 
b/flink-runtime/src/test/scala/org/apache/flink/runtime/testingUtils/TestingCluster.scala
index 22b0d29..cfb1192 100644
--- 
a/flink-runtime/src/test/scala/org/apache/flink/runtime/testingUtils/TestingCluster.scala
+++ 
b/flink-runtime/src/test/scala/org/apache/flink/runtime/testingUtils/TestingCluster.scala
@@ -102,7 +102,8 @@ class TestingCluster(
     archiveCount,
     leaderElectionService,
     submittedJobsGraphs,
-    checkpointRecoveryFactory) = JobManager.createJobManagerComponents(
+    checkpointRecoveryFactory,
+    savepointStore) = JobManager.createJobManagerComponents(
       config,
       createLeaderElectionService())
 
@@ -122,7 +123,8 @@ class TestingCluster(
         timeout,
         leaderElectionService,
         submittedJobsGraphs,
-        checkpointRecoveryFactory))
+        checkpointRecoveryFactory,
+        savepointStore))
 
     val dispatcherJobManagerProps = if (synchronousDispatcher) {
       // disable asynchronous futures (e.g. accumulator update in Heartbeat)

http://git-wip-us.apache.org/repos/asf/flink/blob/dcea46e8/flink-runtime/src/test/scala/org/apache/flink/runtime/testingUtils/TestingJobManager.scala
----------------------------------------------------------------------
diff --git 
a/flink-runtime/src/test/scala/org/apache/flink/runtime/testingUtils/TestingJobManager.scala
 
b/flink-runtime/src/test/scala/org/apache/flink/runtime/testingUtils/TestingJobManager.scala
index 0c0ca40..98d8863 100644
--- 
a/flink-runtime/src/test/scala/org/apache/flink/runtime/testingUtils/TestingJobManager.scala
+++ 
b/flink-runtime/src/test/scala/org/apache/flink/runtime/testingUtils/TestingJobManager.scala
@@ -21,7 +21,7 @@ package org.apache.flink.runtime.testingUtils
 import akka.actor.ActorRef
 
 import org.apache.flink.configuration.Configuration
-import org.apache.flink.runtime.checkpoint.CheckpointRecoveryFactory
+import org.apache.flink.runtime.checkpoint.{SavepointStore, 
CheckpointRecoveryFactory}
 import org.apache.flink.runtime.execution.librarycache.BlobLibraryCacheManager
 import org.apache.flink.runtime.instance.InstanceManager
 import org.apache.flink.runtime.jobmanager.scheduler.Scheduler
@@ -48,7 +48,8 @@ class TestingJobManager(
     timeout: FiniteDuration,
     leaderElectionService: LeaderElectionService,
     submittedJobGraphs : SubmittedJobGraphStore,
-    checkpointRecoveryFactory : CheckpointRecoveryFactory)
+    checkpointRecoveryFactory : CheckpointRecoveryFactory,
+    savepointStore : SavepointStore)
   extends JobManager(
     flinkConfiguration,
       executorService,
@@ -61,5 +62,6 @@ class TestingJobManager(
     timeout,
     leaderElectionService,
     submittedJobGraphs,
-    checkpointRecoveryFactory)
+    checkpointRecoveryFactory,
+    savepointStore)
   with TestingJobManagerLike {}

http://git-wip-us.apache.org/repos/asf/flink/blob/dcea46e8/flink-runtime/src/test/scala/org/apache/flink/runtime/testingUtils/TestingUtils.scala
----------------------------------------------------------------------
diff --git 
a/flink-runtime/src/test/scala/org/apache/flink/runtime/testingUtils/TestingUtils.scala
 
b/flink-runtime/src/test/scala/org/apache/flink/runtime/testingUtils/TestingUtils.scala
index 679dc71..6057f65 100644
--- 
a/flink-runtime/src/test/scala/org/apache/flink/runtime/testingUtils/TestingUtils.scala
+++ 
b/flink-runtime/src/test/scala/org/apache/flink/runtime/testingUtils/TestingUtils.scala
@@ -329,7 +329,8 @@ object TestingUtils {
     archiveCount,
     leaderElectionService,
     submittedJobGraphs,
-    checkpointRecoveryFactory) = JobManager.createJobManagerComponents(
+    checkpointRecoveryFactory,
+    savepointStore) = JobManager.createJobManagerComponents(
       configuration,
       None
     )

http://git-wip-us.apache.org/repos/asf/flink/blob/dcea46e8/flink-yarn-tests/src/main/scala/org/apache/flink/yarn/TestingYarnJobManager.scala
----------------------------------------------------------------------
diff --git 
a/flink-yarn-tests/src/main/scala/org/apache/flink/yarn/TestingYarnJobManager.scala
 
b/flink-yarn-tests/src/main/scala/org/apache/flink/yarn/TestingYarnJobManager.scala
index 2d50407..c7cd205 100644
--- 
a/flink-yarn-tests/src/main/scala/org/apache/flink/yarn/TestingYarnJobManager.scala
+++ 
b/flink-yarn-tests/src/main/scala/org/apache/flink/yarn/TestingYarnJobManager.scala
@@ -22,7 +22,7 @@ import java.util.concurrent.ExecutorService
 
 import akka.actor.ActorRef
 import org.apache.flink.configuration.Configuration
-import org.apache.flink.runtime.checkpoint.CheckpointRecoveryFactory
+import org.apache.flink.runtime.checkpoint.{SavepointStore, 
CheckpointRecoveryFactory}
 import org.apache.flink.runtime.execution.librarycache.BlobLibraryCacheManager
 import org.apache.flink.runtime.instance.InstanceManager
 import org.apache.flink.runtime.jobmanager.SubmittedJobGraphStore
@@ -62,7 +62,8 @@ class TestingYarnJobManager(
     timeout: FiniteDuration,
     leaderElectionService: LeaderElectionService,
     submittedJobGraphs : SubmittedJobGraphStore,
-    checkpointRecoveryFactory : CheckpointRecoveryFactory)
+    checkpointRecoveryFactory : CheckpointRecoveryFactory,
+    savepointStore: SavepointStore)
   extends YarnJobManager(
     flinkConfiguration,
     executorService,
@@ -75,7 +76,8 @@ class TestingYarnJobManager(
     timeout,
     leaderElectionService,
     submittedJobGraphs,
-    checkpointRecoveryFactory)
+    checkpointRecoveryFactory,
+    savepointStore)
   with TestingJobManagerLike {
 
   override val taskManagerRunnerClass = classOf[TestingYarnTaskManagerRunner]

http://git-wip-us.apache.org/repos/asf/flink/blob/dcea46e8/flink-yarn/src/main/scala/org/apache/flink/yarn/YarnJobManager.scala
----------------------------------------------------------------------
diff --git 
a/flink-yarn/src/main/scala/org/apache/flink/yarn/YarnJobManager.scala 
b/flink-yarn/src/main/scala/org/apache/flink/yarn/YarnJobManager.scala
index 8dfa22d..ec1fb81 100644
--- a/flink-yarn/src/main/scala/org/apache/flink/yarn/YarnJobManager.scala
+++ b/flink-yarn/src/main/scala/org/apache/flink/yarn/YarnJobManager.scala
@@ -32,7 +32,7 @@ import grizzled.slf4j.Logger
 import org.apache.flink.api.common.JobID
 import org.apache.flink.configuration.{Configuration => FlinkConfiguration, 
ConfigConstants}
 import org.apache.flink.runtime.akka.AkkaUtils
-import org.apache.flink.runtime.checkpoint.CheckpointRecoveryFactory
+import org.apache.flink.runtime.checkpoint.{SavepointStore, 
CheckpointRecoveryFactory}
 import org.apache.flink.runtime.jobgraph.JobStatus
 import org.apache.flink.runtime.jobmanager.{SubmittedJobGraphStore, JobManager}
 import org.apache.flink.runtime.leaderelection.LeaderElectionService
@@ -91,7 +91,8 @@ class YarnJobManager(
     timeout: FiniteDuration,
     leaderElectionService: LeaderElectionService,
     submittedJobGraphs : SubmittedJobGraphStore,
-    checkpointRecoveryFactory : CheckpointRecoveryFactory)
+    checkpointRecoveryFactory : CheckpointRecoveryFactory,
+    savepointStore: SavepointStore)
   extends JobManager(
     flinkConfiguration,
     executorService,
@@ -104,7 +105,8 @@ class YarnJobManager(
     timeout,
     leaderElectionService,
     submittedJobGraphs,
-    checkpointRecoveryFactory) {
+    checkpointRecoveryFactory,
+    savepointStore) {
 
   import context._
   import scala.collection.JavaConverters._

Reply via email to