Repository: incubator-zeppelin Updated Branches: refs/heads/master 66e5433d2 -> 070e08158
ZEPPELIN - 285 Abort "Pending" or "Running" paragraphs on Interpreter restart This PR address the following issue https://issues.apache.org/jira/browse/ZEPPELIN-285 . The idea is to abort all the jobs which are either running or waiting to run on the interpreter that is going to be restarted. TODO - [x] initial implementation - [x] tests Author: Khalid Huseynov <[email protected]> Closes #306 from khalidhuseynov/fix-pending-issue and squashes the following commits: 45c15b3 [Khalid Huseynov] fix test, first paragraph may start running e719cca [Khalid Huseynov] add test 5287bbb [Khalid Huseynov] initial commit Project: http://git-wip-us.apache.org/repos/asf/incubator-zeppelin/repo Commit: http://git-wip-us.apache.org/repos/asf/incubator-zeppelin/commit/070e0815 Tree: http://git-wip-us.apache.org/repos/asf/incubator-zeppelin/tree/070e0815 Diff: http://git-wip-us.apache.org/repos/asf/incubator-zeppelin/diff/070e0815 Branch: refs/heads/master Commit: 070e0815854d314d0d12eff8db77242d11fa8a0c Parents: 66e5433 Author: Khalid Huseynov <[email protected]> Authored: Wed Nov 4 14:05:45 2015 +0900 Committer: Lee moon soo <[email protected]> Committed: Sun Nov 15 20:06:35 2015 +0900 ---------------------------------------------------------------------- .../interpreter/InterpreterFactory.java | 18 +++++++ .../apache/zeppelin/notebook/NotebookTest.java | 51 +++++++++++++++++++- 2 files changed, 68 insertions(+), 1 deletion(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/incubator-zeppelin/blob/070e0815/zeppelin-zengine/src/main/java/org/apache/zeppelin/interpreter/InterpreterFactory.java ---------------------------------------------------------------------- diff --git a/zeppelin-zengine/src/main/java/org/apache/zeppelin/interpreter/InterpreterFactory.java b/zeppelin-zengine/src/main/java/org/apache/zeppelin/interpreter/InterpreterFactory.java index 8a1d6ff..1beebde 100644 --- a/zeppelin-zengine/src/main/java/org/apache/zeppelin/interpreter/InterpreterFactory.java +++ b/zeppelin-zengine/src/main/java/org/apache/zeppelin/interpreter/InterpreterFactory.java @@ -49,6 +49,8 @@ import org.apache.zeppelin.display.AngularObjectRegistryListener; import org.apache.zeppelin.interpreter.Interpreter.RegisteredInterpreter; import org.apache.zeppelin.interpreter.remote.RemoteAngularObjectRegistry; import org.apache.zeppelin.interpreter.remote.RemoteInterpreter; +import org.apache.zeppelin.scheduler.Job; +import org.apache.zeppelin.scheduler.Job.Status; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -538,6 +540,22 @@ public class InterpreterFactory { synchronized (interpreterSettings) { InterpreterSetting intpsetting = interpreterSettings.get(id); if (intpsetting != null) { + + for (Interpreter intp : intpsetting.getInterpreterGroup()) { + for (Job job : intp.getScheduler().getJobsRunning()) { + job.abort(); + job.setStatus(Status.ABORT); + logger.info("Job " + job.getJobName() + " aborted "); + } + + for (Job job : intp.getScheduler().getJobsWaiting()) { + job.abort(); + job.setStatus(Status.ABORT); + logger.info("Job " + job.getJobName() + " aborted "); + } + } + + intpsetting.getInterpreterGroup().close(); intpsetting.getInterpreterGroup().destroy(); http://git-wip-us.apache.org/repos/asf/incubator-zeppelin/blob/070e0815/zeppelin-zengine/src/test/java/org/apache/zeppelin/notebook/NotebookTest.java ---------------------------------------------------------------------- diff --git a/zeppelin-zengine/src/test/java/org/apache/zeppelin/notebook/NotebookTest.java b/zeppelin-zengine/src/test/java/org/apache/zeppelin/notebook/NotebookTest.java index 31f18cf..faad058 100644 --- a/zeppelin-zengine/src/test/java/org/apache/zeppelin/notebook/NotebookTest.java +++ b/zeppelin-zengine/src/test/java/org/apache/zeppelin/notebook/NotebookTest.java @@ -20,6 +20,7 @@ package org.apache.zeppelin.notebook; import static org.junit.Assert.assertEquals; import static org.junit.Assert.assertNotNull; import static org.junit.Assert.assertNull; +import static org.junit.Assert.assertTrue; import java.io.File; import java.io.IOException; @@ -273,6 +274,54 @@ public class NotebookTest implements JobListenerFactory{ notebook.removeNote(note.id()); } + @Test + public void testAbortParagraphStatusOnInterpreterRestart() throws InterruptedException, + IOException { + Note note = notebook.createNote(); + note.getNoteReplLoader().setInterpreters(factory.getDefaultInterpreterSettingList()); + + Paragraph p1 = note.addParagraph(); + p1.setText("p1"); + Paragraph p2 = note.addParagraph(); + p2.setText("p2"); + Paragraph p3 = note.addParagraph(); + p3.setText("p3"); + Paragraph p4 = note.addParagraph(); + p4.setText("p4"); + + /* all jobs are ready to run */ + assertEquals(Job.Status.READY, p1.getStatus()); + assertEquals(Job.Status.READY, p2.getStatus()); + assertEquals(Job.Status.READY, p3.getStatus()); + assertEquals(Job.Status.READY, p4.getStatus()); + + /* run all */ + note.runAll(); + + /* all are pending in the beginning (first one possibly started)*/ + assertTrue(p1.getStatus() == Job.Status.PENDING || p1.getStatus() == Job.Status.RUNNING); + assertEquals(Job.Status.PENDING, p2.getStatus()); + assertEquals(Job.Status.PENDING, p3.getStatus()); + assertEquals(Job.Status.PENDING, p4.getStatus()); + + /* wait till first job is terminated and second starts running */ + while(p1.isTerminated() == false || (p2.getStatus() == Job.Status.PENDING)) Thread.yield(); + + assertEquals(Job.Status.FINISHED, p1.getStatus()); + assertEquals(Job.Status.RUNNING, p2.getStatus()); + assertEquals(Job.Status.PENDING, p3.getStatus()); + assertEquals(Job.Status.PENDING, p4.getStatus()); + + /* restart interpreter */ + factory.restart(note.getNoteReplLoader().getInterpreterSettings().get(0).id()); + + /* pending and running jobs have been aborted */ + assertEquals(Job.Status.FINISHED, p1.getStatus()); + assertEquals(Job.Status.ABORT, p2.getStatus()); + assertEquals(Job.Status.ABORT, p3.getStatus()); + assertEquals(Job.Status.ABORT, p4.getStatus()); + } + private void delete(File file){ if(file.isFile()) file.delete(); else if(file.isDirectory()){ @@ -285,7 +334,7 @@ public class NotebookTest implements JobListenerFactory{ file.delete(); } } - + @Override public JobListener getParagraphJobListener(Note note) { return new JobListener(){
