Repository: flink
Updated Branches:
  refs/heads/master 04aee61d8 -> fcd264a70


[FLINK-5971] [flip-6] Add timeout for registered jobs on the ResourceManager

This PR introduces a timeout for inactive jobs on the ResourceManager. A job is 
inactive
if there is no active leader known for this job. In case that a job times out, 
it will
be removed from the ResourceManager. Additionally, this PR removes the 
dependency of
the JobLeaderIdService on the RunningJobsRegistry.

Fix YarnFlinkApplicationMasterRunner to use correct arguments for 
JobLeaderIdService

Fix race condition in JobLeaderIdListener#cancelTimeout

This closes #3488.


Project: http://git-wip-us.apache.org/repos/asf/flink/repo
Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/fcd264a7
Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/fcd264a7
Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/fcd264a7

Branch: refs/heads/master
Commit: fcd264a707d3dd8ef4247825752c8639732c943c
Parents: 04aee61
Author: Till Rohrmann <trohrm...@apache.org>
Authored: Mon Mar 6 16:57:43 2017 +0100
Committer: Till Rohrmann <trohrm...@apache.org>
Committed: Mon Mar 13 15:03:18 2017 +0100

----------------------------------------------------------------------
 .../apache/flink/configuration/AkkaOptions.java |   7 +
 .../configuration/ResourceManagerOptions.java   |  40 +++
 .../resourcemanager/JobLeaderIdActions.java     |   8 +-
 .../resourcemanager/JobLeaderIdService.java     | 119 +++++---
 .../resourcemanager/ResourceManager.java        |   6 +-
 .../ResourceManagerConfiguration.java           |  48 ++--
 .../resourcemanager/ResourceManagerRunner.java  |   5 +-
 .../resourcemanager/JobLeaderIdServiceTest.java | 269 +++++++++++++++++++
 .../resourcemanager/ResourceManagerHATest.java  |  10 +-
 .../ResourceManagerJobMasterTest.java           |  10 +-
 .../ResourceManagerTaskExecutorTest.java        |  10 +-
 .../slotmanager/SlotProtocolTest.java           |  21 +-
 .../taskexecutor/TaskExecutorITCase.java        |  10 +-
 .../yarn/YarnFlinkApplicationMasterRunner.java  |   5 +-
 14 files changed, 498 insertions(+), 70 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/flink/blob/fcd264a7/flink-core/src/main/java/org/apache/flink/configuration/AkkaOptions.java
----------------------------------------------------------------------
diff --git 
a/flink-core/src/main/java/org/apache/flink/configuration/AkkaOptions.java 
b/flink-core/src/main/java/org/apache/flink/configuration/AkkaOptions.java
index 7e4c2b7..97b209e 100644
--- a/flink-core/src/main/java/org/apache/flink/configuration/AkkaOptions.java
+++ b/flink-core/src/main/java/org/apache/flink/configuration/AkkaOptions.java
@@ -29,6 +29,13 @@ import org.apache.flink.annotation.PublicEvolving;
 public class AkkaOptions {
 
        /**
+        * Timeout for akka ask calls
+        */
+       public static final ConfigOption<String> AKKA_ASK_TIMEOUT = 
ConfigOptions
+               .key("akka.ask.timeout")
+               .defaultValue("10 s");
+
+       /**
         * The Akka tcp connection timeout.
         */
        public static final ConfigOption<String> AKKA_TCP_TIMEOUT = 
ConfigOptions

http://git-wip-us.apache.org/repos/asf/flink/blob/fcd264a7/flink-core/src/main/java/org/apache/flink/configuration/ResourceManagerOptions.java
----------------------------------------------------------------------
diff --git 
a/flink-core/src/main/java/org/apache/flink/configuration/ResourceManagerOptions.java
 
b/flink-core/src/main/java/org/apache/flink/configuration/ResourceManagerOptions.java
new file mode 100644
index 0000000..6a09f19
--- /dev/null
+++ 
b/flink-core/src/main/java/org/apache/flink/configuration/ResourceManagerOptions.java
@@ -0,0 +1,40 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.configuration;
+
+import org.apache.flink.annotation.PublicEvolving;
+
+/**
+ * The set of configuration options relating to the ResourceManager
+ */
+@PublicEvolving
+public class ResourceManagerOptions {
+
+       /**
+        * Timeout for jobs which don't have a job manager as leader assigned.
+        */
+       public static final ConfigOption<String> JOB_TIMEOUT = ConfigOptions
+               .key("resourcemanager.job.timeout")
+               .defaultValue("5 minutes");
+
+       // 
---------------------------------------------------------------------------------------------
+
+       /** Not intended to be instantiated */
+       private ResourceManagerOptions() {}
+}

http://git-wip-us.apache.org/repos/asf/flink/blob/fcd264a7/flink-runtime/src/main/java/org/apache/flink/runtime/resourcemanager/JobLeaderIdActions.java
----------------------------------------------------------------------
diff --git 
a/flink-runtime/src/main/java/org/apache/flink/runtime/resourcemanager/JobLeaderIdActions.java
 
b/flink-runtime/src/main/java/org/apache/flink/runtime/resourcemanager/JobLeaderIdActions.java
index 58777ef..4ca6209 100644
--- 
a/flink-runtime/src/main/java/org/apache/flink/runtime/resourcemanager/JobLeaderIdActions.java
+++ 
b/flink-runtime/src/main/java/org/apache/flink/runtime/resourcemanager/JobLeaderIdActions.java
@@ -36,11 +36,13 @@ public interface JobLeaderIdActions {
        void jobLeaderLostLeadership(JobID jobId, UUID oldJobLeaderId);
 
        /**
-        * Request to remove the job from the {@link JobLeaderIdService}.
+        * Notify a job timeout. The job is identified by the given JobID. In 
order to check
+        * for the validity of the timeout the timeout id of the triggered 
timeout is provided.
         *
-        * @param jobId identifying the job to remove
+        * @param jobId JobID which identifies the timed out job
+        * @param timeoutId Id of the calling timeout to differentiate valid 
from invalid timeouts
         */
-       void removeJob(JobID jobId);
+       void notifyJobTimeout(JobID jobId, UUID timeoutId);
 
        /**
         * Callback to report occurring errors.

http://git-wip-us.apache.org/repos/asf/flink/blob/fcd264a7/flink-runtime/src/main/java/org/apache/flink/runtime/resourcemanager/JobLeaderIdService.java
----------------------------------------------------------------------
diff --git 
a/flink-runtime/src/main/java/org/apache/flink/runtime/resourcemanager/JobLeaderIdService.java
 
b/flink-runtime/src/main/java/org/apache/flink/runtime/resourcemanager/JobLeaderIdService.java
index 7ef39de..8bffcd0 100644
--- 
a/flink-runtime/src/main/java/org/apache/flink/runtime/resourcemanager/JobLeaderIdService.java
+++ 
b/flink-runtime/src/main/java/org/apache/flink/runtime/resourcemanager/JobLeaderIdService.java
@@ -19,12 +19,12 @@
 package org.apache.flink.runtime.resourcemanager;
 
 import org.apache.flink.api.common.JobID;
+import org.apache.flink.api.common.time.Time;
 import org.apache.flink.runtime.concurrent.CompletableFuture;
 import org.apache.flink.runtime.concurrent.Future;
+import org.apache.flink.runtime.concurrent.ScheduledExecutor;
 import org.apache.flink.runtime.concurrent.impl.FlinkCompletableFuture;
 import org.apache.flink.runtime.highavailability.HighAvailabilityServices;
-import org.apache.flink.runtime.highavailability.RunningJobsRegistry;
-import 
org.apache.flink.runtime.highavailability.RunningJobsRegistry.JobSchedulingStatus;
 import org.apache.flink.runtime.leaderretrieval.LeaderRetrievalListener;
 import org.apache.flink.runtime.leaderretrieval.LeaderRetrievalService;
 import org.apache.flink.util.ExceptionUtils;
@@ -32,11 +32,14 @@ import org.apache.flink.util.Preconditions;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
-import java.io.IOException;
+import javax.annotation.Nullable;
 import java.util.HashMap;
 import java.util.Map;
+import java.util.Objects;
 import java.util.UUID;
 import java.util.concurrent.ExecutionException;
+import java.util.concurrent.ScheduledFuture;
+import java.util.concurrent.TimeUnit;
 
 /**
  * Service which retrieves for a registered job the current job leader id (the 
leader id of the
@@ -51,8 +54,9 @@ public class JobLeaderIdService {
        /** High availability services to use by this service */
        private final HighAvailabilityServices highAvailabilityServices;
 
-       /** Registry to retrieve running jobs */
-       private final RunningJobsRegistry runningJobsRegistry;
+       private final ScheduledExecutor scheduledExecutor;
+
+       private final Time jobTimeout;
 
        /** Map of currently monitored jobs */
        private final Map<JobID, JobLeaderIdListener> jobLeaderIdListeners;
@@ -60,10 +64,13 @@ public class JobLeaderIdService {
        /** Actions to call when the job leader changes */
        private JobLeaderIdActions jobLeaderIdActions;
 
-       public JobLeaderIdService(HighAvailabilityServices 
highAvailabilityServices) throws Exception {
-               this.highAvailabilityServices = 
Preconditions.checkNotNull(highAvailabilityServices);
-
-               this.runningJobsRegistry = 
highAvailabilityServices.getRunningJobsRegistry();
+       public JobLeaderIdService(
+                       HighAvailabilityServices highAvailabilityServices,
+                       ScheduledExecutor scheduledExecutor,
+                       Time jobTimeout) throws Exception {
+               this.highAvailabilityServices = 
Preconditions.checkNotNull(highAvailabilityServices, 
"highAvailabilityServices");
+               this.scheduledExecutor = 
Preconditions.checkNotNull(scheduledExecutor, "scheduledExecutor");
+               this.jobTimeout = Preconditions.checkNotNull(jobTimeout, 
"jobTimeout");
 
                jobLeaderIdListeners = new HashMap<>(4);
 
@@ -142,8 +149,8 @@ public class JobLeaderIdService {
                if (!jobLeaderIdListeners.containsKey(jobId)) {
                        LeaderRetrievalService leaderRetrievalService = 
highAvailabilityServices.getJobManagerLeaderRetriever(jobId);
 
-                       JobLeaderIdListener jobidListener = new 
JobLeaderIdListener(jobId, jobLeaderIdActions, leaderRetrievalService);
-                       jobLeaderIdListeners.put(jobId, jobidListener);
+                       JobLeaderIdListener jobIdListener = new 
JobLeaderIdListener(jobId, jobLeaderIdActions, leaderRetrievalService);
+                       jobLeaderIdListeners.put(jobId, jobIdListener);
                }
        }
 
@@ -183,6 +190,16 @@ public class JobLeaderIdService {
                return listener.getLeaderIdFuture();
        }
 
+       public boolean isValidTimeout(JobID jobId, UUID timeoutId) {
+               JobLeaderIdListener jobLeaderIdListener = 
jobLeaderIdListeners.get(jobId);
+
+               if (null != jobLeaderIdListener) {
+                       return Objects.equals(timeoutId, 
jobLeaderIdListener.getTimeoutId());
+               } else {
+                       return false;
+               }
+       }
+
        // 
--------------------------------------------------------------------------------
        // Static utility classes
        // 
--------------------------------------------------------------------------------
@@ -193,6 +210,7 @@ public class JobLeaderIdService {
         * listener.
         */
        private final class JobLeaderIdListener implements 
LeaderRetrievalListener {
+               private final Object timeoutLock = new Object();
                private final JobID jobId;
                private final JobLeaderIdActions listenerJobLeaderIdActions;
                private final LeaderRetrievalService leaderRetrievalService;
@@ -200,6 +218,15 @@ public class JobLeaderIdService {
                private volatile CompletableFuture<UUID> leaderIdFuture;
                private volatile boolean running = true;
 
+               /** Null if no timeout has been scheduled; otherwise non null */
+               @Nullable
+               private  volatile ScheduledFuture<?> timeoutFuture;
+
+               /** Null if no timeout has been scheduled; otherwise non null */
+               @Nullable
+               private volatile UUID timeoutId;
+
+
                private JobLeaderIdListener(
                                JobID jobId,
                                JobLeaderIdActions listenerJobLeaderIdActions,
@@ -210,6 +237,8 @@ public class JobLeaderIdService {
 
                        leaderIdFuture = new FlinkCompletableFuture<>();
 
+                       activateTimeout();
+
                        // start the leader service we're listening to
                        leaderRetrievalService.start(this);
                }
@@ -218,9 +247,15 @@ public class JobLeaderIdService {
                        return leaderIdFuture;
                }
 
+               @Nullable
+               public UUID getTimeoutId() {
+                       return timeoutId;
+               }
+
                public void stop() throws Exception {
                        running = false;
                        leaderRetrievalService.stop();
+                       cancelTimeout();
                        leaderIdFuture.completeExceptionally(new Exception("Job 
leader id service has been stopped."));
                }
 
@@ -244,29 +279,22 @@ public class JobLeaderIdService {
                                        
leaderIdFuture.complete(leaderSessionId);
                                }
 
-                               try {
-                                       final JobSchedulingStatus jobStatus = 
runningJobsRegistry.getJobSchedulingStatus(jobId);
-                                       if (jobStatus == 
JobSchedulingStatus.PENDING || jobStatus == JobSchedulingStatus.RUNNING) {
-                                               if (leaderSessionId == null) {
-                                                       // there is no new 
leader
-                                                       if (previousJobLeaderId 
!= null) {
-                                                               // we had a 
previous job leader, so notify about his lost leadership
-                                                               
listenerJobLeaderIdActions.jobLeaderLostLeadership(jobId, previousJobLeaderId);
-                                                       }
-                                               } else {
-                                                       if (previousJobLeaderId 
!= null && !leaderSessionId.equals(previousJobLeaderId)) {
-                                                               // we had a 
previous leader and he's not the same as the new leader
-                                                               
listenerJobLeaderIdActions.jobLeaderLostLeadership(jobId, previousJobLeaderId);
-                                                       }
+                               if (previousJobLeaderId != null && 
!previousJobLeaderId.equals(leaderSessionId)) {
+                                       // we had a previous job leader, so 
notify about his lost leadership
+                                       
listenerJobLeaderIdActions.jobLeaderLostLeadership(jobId, previousJobLeaderId);
+
+                                       if (null == leaderSessionId) {
+                                               // No current leader active ==> 
Set a timeout for the job
+                                               activateTimeout();
+
+                                               // check if we got stopped 
asynchronously
+                                               if (!running) {
+                                                       cancelTimeout();
                                                }
-                                       } else {
-                                               // the job is no longer running 
so remove it
-                                               
listenerJobLeaderIdActions.removeJob(jobId);
                                        }
-                               } catch (IOException e) {
-                                       // cannot tell whether the job is still 
running or not so just remove the listener
-                                       LOG.debug("Encountered an error while 
checking the job registry for running jobs.", e);
-                                       
listenerJobLeaderIdActions.removeJob(jobId);
+                               } else if (null != leaderSessionId) {
+                                       // Cancel timeout because we've found 
an active leader for it
+                                       cancelTimeout();
                                }
                        } else {
                                LOG.debug("A leader id change {}@{} has been 
detected after the listener has been stopped.",
@@ -283,5 +311,32 @@ public class JobLeaderIdService {
                                        
JobLeaderIdListener.class.getSimpleName(), exception);
                        }
                }
+
+               private void activateTimeout() {
+                       synchronized (timeoutLock) {
+                               cancelTimeout();
+
+                               final UUID newTimeoutId = UUID.randomUUID();
+
+                               timeoutId = newTimeoutId;
+                               timeoutFuture = scheduledExecutor.schedule(new 
Runnable() {
+                                       @Override
+                                       public void run() {
+                                               
listenerJobLeaderIdActions.notifyJobTimeout(jobId, newTimeoutId);
+                                       }
+                               }, jobTimeout.toMilliseconds(), 
TimeUnit.MILLISECONDS);
+                       }
+               }
+
+               private void cancelTimeout() {
+                       synchronized (timeoutLock) {
+                               if (timeoutFuture != null) {
+                                       timeoutFuture.cancel(true);
+                               }
+
+                               timeoutFuture = null;
+                               timeoutId = null;
+                       }
+               }
        }
 }

http://git-wip-us.apache.org/repos/asf/flink/blob/fcd264a7/flink-runtime/src/main/java/org/apache/flink/runtime/resourcemanager/ResourceManager.java
----------------------------------------------------------------------
diff --git 
a/flink-runtime/src/main/java/org/apache/flink/runtime/resourcemanager/ResourceManager.java
 
b/flink-runtime/src/main/java/org/apache/flink/runtime/resourcemanager/ResourceManager.java
index 3bcbfda..badfbe2 100644
--- 
a/flink-runtime/src/main/java/org/apache/flink/runtime/resourcemanager/ResourceManager.java
+++ 
b/flink-runtime/src/main/java/org/apache/flink/runtime/resourcemanager/ResourceManager.java
@@ -817,11 +817,13 @@ public abstract class ResourceManager<WorkerType extends 
Serializable>
                }
 
                @Override
-               public void removeJob(final JobID jobId) {
+               public void notifyJobTimeout(final JobID jobId, final UUID 
timeoutId) {
                        runAsync(new Runnable() {
                                @Override
                                public void run() {
-                                       ResourceManager.this.removeJob(jobId);
+                                       if 
(jobLeaderIdService.isValidTimeout(jobId, timeoutId)) {
+                                               removeJob(jobId);
+                                       }
                                }
                        });
                }

http://git-wip-us.apache.org/repos/asf/flink/blob/fcd264a7/flink-runtime/src/main/java/org/apache/flink/runtime/resourcemanager/ResourceManagerConfiguration.java
----------------------------------------------------------------------
diff --git 
a/flink-runtime/src/main/java/org/apache/flink/runtime/resourcemanager/ResourceManagerConfiguration.java
 
b/flink-runtime/src/main/java/org/apache/flink/runtime/resourcemanager/ResourceManagerConfiguration.java
index 920f1fc..d04d852 100644
--- 
a/flink-runtime/src/main/java/org/apache/flink/runtime/resourcemanager/ResourceManagerConfiguration.java
+++ 
b/flink-runtime/src/main/java/org/apache/flink/runtime/resourcemanager/ResourceManagerConfiguration.java
@@ -19,10 +19,9 @@
 package org.apache.flink.runtime.resourcemanager;
 
 import org.apache.flink.api.common.time.Time;
-import org.apache.flink.configuration.ConfigConstants;
-import org.apache.flink.configuration.ConfigOption;
-import org.apache.flink.configuration.ConfigOptions;
+import org.apache.flink.configuration.AkkaOptions;
 import org.apache.flink.configuration.Configuration;
+import org.apache.flink.configuration.ResourceManagerOptions;
 import 
org.apache.flink.runtime.resourcemanager.exceptions.ConfigurationException;
 import org.apache.flink.util.Preconditions;
 import scala.concurrent.duration.Duration;
@@ -34,10 +33,15 @@ public class ResourceManagerConfiguration {
 
        private final Time timeout;
        private final Time heartbeatInterval;
+       private final Time jobTimeout;
 
-       public ResourceManagerConfiguration(Time timeout, Time 
heartbeatInterval) {
-               this.timeout = Preconditions.checkNotNull(timeout);
-               this.heartbeatInterval = 
Preconditions.checkNotNull(heartbeatInterval);
+       public ResourceManagerConfiguration(
+                       Time timeout,
+                       Time heartbeatInterval,
+                       Time jobTimeout) {
+               this.timeout = Preconditions.checkNotNull(timeout, "timeout");
+               this.heartbeatInterval = 
Preconditions.checkNotNull(heartbeatInterval, "heartbeatInterval");
+               this.jobTimeout = Preconditions.checkNotNull(jobTimeout, 
"jobTimeout");
        }
 
        public Time getTimeout() {
@@ -48,39 +52,45 @@ public class ResourceManagerConfiguration {
                return heartbeatInterval;
        }
 
+       public Time getJobTimeout() {
+               return jobTimeout;
+       }
+
        // 
--------------------------------------------------------------------------
        // Static factory methods
        // 
--------------------------------------------------------------------------
 
        public static ResourceManagerConfiguration 
fromConfiguration(Configuration configuration) throws ConfigurationException {
-               ConfigOption<String> timeoutOption = ConfigOptions
-                       .key(ConfigConstants.AKKA_ASK_TIMEOUT)
-                       .defaultValue(ConfigConstants.DEFAULT_AKKA_ASK_TIMEOUT);
-
-               final String strTimeout = 
configuration.getString(timeoutOption);
+               final String strTimeout = 
configuration.getString(AkkaOptions.AKKA_ASK_TIMEOUT);
                final Time timeout;
 
                try {
                        timeout = 
Time.milliseconds(Duration.apply(strTimeout).toMillis());
                } catch (NumberFormatException e) {
                        throw new ConfigurationException("Could not parse the 
resource manager's timeout " +
-                               "value " + timeoutOption + '.', e);
+                               "value " + AkkaOptions.AKKA_ASK_TIMEOUT + '.', 
e);
                }
 
-               ConfigOption<String> heartbeatIntervalOption = ConfigOptions
-                       .key(ConfigConstants.AKKA_WATCH_HEARTBEAT_INTERVAL)
-                       .defaultValue(timeout.toString());
-
-               final String strHeartbeatInterval = 
configuration.getString(heartbeatIntervalOption);
+               final String strHeartbeatInterval = 
configuration.getString(AkkaOptions.AKKA_WATCH_HEARTBEAT_INTERVAL);
                final Time heartbeatInterval;
 
                try {
                        heartbeatInterval = 
Time.milliseconds(Duration.apply(strHeartbeatInterval).toMillis());
                } catch (NumberFormatException e) {
                        throw new ConfigurationException("Could not parse the 
resource manager's heartbeat interval " +
-                               "value " + timeoutOption + '.', e);
+                               "value " + 
AkkaOptions.AKKA_WATCH_HEARTBEAT_INTERVAL + '.', e);
+               }
+
+               final String strJobTimeout = 
configuration.getString(ResourceManagerOptions.JOB_TIMEOUT);
+               final Time jobTimeout;
+
+               try {
+                       jobTimeout = 
Time.milliseconds(Duration.apply(strJobTimeout).toMillis());
+               } catch (NumberFormatException e) {
+                       throw new ConfigurationException("Could not parse the 
resource manager's job timeout " +
+                               "value " + ResourceManagerOptions.JOB_TIMEOUT + 
'.', e);
                }
 
-               return new ResourceManagerConfiguration(timeout, 
heartbeatInterval);
+               return new ResourceManagerConfiguration(timeout, 
heartbeatInterval, jobTimeout);
        }
 }

http://git-wip-us.apache.org/repos/asf/flink/blob/fcd264a7/flink-runtime/src/main/java/org/apache/flink/runtime/resourcemanager/ResourceManagerRunner.java
----------------------------------------------------------------------
diff --git 
a/flink-runtime/src/main/java/org/apache/flink/runtime/resourcemanager/ResourceManagerRunner.java
 
b/flink-runtime/src/main/java/org/apache/flink/runtime/resourcemanager/ResourceManagerRunner.java
index e0dee0b..749b407 100644
--- 
a/flink-runtime/src/main/java/org/apache/flink/runtime/resourcemanager/ResourceManagerRunner.java
+++ 
b/flink-runtime/src/main/java/org/apache/flink/runtime/resourcemanager/ResourceManagerRunner.java
@@ -54,7 +54,10 @@ public class ResourceManagerRunner implements 
FatalErrorHandler {
 
                final ResourceManagerConfiguration resourceManagerConfiguration 
= ResourceManagerConfiguration.fromConfiguration(configuration);
                final SlotManagerFactory slotManagerFactory = new 
DefaultSlotManager.Factory();
-               final JobLeaderIdService jobLeaderIdService = new 
JobLeaderIdService(highAvailabilityServices);
+               final JobLeaderIdService jobLeaderIdService = new 
JobLeaderIdService(
+                       highAvailabilityServices,
+                       rpcService.getScheduledExecutor(),
+                       resourceManagerConfiguration.getJobTimeout());
 
                this.resourceManager = new StandaloneResourceManager(
                        rpcService,

http://git-wip-us.apache.org/repos/asf/flink/blob/fcd264a7/flink-runtime/src/test/java/org/apache/flink/runtime/resourcemanager/JobLeaderIdServiceTest.java
----------------------------------------------------------------------
diff --git 
a/flink-runtime/src/test/java/org/apache/flink/runtime/resourcemanager/JobLeaderIdServiceTest.java
 
b/flink-runtime/src/test/java/org/apache/flink/runtime/resourcemanager/JobLeaderIdServiceTest.java
new file mode 100644
index 0000000..d5e99bd
--- /dev/null
+++ 
b/flink-runtime/src/test/java/org/apache/flink/runtime/resourcemanager/JobLeaderIdServiceTest.java
@@ -0,0 +1,269 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.resourcemanager;
+
+import org.apache.flink.api.common.JobID;
+import org.apache.flink.api.common.time.Time;
+import org.apache.flink.runtime.concurrent.Future;
+import org.apache.flink.runtime.concurrent.ScheduledExecutor;
+import 
org.apache.flink.runtime.highavailability.TestingHighAvailabilityServices;
+import org.apache.flink.runtime.leaderelection.TestingLeaderRetrievalService;
+import org.apache.flink.util.TestLogger;
+import org.junit.Test;
+import org.mockito.ArgumentCaptor;
+import org.mockito.invocation.InvocationOnMock;
+import org.mockito.stubbing.Answer;
+
+import java.util.ArrayDeque;
+import java.util.Arrays;
+import java.util.Queue;
+import java.util.UUID;
+import java.util.concurrent.ExecutionException;
+import java.util.concurrent.ScheduledFuture;
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.atomic.AtomicReference;
+
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertFalse;
+import static org.junit.Assert.assertNotNull;
+import static org.junit.Assert.assertTrue;
+import static org.junit.Assert.fail;
+import static org.mockito.Matchers.any;
+import static org.mockito.Matchers.anyBoolean;
+import static org.mockito.Matchers.anyLong;
+import static org.mockito.Matchers.eq;
+import static org.mockito.Mockito.doAnswer;
+import static org.mockito.Mockito.mock;
+import static org.mockito.Mockito.times;
+import static org.mockito.Mockito.verify;
+
+public class JobLeaderIdServiceTest extends TestLogger {
+
+       /**
+        * Tests adding a job and finding out its leader id
+        */
+       @Test(timeout = 10000)
+       public void testAddingJob() throws Exception {
+               final JobID jobId = new JobID();
+               final String address = "foobar";
+               final UUID leaderId = UUID.randomUUID();
+               TestingHighAvailabilityServices highAvailabilityServices = new 
TestingHighAvailabilityServices();
+               TestingLeaderRetrievalService leaderRetrievalService = new 
TestingLeaderRetrievalService();
+
+               highAvailabilityServices.setJobMasterLeaderRetriever(jobId, 
leaderRetrievalService);
+
+               ScheduledExecutor scheduledExecutor = 
mock(ScheduledExecutor.class);
+               Time timeout = Time.milliseconds(5000L);
+               JobLeaderIdActions jobLeaderIdActions = 
mock(JobLeaderIdActions.class);
+
+               JobLeaderIdService jobLeaderIdService = new JobLeaderIdService(
+                       highAvailabilityServices,
+                       scheduledExecutor,
+                       timeout);
+
+               jobLeaderIdService.start(jobLeaderIdActions);
+
+               jobLeaderIdService.addJob(jobId);
+
+               Future<UUID> leaderIdFuture = 
jobLeaderIdService.getLeaderId(jobId);
+
+               // notify the leader id service about the new leader
+               leaderRetrievalService.notifyListener(address, leaderId);
+
+               assertEquals(leaderId, leaderIdFuture.get());
+
+               assertTrue(jobLeaderIdService.containsJob(jobId));
+       }
+
+       /**
+        * Tests that removing a job completes the job leader id future 
exceptionally
+        */
+       @Test(timeout = 10000)
+       public void testRemovingJob() throws Exception {
+               final JobID jobId = new JobID();
+               TestingHighAvailabilityServices highAvailabilityServices = new 
TestingHighAvailabilityServices();
+               TestingLeaderRetrievalService leaderRetrievalService = new 
TestingLeaderRetrievalService();
+
+               highAvailabilityServices.setJobMasterLeaderRetriever(jobId, 
leaderRetrievalService);
+
+               ScheduledExecutor scheduledExecutor = 
mock(ScheduledExecutor.class);
+               Time timeout = Time.milliseconds(5000L);
+               JobLeaderIdActions jobLeaderIdActions = 
mock(JobLeaderIdActions.class);
+
+               JobLeaderIdService jobLeaderIdService = new JobLeaderIdService(
+                       highAvailabilityServices,
+                       scheduledExecutor,
+                       timeout);
+
+               jobLeaderIdService.start(jobLeaderIdActions);
+
+               jobLeaderIdService.addJob(jobId);
+
+               Future<UUID> leaderIdFuture = 
jobLeaderIdService.getLeaderId(jobId);
+
+               // remove the job before we could find a leader
+               jobLeaderIdService.removeJob(jobId);
+
+               assertFalse(jobLeaderIdService.containsJob(jobId));
+
+               try {
+                       leaderIdFuture.get();
+
+                       fail("The leader id future should be completed 
exceptionally.");
+               } catch (ExecutionException ignored) {
+                       // expected exception
+               }
+       }
+
+       /**
+        * Tests that the initial job registration registers a timeout which 
will call
+        * {@link JobLeaderIdActions#notifyJobTimeout(JobID, UUID)} when 
executed.
+        */
+       @Test
+       public void testInitialJobTimeout() throws Exception {
+               final JobID jobId = new JobID();
+               TestingHighAvailabilityServices highAvailabilityServices = new 
TestingHighAvailabilityServices();
+               TestingLeaderRetrievalService leaderRetrievalService = new 
TestingLeaderRetrievalService();
+
+               highAvailabilityServices.setJobMasterLeaderRetriever(jobId, 
leaderRetrievalService);
+
+               ScheduledExecutor scheduledExecutor = 
mock(ScheduledExecutor.class);
+               Time timeout = Time.milliseconds(5000L);
+               JobLeaderIdActions jobLeaderIdActions = 
mock(JobLeaderIdActions.class);
+
+               JobLeaderIdService jobLeaderIdService = new JobLeaderIdService(
+                       highAvailabilityServices,
+                       scheduledExecutor,
+                       timeout);
+
+               jobLeaderIdService.start(jobLeaderIdActions);
+
+               jobLeaderIdService.addJob(jobId);
+
+               assertTrue(jobLeaderIdService.containsJob(jobId));
+
+               ArgumentCaptor<Runnable> runnableArgumentCaptor = 
ArgumentCaptor.forClass(Runnable.class);
+               
verify(scheduledExecutor).schedule(runnableArgumentCaptor.capture(), anyLong(), 
any(TimeUnit.class));
+
+               Runnable timeoutRunnable = runnableArgumentCaptor.getValue();
+               timeoutRunnable.run();
+
+               ArgumentCaptor<UUID> timeoutIdArgumentCaptor = 
ArgumentCaptor.forClass(UUID.class);
+
+               verify(jobLeaderIdActions, 
times(1)).notifyJobTimeout(eq(jobId), timeoutIdArgumentCaptor.capture());
+
+               assertTrue(jobLeaderIdService.isValidTimeout(jobId, 
timeoutIdArgumentCaptor.getValue()));
+       }
+
+       /**
+        * Tests that a timeout get cancelled once a job leader has been found. 
Furthermore, it tests
+        * that a new timeout is registered after the jobmanager has lost 
leadership.
+        */
+       @Test(timeout = 10000)
+       public void jobTimeoutAfterLostLeadership() throws Exception {
+               final JobID jobId = new JobID();
+               final String address = "foobar";
+               final UUID leaderId = UUID.randomUUID();
+               TestingHighAvailabilityServices highAvailabilityServices = new 
TestingHighAvailabilityServices();
+               TestingLeaderRetrievalService leaderRetrievalService = new 
TestingLeaderRetrievalService();
+
+               highAvailabilityServices.setJobMasterLeaderRetriever(jobId, 
leaderRetrievalService);
+
+               ScheduledFuture<?> timeout1 = mock(ScheduledFuture.class);
+               ScheduledFuture<?> timeout2 = mock(ScheduledFuture.class);
+               final Queue<ScheduledFuture<?>> timeoutQueue = new 
ArrayDeque<>(Arrays.asList(timeout1, timeout2));
+               ScheduledExecutor scheduledExecutor = 
mock(ScheduledExecutor.class);
+
+               final AtomicReference<Runnable> lastRunnable = new 
AtomicReference<>();
+               doAnswer(new Answer() {
+                       @Override
+                       public Object answer(InvocationOnMock invocation) 
throws Throwable {
+                               lastRunnable.set((Runnable) 
invocation.getArguments()[0]);
+
+                               return timeoutQueue.poll();
+                       }
+               }).when(scheduledExecutor).schedule(any(Runnable.class), 
anyLong(), any(TimeUnit.class));
+
+               Time timeout = Time.milliseconds(5000L);
+               JobLeaderIdActions jobLeaderIdActions = 
mock(JobLeaderIdActions.class);
+
+               final AtomicReference<UUID> lastTimeoutId = new 
AtomicReference<>();
+
+               doAnswer(new Answer() {
+                       @Override
+                       public Object answer(InvocationOnMock invocation) 
throws Throwable {
+                               lastTimeoutId.set((UUID) 
invocation.getArguments()[1]);
+                               return null;
+                       }
+               }).when(jobLeaderIdActions).notifyJobTimeout(eq(jobId), 
any(UUID.class));
+
+               JobLeaderIdService jobLeaderIdService = new JobLeaderIdService(
+                       highAvailabilityServices,
+                       scheduledExecutor,
+                       timeout);
+
+               jobLeaderIdService.start(jobLeaderIdActions);
+
+               jobLeaderIdService.addJob(jobId);
+
+               Future<UUID> leaderIdFuture = 
jobLeaderIdService.getLeaderId(jobId);
+
+               // notify the leader id service about the new leader
+               leaderRetrievalService.notifyListener(address, leaderId);
+
+               assertEquals(leaderId, leaderIdFuture.get());
+
+               assertTrue(jobLeaderIdService.containsJob(jobId));
+
+               // check that the first timeout got cancelled
+               verify(timeout1, times(1)).cancel(anyBoolean());
+
+               verify(scheduledExecutor, 
times(1)).schedule(any(Runnable.class), anyLong(), any(TimeUnit.class));
+
+               // initial timeout runnable which should no longer have an 
effect
+               Runnable runnable = lastRunnable.get();
+
+               assertNotNull(runnable);
+
+               runnable.run();
+
+               verify(jobLeaderIdActions, 
times(1)).notifyJobTimeout(eq(jobId), any(UUID.class));
+
+               // the timeout should no longer be valid
+               assertFalse(jobLeaderIdService.isValidTimeout(jobId, 
lastTimeoutId.get()));
+
+               // lose leadership
+               leaderRetrievalService.notifyListener("", null);
+
+               verify(scheduledExecutor, 
times(2)).schedule(any(Runnable.class), anyLong(), any(TimeUnit.class));
+
+
+               // the second runnable should be the new timeout
+               runnable = lastRunnable.get();
+
+               assertNotNull(runnable);
+
+               runnable.run();
+
+               verify(jobLeaderIdActions, 
times(2)).notifyJobTimeout(eq(jobId), any(UUID.class));
+
+               // the new timeout should be valid
+               assertTrue(jobLeaderIdService.isValidTimeout(jobId, 
lastTimeoutId.get()));
+       }
+}

http://git-wip-us.apache.org/repos/asf/flink/blob/fcd264a7/flink-runtime/src/test/java/org/apache/flink/runtime/resourcemanager/ResourceManagerHATest.java
----------------------------------------------------------------------
diff --git 
a/flink-runtime/src/test/java/org/apache/flink/runtime/resourcemanager/ResourceManagerHATest.java
 
b/flink-runtime/src/test/java/org/apache/flink/runtime/resourcemanager/ResourceManagerHATest.java
index 2e52eeb..58dedc3 100644
--- 
a/flink-runtime/src/test/java/org/apache/flink/runtime/resourcemanager/ResourceManagerHATest.java
+++ 
b/flink-runtime/src/test/java/org/apache/flink/runtime/resourcemanager/ResourceManagerHATest.java
@@ -46,10 +46,16 @@ public class ResourceManagerHATest {
                TestingHighAvailabilityServices highAvailabilityServices = new 
TestingHighAvailabilityServices();
                
highAvailabilityServices.setResourceManagerLeaderElectionService(leaderElectionService);
 
-               ResourceManagerConfiguration resourceManagerConfiguration = new 
ResourceManagerConfiguration(Time.seconds(5L), Time.seconds(5L));
+               ResourceManagerConfiguration resourceManagerConfiguration = new 
ResourceManagerConfiguration(
+                       Time.seconds(5L),
+                       Time.seconds(5L),
+                       Time.minutes(5L));
                SlotManagerFactory slotManagerFactory = new 
TestingSlotManagerFactory();
                MetricRegistry metricRegistry = mock(MetricRegistry.class);
-               JobLeaderIdService jobLeaderIdService = new 
JobLeaderIdService(highAvailabilityServices);
+               JobLeaderIdService jobLeaderIdService = new JobLeaderIdService(
+                       highAvailabilityServices,
+                       rpcService.getScheduledExecutor(),
+                       resourceManagerConfiguration.getJobTimeout());
                TestingFatalErrorHandler testingFatalErrorHandler = new 
TestingFatalErrorHandler();
 
                final ResourceManager resourceManager =

http://git-wip-us.apache.org/repos/asf/flink/blob/fcd264a7/flink-runtime/src/test/java/org/apache/flink/runtime/resourcemanager/ResourceManagerJobMasterTest.java
----------------------------------------------------------------------
diff --git 
a/flink-runtime/src/test/java/org/apache/flink/runtime/resourcemanager/ResourceManagerJobMasterTest.java
 
b/flink-runtime/src/test/java/org/apache/flink/runtime/resourcemanager/ResourceManagerJobMasterTest.java
index 2622634..031f76e 100644
--- 
a/flink-runtime/src/test/java/org/apache/flink/runtime/resourcemanager/ResourceManagerJobMasterTest.java
+++ 
b/flink-runtime/src/test/java/org/apache/flink/runtime/resourcemanager/ResourceManagerJobMasterTest.java
@@ -193,10 +193,16 @@ public class ResourceManagerJobMasterTest {
                
highAvailabilityServices.setResourceManagerLeaderElectionService(resourceManagerLeaderElectionService);
                highAvailabilityServices.setJobMasterLeaderRetriever(jobID, 
jobMasterLeaderRetrievalService);
 
-               ResourceManagerConfiguration resourceManagerConfiguration = new 
ResourceManagerConfiguration(Time.seconds(5L), Time.seconds(5L));
+               ResourceManagerConfiguration resourceManagerConfiguration = new 
ResourceManagerConfiguration(
+                       Time.seconds(5L),
+                       Time.seconds(5L),
+                       Time.minutes(5L));
                SlotManagerFactory slotManagerFactory = new 
TestingSlotManagerFactory();
                MetricRegistry metricRegistry = mock(MetricRegistry.class);
-               JobLeaderIdService jobLeaderIdService = new 
JobLeaderIdService(highAvailabilityServices);
+               JobLeaderIdService jobLeaderIdService = new JobLeaderIdService(
+                       highAvailabilityServices,
+                       rpcService.getScheduledExecutor(),
+                       resourceManagerConfiguration.getJobTimeout());
 
                ResourceManager resourceManager = new StandaloneResourceManager(
                        rpcService,

http://git-wip-us.apache.org/repos/asf/flink/blob/fcd264a7/flink-runtime/src/test/java/org/apache/flink/runtime/resourcemanager/ResourceManagerTaskExecutorTest.java
----------------------------------------------------------------------
diff --git 
a/flink-runtime/src/test/java/org/apache/flink/runtime/resourcemanager/ResourceManagerTaskExecutorTest.java
 
b/flink-runtime/src/test/java/org/apache/flink/runtime/resourcemanager/ResourceManagerTaskExecutorTest.java
index 1016181..4456235 100644
--- 
a/flink-runtime/src/test/java/org/apache/flink/runtime/resourcemanager/ResourceManagerTaskExecutorTest.java
+++ 
b/flink-runtime/src/test/java/org/apache/flink/runtime/resourcemanager/ResourceManagerTaskExecutorTest.java
@@ -146,9 +146,15 @@ public class ResourceManagerTaskExecutorTest {
                TestingHighAvailabilityServices highAvailabilityServices = new 
TestingHighAvailabilityServices();
                
highAvailabilityServices.setResourceManagerLeaderElectionService(rmLeaderElectionService);
                TestingSlotManagerFactory slotManagerFactory = new 
TestingSlotManagerFactory();
-               ResourceManagerConfiguration resourceManagerConfiguration = new 
ResourceManagerConfiguration(Time.seconds(5L), Time.seconds(5L));
+               ResourceManagerConfiguration resourceManagerConfiguration = new 
ResourceManagerConfiguration(
+                       Time.seconds(5L),
+                       Time.seconds(5L),
+                       Time.minutes(5L));
                MetricRegistry metricRegistry = mock(MetricRegistry.class);
-               JobLeaderIdService jobLeaderIdService = new 
JobLeaderIdService(highAvailabilityServices);
+               JobLeaderIdService jobLeaderIdService = new JobLeaderIdService(
+                       highAvailabilityServices,
+                       rpcService.getScheduledExecutor(),
+                       resourceManagerConfiguration.getJobTimeout());
 
 
                StandaloneResourceManager resourceManager =

http://git-wip-us.apache.org/repos/asf/flink/blob/fcd264a7/flink-runtime/src/test/java/org/apache/flink/runtime/resourcemanager/slotmanager/SlotProtocolTest.java
----------------------------------------------------------------------
diff --git 
a/flink-runtime/src/test/java/org/apache/flink/runtime/resourcemanager/slotmanager/SlotProtocolTest.java
 
b/flink-runtime/src/test/java/org/apache/flink/runtime/resourcemanager/slotmanager/SlotProtocolTest.java
index a3ba436..1e5edbe 100644
--- 
a/flink-runtime/src/test/java/org/apache/flink/runtime/resourcemanager/slotmanager/SlotProtocolTest.java
+++ 
b/flink-runtime/src/test/java/org/apache/flink/runtime/resourcemanager/slotmanager/SlotProtocolTest.java
@@ -107,8 +107,15 @@ public class SlotProtocolTest extends TestLogger {
                TestingLeaderElectionService rmLeaderElectionService =
                        configureHA(testingHaServices, jobID, rmAddress, 
rmLeaderID, jmAddress, jmLeaderID);
 
-               ResourceManagerConfiguration resourceManagerConfiguration = new 
ResourceManagerConfiguration(Time.seconds(5L), Time.seconds(5L));
-               JobLeaderIdService jobLeaderIdService = new 
JobLeaderIdService(testingHaServices);
+               ResourceManagerConfiguration resourceManagerConfiguration = new 
ResourceManagerConfiguration(
+                       Time.seconds(5L),
+                       Time.seconds(5L),
+                       Time.minutes(5L));
+
+               JobLeaderIdService jobLeaderIdService = new JobLeaderIdService(
+                       testingHaServices,
+                       testRpcService.getScheduledExecutor(),
+                       resourceManagerConfiguration.getJobTimeout());
 
                final TestingSlotManagerFactory slotManagerFactory = new 
TestingSlotManagerFactory();
                SpiedResourceManager resourceManager =
@@ -208,9 +215,15 @@ public class SlotProtocolTest extends TestLogger {
                        .thenReturn(new 
FlinkCompletableFuture<TMSlotRequestReply>());
                testRpcService.registerGateway(tmAddress, taskExecutorGateway);
 
-               ResourceManagerConfiguration resourceManagerConfiguration = new 
ResourceManagerConfiguration(Time.seconds(5L), Time.seconds(5L));
+               ResourceManagerConfiguration resourceManagerConfiguration = new 
ResourceManagerConfiguration(
+                       Time.seconds(5L),
+                       Time.seconds(5L),
+                       Time.minutes(5L));
 
-               JobLeaderIdService jobLeaderIdService = new 
JobLeaderIdService(testingHaServices);
+               JobLeaderIdService jobLeaderIdService = new JobLeaderIdService(
+                       testingHaServices,
+                       testRpcService.getScheduledExecutor(),
+                       resourceManagerConfiguration.getJobTimeout());
 
                TestingSlotManagerFactory slotManagerFactory = new 
TestingSlotManagerFactory();
                ResourceManager<ResourceID> resourceManager =

http://git-wip-us.apache.org/repos/asf/flink/blob/fcd264a7/flink-runtime/src/test/java/org/apache/flink/runtime/taskexecutor/TaskExecutorITCase.java
----------------------------------------------------------------------
diff --git 
a/flink-runtime/src/test/java/org/apache/flink/runtime/taskexecutor/TaskExecutorITCase.java
 
b/flink-runtime/src/test/java/org/apache/flink/runtime/taskexecutor/TaskExecutorITCase.java
index 0f884f2..898584c 100644
--- 
a/flink-runtime/src/test/java/org/apache/flink/runtime/taskexecutor/TaskExecutorITCase.java
+++ 
b/flink-runtime/src/test/java/org/apache/flink/runtime/taskexecutor/TaskExecutorITCase.java
@@ -93,9 +93,15 @@ public class TaskExecutorITCase {
                testingHAServices.setJobMasterLeaderRetriever(jobId, new 
TestingLeaderRetrievalService(jmAddress, jmLeaderId));
 
                TestingSerialRpcService rpcService = new 
TestingSerialRpcService();
-               ResourceManagerConfiguration resourceManagerConfiguration = new 
ResourceManagerConfiguration(Time.milliseconds(500L), Time.milliseconds(500L));
+               ResourceManagerConfiguration resourceManagerConfiguration = new 
ResourceManagerConfiguration(
+                       Time.milliseconds(500L),
+                       Time.milliseconds(500L),
+                       Time.minutes(5L));
                SlotManagerFactory slotManagerFactory = new 
DefaultSlotManager.Factory();
-               JobLeaderIdService jobLeaderIdService = new 
JobLeaderIdService(testingHAServices);
+               JobLeaderIdService jobLeaderIdService = new JobLeaderIdService(
+                       testingHAServices,
+                       rpcService.getScheduledExecutor(),
+                       resourceManagerConfiguration.getJobTimeout());
                MetricRegistry metricRegistry = mock(MetricRegistry.class);
 
                final TaskManagerConfiguration taskManagerConfiguration = 
TaskManagerConfiguration.fromConfiguration(configuration);

http://git-wip-us.apache.org/repos/asf/flink/blob/fcd264a7/flink-yarn/src/main/java/org/apache/flink/yarn/YarnFlinkApplicationMasterRunner.java
----------------------------------------------------------------------
diff --git 
a/flink-yarn/src/main/java/org/apache/flink/yarn/YarnFlinkApplicationMasterRunner.java
 
b/flink-yarn/src/main/java/org/apache/flink/yarn/YarnFlinkApplicationMasterRunner.java
index e2aa6ec..ddeb02e 100644
--- 
a/flink-yarn/src/main/java/org/apache/flink/yarn/YarnFlinkApplicationMasterRunner.java
+++ 
b/flink-yarn/src/main/java/org/apache/flink/yarn/YarnFlinkApplicationMasterRunner.java
@@ -187,7 +187,10 @@ public class YarnFlinkApplicationMasterRunner extends 
AbstractYarnFlinkApplicati
        private ResourceManager<?> createResourceManager(Configuration config) 
throws Exception {
                final ResourceManagerConfiguration resourceManagerConfiguration 
= ResourceManagerConfiguration.fromConfiguration(config);
                final SlotManagerFactory slotManagerFactory = new 
DefaultSlotManager.Factory();
-               final JobLeaderIdService jobLeaderIdService = new 
JobLeaderIdService(haServices);
+               final JobLeaderIdService jobLeaderIdService = new 
JobLeaderIdService(
+                       haServices,
+                       commonRpcService.getScheduledExecutor(),
+                       resourceManagerConfiguration.getJobTimeout());
 
                return new YarnResourceManager(config,
                                ENV,

Reply via email to