[ 
https://issues.apache.org/jira/browse/FLINK-2472?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=14709440#comment-14709440
 ] 

ASF GitHub Bot commented on FLINK-2472:
---------------------------------------

Github user tillrohrmann commented on a diff in the pull request:

    https://github.com/apache/flink/pull/979#discussion_r37763050
  
    --- Diff: 
flink-runtime/src/main/java/org/apache/flink/runtime/client/JobClientActor.java 
---
    @@ -49,27 +57,127 @@
        // Actor which submits a job to the JobManager via this actor
        private ActorRef submitter;
     
    +   // timeout for a message from the job manager
    +   private static FiniteDuration JOB_CLIENT_JOB_MANAGER_TIMEOUT;
    +
    +   // heartbeat interval for pinging the job manager for job status
    +   private static FiniteDuration JOB_CLIENT_HEARTBEAT_INTERVAL;
    +
    +   // initial time delay before starting pinging job manager over regular 
intervals
    +   private static FiniteDuration JOB_CLIENT_INITIAL_PING_DELAY;
    +
    +   // maximum waiting time for a job to go to running status (milliseconds)
    +   private static long JOB_CLIENT_JOB_STATUS_TIMEOUT;
    +
    +   // time at which the current job was created
    +   private long currentJobCreatedAt;
    +
    +   // current job id
    +   private JobID currentJobId;
    +
    +   // scheduler to ping JobManager after a time interval
    +   private Cancellable scheduler;
    +
    +   // maintain when we got our last ping from the Job Manager.
    +   private long jobManagerPinged = 0;
    +
    +   // maintain the last time we got a terminal state message
    +   private long terminalStateAt = 0;
    +
        public JobClientActor(
                        ActorRef jobManager,
                        Logger logger,
                        boolean sysoutUpdates,
    -                   Option<UUID> leaderSessionID) {
    +                   Option<UUID> leaderSessionID,
    +                   Configuration config) {
                this.jobManager = Preconditions.checkNotNull(jobManager, "The 
JobManager ActorRef must not be null.");
                this.logger = Preconditions.checkNotNull(logger, "The logger 
must not be null.");
                this.leaderSessionID = 
Preconditions.checkNotNull(leaderSessionID, "The leader session ID option must 
not be null.");
     
                this.sysoutUpdates = sysoutUpdates;
    +           // set this to 0 to indicate the job hasn't been created yet.
    +           this.currentJobCreatedAt = 0;
    +           this.terminalStateAt = 0;
    +           parseTimes(config);
        }
        
        @Override
        protected void handleMessage(Object message) {
    -           
    +
    +           // first see if the message was from the Job Manager
    +           if(getContext().sender() == jobManager){
    +                   this.jobManagerPinged = System.currentTimeMillis();
    +           }
    +
    +           // ======= Job status messages on regular intervals 
==============
    +           if(message instanceof JobManagerMessages.CurrentJobStatus){
    +                   JobStatus statusReport = 
((JobManagerMessages.CurrentJobStatus) message).status();
    +                   long timeDiff;
    +                   switch(statusReport){
    +                           case RUNNING:
    +                                   // Vincent, we happy?
    +                                   resetTimeouts();
    +                                   break;
    +                           case FINISHED:
    +                                   // Yeah! We happy!
    +                                   resetTimeouts();
    +                                   break;
    +                           case CREATED:
    +                                   // we're still at Job CREATED. Let's 
see if we're over the limit.
    +                                   timeDiff = (System.currentTimeMillis() 
- this.currentJobCreatedAt);
    +                                   if(timeDiff > 
JOB_CLIENT_JOB_STATUS_TIMEOUT){
    +                                           failWithTimeout(timeDiff);
    +                                   } // otherwise just wait a bit longer.
    +                                   break;
    +                           case RESTARTING:
    +                                   if(this.currentJobCreatedAt == 0){
    --- End diff --
    
    Even though we have not defined it properly, but the rest of the code 
inserts white spaces after keywords such as `if` and `else`, for example.


> Make the JobClientActor check periodically if the submitted Job is still 
> running and if the JobManager is still alive
> ---------------------------------------------------------------------------------------------------------------------
>
>                 Key: FLINK-2472
>                 URL: https://issues.apache.org/jira/browse/FLINK-2472
>             Project: Flink
>          Issue Type: Improvement
>            Reporter: Till Rohrmann
>            Assignee: Sachin Goel
>
> In case that the {{JobManager}} dies without notifying possibly connected 
> {{JobClientActors}} or if the job execution finishes without sending the 
> {{SerializedJobExecutionResult}} back to the {{JobClientActor}}, it might 
> happen that a {{JobClient.submitJobAndWait}} never returns.
> I propose to let the {{JobClientActor}} periodically check whether the 
> {{JobManager}} is still alive and whether the submitted job is still running. 
> If not, then the {{JobClientActor}} should return an exception to complete 
> the waiting future.



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)

Reply via email to