This is an automated email from the ASF dual-hosted git repository. zjffdu pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/zeppelin.git
The following commit(s) were added to refs/heads/master by this push: new a8b8ca4 ZEPPELIN-4483. Job stautus in zeppelin server side may be always in RUNNING while the job is finished in interpreter process a8b8ca4 is described below commit a8b8ca41210526ed4f3b7439eccc51bde104eb99 Author: Jeff Zhang <zjf...@apache.org> AuthorDate: Mon Dec 16 10:03:27 2019 +0800 ZEPPELIN-4483. Job stautus in zeppelin server side may be always in RUNNING while the job is finished in interpreter process ### What is this PR for? There's 2 places that job will update its status, one is JobStatusPoller thread, another is Job itself. It is possible that job is finished, and after it get rpc request that tell the job is in RUNNING state. In this case, the job would be never complete in frontend. This PR fix this issue. ### What type of PR is it? [Bug Fix ] ### Todos * [ ] - Task ### What is the Jira issue? * https://issues.apache.org/jira/browse/ZEPPELIN-4483 ### How should this be tested? * CI pass. I run the following code in a cron job which run every 1 minute, before this PR, I can reproduce this issue. With this PR, I didn't see this issue for more than 2 days. ``` sc.range(1,10).sum() ``` ### Screenshots (if appropriate) ### Questions: * Does the licenses files need update? No * Is there breaking changes for older versions? No * Does this needs documentation? No Author: Jeff Zhang <zjf...@apache.org> Closes #3553 from zjffdu/ZEPPELIN-4483 and squashes the following commits: 31594e0e9 [Jeff Zhang] update job status only when previous status is PENDING 6783e5cdb [Jeff Zhang] ZEPPELIN-4483. Job stautus in zeppelin server side may be always in RUNNING while the job is finished in interpreter process --- .../remote/RemoteInterpreterServer.java | 2 - .../zeppelin/scheduler/AbstractScheduler.java | 37 +++++----- .../org/apache/zeppelin/socket/NotebookServer.java | 4 - .../apache/zeppelin/scheduler/RemoteScheduler.java | 86 ++++++++-------------- 4 files changed, 49 insertions(+), 80 deletions(-) diff --git a/zeppelin-interpreter/src/main/java/org/apache/zeppelin/interpreter/remote/RemoteInterpreterServer.java b/zeppelin-interpreter/src/main/java/org/apache/zeppelin/interpreter/remote/RemoteInterpreterServer.java index b15d3de..c627d07 100644 --- a/zeppelin-interpreter/src/main/java/org/apache/zeppelin/interpreter/remote/RemoteInterpreterServer.java +++ b/zeppelin-interpreter/src/main/java/org/apache/zeppelin/interpreter/remote/RemoteInterpreterServer.java @@ -889,7 +889,6 @@ public class RemoteInterpreterServer extends Thread synchronized (interpreterGroup) { List<Interpreter> interpreters = interpreterGroup.get(sessionId); if (interpreters == null) { - logger.info("getStatus:" + Status.UNKNOWN.name()); return Status.UNKNOWN.name(); } @@ -902,7 +901,6 @@ public class RemoteInterpreterServer extends Thread } } } - logger.info("getStatus:" + Status.UNKNOWN.name()); return Status.UNKNOWN.name(); } diff --git a/zeppelin-interpreter/src/main/java/org/apache/zeppelin/scheduler/AbstractScheduler.java b/zeppelin-interpreter/src/main/java/org/apache/zeppelin/scheduler/AbstractScheduler.java index 85680ed..175dc6a 100644 --- a/zeppelin-interpreter/src/main/java/org/apache/zeppelin/scheduler/AbstractScheduler.java +++ b/zeppelin-interpreter/src/main/java/org/apache/zeppelin/scheduler/AbstractScheduler.java @@ -120,25 +120,26 @@ public abstract class AbstractScheduler implements Scheduler { } runningJob.run(); Object jobResult = runningJob.getReturn(); - if (runningJob.isAborted()) { - runningJob.setStatus(Job.Status.ABORT); - LOGGER.debug("Job Aborted, " + runningJob.getId() + ", " + - runningJob.getErrorMessage()); - } else if (runningJob.getException() != null) { - LOGGER.debug("Job Error, " + runningJob.getId() + ", " + - runningJob.getReturn()); - runningJob.setStatus(Job.Status.ERROR); - } else if (jobResult != null && jobResult instanceof InterpreterResult - && ((InterpreterResult) jobResult).code() == InterpreterResult.Code.ERROR) { - LOGGER.debug("Job Error, " + runningJob.getId() + ", " + - runningJob.getReturn()); - runningJob.setStatus(Job.Status.ERROR); - } else { - LOGGER.debug("Job Finished, " + runningJob.getId() + ", Result: " + - runningJob.getReturn()); - runningJob.setStatus(Job.Status.FINISHED); + synchronized (runningJob) { + if (runningJob.isAborted()) { + runningJob.setStatus(Job.Status.ABORT); + LOGGER.debug("Job Aborted, " + runningJob.getId() + ", " + + runningJob.getErrorMessage()); + } else if (runningJob.getException() != null) { + LOGGER.debug("Job Error, " + runningJob.getId() + ", " + + runningJob.getReturn()); + runningJob.setStatus(Job.Status.ERROR); + } else if (jobResult != null && jobResult instanceof InterpreterResult + && ((InterpreterResult) jobResult).code() == InterpreterResult.Code.ERROR) { + LOGGER.debug("Job Error, " + runningJob.getId() + ", " + + runningJob.getReturn()); + runningJob.setStatus(Job.Status.ERROR); + } else { + LOGGER.debug("Job Finished, " + runningJob.getId() + ", Result: " + + runningJob.getReturn()); + runningJob.setStatus(Job.Status.FINISHED); + } } - LOGGER.info("Job " + runningJob.getId() + " finished by scheduler " + name); // reset aborted flag to allow retry runningJob.aborted = false; diff --git a/zeppelin-server/src/main/java/org/apache/zeppelin/socket/NotebookServer.java b/zeppelin-server/src/main/java/org/apache/zeppelin/socket/NotebookServer.java index 702efd5..356f7be 100644 --- a/zeppelin-server/src/main/java/org/apache/zeppelin/socket/NotebookServer.java +++ b/zeppelin-server/src/main/java/org/apache/zeppelin/socket/NotebookServer.java @@ -1831,10 +1831,6 @@ public class NotebookServer extends WebSocketServlet p.setStatusToUserParagraph(p.getStatus()); broadcastParagraph(p.getNote(), p); - // for (NoteEventListener listener : notebook.getNoteEventListeners()) { - // listener.onParagraphStatusChange(p, after); - // } - try { broadcastUpdateNoteJobInfo(System.currentTimeMillis() - 5000); } catch (IOException e) { diff --git a/zeppelin-zengine/src/main/java/org/apache/zeppelin/scheduler/RemoteScheduler.java b/zeppelin-zengine/src/main/java/org/apache/zeppelin/scheduler/RemoteScheduler.java index 34f1d5d..5f19df1 100644 --- a/zeppelin-zengine/src/main/java/org/apache/zeppelin/scheduler/RemoteScheduler.java +++ b/zeppelin-zengine/src/main/java/org/apache/zeppelin/scheduler/RemoteScheduler.java @@ -50,7 +50,7 @@ public class RemoteScheduler extends AbstractScheduler { // wait until it is submitted to the remote while (!jobRunner.isJobSubmittedInRemote()) { try { - Thread.sleep(500); + Thread.sleep(100); } catch (InterruptedException e) { LOGGER.error("Exception in RemoteScheduler while jobRunner.isJobSubmittedInRemote " + "queue.wait", e); @@ -59,24 +59,20 @@ public class RemoteScheduler extends AbstractScheduler { } /** - * Role of the class is get status info from remote process from PENDING to - * RUNNING status. + * Role of the class is getting status info from remote process from PENDING to + * RUNNING status. This thread will exist after job is in RUNNING/FINISHED state. */ private class JobStatusPoller extends Thread { - private long initialPeriodMsec; - private long initialPeriodCheckIntervalMsec; private long checkIntervalMsec; private volatile boolean terminate; private JobListener listener; private Job job; - volatile Status lastStatus; + private volatile Status lastStatus; - public JobStatusPoller(long initialPeriodMsec, - long initialPeriodCheckIntervalMsec, long checkIntervalMsec, Job job, - JobListener listener) { + public JobStatusPoller(Job job, + JobListener listener, + long checkIntervalMsec) { setName("JobStatusPoller-" + job.getId()); - this.initialPeriodMsec = initialPeriodMsec; - this.initialPeriodCheckIntervalMsec = initialPeriodCheckIntervalMsec; this.checkIntervalMsec = checkIntervalMsec; this.job = job; this.listener = listener; @@ -85,19 +81,10 @@ public class RemoteScheduler extends AbstractScheduler { @Override public void run() { - long started = System.currentTimeMillis(); while (terminate == false) { - long current = System.currentTimeMillis(); - long interval; - if (current - started < initialPeriodMsec) { - interval = initialPeriodCheckIntervalMsec; - } else { - interval = checkIntervalMsec; - } - synchronized (this) { try { - this.wait(interval); + this.wait(checkIntervalMsec); } catch (InterruptedException e) { LOGGER.error("Exception in RemoteScheduler while run this.wait", e); } @@ -109,12 +96,11 @@ public class RemoteScheduler extends AbstractScheduler { } Status newStatus = getStatus(); - if (newStatus == Status.UNKNOWN) { // unknown - continue; - } - - if (newStatus != Status.READY && newStatus != Status.PENDING) { - // we don't need more + if (newStatus == Status.RUNNING || + newStatus == Status.FINISHED || + newStatus == Status.ERROR || + newStatus == Status.ABORT) { + // Exit this thread when job is in RUNNING/FINISHED/ERROR/ABORT state. break; } } @@ -128,26 +114,13 @@ public class RemoteScheduler extends AbstractScheduler { } } - - private Status getLastStatus() { - if (terminate == true) { - if (job.getErrorMessage() != null) { - return Status.ERROR; - } else if (lastStatus != Status.FINISHED && - lastStatus != Status.ERROR && - lastStatus != Status.ABORT) { - return Status.FINISHED; + public Status getStatus() { + if (!remoteInterpreter.isOpened()) { + if (lastStatus != null) { + return lastStatus; } else { - return (lastStatus == null) ? Status.FINISHED : lastStatus; + return job.getStatus(); } - } else { - return (lastStatus == null) ? Status.UNKNOWN : lastStatus; - } - } - - public synchronized Status getStatus() { - if (!remoteInterpreter.isOpened()) { - return getLastStatus(); } Status status = Status.valueOf(remoteInterpreter.getStatus(job.getId())); if (status == Status.UNKNOWN) { @@ -155,8 +128,8 @@ public class RemoteScheduler extends AbstractScheduler { // maybe not submitted, maybe already finished return job.getStatus(); } + listener.onStatusChange(job, lastStatus, status); lastStatus = status; - listener.onStatusChange(job, null, status); return status; } } @@ -166,7 +139,7 @@ public class RemoteScheduler extends AbstractScheduler { private RemoteScheduler scheduler; private Job job; private volatile boolean jobExecuted; - volatile boolean jobSubmittedRemotely; + private volatile boolean jobSubmittedRemotely; public JobRunner(RemoteScheduler scheduler, Job job) { this.scheduler = scheduler; @@ -181,8 +154,7 @@ public class RemoteScheduler extends AbstractScheduler { @Override public void run() { - JobStatusPoller jobStatusPoller = new JobStatusPoller(1500, 100, 500, - job, this); + JobStatusPoller jobStatusPoller = new JobStatusPoller(job, this, 100); jobStatusPoller.start(); scheduler.runJob(job); jobExecuted = true; @@ -199,12 +171,12 @@ public class RemoteScheduler extends AbstractScheduler { public void onProgressUpdate(Job job, int progress) { } + // Call by JobStatusPoller thread, update status when JobStatusPoller get new status. @Override public void onStatusChange(Job job, Status before, Status after) { - // Update remoteStatus if (jobExecuted == false) { if (after == Status.FINISHED || after == Status.ABORT - || after == Status.ERROR) { + || after == Status.ERROR) { // it can be status of last run. // so not updating the remoteStatus return; @@ -216,12 +188,14 @@ public class RemoteScheduler extends AbstractScheduler { jobSubmittedRemotely = true; } - // only set status when it is RUNNING - // We would set other status based on the interpret result - if (after == Status.RUNNING) { - job.setStatus(Status.RUNNING); + // only set status when the status fetched from JobStatusPoller is RUNNING, + // the status of job itself is still in PENDING. + // Because the status from JobStatusPoller may happen after the job is finished. + synchronized (job) { + if (after == Status.RUNNING && job.getStatus() == Status.PENDING) { + job.setStatus(Status.RUNNING); + } } } } - }