Repository: zeppelin Updated Branches: refs/heads/master 142597bcf -> b41997850
[MINOR] Use standard java API to interrupt thread ### What is this PR for? Use java `Thread.interrupt` method to stop job progress polling thread. Standard API is: * proper synchronized * able to interrupt `Thread.sleep` with any interval ### What type of PR is it? [Refactoring] ### Questions: * Does the licenses files need update? no * Is there breaking changes for older versions? no * Does this needs documentation? no Author: Igor Drozdov <igor_droz...@epam.com> Closes #2039 from DrIgor/refactor-progress-poller and squashes the following commits: 895787f [Igor Drozdov] Use standard java API to interrupt thread Project: http://git-wip-us.apache.org/repos/asf/zeppelin/repo Commit: http://git-wip-us.apache.org/repos/asf/zeppelin/commit/b4199785 Tree: http://git-wip-us.apache.org/repos/asf/zeppelin/tree/b4199785 Diff: http://git-wip-us.apache.org/repos/asf/zeppelin/diff/b4199785 Branch: refs/heads/master Commit: b41997850990c8ed99bb8d3d67c9a64371da7b84 Parents: 142597b Author: Igor Drozdov <igor_droz...@epam.com> Authored: Mon Feb 20 10:04:12 2017 +0300 Committer: Felix Cheung <felixche...@apache.org> Committed: Tue Mar 7 23:22:06 2017 -0800 ---------------------------------------------------------------------- .../java/org/apache/zeppelin/scheduler/Job.java | 12 ++--- .../zeppelin/scheduler/JobProgressPoller.java | 49 +++++++++----------- 2 files changed, 26 insertions(+), 35 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/zeppelin/blob/b4199785/zeppelin-interpreter/src/main/java/org/apache/zeppelin/scheduler/Job.java ---------------------------------------------------------------------- diff --git a/zeppelin-interpreter/src/main/java/org/apache/zeppelin/scheduler/Job.java b/zeppelin-interpreter/src/main/java/org/apache/zeppelin/scheduler/Job.java index 76d90b9..0e9dbeb 100644 --- a/zeppelin-interpreter/src/main/java/org/apache/zeppelin/scheduler/Job.java +++ b/zeppelin-interpreter/src/main/java/org/apache/zeppelin/scheduler/Job.java @@ -182,22 +182,16 @@ public abstract class Job { this.exception = null; errorMessage = null; dateFinished = new Date(); - progressUpdator.terminate(); - } catch (NullPointerException e) { - LOGGER.error("Job failed", e); - progressUpdator.terminate(); - this.exception = e; - setResult(e.getMessage()); - errorMessage = getStack(e); - dateFinished = new Date(); } catch (Throwable e) { LOGGER.error("Job failed", e); - progressUpdator.terminate(); this.exception = e; setResult(e.getMessage()); errorMessage = getStack(e); dateFinished = new Date(); } finally { + if (progressUpdator != null) { + progressUpdator.interrupt(); + } //aborted = false; } } http://git-wip-us.apache.org/repos/asf/zeppelin/blob/b4199785/zeppelin-interpreter/src/main/java/org/apache/zeppelin/scheduler/JobProgressPoller.java ---------------------------------------------------------------------- diff --git a/zeppelin-interpreter/src/main/java/org/apache/zeppelin/scheduler/JobProgressPoller.java b/zeppelin-interpreter/src/main/java/org/apache/zeppelin/scheduler/JobProgressPoller.java index 967702a..8b8cda0 100644 --- a/zeppelin-interpreter/src/main/java/org/apache/zeppelin/scheduler/JobProgressPoller.java +++ b/zeppelin-interpreter/src/main/java/org/apache/zeppelin/scheduler/JobProgressPoller.java @@ -21,48 +21,45 @@ import org.slf4j.Logger; import org.slf4j.LoggerFactory; /** + * Polls job progress with given interval + * + * @see Job#progress() + * @see JobListener#onProgressUpdate(org.apache.zeppelin.scheduler.Job, int) + * * TODO(moon) : add description. */ public class JobProgressPoller extends Thread { public static final long DEFAULT_INTERVAL_MSEC = 500; - Logger logger = LoggerFactory.getLogger(JobProgressPoller.class); + private static final Logger logger = LoggerFactory.getLogger(JobProgressPoller.class); + private Job job; private long intervalMs; - boolean terminate = false; public JobProgressPoller(Job job, long intervalMs) { + super("JobProgressPoller, jobId=" + job.getId()); this.job = job; - this.intervalMs = intervalMs; + if (intervalMs < 0) { + throw new IllegalArgumentException("polling interval can't be " + intervalMs); + } + this.intervalMs = intervalMs == 0 ? DEFAULT_INTERVAL_MSEC : intervalMs; } @Override public void run() { - if (intervalMs < 0) { - return; - } else if (intervalMs == 0) { - intervalMs = DEFAULT_INTERVAL_MSEC; - } - - while (terminate == false) { - JobListener listener = job.getListener(); - if (listener != null) { - try { - if (job.isRunning()) { - listener.onProgressUpdate(job, job.progress()); + try { + while (!Thread.interrupted()) { + JobListener listener = job.getListener(); + if (listener != null) { + try { + if (job.isRunning()) { + listener.onProgressUpdate(job, job.progress()); + } + } catch (Exception e) { + logger.error("Can not get or update progress", e); } - } catch (Exception e) { - logger.error("Can not get or update progress", e); } - } - try { Thread.sleep(intervalMs); - } catch (InterruptedException e) { - logger.error("Exception in JobProgressPoller while run Thread.sleep", e); } - } - } - - public void terminate() { - terminate = true; + } catch (InterruptedException ignored) {} } }