Repository: incubator-zeppelin Updated Branches: refs/heads/master 77c59e4f0 -> ff85f793b
Run sequence is not preserved This PR came from https://github.com/NFLabs/zeppelin/pull/397 After https://github.com/NFLabs/zeppelin/pull/364, order of submitted job to the remote interpreter is not preserved. Result is, when user run a lot of paragraphs in a short time, they run in random order(like click run 'Run all paragraphs). https://github.com/NFLabs/zeppelin/issues/395 is one of the possible problem. this PR adds some unittests and fix the problem. Ready to merge. Author: Lee moon soo <[email protected]> Author: Lee moon soo <[email protected]> Closes #8 from Leemoonsoo/fix/run_order and squashes the following commits: 4a6a230 [Lee moon soo] Prevent status remaining RUNNING after job executed 1fb3715 [Lee moon soo] Fix daedlock 6405abe [Lee moon soo] Update RemoteSchedulerTest c31a807 [Lee moon soo] Make RemoteScheduler keep order of job submitted Project: http://git-wip-us.apache.org/repos/asf/incubator-zeppelin/repo Commit: http://git-wip-us.apache.org/repos/asf/incubator-zeppelin/commit/ff85f793 Tree: http://git-wip-us.apache.org/repos/asf/incubator-zeppelin/tree/ff85f793 Diff: http://git-wip-us.apache.org/repos/asf/incubator-zeppelin/diff/ff85f793 Branch: refs/heads/master Commit: ff85f793bb35c43352432b47ee5e44e2ef650a87 Parents: 77c59e4 Author: Lee moon soo <[email protected]> Authored: Sat Mar 28 18:05:56 2015 +0900 Committer: Lee moon soo <[email protected]> Committed: Tue Mar 31 08:39:27 2015 +0900 ---------------------------------------------------------------------- .../interpreter/LazyOpenInterpreter.java | 1 - .../interpreter/remote/RemoteInterpreter.java | 35 +-- .../zeppelin/scheduler/JobProgressPoller.java | 7 +- .../zeppelin/scheduler/RemoteScheduler.java | 179 ++++++++---- .../remote/RemoteInterpreterTest.java | 288 ++++++++++++++++++- .../remote/mock/MockInterpreterA.java | 16 +- .../remote/mock/MockInterpreterB.java | 27 +- .../zeppelin/scheduler/RemoteSchedulerTest.java | 25 +- 8 files changed, 482 insertions(+), 96 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/incubator-zeppelin/blob/ff85f793/zeppelin-interpreter/src/main/java/com/nflabs/zeppelin/interpreter/LazyOpenInterpreter.java ---------------------------------------------------------------------- diff --git a/zeppelin-interpreter/src/main/java/com/nflabs/zeppelin/interpreter/LazyOpenInterpreter.java b/zeppelin-interpreter/src/main/java/com/nflabs/zeppelin/interpreter/LazyOpenInterpreter.java index 0703320..753adc9 100644 --- a/zeppelin-interpreter/src/main/java/com/nflabs/zeppelin/interpreter/LazyOpenInterpreter.java +++ b/zeppelin-interpreter/src/main/java/com/nflabs/zeppelin/interpreter/LazyOpenInterpreter.java @@ -84,7 +84,6 @@ public class LazyOpenInterpreter @Override public FormType getFormType() { - open(); return intp.getFormType(); } http://git-wip-us.apache.org/repos/asf/incubator-zeppelin/blob/ff85f793/zeppelin-interpreter/src/main/java/com/nflabs/zeppelin/interpreter/remote/RemoteInterpreter.java ---------------------------------------------------------------------- diff --git a/zeppelin-interpreter/src/main/java/com/nflabs/zeppelin/interpreter/remote/RemoteInterpreter.java b/zeppelin-interpreter/src/main/java/com/nflabs/zeppelin/interpreter/remote/RemoteInterpreter.java index 240e861..ccae0f7 100644 --- a/zeppelin-interpreter/src/main/java/com/nflabs/zeppelin/interpreter/remote/RemoteInterpreter.java +++ b/zeppelin-interpreter/src/main/java/com/nflabs/zeppelin/interpreter/remote/RemoteInterpreter.java @@ -34,6 +34,7 @@ public class RemoteInterpreter extends Interpreter { private String interpreterPath; private String className; FormType formType; + boolean initialized; private Map<String, String> env; static Map<String, RemoteInterpreterProcess> interpreterGroupReference = new HashMap<String, RemoteInterpreterProcess>(); @@ -45,6 +46,7 @@ public class RemoteInterpreter extends Interpreter { super(property); this.className = className; + initialized = false; this.interpreterRunner = interpreterRunner; this.interpreterPath = interpreterPath; env = new HashMap<String, String>(); @@ -84,7 +86,11 @@ public class RemoteInterpreter extends Interpreter { } } - private void init() { + private synchronized void init() { + if (initialized == true) { + return; + } + RemoteInterpreterProcess interpreterProcess = null; synchronized (interpreterGroupReference) { @@ -122,6 +128,7 @@ public class RemoteInterpreter extends Interpreter { } } } + initialized = true; } @@ -129,24 +136,6 @@ public class RemoteInterpreter extends Interpreter { @Override public void open() { init(); - - RemoteInterpreterProcess interpreterProcess = getInterpreterProcess(); - - Client client = null; - try { - client = interpreterProcess.getClient(); - } catch (Exception e1) { - throw new InterpreterException(e1); - } - - try { - logger.info("open remote interpreter {}", className); - client.open(className); - } catch (TException e) { - throw new InterpreterException(e); - } finally { - interpreterProcess.releaseClient(client); - } } @Override @@ -228,6 +217,8 @@ public class RemoteInterpreter extends Interpreter { @Override public FormType getFormType() { + init(); + if (formType != null) { return formType; } @@ -292,9 +283,11 @@ public class RemoteInterpreter extends Interpreter { @Override public Scheduler getScheduler() { int maxConcurrency = 10; - + RemoteInterpreterProcess interpreterProcess = getInterpreterProcess(); return SchedulerFactory.singleton().createOrGetRemoteScheduler( - "remoteinterpreter_" + this.hashCode(), getInterpreterProcess(), maxConcurrency); + "remoteinterpreter_" + interpreterProcess.hashCode(), + getInterpreterProcess(), + maxConcurrency); } @Override http://git-wip-us.apache.org/repos/asf/incubator-zeppelin/blob/ff85f793/zeppelin-interpreter/src/main/java/com/nflabs/zeppelin/scheduler/JobProgressPoller.java ---------------------------------------------------------------------- diff --git a/zeppelin-interpreter/src/main/java/com/nflabs/zeppelin/scheduler/JobProgressPoller.java b/zeppelin-interpreter/src/main/java/com/nflabs/zeppelin/scheduler/JobProgressPoller.java index bcda22d..142842a 100644 --- a/zeppelin-interpreter/src/main/java/com/nflabs/zeppelin/scheduler/JobProgressPoller.java +++ b/zeppelin-interpreter/src/main/java/com/nflabs/zeppelin/scheduler/JobProgressPoller.java @@ -5,7 +5,7 @@ import org.slf4j.LoggerFactory; /** * TODO(moon) : add description. - * + * * @author Leemoonsoo * */ @@ -21,6 +21,7 @@ public class JobProgressPoller extends Thread { this.intervalMs = intervalMs; } + @Override public void run() { if (intervalMs < 0) { return; @@ -32,7 +33,9 @@ public class JobProgressPoller extends Thread { JobListener listener = job.getListener(); if (listener != null) { try { - listener.onProgressUpdate(job, job.progress()); + if (job.isRunning()) { + listener.onProgressUpdate(job, job.progress()); + } } catch (Exception e) { logger.error("Can not get or update progress", e); } http://git-wip-us.apache.org/repos/asf/incubator-zeppelin/blob/ff85f793/zeppelin-interpreter/src/main/java/com/nflabs/zeppelin/scheduler/RemoteScheduler.java ---------------------------------------------------------------------- diff --git a/zeppelin-interpreter/src/main/java/com/nflabs/zeppelin/scheduler/RemoteScheduler.java b/zeppelin-interpreter/src/main/java/com/nflabs/zeppelin/scheduler/RemoteScheduler.java index bb02f13..14baa9b 100644 --- a/zeppelin-interpreter/src/main/java/com/nflabs/zeppelin/scheduler/RemoteScheduler.java +++ b/zeppelin-interpreter/src/main/java/com/nflabs/zeppelin/scheduler/RemoteScheduler.java @@ -28,10 +28,8 @@ public class RemoteScheduler implements Scheduler { private int maxConcurrency; private RemoteInterpreterProcess interpreterProcess; - public RemoteScheduler(String name, - ExecutorService executor, - RemoteInterpreterProcess interpreterProcess, - SchedulerListener listener, + public RemoteScheduler(String name, ExecutorService executor, + RemoteInterpreterProcess interpreterProcess, SchedulerListener listener, int maxConcurrency) { this.name = name; this.executor = executor; @@ -42,8 +40,10 @@ public class RemoteScheduler implements Scheduler { @Override public void run() { - synchronized (queue) { - while (terminate == false) { + while (terminate == false) { + Job job = null; + + synchronized (queue) { if (running.size() >= maxConcurrency || queue.isEmpty() == true) { try { queue.wait(500); @@ -52,13 +52,23 @@ public class RemoteScheduler implements Scheduler { continue; } - - Job job = queue.remove(0); + job = queue.remove(0); running.add(job); + } - // run and - Scheduler scheduler = this; - executor.execute(new JobRunner(scheduler, job)); + // run + Scheduler scheduler = this; + JobRunner jobRunner = new JobRunner(scheduler, job); + executor.execute(jobRunner); + + // wait until it is submitted to the remote + while (!jobRunner.isJobSubmittedInRemote()) { + synchronized (queue) { + try { + queue.wait(500); + } catch (InterruptedException e) { + } + } } } } @@ -70,12 +80,24 @@ public class RemoteScheduler implements Scheduler { @Override public Collection<Job> getJobsWaiting() { - return null; + List<Job> ret = new LinkedList<Job>(); + synchronized (queue) { + for (Job job : queue) { + ret.add(job); + } + } + return ret; } @Override public Collection<Job> getJobsRunning() { - return null; + List<Job> ret = new LinkedList<Job>(); + synchronized (queue) { + for (Job job : running) { + ret.add(job); + } + } + return ret; } @Override @@ -96,8 +118,8 @@ public class RemoteScheduler implements Scheduler { } /** - * Role of the class is get status info from remote process - * from PENDING to RUNNING status. + * Role of the class is get status info from remote process from PENDING to + * RUNNING status. */ private class JobStatusPoller extends Thread { private long initialPeriodMsec; @@ -106,14 +128,11 @@ public class RemoteScheduler implements Scheduler { private boolean terminate; private JobListener listener; private Job job; + Status lastStatus; - public JobStatusPoller( - long initialPeriodMsec, - long initialPeriodCheckIntervalMsec, - long checkIntervalMsec, - Job job, - JobListener listener - ) { + public JobStatusPoller(long initialPeriodMsec, + long initialPeriodCheckIntervalMsec, long checkIntervalMsec, Job job, + JobListener listener) { this.initialPeriodMsec = initialPeriodMsec; this.initialPeriodCheckIntervalMsec = initialPeriodCheckIntervalMsec; this.checkIntervalMsec = checkIntervalMsec; @@ -141,19 +160,13 @@ public class RemoteScheduler implements Scheduler { } } + Status newStatus = getStatus(); - if (newStatus == null) { + if (newStatus == null) { // unknown continue; } - // update only RUNNING - if (newStatus == Status.RUNNING) { - listener.afterStatusChange(job, null, newStatus); - break; - } - - if (newStatus != Status.READY && - newStatus != Status.PENDING) { + if (newStatus != Status.READY && newStatus != Status.PENDING) { // we don't need more continue; } @@ -167,9 +180,24 @@ public class RemoteScheduler implements Scheduler { } } + + private Status getLastStatus() { + if (terminate == true) { + if (lastStatus != Status.FINISHED && + lastStatus != Status.ERROR && + lastStatus != Status.ABORT) { + return Status.FINISHED; + } else { + return (lastStatus == null) ? Status.FINISHED : lastStatus; + } + } else { + return (lastStatus == null) ? Status.FINISHED : lastStatus; + } + } + public synchronized Job.Status getStatus() { if (interpreterProcess.referenceCount() <= 0) { - return null; + return getLastStatus(); } Client client; @@ -177,33 +205,52 @@ public class RemoteScheduler implements Scheduler { client = interpreterProcess.getClient(); } catch (Exception e) { logger.error("Can't get status information", e); - return Status.FINISHED; + lastStatus = Status.ERROR; + return Status.ERROR; } try { - Status status = Status.valueOf(client.getStatus(job.getId())); - logger.info("getStatus from remote {}", status); + String statusStr = client.getStatus(job.getId()); + if ("Unknown".equals(statusStr)) { + // not found this job in the remote schedulers. + // maybe not submitted, maybe already finished + Status status = getLastStatus(); + listener.afterStatusChange(job, null, status); + return status; + } + Status status = Status.valueOf(statusStr); + lastStatus = status; + listener.afterStatusChange(job, null, status); return status; } catch (TException e) { logger.error("Can't get status information", e); - return Status.FINISHED; + lastStatus = Status.ERROR; + return Status.ERROR; } catch (Exception e) { - // unknown status - return Status.FINISHED; + logger.error("Unknown status", e); + lastStatus = Status.ERROR; + return Status.ERROR; } finally { interpreterProcess.releaseClient(client); } } } - private class JobRunner implements Runnable, JobListener { private Scheduler scheduler; private Job job; + private boolean jobExecuted; + boolean jobSubmittedRemotely; public JobRunner(Scheduler scheduler, Job job) { this.scheduler = scheduler; this.job = job; + jobExecuted = false; + jobSubmittedRemotely = false; + } + + public boolean isJobSubmittedInRemote() { + return jobSubmittedRemotely; } @Override @@ -220,26 +267,25 @@ public class RemoteScheduler implements Scheduler { return; } - - JobStatusPoller jobStatusPoller = new JobStatusPoller( - 1500, - 100, - 500, - job, - this - ); - logger.info("*********** Start job status poller"); + JobStatusPoller jobStatusPoller = new JobStatusPoller(1500, 100, 500, + job, this); jobStatusPoller.start(); if (listener != null) { listener.jobStarted(scheduler, job); } job.run(); + jobExecuted = true; + jobSubmittedRemotely = true; jobStatusPoller.shutdown(); + try { + jobStatusPoller.join(); + } catch (InterruptedException e) { + logger.error("JobStatusPoller interrupted", e); + } job.setStatus(jobStatusPoller.getStatus()); - if (listener != null) { listener.jobFinished(scheduler, job); } @@ -263,16 +309,42 @@ public class RemoteScheduler implements Scheduler { @Override public void afterStatusChange(Job job, Status before, Status after) { - // status polled by status poller - if (job.getStatus() == after) { + if (after == null) { // unknown. maybe before sumitted remotely, maybe already finished. + if (jobExecuted) { + jobSubmittedRemotely = true; + if (job.isAborted()) { + job.setStatus(Status.ABORT); + } else if (job.getException() != null) { + job.setStatus(Status.ERROR); + } else { + job.setStatus(Status.FINISHED); + } + } return; } - job.setStatus(after); + + // Update remoteStatus + if (jobExecuted == false) { + if (after == Status.FINISHED || after == Status.ABORT + || after == Status.ERROR) { + // it can be status of last run. + // so not updating the remoteStatus + return; + } else if (after == Status.RUNNING) { + jobSubmittedRemotely = true; + } + } else { + jobSubmittedRemotely = true; + } + + // status polled by status poller + if (job.getStatus() != after) { + job.setStatus(after); + } } } - @Override public void stop() { terminate = true; @@ -280,7 +352,6 @@ public class RemoteScheduler implements Scheduler { queue.notify(); } - } } http://git-wip-us.apache.org/repos/asf/incubator-zeppelin/blob/ff85f793/zeppelin-interpreter/src/test/java/com/nflabs/zeppelin/interpreter/remote/RemoteInterpreterTest.java ---------------------------------------------------------------------- diff --git a/zeppelin-interpreter/src/test/java/com/nflabs/zeppelin/interpreter/remote/RemoteInterpreterTest.java b/zeppelin-interpreter/src/test/java/com/nflabs/zeppelin/interpreter/remote/RemoteInterpreterTest.java index 5ec6a32..dcee6aa 100644 --- a/zeppelin-interpreter/src/test/java/com/nflabs/zeppelin/interpreter/remote/RemoteInterpreterTest.java +++ b/zeppelin-interpreter/src/test/java/com/nflabs/zeppelin/interpreter/remote/RemoteInterpreterTest.java @@ -7,6 +7,8 @@ import static org.junit.Assert.assertTrue; import java.io.File; import java.io.IOException; import java.util.HashMap; +import java.util.LinkedList; +import java.util.List; import java.util.Map; import java.util.Properties; @@ -18,26 +20,35 @@ import org.junit.Test; import com.nflabs.zeppelin.display.GUI; import com.nflabs.zeppelin.interpreter.InterpreterContext; import com.nflabs.zeppelin.interpreter.InterpreterGroup; +import com.nflabs.zeppelin.interpreter.InterpreterResult; import com.nflabs.zeppelin.interpreter.remote.mock.MockInterpreterA; import com.nflabs.zeppelin.interpreter.remote.mock.MockInterpreterB; +import com.nflabs.zeppelin.scheduler.Job; +import com.nflabs.zeppelin.scheduler.Job.Status; +import com.nflabs.zeppelin.scheduler.Scheduler; public class RemoteInterpreterTest { + private InterpreterGroup intpGroup; + private HashMap<String, String> env; + @Before public void setUp() throws Exception { + intpGroup = new InterpreterGroup(); + env = new HashMap<String, String>(); + env.put("ZEPPELIN_CLASSPATH", new File("./target/test-classes").getAbsolutePath()); } @After public void tearDown() throws Exception { + intpGroup.clone(); + intpGroup.destroy(); } @Test public void testRemoteInterperterCall() throws TTransportException, IOException { Properties p = new Properties(); - InterpreterGroup intpGroup = new InterpreterGroup(); - Map<String, String> env = new HashMap<String, String>(); - env.put("ZEPPELIN_CLASSPATH", new File("./target/test-classes").getAbsolutePath()); RemoteInterpreter intpA = new RemoteInterpreter( p, @@ -97,9 +108,6 @@ public class RemoteInterpreterTest { @Test public void testRemoteSchedulerSharing() throws TTransportException, IOException { Properties p = new Properties(); - InterpreterGroup intpGroup = new InterpreterGroup(); - Map<String, String> env = new HashMap<String, String>(); - env.put("ZEPPELIN_CLASSPATH", new File("./target/test-classes").getAbsolutePath()); RemoteInterpreter intpA = new RemoteInterpreter( p, @@ -127,30 +135,294 @@ public class RemoteInterpreterTest { intpB.open(); long start = System.currentTimeMillis(); - intpA.interpret("500", + InterpreterResult ret = intpA.interpret("500", new InterpreterContext( "id", "title", "text", new HashMap<String, Object>(), new GUI())); + assertEquals("500", ret.message()); - intpB.interpret("500", + ret = intpB.interpret("500", new InterpreterContext( "id", "title", "text", new HashMap<String, Object>(), new GUI())); + assertEquals("1000", ret.message()); + long end = System.currentTimeMillis(); + assertTrue(end - start >= 1000); + + + intpA.close(); + intpB.close(); + + RemoteInterpreterProcess process = intpA.getInterpreterProcess(); + assertFalse(process.isRunning()); + } + + @Test + public void testRemoteSchedulerSharingSubmit() throws TTransportException, IOException, InterruptedException { + Properties p = new Properties(); + + final RemoteInterpreter intpA = new RemoteInterpreter( + p, + MockInterpreterA.class.getName(), + new File("../bin/interpreter.sh").getAbsolutePath(), + "fake", + env + ); + + intpGroup.add(intpA); + intpA.setInterpreterGroup(intpGroup); + + final RemoteInterpreter intpB = new RemoteInterpreter( + p, + MockInterpreterB.class.getName(), + new File("../bin/interpreter.sh").getAbsolutePath(), + "fake", + env + ); + + intpGroup.add(intpB); + intpB.setInterpreterGroup(intpGroup); + + intpA.open(); + intpB.open(); + + long start = System.currentTimeMillis(); + Job jobA = new Job("jobA", null) { + + @Override + public int progress() { + return 0; + } + + @Override + public Map<String, Object> info() { + return null; + } + + @Override + protected Object jobRun() throws Throwable { + return intpA.interpret("500", + new InterpreterContext( + "jobA", + "title", + "text", + new HashMap<String, Object>(), + new GUI())); + } + + @Override + protected boolean jobAbort() { + return false; + } + + }; + intpA.getScheduler().submit(jobA); + + Job jobB = new Job("jobB", null) { + + @Override + public int progress() { + return 0; + } + + @Override + public Map<String, Object> info() { + return null; + } + + @Override + protected Object jobRun() throws Throwable { + return intpB.interpret("500", + new InterpreterContext( + "jobB", + "title", + "text", + new HashMap<String, Object>(), + new GUI())); + } + + @Override + protected boolean jobAbort() { + return false; + } + + }; + intpB.getScheduler().submit(jobB); + + // wait until both job finished + while (jobA.getStatus() != Status.FINISHED || + jobB.getStatus() != Status.FINISHED) { + Thread.sleep(100); + } + long end = System.currentTimeMillis(); assertTrue(end - start >= 1000); + assertEquals("1000", ((InterpreterResult) jobB.getReturn()).message()); intpA.close(); intpB.close(); RemoteInterpreterProcess process = intpA.getInterpreterProcess(); assertFalse(process.isRunning()); + } + + @Test + public void testRunOrderPreserved() throws InterruptedException { + Properties p = new Properties(); + + final RemoteInterpreter intpA = new RemoteInterpreter( + p, + MockInterpreterA.class.getName(), + new File("../bin/interpreter.sh").getAbsolutePath(), + "fake", + env + ); + intpGroup.add(intpA); + intpA.setInterpreterGroup(intpGroup); + + intpA.open(); + + int concurrency = 3; + final List<String> results = new LinkedList<String>(); + + Scheduler scheduler = intpA.getScheduler(); + for (int i = 0; i < concurrency; i++) { + final String jobId = Integer.toString(i); + scheduler.submit(new Job(jobId, Integer.toString(i), null, 200) { + + @Override + public int progress() { + return 0; + } + + @Override + public Map<String, Object> info() { + return null; + } + + @Override + protected Object jobRun() throws Throwable { + InterpreterResult ret = intpA.interpret(getJobName(), new InterpreterContext( + jobId, + "title", + "text", + new HashMap<String, Object>(), + new GUI())); + + synchronized (results) { + results.add(ret.message()); + results.notify(); + } + return null; + } + + @Override + protected boolean jobAbort() { + return false; + } + + }); + } + + // wait for job finished + synchronized (results) { + while (results.size() != concurrency) { + results.wait(300); + } + } + + int i = 0; + for (String result : results) { + assertEquals(Integer.toString(i++), result); + } + assertEquals(concurrency, i); + + intpA.close(); + } + + + @Test + public void testRunParallel() throws InterruptedException { + Properties p = new Properties(); + p.put("parallel", "true"); + + final RemoteInterpreter intpA = new RemoteInterpreter( + p, + MockInterpreterA.class.getName(), + new File("../bin/interpreter.sh").getAbsolutePath(), + "fake", + env + ); + + intpGroup.add(intpA); + intpA.setInterpreterGroup(intpGroup); + + intpA.open(); + + int concurrency = 4; + final int timeToSleep = 1000; + final List<String> results = new LinkedList<String>(); + long start = System.currentTimeMillis(); + + Scheduler scheduler = intpA.getScheduler(); + for (int i = 0; i < concurrency; i++) { + final String jobId = Integer.toString(i); + scheduler.submit(new Job(jobId, Integer.toString(i), null, 300) { + + @Override + public int progress() { + return 0; + } + + @Override + public Map<String, Object> info() { + return null; + } + + @Override + protected Object jobRun() throws Throwable { + String stmt = Integer.toString(timeToSleep); + InterpreterResult ret = intpA.interpret(stmt, new InterpreterContext( + jobId, + "title", + "text", + new HashMap<String, Object>(), + new GUI())); + + synchronized (results) { + results.add(ret.message()); + results.notify(); + } + return stmt; + } + + @Override + protected boolean jobAbort() { + return false; + } + + }); + } + + // wait for job finished + synchronized (results) { + while (results.size() != concurrency) { + results.wait(300); + } + } + + long end = System.currentTimeMillis(); + + assertTrue(end - start < timeToSleep * concurrency); + + intpA.close(); } } http://git-wip-us.apache.org/repos/asf/incubator-zeppelin/blob/ff85f793/zeppelin-interpreter/src/test/java/com/nflabs/zeppelin/interpreter/remote/mock/MockInterpreterA.java ---------------------------------------------------------------------- diff --git a/zeppelin-interpreter/src/test/java/com/nflabs/zeppelin/interpreter/remote/mock/MockInterpreterA.java b/zeppelin-interpreter/src/test/java/com/nflabs/zeppelin/interpreter/remote/mock/MockInterpreterA.java index 10b6ec9..1df3979 100644 --- a/zeppelin-interpreter/src/test/java/com/nflabs/zeppelin/interpreter/remote/mock/MockInterpreterA.java +++ b/zeppelin-interpreter/src/test/java/com/nflabs/zeppelin/interpreter/remote/mock/MockInterpreterA.java @@ -22,23 +22,31 @@ public class MockInterpreterA extends Interpreter { .add("p1", "v1", "property1").build()); } + + private String lastSt; + public MockInterpreterA(Properties property) { super(property); } @Override public void open() { - + //new RuntimeException().printStackTrace(); } @Override public void close() { } + public String getLastStatement() { + return lastSt; + } + @Override public InterpreterResult interpret(String st, InterpreterContext context) { try { Thread.sleep(Long.parseLong(st)); + this.lastSt = st; } catch (NumberFormatException | InterruptedException e) { throw new InterpreterException(e); } @@ -67,6 +75,10 @@ public class MockInterpreterA extends Interpreter { @Override public Scheduler getScheduler() { - return SchedulerFactory.singleton().createOrGetFIFOScheduler("interpreter_" + this.hashCode()); + if (getProperty("parallel") != null && getProperty("parallel").equals("true")) { + return SchedulerFactory.singleton().createOrGetParallelScheduler("interpreter_" + this.hashCode(), 10); + } else { + return SchedulerFactory.singleton().createOrGetFIFOScheduler("interpreter_" + this.hashCode()); + } } } http://git-wip-us.apache.org/repos/asf/incubator-zeppelin/blob/ff85f793/zeppelin-interpreter/src/test/java/com/nflabs/zeppelin/interpreter/remote/mock/MockInterpreterB.java ---------------------------------------------------------------------- diff --git a/zeppelin-interpreter/src/test/java/com/nflabs/zeppelin/interpreter/remote/mock/MockInterpreterB.java b/zeppelin-interpreter/src/test/java/com/nflabs/zeppelin/interpreter/remote/mock/MockInterpreterB.java index d8b2ce9..39f2ab8 100644 --- a/zeppelin-interpreter/src/test/java/com/nflabs/zeppelin/interpreter/remote/mock/MockInterpreterB.java +++ b/zeppelin-interpreter/src/test/java/com/nflabs/zeppelin/interpreter/remote/mock/MockInterpreterB.java @@ -10,6 +10,7 @@ import com.nflabs.zeppelin.interpreter.InterpreterGroup; import com.nflabs.zeppelin.interpreter.InterpreterPropertyBuilder; import com.nflabs.zeppelin.interpreter.InterpreterResult; import com.nflabs.zeppelin.interpreter.InterpreterResult.Code; +import com.nflabs.zeppelin.interpreter.WrappedInterpreter; import com.nflabs.zeppelin.scheduler.Scheduler; public class MockInterpreterB extends Interpreter { @@ -28,7 +29,7 @@ public class MockInterpreterB extends Interpreter { @Override public void open() { - + //new RuntimeException().printStackTrace(); } @Override @@ -37,12 +38,18 @@ public class MockInterpreterB extends Interpreter { @Override public InterpreterResult interpret(String st, InterpreterContext context) { + MockInterpreterA intpA = getInterpreterA(); + String intpASt = intpA.getLastStatement(); + long timeToSleep = Long.parseLong(st); + if (intpASt != null) { + timeToSleep += Long.parseLong(intpASt); + } try { - Thread.sleep(Long.parseLong(st)); + Thread.sleep(timeToSleep); } catch (NumberFormatException | InterruptedException e) { throw new InterpreterException(e); } - return new InterpreterResult(Code.SUCCESS, st); + return new InterpreterResult(Code.SUCCESS, Long.toString(timeToSleep)); } @Override @@ -65,6 +72,20 @@ public class MockInterpreterB extends Interpreter { return null; } + public MockInterpreterA getInterpreterA() { + InterpreterGroup interpreterGroup = getInterpreterGroup(); + for (Interpreter intp : interpreterGroup) { + if (intp.getClassName().equals(MockInterpreterA.class.getName())) { + Interpreter p = intp; + while (p instanceof WrappedInterpreter) { + p = ((WrappedInterpreter) p).getInnerInterpreter(); + } + return (MockInterpreterA) p; + } + } + return null; + } + @Override public Scheduler getScheduler() { InterpreterGroup interpreterGroup = getInterpreterGroup(); http://git-wip-us.apache.org/repos/asf/incubator-zeppelin/blob/ff85f793/zeppelin-interpreter/src/test/java/com/nflabs/zeppelin/scheduler/RemoteSchedulerTest.java ---------------------------------------------------------------------- diff --git a/zeppelin-interpreter/src/test/java/com/nflabs/zeppelin/scheduler/RemoteSchedulerTest.java b/zeppelin-interpreter/src/test/java/com/nflabs/zeppelin/scheduler/RemoteSchedulerTest.java index 8695038..35aa1d3 100644 --- a/zeppelin-interpreter/src/test/java/com/nflabs/zeppelin/scheduler/RemoteSchedulerTest.java +++ b/zeppelin-interpreter/src/test/java/com/nflabs/zeppelin/scheduler/RemoteSchedulerTest.java @@ -1,5 +1,7 @@ package com.nflabs.zeppelin.scheduler; +import static org.junit.Assert.assertEquals; + import java.io.File; import java.util.HashMap; import java.util.Map; @@ -53,7 +55,7 @@ public class RemoteSchedulerTest { intpA.getInterpreterProcess(), 10); - scheduler.submit(new Job("jobName", null) { + Job job = new Job("jobId", "jobName", null, 200) { @Override public int progress() { @@ -67,21 +69,34 @@ public class RemoteSchedulerTest { @Override protected Object jobRun() throws Throwable { - intpA.interpret("500", new InterpreterContext( - "id", + intpA.interpret("1000", new InterpreterContext( + "jobId", "title", "text", new HashMap<String, Object>(), new GUI())); - return "500"; + return "1000"; } @Override protected boolean jobAbort() { return false; } + }; + scheduler.submit(job); + + while (job.isRunning() == false) { + Thread.sleep(100); + } + + Thread.sleep(500); + assertEquals(0, scheduler.getJobsWaiting().size()); + assertEquals(1, scheduler.getJobsRunning().size()); + + Thread.sleep(500); - }); + assertEquals(0, scheduler.getJobsWaiting().size()); + assertEquals(0, scheduler.getJobsRunning().size()); intpA.close(); schedulerSvc.removeScheduler("test");
