tillrohrmann commented on a change in pull request #14431:
URL: https://github.com/apache/flink/pull/14431#discussion_r546659238



##########
File path: 
flink-runtime/src/main/java/org/apache/flink/runtime/jobmaster/JobMaster.java
##########
@@ -797,52 +796,75 @@ private void startJobExecution() throws Exception {
        }
 
        private void startJobMasterServices() throws Exception {
-               // start the slot pool make sure the slot pool now accepts 
messages for this leader
-               slotPool.start(getFencingToken(), getAddress(), 
getMainThreadExecutor());
-
-               //TODO: Remove once the ZooKeeperLeaderRetrieval returns the 
stored address upon start
-               // try to reconnect to previously known leader
-               reconnectToResourceManager(new FlinkException("Starting 
JobMaster component."));
-
-               // job is ready to go, try to establish connection with 
resource manager
-               //   - activate leader retrieval for the resource manager
-               //   - on notification of the leader, the connection will be 
established and
-               //     the slot pool will start requesting slots
-               resourceManagerLeaderRetriever.start(new 
ResourceManagerLeaderListener());
+               try {
+                       // start the slot pool make sure the slot pool now 
accepts messages for this leader
+                       slotPool.start(getFencingToken(), getAddress(), 
getMainThreadExecutor());
+
+                       // job is ready to go, try to establish connection with 
resource manager
+                       //   - activate leader retrieval for the resource 
manager
+                       //   - on notification of the leader, the connection 
will be established and
+                       //     the slot pool will start requesting slots
+                       resourceManagerLeaderRetriever.start(new 
ResourceManagerLeaderListener());
+               } catch (Exception e) {
+                       handleStartJobMasterServicesError(e);
+               }
        }
 
-       /**
-        * Suspending job, all the running tasks will be cancelled, and 
communication with other components
-        * will be disposed.
-        *
-        * @param cause The reason of why this job been suspended.
-        */
-       private void suspendExecution(final Exception cause) {
-               validateRunsInMainThread();
+       private void handleStartJobMasterServicesError(Exception e) throws 
Exception {
+               try {
+                       stopJobMasterServices();
+               } catch (Exception inner) {
+                       e.addSuppressed(inner);
+               }
+
+               throw e;
+       }
+
+       private void stopJobMasterServices() throws Exception {
+               Exception resultingException = null;
 
                try {
                        resourceManagerLeaderRetriever.stop();
-                       resourceManagerAddress = null;
-               } catch (Throwable t) {
-                       log.warn("Failed to stop resource manager leader 
retriever when suspending.", t);
+               } catch (Exception e) {
+                       resultingException = e;
                }
 
-               suspendScheduler(cause);
+               // TODO: Distinguish between job termination which should free 
all slots and a loss of leadership which should keep the slots
+               slotPool.close();
+
+               stopHeartbeatServices();
+
+               ExceptionUtils.tryRethrowException(resultingException);
+       }
+
+       private void stopJobExecution(final Exception cause) throws Exception {
+               validateRunsInMainThread();
+
+               Exception resultingException = null;

Review comment:
       yes will do.




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


Reply via email to