[
https://issues.apache.org/jira/browse/FLINK-2472?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=14709516#comment-14709516
]
ASF GitHub Bot commented on FLINK-2472:
---------------------------------------
Github user sachingoel0101 commented on a diff in the pull request:
https://github.com/apache/flink/pull/979#discussion_r37768306
--- Diff:
flink-runtime/src/main/java/org/apache/flink/runtime/client/JobClientActor.java
---
@@ -49,27 +57,127 @@
// Actor which submits a job to the JobManager via this actor
private ActorRef submitter;
+ // timeout for a message from the job manager
+ private static FiniteDuration JOB_CLIENT_JOB_MANAGER_TIMEOUT;
+
+ // heartbeat interval for pinging the job manager for job status
+ private static FiniteDuration JOB_CLIENT_HEARTBEAT_INTERVAL;
+
+ // initial time delay before starting pinging job manager over regular
intervals
+ private static FiniteDuration JOB_CLIENT_INITIAL_PING_DELAY;
+
+ // maximum waiting time for a job to go to running status (milliseconds)
+ private static long JOB_CLIENT_JOB_STATUS_TIMEOUT;
+
+ // time at which the current job was created
+ private long currentJobCreatedAt;
+
+ // current job id
+ private JobID currentJobId;
+
+ // scheduler to ping JobManager after a time interval
+ private Cancellable scheduler;
+
+ // maintain when we got our last ping from the Job Manager.
+ private long jobManagerPinged = 0;
+
+ // maintain the last time we got a terminal state message
+ private long terminalStateAt = 0;
+
public JobClientActor(
ActorRef jobManager,
Logger logger,
boolean sysoutUpdates,
- Option<UUID> leaderSessionID) {
+ Option<UUID> leaderSessionID,
+ Configuration config) {
this.jobManager = Preconditions.checkNotNull(jobManager, "The
JobManager ActorRef must not be null.");
this.logger = Preconditions.checkNotNull(logger, "The logger
must not be null.");
this.leaderSessionID =
Preconditions.checkNotNull(leaderSessionID, "The leader session ID option must
not be null.");
this.sysoutUpdates = sysoutUpdates;
+ // set this to 0 to indicate the job hasn't been created yet.
+ this.currentJobCreatedAt = 0;
+ this.terminalStateAt = 0;
+ parseTimes(config);
}
@Override
protected void handleMessage(Object message) {
-
+
+ // first see if the message was from the Job Manager
+ if(getContext().sender() == jobManager){
+ this.jobManagerPinged = System.currentTimeMillis();
+ }
+
+ // ======= Job status messages on regular intervals
==============
+ if(message instanceof JobManagerMessages.CurrentJobStatus){
+ JobStatus statusReport =
((JobManagerMessages.CurrentJobStatus) message).status();
+ long timeDiff;
+ switch(statusReport){
+ case RUNNING:
+ // Vincent, we happy?
+ resetTimeouts();
+ break;
+ case FINISHED:
+ // Yeah! We happy!
+ resetTimeouts();
+ break;
+ case CREATED:
+ // we're still at Job CREATED. Let's
see if we're over the limit.
+ timeDiff = (System.currentTimeMillis()
- this.currentJobCreatedAt);
+ if(timeDiff >
JOB_CLIENT_JOB_STATUS_TIMEOUT){
+ failWithTimeout(timeDiff);
+ } // otherwise just wait a bit longer.
+ break;
+ case RESTARTING:
+ if(this.currentJobCreatedAt == 0){
--- End diff --
Will do.
> Make the JobClientActor check periodically if the submitted Job is still
> running and if the JobManager is still alive
> ---------------------------------------------------------------------------------------------------------------------
>
> Key: FLINK-2472
> URL: https://issues.apache.org/jira/browse/FLINK-2472
> Project: Flink
> Issue Type: Improvement
> Reporter: Till Rohrmann
> Assignee: Sachin Goel
>
> In case that the {{JobManager}} dies without notifying possibly connected
> {{JobClientActors}} or if the job execution finishes without sending the
> {{SerializedJobExecutionResult}} back to the {{JobClientActor}}, it might
> happen that a {{JobClient.submitJobAndWait}} never returns.
> I propose to let the {{JobClientActor}} periodically check whether the
> {{JobManager}} is still alive and whether the submitted job is still running.
> If not, then the {{JobClientActor}} should return an exception to complete
> the waiting future.
--
This message was sent by Atlassian JIRA
(v6.3.4#6332)