This is an automated email from the ASF dual-hosted git repository.

zjffdu pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/zeppelin.git


The following commit(s) were added to refs/heads/master by this push:
     new a8b8ca4  ZEPPELIN-4483. Job stautus in zeppelin server side may be 
always in RUNNING while the job is finished in interpreter process
a8b8ca4 is described below

commit a8b8ca41210526ed4f3b7439eccc51bde104eb99
Author: Jeff Zhang <zjf...@apache.org>
AuthorDate: Mon Dec 16 10:03:27 2019 +0800

    ZEPPELIN-4483. Job stautus in zeppelin server side may be always in RUNNING 
while the job is finished in interpreter process
    
    ### What is this PR for?
    
    There's 2 places that job will update its status, one is JobStatusPoller 
thread, another is Job itself. It is possible that job is finished, and after 
it get rpc request that tell the job is in RUNNING state. In this case, the job 
would be never complete in frontend. This PR fix this issue.
    
    ### What type of PR is it?
    [Bug Fix ]
    
    ### Todos
    * [ ] - Task
    
    ### What is the Jira issue?
    * https://issues.apache.org/jira/browse/ZEPPELIN-4483
    
    ### How should this be tested?
    * CI pass. I run the following code in a cron job which run every 1 minute, 
before this PR, I can reproduce this issue. With this PR, I didn't see this 
issue for more than 2 days.
    
    ```
    sc.range(1,10).sum()
    ```
    ### Screenshots (if appropriate)
    
    ### Questions:
    * Does the licenses files need update? No
    * Is there breaking changes for older versions? No
    * Does this needs documentation? No
    
    Author: Jeff Zhang <zjf...@apache.org>
    
    Closes #3553 from zjffdu/ZEPPELIN-4483 and squashes the following commits:
    
    31594e0e9 [Jeff Zhang] update job status only when previous status is 
PENDING
    6783e5cdb [Jeff Zhang] ZEPPELIN-4483. Job stautus in zeppelin server side 
may be always in RUNNING while the job is finished in interpreter process
---
 .../remote/RemoteInterpreterServer.java            |  2 -
 .../zeppelin/scheduler/AbstractScheduler.java      | 37 +++++-----
 .../org/apache/zeppelin/socket/NotebookServer.java |  4 -
 .../apache/zeppelin/scheduler/RemoteScheduler.java | 86 ++++++++--------------
 4 files changed, 49 insertions(+), 80 deletions(-)

diff --git 
a/zeppelin-interpreter/src/main/java/org/apache/zeppelin/interpreter/remote/RemoteInterpreterServer.java
 
b/zeppelin-interpreter/src/main/java/org/apache/zeppelin/interpreter/remote/RemoteInterpreterServer.java
index b15d3de..c627d07 100644
--- 
a/zeppelin-interpreter/src/main/java/org/apache/zeppelin/interpreter/remote/RemoteInterpreterServer.java
+++ 
b/zeppelin-interpreter/src/main/java/org/apache/zeppelin/interpreter/remote/RemoteInterpreterServer.java
@@ -889,7 +889,6 @@ public class RemoteInterpreterServer extends Thread
     synchronized (interpreterGroup) {
       List<Interpreter> interpreters = interpreterGroup.get(sessionId);
       if (interpreters == null) {
-        logger.info("getStatus:" + Status.UNKNOWN.name());
         return Status.UNKNOWN.name();
       }
 
@@ -902,7 +901,6 @@ public class RemoteInterpreterServer extends Thread
         }
       }
     }
-    logger.info("getStatus:" + Status.UNKNOWN.name());
     return Status.UNKNOWN.name();
   }
 
diff --git 
a/zeppelin-interpreter/src/main/java/org/apache/zeppelin/scheduler/AbstractScheduler.java
 
b/zeppelin-interpreter/src/main/java/org/apache/zeppelin/scheduler/AbstractScheduler.java
index 85680ed..175dc6a 100644
--- 
a/zeppelin-interpreter/src/main/java/org/apache/zeppelin/scheduler/AbstractScheduler.java
+++ 
b/zeppelin-interpreter/src/main/java/org/apache/zeppelin/scheduler/AbstractScheduler.java
@@ -120,25 +120,26 @@ public abstract class AbstractScheduler implements 
Scheduler {
     }
     runningJob.run();
     Object jobResult = runningJob.getReturn();
-    if (runningJob.isAborted()) {
-      runningJob.setStatus(Job.Status.ABORT);
-      LOGGER.debug("Job Aborted, " + runningJob.getId() + ", " +
-          runningJob.getErrorMessage());
-    } else if (runningJob.getException() != null) {
-      LOGGER.debug("Job Error, " + runningJob.getId() + ", " +
-          runningJob.getReturn());
-      runningJob.setStatus(Job.Status.ERROR);
-    } else if (jobResult != null && jobResult instanceof InterpreterResult
-        && ((InterpreterResult) jobResult).code() == 
InterpreterResult.Code.ERROR) {
-      LOGGER.debug("Job Error, " + runningJob.getId() + ", " +
-          runningJob.getReturn());
-      runningJob.setStatus(Job.Status.ERROR);
-    } else {
-      LOGGER.debug("Job Finished, " + runningJob.getId() + ", Result: " +
-          runningJob.getReturn());
-      runningJob.setStatus(Job.Status.FINISHED);
+    synchronized (runningJob) {
+      if (runningJob.isAborted()) {
+        runningJob.setStatus(Job.Status.ABORT);
+        LOGGER.debug("Job Aborted, " + runningJob.getId() + ", " +
+                runningJob.getErrorMessage());
+      } else if (runningJob.getException() != null) {
+        LOGGER.debug("Job Error, " + runningJob.getId() + ", " +
+                runningJob.getReturn());
+        runningJob.setStatus(Job.Status.ERROR);
+      } else if (jobResult != null && jobResult instanceof InterpreterResult
+              && ((InterpreterResult) jobResult).code() == 
InterpreterResult.Code.ERROR) {
+        LOGGER.debug("Job Error, " + runningJob.getId() + ", " +
+                runningJob.getReturn());
+        runningJob.setStatus(Job.Status.ERROR);
+      } else {
+        LOGGER.debug("Job Finished, " + runningJob.getId() + ", Result: " +
+                runningJob.getReturn());
+        runningJob.setStatus(Job.Status.FINISHED);
+      }
     }
-
     LOGGER.info("Job " + runningJob.getId() + " finished by scheduler " + 
name);
     // reset aborted flag to allow retry
     runningJob.aborted = false;
diff --git 
a/zeppelin-server/src/main/java/org/apache/zeppelin/socket/NotebookServer.java 
b/zeppelin-server/src/main/java/org/apache/zeppelin/socket/NotebookServer.java
index 702efd5..356f7be 100644
--- 
a/zeppelin-server/src/main/java/org/apache/zeppelin/socket/NotebookServer.java
+++ 
b/zeppelin-server/src/main/java/org/apache/zeppelin/socket/NotebookServer.java
@@ -1831,10 +1831,6 @@ public class NotebookServer extends WebSocketServlet
 
     p.setStatusToUserParagraph(p.getStatus());
     broadcastParagraph(p.getNote(), p);
-    //    for (NoteEventListener listener : notebook.getNoteEventListeners()) {
-    //      listener.onParagraphStatusChange(p, after);
-    //    }
-
     try {
       broadcastUpdateNoteJobInfo(System.currentTimeMillis() - 5000);
     } catch (IOException e) {
diff --git 
a/zeppelin-zengine/src/main/java/org/apache/zeppelin/scheduler/RemoteScheduler.java
 
b/zeppelin-zengine/src/main/java/org/apache/zeppelin/scheduler/RemoteScheduler.java
index 34f1d5d..5f19df1 100644
--- 
a/zeppelin-zengine/src/main/java/org/apache/zeppelin/scheduler/RemoteScheduler.java
+++ 
b/zeppelin-zengine/src/main/java/org/apache/zeppelin/scheduler/RemoteScheduler.java
@@ -50,7 +50,7 @@ public class RemoteScheduler extends AbstractScheduler {
     // wait until it is submitted to the remote
     while (!jobRunner.isJobSubmittedInRemote()) {
       try {
-        Thread.sleep(500);
+        Thread.sleep(100);
       } catch (InterruptedException e) {
         LOGGER.error("Exception in RemoteScheduler while 
jobRunner.isJobSubmittedInRemote " +
             "queue.wait", e);
@@ -59,24 +59,20 @@ public class RemoteScheduler extends AbstractScheduler {
   }
 
   /**
-   * Role of the class is get status info from remote process from PENDING to
-   * RUNNING status.
+   * Role of the class is getting status info from remote process from PENDING 
to
+   * RUNNING status. This thread will exist after job is in RUNNING/FINISHED 
state.
    */
   private class JobStatusPoller extends Thread {
-    private long initialPeriodMsec;
-    private long initialPeriodCheckIntervalMsec;
     private long checkIntervalMsec;
     private volatile boolean terminate;
     private JobListener listener;
     private Job job;
-    volatile Status lastStatus;
+    private volatile Status lastStatus;
 
-    public JobStatusPoller(long initialPeriodMsec,
-                           long initialPeriodCheckIntervalMsec, long 
checkIntervalMsec, Job job,
-                           JobListener listener) {
+    public JobStatusPoller(Job job,
+                           JobListener listener,
+                           long checkIntervalMsec) {
       setName("JobStatusPoller-" + job.getId());
-      this.initialPeriodMsec = initialPeriodMsec;
-      this.initialPeriodCheckIntervalMsec = initialPeriodCheckIntervalMsec;
       this.checkIntervalMsec = checkIntervalMsec;
       this.job = job;
       this.listener = listener;
@@ -85,19 +81,10 @@ public class RemoteScheduler extends AbstractScheduler {
 
     @Override
     public void run() {
-      long started = System.currentTimeMillis();
       while (terminate == false) {
-        long current = System.currentTimeMillis();
-        long interval;
-        if (current - started < initialPeriodMsec) {
-          interval = initialPeriodCheckIntervalMsec;
-        } else {
-          interval = checkIntervalMsec;
-        }
-
         synchronized (this) {
           try {
-            this.wait(interval);
+            this.wait(checkIntervalMsec);
           } catch (InterruptedException e) {
             LOGGER.error("Exception in RemoteScheduler while run this.wait", 
e);
           }
@@ -109,12 +96,11 @@ public class RemoteScheduler extends AbstractScheduler {
         }
 
         Status newStatus = getStatus();
-        if (newStatus == Status.UNKNOWN) { // unknown
-          continue;
-        }
-
-        if (newStatus != Status.READY && newStatus != Status.PENDING) {
-          // we don't need more
+        if (newStatus == Status.RUNNING ||
+                newStatus == Status.FINISHED ||
+                newStatus == Status.ERROR ||
+                newStatus == Status.ABORT) {
+          // Exit this thread when job is in RUNNING/FINISHED/ERROR/ABORT 
state.
           break;
         }
       }
@@ -128,26 +114,13 @@ public class RemoteScheduler extends AbstractScheduler {
       }
     }
 
-
-    private Status getLastStatus() {
-      if (terminate == true) {
-        if (job.getErrorMessage() != null) {
-          return Status.ERROR;
-        } else if (lastStatus != Status.FINISHED &&
-            lastStatus != Status.ERROR &&
-            lastStatus != Status.ABORT) {
-          return Status.FINISHED;
+    public Status getStatus() {
+      if (!remoteInterpreter.isOpened()) {
+        if (lastStatus != null) {
+          return lastStatus;
         } else {
-          return (lastStatus == null) ? Status.FINISHED : lastStatus;
+          return job.getStatus();
         }
-      } else {
-        return (lastStatus == null) ? Status.UNKNOWN : lastStatus;
-      }
-    }
-
-    public synchronized Status getStatus() {
-      if (!remoteInterpreter.isOpened()) {
-        return getLastStatus();
       }
       Status status = Status.valueOf(remoteInterpreter.getStatus(job.getId()));
       if (status == Status.UNKNOWN) {
@@ -155,8 +128,8 @@ public class RemoteScheduler extends AbstractScheduler {
         // maybe not submitted, maybe already finished
         return job.getStatus();
       }
+      listener.onStatusChange(job, lastStatus, status);
       lastStatus = status;
-      listener.onStatusChange(job, null, status);
       return status;
     }
   }
@@ -166,7 +139,7 @@ public class RemoteScheduler extends AbstractScheduler {
     private RemoteScheduler scheduler;
     private Job job;
     private volatile boolean jobExecuted;
-    volatile boolean jobSubmittedRemotely;
+    private volatile boolean jobSubmittedRemotely;
 
     public JobRunner(RemoteScheduler scheduler, Job job) {
       this.scheduler = scheduler;
@@ -181,8 +154,7 @@ public class RemoteScheduler extends AbstractScheduler {
 
     @Override
     public void run() {
-      JobStatusPoller jobStatusPoller = new JobStatusPoller(1500, 100, 500,
-          job, this);
+      JobStatusPoller jobStatusPoller = new JobStatusPoller(job, this, 100);
       jobStatusPoller.start();
       scheduler.runJob(job);
       jobExecuted = true;
@@ -199,12 +171,12 @@ public class RemoteScheduler extends AbstractScheduler {
     public void onProgressUpdate(Job job, int progress) {
     }
 
+    // Call by JobStatusPoller thread, update status when JobStatusPoller get 
new status.
     @Override
     public void onStatusChange(Job job, Status before, Status after) {
-      // Update remoteStatus
       if (jobExecuted == false) {
         if (after == Status.FINISHED || after == Status.ABORT
-            || after == Status.ERROR) {
+                || after == Status.ERROR) {
           // it can be status of last run.
           // so not updating the remoteStatus
           return;
@@ -216,12 +188,14 @@ public class RemoteScheduler extends AbstractScheduler {
         jobSubmittedRemotely = true;
       }
 
-      // only set status when it is RUNNING
-      // We would set other status based on the interpret result
-      if (after == Status.RUNNING) {
-        job.setStatus(Status.RUNNING);
+      // only set status when the status fetched from JobStatusPoller is 
RUNNING,
+      // the status of job itself is still in PENDING.
+      // Because the status from JobStatusPoller may happen after the job is 
finished.
+      synchronized (job) {
+        if (after == Status.RUNNING && job.getStatus() == Status.PENDING) {
+          job.setStatus(Status.RUNNING);
+        }
       }
     }
   }
-
 }

Reply via email to