http://git-wip-us.apache.org/repos/asf/zeppelin/blob/d6203c51/zeppelin-interpreter/src/main/java/org/apache/zeppelin/scheduler/Job.java ---------------------------------------------------------------------- diff --git a/zeppelin-interpreter/src/main/java/org/apache/zeppelin/scheduler/Job.java b/zeppelin-interpreter/src/main/java/org/apache/zeppelin/scheduler/Job.java index d0025d8..191902a 100644 --- a/zeppelin-interpreter/src/main/java/org/apache/zeppelin/scheduler/Job.java +++ b/zeppelin-interpreter/src/main/java/org/apache/zeppelin/scheduler/Job.java @@ -41,6 +41,7 @@ public abstract class Job { /** * Job status. * + * UNKNOWN - Job is not found in remote * READY - Job is not running, ready to run. * PENDING - Job is submitted to scheduler. but not running yet * RUNNING - Job is running. @@ -48,8 +49,8 @@ public abstract class Job { * ERROR - Job finished run. with error * ABORT - Job finished by abort */ - public static enum Status { - READY, PENDING, RUNNING, FINISHED, ERROR, ABORT; + public enum Status { + UNKNOWN, READY, PENDING, RUNNING, FINISHED, ERROR, ABORT; public boolean isReady() { return this == READY; @@ -70,14 +71,14 @@ public abstract class Job { Date dateCreated; Date dateStarted; Date dateFinished; - Status status; + volatile Status status; static Logger LOGGER = LoggerFactory.getLogger(Job.class); transient boolean aborted = false; - private String errorMessage; - private transient Throwable exception; + private volatile String errorMessage; + private transient volatile Throwable exception; private transient JobListener listener; private long progressUpdateIntervalMs;
http://git-wip-us.apache.org/repos/asf/zeppelin/blob/d6203c51/zeppelin-interpreter/src/main/java/org/apache/zeppelin/scheduler/RemoteScheduler.java ---------------------------------------------------------------------- diff --git a/zeppelin-interpreter/src/main/java/org/apache/zeppelin/scheduler/RemoteScheduler.java b/zeppelin-interpreter/src/main/java/org/apache/zeppelin/scheduler/RemoteScheduler.java deleted file mode 100644 index f9ddc4e..0000000 --- a/zeppelin-interpreter/src/main/java/org/apache/zeppelin/scheduler/RemoteScheduler.java +++ /dev/null @@ -1,426 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.zeppelin.scheduler; - -import org.apache.thrift.TException; -import org.apache.zeppelin.interpreter.InterpreterResult; -import org.apache.zeppelin.interpreter.InterpreterResult.Code; -import org.apache.zeppelin.interpreter.remote.RemoteInterpreterProcess; -import org.apache.zeppelin.interpreter.thrift.RemoteInterpreterService.Client; -import org.apache.zeppelin.scheduler.Job.Status; -import org.slf4j.Logger; -import org.slf4j.LoggerFactory; - -import java.util.Collection; -import java.util.Iterator; -import java.util.LinkedList; -import java.util.List; -import java.util.concurrent.ExecutorService; - -/** - * RemoteScheduler runs in ZeppelinServer and proxies Scheduler running on RemoteInterpreter - */ -public class RemoteScheduler implements Scheduler { - Logger logger = LoggerFactory.getLogger(RemoteScheduler.class); - - List<Job> queue = new LinkedList<>(); - List<Job> running = new LinkedList<>(); - private ExecutorService executor; - private SchedulerListener listener; - boolean terminate = false; - private String name; - private int maxConcurrency; - private final String noteId; - private RemoteInterpreterProcess interpreterProcess; - - public RemoteScheduler(String name, ExecutorService executor, String noteId, - RemoteInterpreterProcess interpreterProcess, SchedulerListener listener, - int maxConcurrency) { - this.name = name; - this.executor = executor; - this.listener = listener; - this.noteId = noteId; - this.interpreterProcess = interpreterProcess; - this.maxConcurrency = maxConcurrency; - } - - @Override - public void run() { - while (terminate == false) { - Job job = null; - - synchronized (queue) { - if (running.size() >= maxConcurrency || queue.isEmpty() == true) { - try { - queue.wait(500); - } catch (InterruptedException e) { - logger.error("Exception in RemoteScheduler while run queue.wait", e); - } - continue; - } - - job = queue.remove(0); - running.add(job); - } - - // run - Scheduler scheduler = this; - JobRunner jobRunner = new JobRunner(scheduler, job); - executor.execute(jobRunner); - - // wait until it is submitted to the remote - while (!jobRunner.isJobSubmittedInRemote()) { - synchronized (queue) { - try { - queue.wait(500); - } catch (InterruptedException e) { - logger.error("Exception in RemoteScheduler while jobRunner.isJobSubmittedInRemote " + - "queue.wait", e); - } - } - } - } - } - - @Override - public String getName() { - return name; - } - - @Override - public Collection<Job> getJobsWaiting() { - List<Job> ret = new LinkedList<>(); - synchronized (queue) { - for (Job job : queue) { - ret.add(job); - } - } - return ret; - } - - @Override - public Job removeFromWaitingQueue(String jobId) { - synchronized (queue) { - Iterator<Job> it = queue.iterator(); - while (it.hasNext()) { - Job job = it.next(); - if (job.getId().equals(jobId)) { - it.remove(); - return job; - } - } - } - return null; - } - - @Override - public Collection<Job> getJobsRunning() { - List<Job> ret = new LinkedList<>(); - synchronized (queue) { - for (Job job : running) { - ret.add(job); - } - } - return ret; - } - - @Override - public void submit(Job job) { - if (terminate) { - throw new RuntimeException("Scheduler already terminated"); - } - job.setStatus(Status.PENDING); - - synchronized (queue) { - queue.add(job); - queue.notify(); - } - } - - public void setMaxConcurrency(int maxConcurrency) { - this.maxConcurrency = maxConcurrency; - synchronized (queue) { - queue.notify(); - } - } - - /** - * Role of the class is get status info from remote process from PENDING to - * RUNNING status. - */ - private class JobStatusPoller extends Thread { - private long initialPeriodMsec; - private long initialPeriodCheckIntervalMsec; - private long checkIntervalMsec; - private boolean terminate; - private JobListener listener; - private Job job; - Status lastStatus; - - public JobStatusPoller(long initialPeriodMsec, - long initialPeriodCheckIntervalMsec, long checkIntervalMsec, Job job, - JobListener listener) { - this.initialPeriodMsec = initialPeriodMsec; - this.initialPeriodCheckIntervalMsec = initialPeriodCheckIntervalMsec; - this.checkIntervalMsec = checkIntervalMsec; - this.job = job; - this.listener = listener; - this.terminate = false; - } - - @Override - public void run() { - long started = System.currentTimeMillis(); - while (terminate == false) { - long current = System.currentTimeMillis(); - long interval; - if (current - started < initialPeriodMsec) { - interval = initialPeriodCheckIntervalMsec; - } else { - interval = checkIntervalMsec; - } - - synchronized (this) { - try { - this.wait(interval); - } catch (InterruptedException e) { - logger.error("Exception in RemoteScheduler while run this.wait", e); - } - } - - if (terminate) { - // terminated by shutdown - break; - } - - Status newStatus = getStatus(); - if (newStatus == null) { // unknown - continue; - } - - if (newStatus != Status.READY && newStatus != Status.PENDING) { - // we don't need more - break; - } - } - terminate = true; - } - - public void shutdown() { - terminate = true; - synchronized (this) { - this.notify(); - } - } - - - private Status getLastStatus() { - if (terminate == true) { - if (lastStatus != Status.FINISHED && - lastStatus != Status.ERROR && - lastStatus != Status.ABORT) { - return Status.FINISHED; - } else { - return (lastStatus == null) ? Status.FINISHED : lastStatus; - } - } else { - return (lastStatus == null) ? Status.FINISHED : lastStatus; - } - } - - public synchronized Job.Status getStatus() { - if (interpreterProcess.referenceCount() <= 0) { - return getLastStatus(); - } - - Client client; - try { - client = interpreterProcess.getClient(); - } catch (Exception e) { - logger.error("Can't get status information", e); - lastStatus = Status.ERROR; - return Status.ERROR; - } - - boolean broken = false; - try { - String statusStr = client.getStatus(noteId, job.getId()); - if ("Unknown".equals(statusStr)) { - // not found this job in the remote schedulers. - // maybe not submitted, maybe already finished - //Status status = getLastStatus(); - listener.afterStatusChange(job, null, null); - return job.getStatus(); - } - Status status = Status.valueOf(statusStr); - lastStatus = status; - listener.afterStatusChange(job, null, status); - return status; - } catch (TException e) { - broken = true; - logger.error("Can't get status information", e); - lastStatus = Status.ERROR; - return Status.ERROR; - } catch (Exception e) { - logger.error("Unknown status", e); - lastStatus = Status.ERROR; - return Status.ERROR; - } finally { - interpreterProcess.releaseClient(client, broken); - } - } - } - - private class JobRunner implements Runnable, JobListener { - private Scheduler scheduler; - private Job job; - private boolean jobExecuted; - boolean jobSubmittedRemotely; - - public JobRunner(Scheduler scheduler, Job job) { - this.scheduler = scheduler; - this.job = job; - jobExecuted = false; - jobSubmittedRemotely = false; - } - - public boolean isJobSubmittedInRemote() { - return jobSubmittedRemotely; - } - - @Override - public void run() { - if (job.isAborted()) { - synchronized (queue) { - job.setStatus(Status.ABORT); - job.aborted = false; - - running.remove(job); - queue.notify(); - } - jobSubmittedRemotely = true; - - return; - } - - JobStatusPoller jobStatusPoller = new JobStatusPoller(1500, 100, 500, - job, this); - jobStatusPoller.start(); - - if (listener != null) { - listener.jobStarted(scheduler, job); - } - job.run(); - - jobExecuted = true; - jobSubmittedRemotely = true; - - jobStatusPoller.shutdown(); - try { - jobStatusPoller.join(); - } catch (InterruptedException e) { - logger.error("JobStatusPoller interrupted", e); - } - - // set job status based on result. - Status lastStatus = jobStatusPoller.getStatus(); - Object jobResult = job.getReturn(); - if (jobResult != null && jobResult instanceof InterpreterResult) { - if (((InterpreterResult) jobResult).code() == Code.ERROR) { - lastStatus = Status.ERROR; - } - } - if (job.getException() != null) { - lastStatus = Status.ERROR; - } - - synchronized (queue) { - job.setStatus(lastStatus); - - if (listener != null) { - listener.jobFinished(scheduler, job); - } - - // reset aborted flag to allow retry - job.aborted = false; - - running.remove(job); - queue.notify(); - } - } - - @Override - public void onProgressUpdate(Job job, int progress) { - } - - @Override - public void beforeStatusChange(Job job, Status before, Status after) { - } - - @Override - public void afterStatusChange(Job job, Status before, Status after) { - if (after == null) { // unknown. maybe before sumitted remotely, maybe already finished. - if (jobExecuted) { - jobSubmittedRemotely = true; - Object jobResult = job.getReturn(); - if (job.isAborted()) { - job.setStatus(Status.ABORT); - } else if (job.getException() != null) { - job.setStatus(Status.ERROR); - } else if (jobResult != null && jobResult instanceof InterpreterResult - && ((InterpreterResult) jobResult).code() == Code.ERROR) { - job.setStatus(Status.ERROR); - } else { - job.setStatus(Status.FINISHED); - } - } - return; - } - - - // Update remoteStatus - if (jobExecuted == false) { - if (after == Status.FINISHED || after == Status.ABORT - || after == Status.ERROR) { - // it can be status of last run. - // so not updating the remoteStatus - return; - } else if (after == Status.RUNNING) { - jobSubmittedRemotely = true; - } - } else { - jobSubmittedRemotely = true; - } - - // status polled by status poller - if (job.getStatus() != after) { - job.setStatus(after); - } - } - } - - @Override - public void stop() { - terminate = true; - synchronized (queue) { - queue.notify(); - } - - } - -} http://git-wip-us.apache.org/repos/asf/zeppelin/blob/d6203c51/zeppelin-interpreter/src/main/java/org/apache/zeppelin/scheduler/SchedulerFactory.java ---------------------------------------------------------------------- diff --git a/zeppelin-interpreter/src/main/java/org/apache/zeppelin/scheduler/SchedulerFactory.java b/zeppelin-interpreter/src/main/java/org/apache/zeppelin/scheduler/SchedulerFactory.java index af52dec..b629ef7 100644 --- a/zeppelin-interpreter/src/main/java/org/apache/zeppelin/scheduler/SchedulerFactory.java +++ b/zeppelin-interpreter/src/main/java/org/apache/zeppelin/scheduler/SchedulerFactory.java @@ -17,24 +17,21 @@ package org.apache.zeppelin.scheduler; -import java.util.Collection; import java.util.LinkedHashMap; -import java.util.LinkedList; -import java.util.List; import java.util.Map; import java.util.concurrent.ExecutorService; -import org.apache.zeppelin.interpreter.remote.RemoteInterpreterProcess; import org.slf4j.Logger; import org.slf4j.LoggerFactory; /** - * TODO(moon) : add description. + * Factory class for creating schedulers + * */ public class SchedulerFactory implements SchedulerListener { private static final Logger logger = LoggerFactory.getLogger(SchedulerFactory.class); - ExecutorService executor; - Map<String, Scheduler> schedulers = new LinkedHashMap<>(); + protected ExecutorService executor; + protected Map<String, Scheduler> schedulers = new LinkedHashMap<>(); private static SchedulerFactory singleton; private static Long singletonLock = new Long(0); @@ -54,17 +51,17 @@ public class SchedulerFactory implements SchedulerListener { return singleton; } - public SchedulerFactory() throws Exception { - executor = ExecutorFactory.singleton().createOrGet("schedulerFactory", 100); + SchedulerFactory() throws Exception { + executor = ExecutorFactory.singleton().createOrGet("SchedulerFactory", 100); } public void destroy() { - ExecutorFactory.singleton().shutdown("schedulerFactory"); + ExecutorFactory.singleton().shutdown("SchedulerFactory"); } public Scheduler createOrGetFIFOScheduler(String name) { synchronized (schedulers) { - if (schedulers.containsKey(name) == false) { + if (!schedulers.containsKey(name)) { Scheduler s = new FIFOScheduler(name, executor, this); schedulers.put(name, s); executor.execute(s); @@ -75,7 +72,7 @@ public class SchedulerFactory implements SchedulerListener { public Scheduler createOrGetParallelScheduler(String name, int maxConcurrency) { synchronized (schedulers) { - if (schedulers.containsKey(name) == false) { + if (!schedulers.containsKey(name)) { Scheduler s = new ParallelScheduler(name, executor, this, maxConcurrency); schedulers.put(name, s); executor.execute(s); @@ -84,60 +81,38 @@ public class SchedulerFactory implements SchedulerListener { } } - public Scheduler createOrGetRemoteScheduler( - String name, - String noteId, - RemoteInterpreterProcess interpreterProcess, - int maxConcurrency) { - + public Scheduler createOrGetScheduler(Scheduler scheduler) { synchronized (schedulers) { - if (schedulers.containsKey(name) == false) { - Scheduler s = new RemoteScheduler( - name, - executor, - noteId, - interpreterProcess, - this, - maxConcurrency); - schedulers.put(name, s); - executor.execute(s); + if (!schedulers.containsKey(scheduler.getName())) { + schedulers.put(scheduler.getName(), scheduler); + executor.execute(scheduler); } - return schedulers.get(name); + return schedulers.get(scheduler.getName()); } } - public Scheduler removeScheduler(String name) { + public void removeScheduler(String name) { synchronized (schedulers) { Scheduler s = schedulers.remove(name); if (s != null) { s.stop(); } } - return null; } - public Collection<Scheduler> listScheduler(String name) { - List<Scheduler> s = new LinkedList<>(); - synchronized (schedulers) { - for (Scheduler ss : schedulers.values()) { - s.add(ss); - } - } - return s; + public ExecutorService getExecutor() { + return executor; } @Override public void jobStarted(Scheduler scheduler, Job job) { - logger.info("Job " + job.getJobName() + " started by scheduler " + scheduler.getName()); + logger.info("Job " + job.getId() + " started by scheduler " + scheduler.getName()); } @Override public void jobFinished(Scheduler scheduler, Job job) { - logger.info("Job " + job.getJobName() + " finished by scheduler " + scheduler.getName()); + logger.info("Job " + job.getId() + " finished by scheduler " + scheduler.getName()); } - - - } http://git-wip-us.apache.org/repos/asf/zeppelin/blob/d6203c51/zeppelin-interpreter/src/main/java/org/apache/zeppelin/tabledata/TableDataProxy.java ---------------------------------------------------------------------- diff --git a/zeppelin-interpreter/src/main/java/org/apache/zeppelin/tabledata/TableDataProxy.java b/zeppelin-interpreter/src/main/java/org/apache/zeppelin/tabledata/TableDataProxy.java index 8673476..1926528 100644 --- a/zeppelin-interpreter/src/main/java/org/apache/zeppelin/tabledata/TableDataProxy.java +++ b/zeppelin-interpreter/src/main/java/org/apache/zeppelin/tabledata/TableDataProxy.java @@ -17,7 +17,6 @@ package org.apache.zeppelin.tabledata; import org.apache.zeppelin.resource.Resource; -import org.apache.zeppelin.resource.ResourcePoolUtils; import java.util.Iterator; http://git-wip-us.apache.org/repos/asf/zeppelin/blob/d6203c51/zeppelin-interpreter/src/main/java/org/apache/zeppelin/util/IdHashes.java ---------------------------------------------------------------------- diff --git a/zeppelin-interpreter/src/main/java/org/apache/zeppelin/util/IdHashes.java b/zeppelin-interpreter/src/main/java/org/apache/zeppelin/util/IdHashes.java new file mode 100644 index 0000000..14c03a1 --- /dev/null +++ b/zeppelin-interpreter/src/main/java/org/apache/zeppelin/util/IdHashes.java @@ -0,0 +1,76 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.zeppelin.util; + +import java.math.BigInteger; +import java.util.ArrayList; +import java.util.List; +import java.util.Random; + +/** + * Generate Tiny ID. + */ +public class IdHashes { + private static final char[] DICTIONARY = new char[] {'1', '2', '3', '4', '5', '6', '7', '8', '9', + 'A', 'B', 'C', 'D', 'E', 'F', 'G', 'H', 'J', 'K', 'M', 'N', 'P', 'Q', 'R', 'S', 'T', 'U', 'V', + 'W', 'X', 'Y', 'Z'}; + + /** + * encodes the given string into the base of the dictionary provided in the constructor. + * + * @param value the number to encode. + * @return the encoded string. + */ + private static String encode(Long value) { + + List<Character> result = new ArrayList<>(); + BigInteger base = new BigInteger("" + DICTIONARY.length); + int exponent = 1; + BigInteger remaining = new BigInteger(value.toString()); + while (true) { + BigInteger a = base.pow(exponent); // 16^1 = 16 + BigInteger b = remaining.mod(a); // 119 % 16 = 7 | 112 % 256 = 112 + BigInteger c = base.pow(exponent - 1); + BigInteger d = b.divide(c); + + // if d > dictionary.length, we have a problem. but BigInteger doesnt have + // a greater than method :-( hope for the best. theoretically, d is always + // an index of the dictionary! + result.add(DICTIONARY[d.intValue()]); + remaining = remaining.subtract(b); // 119 - 7 = 112 | 112 - 112 = 0 + + // finished? + if (remaining.equals(BigInteger.ZERO)) { + break; + } + + exponent++; + } + + // need to reverse it, since the start of the list contains the least significant values + StringBuffer sb = new StringBuffer(); + for (int i = result.size() - 1; i >= 0; i--) { + sb.append(result.get(i)); + } + return sb.toString(); + } + + public static String generateId() { + return encode(System.currentTimeMillis() + new Random().nextInt()); + } +} http://git-wip-us.apache.org/repos/asf/zeppelin/blob/d6203c51/zeppelin-interpreter/src/main/java/org/apache/zeppelin/util/Util.java ---------------------------------------------------------------------- diff --git a/zeppelin-interpreter/src/main/java/org/apache/zeppelin/util/Util.java b/zeppelin-interpreter/src/main/java/org/apache/zeppelin/util/Util.java new file mode 100644 index 0000000..6153f49 --- /dev/null +++ b/zeppelin-interpreter/src/main/java/org/apache/zeppelin/util/Util.java @@ -0,0 +1,76 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.zeppelin.util; + +import org.apache.commons.lang.StringUtils; + +import java.io.IOException; +import java.util.Properties; + +/** + * TODO(moon) : add description. + */ +public class Util { + private static final String PROJECT_PROPERTIES_VERSION_KEY = "version"; + private static final String GIT_PROPERTIES_COMMIT_ID_KEY = "git.commit.id.abbrev"; + private static final String GIT_PROPERTIES_COMMIT_TS_KEY = "git.commit.time"; + + private static Properties projectProperties; + private static Properties gitProperties; + + static { + projectProperties = new Properties(); + gitProperties = new Properties(); + try { + projectProperties.load(Util.class.getResourceAsStream("/project.properties")); + gitProperties.load(Util.class.getResourceAsStream("/git.properties")); + } catch (IOException e) { + //Fail to read project.properties + } + } + + /** + * Get Zeppelin version + * + * @return Current Zeppelin version + */ + public static String getVersion() { + return StringUtils.defaultIfEmpty(projectProperties.getProperty(PROJECT_PROPERTIES_VERSION_KEY), + StringUtils.EMPTY); + } + + /** + * Get Zeppelin Git latest commit id + * + * @return Latest Zeppelin commit id + */ + public static String getGitCommitId() { + return StringUtils.defaultIfEmpty(gitProperties.getProperty(GIT_PROPERTIES_COMMIT_ID_KEY), + StringUtils.EMPTY); + } + + /** + * Get Zeppelin Git latest commit timestamp + * + * @return Latest Zeppelin commit timestamp + */ + public static String getGitTimestamp() { + return StringUtils.defaultIfEmpty(gitProperties.getProperty(GIT_PROPERTIES_COMMIT_TS_KEY), + StringUtils.EMPTY); + } +} http://git-wip-us.apache.org/repos/asf/zeppelin/blob/d6203c51/zeppelin-interpreter/src/test/java/org/apache/zeppelin/interpreter/DummyInterpreter.java ---------------------------------------------------------------------- diff --git a/zeppelin-interpreter/src/test/java/org/apache/zeppelin/interpreter/DummyInterpreter.java b/zeppelin-interpreter/src/test/java/org/apache/zeppelin/interpreter/DummyInterpreter.java deleted file mode 100644 index a7a6eb9..0000000 --- a/zeppelin-interpreter/src/test/java/org/apache/zeppelin/interpreter/DummyInterpreter.java +++ /dev/null @@ -1,43 +0,0 @@ -package org.apache.zeppelin.interpreter; - -import java.util.Properties; - -/** - * - */ -public class DummyInterpreter extends Interpreter { - - public DummyInterpreter(Properties property) { - super(property); - } - - @Override - public void open() { - - } - - @Override - public void close() { - - } - - @Override - public InterpreterResult interpret(String st, InterpreterContext context) { - return null; - } - - @Override - public void cancel(InterpreterContext context) { - - } - - @Override - public FormType getFormType() { - return null; - } - - @Override - public int getProgress(InterpreterContext context) { - return 0; - } -} http://git-wip-us.apache.org/repos/asf/zeppelin/blob/d6203c51/zeppelin-interpreter/src/test/java/org/apache/zeppelin/interpreter/InterpreterOutputChangeWatcherTest.java ---------------------------------------------------------------------- diff --git a/zeppelin-interpreter/src/test/java/org/apache/zeppelin/interpreter/InterpreterOutputChangeWatcherTest.java b/zeppelin-interpreter/src/test/java/org/apache/zeppelin/interpreter/InterpreterOutputChangeWatcherTest.java index e376809..f3a30fb 100644 --- a/zeppelin-interpreter/src/test/java/org/apache/zeppelin/interpreter/InterpreterOutputChangeWatcherTest.java +++ b/zeppelin-interpreter/src/test/java/org/apache/zeppelin/interpreter/InterpreterOutputChangeWatcherTest.java @@ -21,6 +21,7 @@ import static org.junit.Assert.*; import java.io.File; import java.io.FileOutputStream; import java.io.IOException; +import java.util.concurrent.atomic.AtomicInteger; import org.junit.After; import org.junit.Before; @@ -29,7 +30,7 @@ import org.junit.Test; public class InterpreterOutputChangeWatcherTest implements InterpreterOutputChangeListener { private File tmpDir; private File fileChanged; - private int numChanged; + private AtomicInteger numChanged; private InterpreterOutputChangeWatcher watcher; @Before @@ -40,7 +41,7 @@ public class InterpreterOutputChangeWatcherTest implements InterpreterOutputChan tmpDir = new File(System.getProperty("java.io.tmpdir")+"/ZeppelinLTest_"+System.currentTimeMillis()); tmpDir.mkdirs(); fileChanged = null; - numChanged = 0; + numChanged = new AtomicInteger(0); } @After @@ -66,7 +67,7 @@ public class InterpreterOutputChangeWatcherTest implements InterpreterOutputChan @Test public void test() throws IOException, InterruptedException { assertNull(fileChanged); - assertEquals(0, numChanged); + assertEquals(0, numChanged.get()); Thread.sleep(1000); // create new file @@ -92,14 +93,14 @@ public class InterpreterOutputChangeWatcherTest implements InterpreterOutputChan } assertNotNull(fileChanged); - assertEquals(1, numChanged); + assertEquals(1, numChanged.get()); } @Override public void fileChanged(File file) { fileChanged = file; - numChanged++; + numChanged.incrementAndGet(); synchronized(this) { notify(); http://git-wip-us.apache.org/repos/asf/zeppelin/blob/d6203c51/zeppelin-interpreter/src/test/java/org/apache/zeppelin/interpreter/InterpreterTest.java ---------------------------------------------------------------------- diff --git a/zeppelin-interpreter/src/test/java/org/apache/zeppelin/interpreter/InterpreterTest.java b/zeppelin-interpreter/src/test/java/org/apache/zeppelin/interpreter/InterpreterTest.java index 305268c..31c9225 100644 --- a/zeppelin-interpreter/src/test/java/org/apache/zeppelin/interpreter/InterpreterTest.java +++ b/zeppelin-interpreter/src/test/java/org/apache/zeppelin/interpreter/InterpreterTest.java @@ -24,6 +24,7 @@ import org.junit.Test; import static org.junit.Assert.assertEquals; +//TODO(zjffdu) add more test for Interpreter which is a very important class public class InterpreterTest { @Test @@ -85,4 +86,41 @@ public class InterpreterTest { ); } + public static class DummyInterpreter extends Interpreter { + + public DummyInterpreter(Properties property) { + super(property); + } + + @Override + public void open() { + + } + + @Override + public void close() { + + } + + @Override + public InterpreterResult interpret(String st, InterpreterContext context) { + return null; + } + + @Override + public void cancel(InterpreterContext context) { + + } + + @Override + public FormType getFormType() { + return null; + } + + @Override + public int getProgress(InterpreterContext context) { + return 0; + } + } + } http://git-wip-us.apache.org/repos/asf/zeppelin/blob/d6203c51/zeppelin-interpreter/src/test/java/org/apache/zeppelin/interpreter/remote/RemoteInterpreterUtilsTest.java ---------------------------------------------------------------------- diff --git a/zeppelin-interpreter/src/test/java/org/apache/zeppelin/interpreter/remote/RemoteInterpreterUtilsTest.java b/zeppelin-interpreter/src/test/java/org/apache/zeppelin/interpreter/remote/RemoteInterpreterUtilsTest.java new file mode 100644 index 0000000..5f7426a --- /dev/null +++ b/zeppelin-interpreter/src/test/java/org/apache/zeppelin/interpreter/remote/RemoteInterpreterUtilsTest.java @@ -0,0 +1,33 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.zeppelin.interpreter.remote; + +import org.junit.Test; + +import java.io.IOException; + +import static org.junit.Assert.assertTrue; + +public class RemoteInterpreterUtilsTest { + + @Test + public void testFindRandomAvailablePortOnAllLocalInterfaces() throws IOException { + assertTrue(RemoteInterpreterUtils.findRandomAvailablePortOnAllLocalInterfaces() > 0); + } + +} http://git-wip-us.apache.org/repos/asf/zeppelin/blob/d6203c51/zeppelin-interpreter/src/test/resources/conf/interpreter.json ---------------------------------------------------------------------- diff --git a/zeppelin-interpreter/src/test/resources/conf/interpreter.json b/zeppelin-interpreter/src/test/resources/conf/interpreter.json new file mode 100644 index 0000000..45e1d60 --- /dev/null +++ b/zeppelin-interpreter/src/test/resources/conf/interpreter.json @@ -0,0 +1,115 @@ +{ + "interpreterSettings": { + "2C3RWCVAG": { + "id": "2C3RWCVAG", + "name": "test", + "group": "test", + "properties": { + "property_1": "value_1", + "property_2": "new_value_2", + "property_3": "value_3" + }, + "status": "READY", + "interpreterGroup": [ + { + "name": "echo", + "class": "org.apache.zeppelin.interpreter.EchoInterpreter", + "defaultInterpreter": true, + "editor": { + "language": "java", + "editOnDblClick": false + } + } + ], + "dependencies": [], + "option": { + "remote": true, + "port": -1, + "perNote": "shared", + "perUser": "shared", + "isExistingProcess": false, + "setPermission": false, + "users": [], + "isUserImpersonate": false + } + }, + + "2CKWE7B19": { + "id": "2CKWE7B19", + "name": "test2", + "group": "test", + "properties": { + "property_1": "value_1", + "property_2": "new_value_2", + "property_3": "value_3" + }, + "status": "READY", + "interpreterGroup": [ + { + "name": "echo", + "class": "org.apache.zeppelin.interpreter.EchoInterpreter", + "defaultInterpreter": true, + "editor": { + "language": "java", + "editOnDblClick": false + } + } + ], + "dependencies": [], + "option": { + "remote": true, + "port": -1, + "perNote": "shared", + "perUser": "shared", + "isExistingProcess": false, + "setPermission": false, + "users": [], + "isUserImpersonate": false + } + } + }, + "interpreterBindings": { + "2C6793KRV": [ + "2C48Y7FSJ", + "2C63XW4XE", + "2C66GE1VB", + "2C5VH924X", + "2C4BJDRRZ", + "2C3SQSB7V", + "2C4HKDCQW", + "2C3DR183X", + "2C66Z9XPQ", + "2C3PTPMUH", + "2C69WE69N", + "2C5SRRXHM", + "2C4ZD49PF", + "2C6V3D44K", + "2C4UB1UZA", + "2C5S1R21W", + "2C5DCRVGM", + "2C686X8ZH", + "2C3RWCVAG", + "2C3JKFMJU", + "2C3VECEG2" + ] + }, + "interpreterRepositories": [ + { + "id": "central", + "type": "default", + "url": "http://repo1.maven.org/maven2/", + "releasePolicy": { + "enabled": true, + "updatePolicy": "daily", + "checksumPolicy": "warn" + }, + "snapshotPolicy": { + "enabled": true, + "updatePolicy": "daily", + "checksumPolicy": "warn" + }, + "mirroredRepositories": [], + "repositoryManager": false + } + ] +} \ No newline at end of file http://git-wip-us.apache.org/repos/asf/zeppelin/blob/d6203c51/zeppelin-interpreter/src/test/resources/interpreter/test/interpreter-setting.json ---------------------------------------------------------------------- diff --git a/zeppelin-interpreter/src/test/resources/interpreter/test/interpreter-setting.json b/zeppelin-interpreter/src/test/resources/interpreter/test/interpreter-setting.json new file mode 100644 index 0000000..1ba1b94 --- /dev/null +++ b/zeppelin-interpreter/src/test/resources/interpreter/test/interpreter-setting.json @@ -0,0 +1,42 @@ +[ + { + "group": "test", + "name": "double_echo", + "className": "org.apache.zeppelin.interpreter.DoubleEchoInterpreter", + "properties": { + "property_1": { + "envName": "PROPERTY_1", + "propertyName": "property_1", + "defaultValue": "value_1", + "description": "desc_1" + }, + "property_2": { + "envName": "PROPERTY_2", + "propertyName": "property_2", + "defaultValue": "value_2", + "description": "desc_2" + } + } + }, + + { + "group": "test", + "name": "echo", + "defaultInterpreter": true, + "className": "org.apache.zeppelin.interpreter.EchoInterpreter", + "properties": { + "property_1": { + "envName": "PROPERTY_1", + "propertyName": "property_1", + "defaultValue": "value_1", + "description": "desc_1" + }, + "property_2": { + "envName": "PROPERTY_2", + "propertyName": "property_2", + "defaultValue": "value_2", + "description": "desc_2" + } + } + } +] http://git-wip-us.apache.org/repos/asf/zeppelin/blob/d6203c51/zeppelin-interpreter/src/test/resources/log4j.properties ---------------------------------------------------------------------- diff --git a/zeppelin-interpreter/src/test/resources/log4j.properties b/zeppelin-interpreter/src/test/resources/log4j.properties index d8a7839..6f34691 100644 --- a/zeppelin-interpreter/src/test/resources/log4j.properties +++ b/zeppelin-interpreter/src/test/resources/log4j.properties @@ -26,4 +26,6 @@ log4j.appender.stdout.layout.ConversionPattern=%d{ABSOLUTE} %5p %c:%L - %m%n # # Root logger option -log4j.rootLogger=INFO, stdout \ No newline at end of file +log4j.rootLogger=INFO, stdout +log4j.logger.org.apache.zeppelin.interpreter=DEBUG +log4j.logger.org.apache.zeppelin.scheduler=DEBUG \ No newline at end of file http://git-wip-us.apache.org/repos/asf/zeppelin/blob/d6203c51/zeppelin-server/src/main/java/org/apache/zeppelin/rest/InterpreterRestApi.java ---------------------------------------------------------------------- diff --git a/zeppelin-server/src/main/java/org/apache/zeppelin/rest/InterpreterRestApi.java b/zeppelin-server/src/main/java/org/apache/zeppelin/rest/InterpreterRestApi.java index cd0210e..c1dba5c 100644 --- a/zeppelin-server/src/main/java/org/apache/zeppelin/rest/InterpreterRestApi.java +++ b/zeppelin-server/src/main/java/org/apache/zeppelin/rest/InterpreterRestApi.java @@ -185,7 +185,7 @@ public class InterpreterRestApi { String noteId = request == null ? null : request.getNoteId(); if (null == noteId) { - interpreterSettingManager.close(setting); + interpreterSettingManager.close(settingId); } else { interpreterSettingManager.restart(settingId, noteId, SecurityUtils.getPrincipal()); } @@ -208,7 +208,7 @@ public class InterpreterRestApi { @GET @ZeppelinApi public Response listInterpreter(String message) { - Map<String, InterpreterSetting> m = interpreterSettingManager.getAvailableInterpreterSettings(); + Map<String, InterpreterSetting> m = interpreterSettingManager.getInterpreterSettingTemplates(); return new JsonResponse<>(Status.OK, "", m).build(); } http://git-wip-us.apache.org/repos/asf/zeppelin/blob/d6203c51/zeppelin-server/src/main/java/org/apache/zeppelin/server/ZeppelinServer.java ---------------------------------------------------------------------- diff --git a/zeppelin-server/src/main/java/org/apache/zeppelin/server/ZeppelinServer.java b/zeppelin-server/src/main/java/org/apache/zeppelin/server/ZeppelinServer.java index 7453470..c103eeb 100644 --- a/zeppelin-server/src/main/java/org/apache/zeppelin/server/ZeppelinServer.java +++ b/zeppelin-server/src/main/java/org/apache/zeppelin/server/ZeppelinServer.java @@ -31,12 +31,10 @@ import org.apache.shiro.web.env.EnvironmentLoaderListener; import org.apache.shiro.web.servlet.ShiroFilter; import org.apache.zeppelin.conf.ZeppelinConfiguration; import org.apache.zeppelin.conf.ZeppelinConfiguration.ConfVars; -import org.apache.zeppelin.dep.DependencyResolver; import org.apache.zeppelin.helium.Helium; import org.apache.zeppelin.helium.HeliumApplicationFactory; import org.apache.zeppelin.helium.HeliumBundleFactory; import org.apache.zeppelin.interpreter.InterpreterFactory; -import org.apache.zeppelin.interpreter.InterpreterOption; import org.apache.zeppelin.interpreter.InterpreterOutput; import org.apache.zeppelin.interpreter.InterpreterSettingManager; import org.apache.zeppelin.notebook.Notebook; @@ -93,13 +91,11 @@ public class ZeppelinServer extends Application { private NotebookRepoSync notebookRepo; private NotebookAuthorization notebookAuthorization; private Credentials credentials; - private DependencyResolver depResolver; public ZeppelinServer() throws Exception { ZeppelinConfiguration conf = ZeppelinConfiguration.create(); - this.depResolver = new DependencyResolver( - conf.getString(ConfVars.ZEPPELIN_INTERPRETER_LOCALREPO)); + InterpreterOutput.limit = conf.getInt(ConfVars.ZEPPELIN_INTERPRETER_OUTPUT_LIMIT); @@ -129,13 +125,26 @@ public class ZeppelinServer extends Application { new File(conf.getRelativeDir("zeppelin-web/src/app/spell"))); } + this.schedulerFactory = SchedulerFactory.singleton(); + this.interpreterSettingManager = new InterpreterSettingManager(conf, notebookWsServer, + notebookWsServer, notebookWsServer); + this.replFactory = new InterpreterFactory(interpreterSettingManager); + this.notebookRepo = new NotebookRepoSync(conf); + this.noteSearchService = new LuceneSearch(); + this.notebookAuthorization = NotebookAuthorization.init(conf); + this.credentials = new Credentials(conf.credentialsPersist(), conf.getCredentialsPath()); + notebook = new Notebook(conf, + notebookRepo, schedulerFactory, replFactory, interpreterSettingManager, notebookWsServer, + noteSearchService, notebookAuthorization, credentials); + ZeppelinServer.helium = new Helium( conf.getHeliumConfPath(), conf.getHeliumRegistry(), new File(conf.getRelativeDir(ConfVars.ZEPPELIN_DEP_LOCALREPO), "helium-registry-cache"), heliumBundleFactory, - heliumApplicationFactory); + heliumApplicationFactory, + interpreterSettingManager); // create bundle try { @@ -144,20 +153,6 @@ public class ZeppelinServer extends Application { LOG.error(e.getMessage(), e); } - this.schedulerFactory = new SchedulerFactory(); - this.interpreterSettingManager = new InterpreterSettingManager(conf, depResolver, - new InterpreterOption(true)); - this.replFactory = new InterpreterFactory(conf, notebookWsServer, - notebookWsServer, heliumApplicationFactory, depResolver, SecurityUtils.isAuthenticated(), - interpreterSettingManager); - this.notebookRepo = new NotebookRepoSync(conf); - this.noteSearchService = new LuceneSearch(); - this.notebookAuthorization = NotebookAuthorization.init(conf); - this.credentials = new Credentials(conf.credentialsPersist(), conf.getCredentialsPath()); - notebook = new Notebook(conf, - notebookRepo, schedulerFactory, replFactory, interpreterSettingManager, notebookWsServer, - noteSearchService, notebookAuthorization, credentials); - // to update notebook from application event from remote process. heliumApplicationFactory.setNotebook(notebook); // to update fire websocket event on application event. @@ -206,7 +201,7 @@ public class ZeppelinServer extends Application { LOG.info("Shutting down Zeppelin Server ... "); try { jettyWebServer.stop(); - notebook.getInterpreterSettingManager().shutdown(); + notebook.getInterpreterSettingManager().close(); notebook.close(); Thread.sleep(3000); } catch (Exception e) { http://git-wip-us.apache.org/repos/asf/zeppelin/blob/d6203c51/zeppelin-server/src/main/java/org/apache/zeppelin/socket/NotebookServer.java ---------------------------------------------------------------------- diff --git a/zeppelin-server/src/main/java/org/apache/zeppelin/socket/NotebookServer.java b/zeppelin-server/src/main/java/org/apache/zeppelin/socket/NotebookServer.java index f0e0bb2..2e3a5c7 100644 --- a/zeppelin-server/src/main/java/org/apache/zeppelin/socket/NotebookServer.java +++ b/zeppelin-server/src/main/java/org/apache/zeppelin/socket/NotebookServer.java @@ -41,12 +41,7 @@ import org.apache.zeppelin.display.AngularObjectRegistryListener; import org.apache.zeppelin.display.Input; import org.apache.zeppelin.helium.ApplicationEventListener; import org.apache.zeppelin.helium.HeliumPackage; -import org.apache.zeppelin.interpreter.Interpreter; -import org.apache.zeppelin.interpreter.InterpreterContextRunner; -import org.apache.zeppelin.interpreter.InterpreterGroup; -import org.apache.zeppelin.interpreter.InterpreterResult; -import org.apache.zeppelin.interpreter.InterpreterResultMessage; -import org.apache.zeppelin.interpreter.InterpreterSetting; +import org.apache.zeppelin.interpreter.*; import org.apache.zeppelin.interpreter.remote.RemoteAngularObjectRegistry; import org.apache.zeppelin.interpreter.remote.RemoteInterpreterProcessListener; import org.apache.zeppelin.interpreter.thrift.InterpreterCompletion; @@ -59,7 +54,6 @@ import org.apache.zeppelin.notebook.NotebookEventListener; import org.apache.zeppelin.notebook.NotebookImportDeserializer; import org.apache.zeppelin.notebook.Paragraph; import org.apache.zeppelin.notebook.ParagraphJobListener; -import org.apache.zeppelin.notebook.ParagraphRuntimeInfo; import org.apache.zeppelin.notebook.repo.NotebookRepo.Revision; import org.apache.zeppelin.notebook.socket.Message; import org.apache.zeppelin.notebook.socket.Message.OP; @@ -86,8 +80,6 @@ import org.slf4j.LoggerFactory; import com.google.common.collect.Queues; import com.google.gson.Gson; import com.google.gson.GsonBuilder; -import com.google.gson.JsonElement; -import com.google.gson.JsonObject; import com.google.gson.reflect.TypeToken; /** @@ -463,7 +455,8 @@ public class NotebookServer extends WebSocketServlet Notebook notebook = notebook(); List<Note> notes = notebook.getAllNotes(); for (Note note : notes) { - List<String> ids = notebook.getInterpreterSettingManager().getInterpreters(note.getId()); + List<String> ids = notebook.getInterpreterSettingManager() + .getInterpreterBinding(note.getId()); for (String id : ids) { if (id.equals(interpreterGroupId)) { broadcast(note.getId(), m); @@ -1022,7 +1015,7 @@ public class NotebookServer extends WebSocketServlet List<String> interpreterSettingIds = new LinkedList<>(); interpreterSettingIds.add(defaultInterpreterId); for (String interpreterSettingId : notebook.getInterpreterSettingManager(). - getDefaultInterpreterSettingList()) { + getInterpreterSettingIds()) { if (!interpreterSettingId.equals(defaultInterpreterId)) { interpreterSettingIds.add(interpreterSettingId); } @@ -1382,12 +1375,13 @@ public class NotebookServer extends WebSocketServlet List<InterpreterSetting> settings = notebook.getInterpreterSettingManager().getInterpreterSettings(note.getId()); for (InterpreterSetting setting : settings) { - if (setting.getInterpreterGroup(user, note.getId()) == null) { + if (setting.getOrCreateInterpreterGroup(user, note.getId()) == null) { continue; } - if (interpreterGroupId.equals(setting.getInterpreterGroup(user, note.getId()).getId())) { + if (interpreterGroupId.equals(setting.getOrCreateInterpreterGroup(user, note.getId()) + .getId())) { AngularObjectRegistry angularObjectRegistry = - setting.getInterpreterGroup(user, note.getId()).getAngularObjectRegistry(); + setting.getOrCreateInterpreterGroup(user, note.getId()).getAngularObjectRegistry(); // first trying to get local registry ao = angularObjectRegistry.get(varName, noteId, paragraphId); @@ -1424,12 +1418,13 @@ public class NotebookServer extends WebSocketServlet List<InterpreterSetting> settings = notebook.getInterpreterSettingManager().getInterpreterSettings(note.getId()); for (InterpreterSetting setting : settings) { - if (setting.getInterpreterGroup(user, n.getId()) == null) { + if (setting.getOrCreateInterpreterGroup(user, n.getId()) == null) { continue; } - if (interpreterGroupId.equals(setting.getInterpreterGroup(user, n.getId()).getId())) { + if (interpreterGroupId.equals(setting.getOrCreateInterpreterGroup(user, n.getId()) + .getId())) { AngularObjectRegistry angularObjectRegistry = - setting.getInterpreterGroup(user, n.getId()).getAngularObjectRegistry(); + setting.getOrCreateInterpreterGroup(user, n.getId()).getAngularObjectRegistry(); this.broadcastExcept(n.getId(), new Message(OP.ANGULAR_OBJECT_UPDATE).put("angularObject", ao) .put("interpreterGroupId", interpreterGroupId).put("noteId", n.getId()) @@ -2302,13 +2297,13 @@ public class NotebookServer extends WebSocketServlet for (InterpreterSetting intpSetting : settings) { AngularObjectRegistry registry = - intpSetting.getInterpreterGroup(user, note.getId()).getAngularObjectRegistry(); + intpSetting.getOrCreateInterpreterGroup(user, note.getId()).getAngularObjectRegistry(); List<AngularObject> objects = registry.getAllWithGlobal(note.getId()); for (AngularObject object : objects) { conn.send(serializeMessage( new Message(OP.ANGULAR_OBJECT_UPDATE).put("angularObject", object) .put("interpreterGroupId", - intpSetting.getInterpreterGroup(user, note.getId()).getId()) + intpSetting.getOrCreateInterpreterGroup(user, note.getId()).getId()) .put("noteId", note.getId()).put("paragraphId", object.getParagraphId()))); } } @@ -2354,7 +2349,7 @@ public class NotebookServer extends WebSocketServlet } List<String> settingIds = - notebook.getInterpreterSettingManager().getInterpreters(note.getId()); + notebook.getInterpreterSettingManager().getInterpreterBinding(note.getId()); for (String id : settingIds) { if (interpreterGroupId.contains(id)) { broadcast(note.getId(), http://git-wip-us.apache.org/repos/asf/zeppelin/blob/d6203c51/zeppelin-server/src/test/java/org/apache/zeppelin/interpreter/mock/MockInterpreter1.java ---------------------------------------------------------------------- diff --git a/zeppelin-server/src/test/java/org/apache/zeppelin/interpreter/mock/MockInterpreter1.java b/zeppelin-server/src/test/java/org/apache/zeppelin/interpreter/mock/MockInterpreter1.java deleted file mode 100644 index 1b1306a..0000000 --- a/zeppelin-server/src/test/java/org/apache/zeppelin/interpreter/mock/MockInterpreter1.java +++ /dev/null @@ -1,75 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package org.apache.zeppelin.interpreter.mock; - -import java.util.HashMap; -import java.util.List; -import java.util.Map; -import java.util.Properties; - -import org.apache.zeppelin.interpreter.Interpreter; -import org.apache.zeppelin.interpreter.InterpreterContext; -import org.apache.zeppelin.interpreter.InterpreterResult; -import org.apache.zeppelin.interpreter.thrift.InterpreterCompletion; -import org.apache.zeppelin.scheduler.Scheduler; -import org.apache.zeppelin.scheduler.SchedulerFactory; - -public class MockInterpreter1 extends Interpreter{ - Map<String, Object> vars = new HashMap<>(); - - public MockInterpreter1(Properties property) { - super(property); - } - - @Override - public void open() { - } - - @Override - public void close() { - } - - @Override - public InterpreterResult interpret(String st, InterpreterContext context) { - return new InterpreterResult(InterpreterResult.Code.SUCCESS, "repl1: "+st); - } - - @Override - public void cancel(InterpreterContext context) { - } - - @Override - public FormType getFormType() { - return FormType.SIMPLE; - } - - @Override - public int getProgress(InterpreterContext context) { - return 0; - } - - @Override - public Scheduler getScheduler() { - return SchedulerFactory.singleton().createOrGetFIFOScheduler("test_"+this.hashCode()); - } - - @Override - public List<InterpreterCompletion> completion(String buf, int cursor, - InterpreterContext interpreterContext) { - return null; - } -} http://git-wip-us.apache.org/repos/asf/zeppelin/blob/d6203c51/zeppelin-server/src/test/java/org/apache/zeppelin/rest/AbstractTestRestApi.java ---------------------------------------------------------------------- diff --git a/zeppelin-server/src/test/java/org/apache/zeppelin/rest/AbstractTestRestApi.java b/zeppelin-server/src/test/java/org/apache/zeppelin/rest/AbstractTestRestApi.java index a7907db..e2f171f 100644 --- a/zeppelin-server/src/test/java/org/apache/zeppelin/rest/AbstractTestRestApi.java +++ b/zeppelin-server/src/test/java/org/apache/zeppelin/rest/AbstractTestRestApi.java @@ -307,10 +307,9 @@ public abstract class AbstractTestRestApi { protected static void shutDown() throws Exception { if (!wasRunning) { // restart interpreter to stop all interpreter processes - List<String> settingList = ZeppelinServer.notebook.getInterpreterSettingManager() - .getDefaultInterpreterSettingList(); - for (String setting : settingList) { - ZeppelinServer.notebook.getInterpreterSettingManager().restart(setting); + List<InterpreterSetting> settingList = ZeppelinServer.notebook.getInterpreterSettingManager().get(); + for (InterpreterSetting setting : settingList) { + ZeppelinServer.notebook.getInterpreterSettingManager().restart(setting.getId()); } if (shiroIni != null) { FileUtils.deleteQuietly(shiroIni); http://git-wip-us.apache.org/repos/asf/zeppelin/blob/d6203c51/zeppelin-server/src/test/java/org/apache/zeppelin/rest/InterpreterRestApiTest.java ---------------------------------------------------------------------- diff --git a/zeppelin-server/src/test/java/org/apache/zeppelin/rest/InterpreterRestApiTest.java b/zeppelin-server/src/test/java/org/apache/zeppelin/rest/InterpreterRestApiTest.java index 28541bd..72dd8a7 100644 --- a/zeppelin-server/src/test/java/org/apache/zeppelin/rest/InterpreterRestApiTest.java +++ b/zeppelin-server/src/test/java/org/apache/zeppelin/rest/InterpreterRestApiTest.java @@ -80,7 +80,7 @@ public class InterpreterRestApiTest extends AbstractTestRestApi { // then assertThat(get, isAllowed()); - assertEquals(ZeppelinServer.notebook.getInterpreterSettingManager().getAvailableInterpreterSettings().size(), + assertEquals(ZeppelinServer.notebook.getInterpreterSettingManager().getInterpreterSettingTemplates().size(), body.entrySet().size()); get.releaseConnection(); } @@ -110,7 +110,7 @@ public class InterpreterRestApiTest extends AbstractTestRestApi { @Test public void testSettingsCRUD() throws IOException { // when: call create setting API - String rawRequest = "{\"name\":\"md2\",\"group\":\"md\"," + + String rawRequest = "{\"name\":\"md3\",\"group\":\"md\"," + "\"properties\":{\"propname\": {\"value\": \"propvalue\", \"name\": \"propname\", \"type\": \"textarea\"}}," + "\"interpreterGroup\":[{\"class\":\"org.apache.zeppelin.markdown.Markdown\",\"name\":\"md\"}]," + "\"dependencies\":[]," + @@ -367,7 +367,7 @@ public class InterpreterRestApiTest extends AbstractTestRestApi { @Test public void testGetMetadataInfo() throws IOException { - String jsonRequest = "{\"name\":\"spark\",\"group\":\"spark\"," + + String jsonRequest = "{\"name\":\"spark_new\",\"group\":\"spark\"," + "\"properties\":{\"propname\": {\"value\": \"propvalue\", \"name\": \"propname\", \"type\": \"textarea\"}}," + "\"interpreterGroup\":[{\"class\":\"org.apache.zeppelin.markdown.Markdown\",\"name\":\"md\"}]," + "\"dependencies\":[]," + http://git-wip-us.apache.org/repos/asf/zeppelin/blob/d6203c51/zeppelin-server/src/test/java/org/apache/zeppelin/socket/NotebookServerTest.java ---------------------------------------------------------------------- diff --git a/zeppelin-server/src/test/java/org/apache/zeppelin/socket/NotebookServerTest.java b/zeppelin-server/src/test/java/org/apache/zeppelin/socket/NotebookServerTest.java index 8da36a6..10d77b2 100644 --- a/zeppelin-server/src/test/java/org/apache/zeppelin/socket/NotebookServerTest.java +++ b/zeppelin-server/src/test/java/org/apache/zeppelin/socket/NotebookServerTest.java @@ -30,6 +30,7 @@ import org.apache.zeppelin.notebook.Paragraph; import org.apache.zeppelin.notebook.socket.Message; import org.apache.zeppelin.notebook.socket.Message.OP; import org.apache.zeppelin.rest.AbstractTestRestApi; +import org.apache.zeppelin.scheduler.Job; import org.apache.zeppelin.server.ZeppelinServer; import org.apache.zeppelin.user.AuthenticationInfo; import org.junit.AfterClass; @@ -95,7 +96,7 @@ public class NotebookServerTest extends AbstractTestRestApi { } @Test - public void testMakeSureNoAngularObjectBroadcastToWebsocketWhoFireTheEvent() throws IOException { + public void testMakeSureNoAngularObjectBroadcastToWebsocketWhoFireTheEvent() throws IOException, InterruptedException { // create a notebook Note note1 = notebook.createNote(anonymous); @@ -104,7 +105,7 @@ public class NotebookServerTest extends AbstractTestRestApi { List<InterpreterSetting> settings = notebook.getInterpreterSettingManager().getInterpreterSettings(note1.getId()); for (InterpreterSetting setting : settings) { if (setting.getName().equals("md")) { - interpreterGroup = setting.getInterpreterGroup("anonymous", "sharedProcess"); + interpreterGroup = setting.getOrCreateInterpreterGroup("anonymous", "sharedProcess"); break; } } @@ -115,6 +116,14 @@ public class NotebookServerTest extends AbstractTestRestApi { p1.setAuthenticationInfo(anonymous); note1.run(p1.getId()); + // wait for paragraph finished + while(true) { + if (p1.getStatus() == Job.Status.FINISHED) { + break; + } + Thread.sleep(100); + } + // add angularObject interpreterGroup.getAngularObjectRegistry().add("object1", "value1", note1.getId(), null); http://git-wip-us.apache.org/repos/asf/zeppelin/blob/d6203c51/zeppelin-server/src/test/resources/log4j.properties ---------------------------------------------------------------------- diff --git a/zeppelin-server/src/test/resources/log4j.properties b/zeppelin-server/src/test/resources/log4j.properties index b0d1067..8368993 100644 --- a/zeppelin-server/src/test/resources/log4j.properties +++ b/zeppelin-server/src/test/resources/log4j.properties @@ -33,7 +33,6 @@ log4j.logger.org.apache.hadoop.mapred=WARN log4j.logger.org.apache.hadoop.hive.ql=WARN log4j.logger.org.apache.hadoop.hive.metastore=WARN log4j.logger.org.apache.haadoop.hive.service.HiveServer=WARN -log4j.logger.org.apache.zeppelin.scheduler=WARN log4j.logger.org.quartz=WARN log4j.logger.DataNucleus=WARN http://git-wip-us.apache.org/repos/asf/zeppelin/blob/d6203c51/zeppelin-zengine/src/main/java/org/apache/zeppelin/conf/ZeppelinConfiguration.java ---------------------------------------------------------------------- diff --git a/zeppelin-zengine/src/main/java/org/apache/zeppelin/conf/ZeppelinConfiguration.java b/zeppelin-zengine/src/main/java/org/apache/zeppelin/conf/ZeppelinConfiguration.java index f00fe93..03cc069 100644 --- a/zeppelin-zengine/src/main/java/org/apache/zeppelin/conf/ZeppelinConfiguration.java +++ b/zeppelin-zengine/src/main/java/org/apache/zeppelin/conf/ZeppelinConfiguration.java @@ -17,22 +17,21 @@ package org.apache.zeppelin.conf; -import java.io.File; -import java.net.URL; -import java.util.Arrays; -import java.util.HashMap; -import java.util.List; -import java.util.Map; - import org.apache.commons.configuration.ConfigurationException; import org.apache.commons.configuration.XMLConfiguration; import org.apache.commons.configuration.tree.ConfigurationNode; import org.apache.commons.lang.StringUtils; -import org.apache.zeppelin.notebook.repo.GitNotebookRepo; import org.apache.zeppelin.util.Util; import org.slf4j.Logger; import org.slf4j.LoggerFactory; +import java.io.File; +import java.net.URL; +import java.util.Arrays; +import java.util.HashMap; +import java.util.List; +import java.util.Map; + /** * Zeppelin configuration. * @@ -528,7 +527,7 @@ public class ZeppelinConfiguration extends XMLConfiguration { ConfigurationKeyPredicate predicate) { Map<String, String> configurations = new HashMap<>(); - for (ZeppelinConfiguration.ConfVars v : ZeppelinConfiguration.ConfVars.values()) { + for (ConfVars v : ConfVars.values()) { String key = v.getVarName(); if (!predicate.apply(key)) { @@ -653,7 +652,8 @@ public class ZeppelinConfiguration extends XMLConfiguration { ZEPPELIN_NOTEBOOK_MONGO_COLLECTION("zeppelin.notebook.mongo.collection", "notes"), ZEPPELIN_NOTEBOOK_MONGO_URI("zeppelin.notebook.mongo.uri", "mongodb://localhost"), ZEPPELIN_NOTEBOOK_MONGO_AUTOIMPORT("zeppelin.notebook.mongo.autoimport", false), - ZEPPELIN_NOTEBOOK_STORAGE("zeppelin.notebook.storage", GitNotebookRepo.class.getName()), + ZEPPELIN_NOTEBOOK_STORAGE("zeppelin.notebook.storage", + "org.apache.zeppelin.notebook.repo.GitNotebookRepo"), ZEPPELIN_NOTEBOOK_ONE_WAY_SYNC("zeppelin.notebook.one.way.sync", false), // whether by default note is public or private ZEPPELIN_NOTEBOOK_PUBLIC("zeppelin.notebook.public", true), http://git-wip-us.apache.org/repos/asf/zeppelin/blob/d6203c51/zeppelin-zengine/src/main/java/org/apache/zeppelin/helium/Helium.java ---------------------------------------------------------------------- diff --git a/zeppelin-zengine/src/main/java/org/apache/zeppelin/helium/Helium.java b/zeppelin-zengine/src/main/java/org/apache/zeppelin/helium/Helium.java index 17a3529..e5f2e3b 100644 --- a/zeppelin-zengine/src/main/java/org/apache/zeppelin/helium/Helium.java +++ b/zeppelin-zengine/src/main/java/org/apache/zeppelin/helium/Helium.java @@ -16,14 +16,17 @@ */ package org.apache.zeppelin.helium; +import com.google.gson.Gson; import org.apache.commons.io.FileUtils; import org.apache.commons.lang3.StringUtils; import org.apache.zeppelin.interpreter.Interpreter; +import org.apache.zeppelin.interpreter.InterpreterGroup; +import org.apache.zeppelin.interpreter.InterpreterSettingManager; +import org.apache.zeppelin.interpreter.ManagedInterpreterGroup; +import org.apache.zeppelin.interpreter.remote.RemoteInterpreterProcess; +import org.apache.zeppelin.interpreter.thrift.RemoteInterpreterService; import org.apache.zeppelin.notebook.Paragraph; -import org.apache.zeppelin.resource.DistributedResourcePool; -import org.apache.zeppelin.resource.ResourcePool; -import org.apache.zeppelin.resource.ResourcePoolUtils; -import org.apache.zeppelin.resource.ResourceSet; +import org.apache.zeppelin.resource.*; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -47,19 +50,22 @@ public class Helium { private final HeliumBundleFactory bundleFactory; private final HeliumApplicationFactory applicationFactory; + private final InterpreterSettingManager interpreterSettingManager; public Helium( String heliumConfPath, String registryPaths, File registryCacheDir, HeliumBundleFactory bundleFactory, - HeliumApplicationFactory applicationFactory) + HeliumApplicationFactory applicationFactory, + InterpreterSettingManager interpreterSettingManager) throws IOException { this.heliumConfPath = heliumConfPath; this.registryPaths = registryPaths; this.registryCacheDir = registryCacheDir; this.bundleFactory = bundleFactory; this.applicationFactory = applicationFactory; + this.interpreterSettingManager = interpreterSettingManager; heliumConf = loadConf(heliumConfPath); allPackages = getAllPackageInfo(); } @@ -350,7 +356,7 @@ public class Helium { allResources = resourcePool.getAll(); } } else { - allResources = ResourcePoolUtils.getAllResources(); + allResources = interpreterSettingManager.getAllResources(); } for (List<HeliumPackageSearchResult> pkgs : allPackages.values()) { @@ -478,4 +484,40 @@ public class Helium { return mixed; } + + public ResourceSet getAllResources() { + return getAllResourcesExcept(null); + } + + private ResourceSet getAllResourcesExcept(String interpreterGroupExcludsion) { + ResourceSet resourceSet = new ResourceSet(); + for (ManagedInterpreterGroup intpGroup : interpreterSettingManager.getAllInterpreterGroup()) { + if (interpreterGroupExcludsion != null && + intpGroup.getId().equals(interpreterGroupExcludsion)) { + continue; + } + + RemoteInterpreterProcess remoteInterpreterProcess = intpGroup.getRemoteInterpreterProcess(); + if (remoteInterpreterProcess == null) { + ResourcePool localPool = intpGroup.getResourcePool(); + if (localPool != null) { + resourceSet.addAll(localPool.getAll()); + } + } else if (remoteInterpreterProcess.isRunning()) { + List<String> resourceList = remoteInterpreterProcess.callRemoteFunction( + new RemoteInterpreterProcess.RemoteFunction<List<String>>() { + @Override + public List<String> call(RemoteInterpreterService.Client client) throws Exception { + return client.resourcePoolGetAll(); + } + } + ); + Gson gson = new Gson(); + for (String res : resourceList) { + resourceSet.add(gson.fromJson(res, Resource.class)); + } + } + } + return resourceSet; + } } http://git-wip-us.apache.org/repos/asf/zeppelin/blob/d6203c51/zeppelin-zengine/src/main/java/org/apache/zeppelin/helium/HeliumApplicationFactory.java ---------------------------------------------------------------------- diff --git a/zeppelin-zengine/src/main/java/org/apache/zeppelin/helium/HeliumApplicationFactory.java b/zeppelin-zengine/src/main/java/org/apache/zeppelin/helium/HeliumApplicationFactory.java index 84368a7..f4dfae3 100644 --- a/zeppelin-zengine/src/main/java/org/apache/zeppelin/helium/HeliumApplicationFactory.java +++ b/zeppelin-zengine/src/main/java/org/apache/zeppelin/helium/HeliumApplicationFactory.java @@ -16,7 +16,6 @@ */ package org.apache.zeppelin.helium; -import org.apache.thrift.TException; import org.apache.zeppelin.interpreter.*; import org.apache.zeppelin.interpreter.remote.RemoteAngularObjectRegistry; import org.apache.zeppelin.interpreter.remote.RemoteInterpreterProcess; @@ -80,7 +79,7 @@ public class HeliumApplicationFactory implements ApplicationEventListener, Noteb try { // get interpreter process Interpreter intp = paragraph.getRepl(paragraph.getRequiredReplName()); - InterpreterGroup intpGroup = intp.getInterpreterGroup(); + ManagedInterpreterGroup intpGroup = (ManagedInterpreterGroup) intp.getInterpreterGroup(); RemoteInterpreterProcess intpProcess = intpGroup.getRemoteInterpreterProcess(); if (intpProcess == null) { throw new ApplicationException("Target interpreter process is not running"); @@ -105,38 +104,33 @@ public class HeliumApplicationFactory implements ApplicationEventListener, Noteb private void load(RemoteInterpreterProcess intpProcess, ApplicationState appState) throws Exception { - RemoteInterpreterService.Client client = null; - synchronized (appState) { if (appState.getStatus() == ApplicationState.Status.LOADED) { // already loaded return; } - try { - appStatusChange(paragraph, appState.getId(), ApplicationState.Status.LOADING); - String pkgInfo = pkg.toJson(); - String appId = appState.getId(); - - client = intpProcess.getClient(); - RemoteApplicationResult ret = client.loadApplication( - appId, - pkgInfo, - paragraph.getNote().getId(), - paragraph.getId()); - - if (ret.isSuccess()) { - appStatusChange(paragraph, appState.getId(), ApplicationState.Status.LOADED); - } else { - throw new ApplicationException(ret.getMsg()); - } - } catch (TException e) { - intpProcess.releaseBrokenClient(client); - throw e; - } finally { - if (client != null) { - intpProcess.releaseClient(client); - } + appStatusChange(paragraph, appState.getId(), ApplicationState.Status.LOADING); + final String pkgInfo = pkg.toJson(); + final String appId = appState.getId(); + + RemoteApplicationResult ret = intpProcess.callRemoteFunction( + new RemoteInterpreterProcess.RemoteFunction<RemoteApplicationResult>() { + @Override + public RemoteApplicationResult call(RemoteInterpreterService.Client client) + throws Exception { + return client.loadApplication( + appId, + pkgInfo, + paragraph.getNote().getId(), + paragraph.getId()); + } + } + ); + if (ret.isSuccess()) { + appStatusChange(paragraph, appState.getId(), ApplicationState.Status.LOADED); + } else { + throw new ApplicationException(ret.getMsg()); } } } @@ -199,7 +193,7 @@ public class HeliumApplicationFactory implements ApplicationEventListener, Noteb } } - private void unload(ApplicationState appsToUnload) throws ApplicationException { + private void unload(final ApplicationState appsToUnload) throws ApplicationException { synchronized (appsToUnload) { if (appsToUnload.getStatus() != ApplicationState.Status.LOADED) { throw new ApplicationException( @@ -212,31 +206,24 @@ public class HeliumApplicationFactory implements ApplicationEventListener, Noteb } RemoteInterpreterProcess intpProcess = - intp.getInterpreterGroup().getRemoteInterpreterProcess(); + ((ManagedInterpreterGroup) intp.getInterpreterGroup()).getRemoteInterpreterProcess(); if (intpProcess == null) { throw new ApplicationException("Target interpreter process is not running"); } - RemoteInterpreterService.Client client; - try { - client = intpProcess.getClient(); - } catch (Exception e) { - throw new ApplicationException(e); - } - - try { - RemoteApplicationResult ret = client.unloadApplication(appsToUnload.getId()); - - if (ret.isSuccess()) { - appStatusChange(paragraph, appsToUnload.getId(), ApplicationState.Status.UNLOADED); - } else { - throw new ApplicationException(ret.getMsg()); - } - } catch (TException e) { - intpProcess.releaseBrokenClient(client); - throw new ApplicationException(e); - } finally { - intpProcess.releaseClient(client); + RemoteApplicationResult ret = intpProcess.callRemoteFunction( + new RemoteInterpreterProcess.RemoteFunction<RemoteApplicationResult>() { + @Override + public RemoteApplicationResult call(RemoteInterpreterService.Client client) + throws Exception { + return client.unloadApplication(appsToUnload.getId()); + } + } + ); + if (ret.isSuccess()) { + appStatusChange(paragraph, appsToUnload.getId(), ApplicationState.Status.UNLOADED); + } else { + throw new ApplicationException(ret.getMsg()); } } } @@ -286,7 +273,7 @@ public class HeliumApplicationFactory implements ApplicationEventListener, Noteb } } - private void run(ApplicationState app) throws ApplicationException { + private void run(final ApplicationState app) throws ApplicationException { synchronized (app) { if (app.getStatus() != ApplicationState.Status.LOADED) { throw new ApplicationException( @@ -299,33 +286,23 @@ public class HeliumApplicationFactory implements ApplicationEventListener, Noteb } RemoteInterpreterProcess intpProcess = - intp.getInterpreterGroup().getRemoteInterpreterProcess(); + ((ManagedInterpreterGroup) intp.getInterpreterGroup()).getRemoteInterpreterProcess(); if (intpProcess == null) { throw new ApplicationException("Target interpreter process is not running"); } - RemoteInterpreterService.Client client = null; - try { - client = intpProcess.getClient(); - } catch (Exception e) { - throw new ApplicationException(e); - } - - try { - RemoteApplicationResult ret = client.runApplication(app.getId()); - - if (ret.isSuccess()) { - // success - } else { - throw new ApplicationException(ret.getMsg()); - } - } catch (TException e) { - intpProcess.releaseBrokenClient(client); - client = null; - throw new ApplicationException(e); - } finally { - if (client != null) { - intpProcess.releaseClient(client); - } + RemoteApplicationResult ret = intpProcess.callRemoteFunction( + new RemoteInterpreterProcess.RemoteFunction<RemoteApplicationResult>() { + @Override + public RemoteApplicationResult call(RemoteInterpreterService.Client client) + throws Exception { + return client.runApplication(app.getId()); + } + } + ); + if (ret.isSuccess()) { + // success + } else { + throw new ApplicationException(ret.getMsg()); } } }