http://git-wip-us.apache.org/repos/asf/incubator-zeppelin/blob/669d408d/zeppelin-interpreter/src/main/java/com/nflabs/zeppelin/scheduler/FIFOScheduler.java ---------------------------------------------------------------------- diff --git a/zeppelin-interpreter/src/main/java/com/nflabs/zeppelin/scheduler/FIFOScheduler.java b/zeppelin-interpreter/src/main/java/com/nflabs/zeppelin/scheduler/FIFOScheduler.java deleted file mode 100644 index 078cd3c..0000000 --- a/zeppelin-interpreter/src/main/java/com/nflabs/zeppelin/scheduler/FIFOScheduler.java +++ /dev/null @@ -1,134 +0,0 @@ -package com.nflabs.zeppelin.scheduler; - -import java.util.Collection; -import java.util.LinkedList; -import java.util.List; -import java.util.concurrent.ExecutorService; - -import com.nflabs.zeppelin.scheduler.Job.Status; - -/** - * TODO(moon) : add description. - * - * @author Leemoonsoo - * - */ -public class FIFOScheduler implements Scheduler { - List<Job> queue = new LinkedList<Job>(); - private ExecutorService executor; - private SchedulerListener listener; - boolean terminate = false; - Job runningJob = null; - private String name; - - public FIFOScheduler(String name, ExecutorService executor, SchedulerListener listener) { - this.name = name; - this.executor = executor; - this.listener = listener; - } - - @Override - public String getName() { - return name; - } - - @Override - public Collection<Job> getJobsWaiting() { - List<Job> ret = new LinkedList<Job>(); - synchronized (queue) { - for (Job job : queue) { - ret.add(job); - } - } - return ret; - } - - @Override - public Collection<Job> getJobsRunning() { - List<Job> ret = new LinkedList<Job>(); - Job job = runningJob; - - if (job != null) { - ret.add(job); - } - - return ret; - } - - - - @Override - public void submit(Job job) { - job.setStatus(Status.PENDING); - synchronized (queue) { - queue.add(job); - queue.notify(); - } - } - - @Override - public void run() { - - synchronized (queue) { - while (terminate == false) { - if (runningJob != null || queue.isEmpty() == true) { - try { - queue.wait(500); - } catch (InterruptedException e) { - } - continue; - } - - runningJob = queue.remove(0); - - final Scheduler scheduler = this; - this.executor.execute(new Runnable() { - @Override - public void run() { - if (runningJob.isAborted()) { - runningJob.setStatus(Status.ABORT); - runningJob.aborted = false; - synchronized (queue) { - queue.notify(); - } - return; - } - - runningJob.setStatus(Status.RUNNING); - if (listener != null) { - listener.jobStarted(scheduler, runningJob); - } - runningJob.run(); - if (runningJob.isAborted()) { - runningJob.setStatus(Status.ABORT); - } else { - if (runningJob.getException() != null) { - runningJob.setStatus(Status.ERROR); - } else { - runningJob.setStatus(Status.FINISHED); - } - } - if (listener != null) { - listener.jobFinished(scheduler, runningJob); - } - // reset aborted flag to allow retry - runningJob.aborted = false; - runningJob = null; - synchronized (queue) { - queue.notify(); - } - } - }); - } - } - } - - @Override - public void stop() { - terminate = true; - synchronized (queue) { - queue.notify(); - } - } - -}
http://git-wip-us.apache.org/repos/asf/incubator-zeppelin/blob/669d408d/zeppelin-interpreter/src/main/java/com/nflabs/zeppelin/scheduler/Job.java ---------------------------------------------------------------------- diff --git a/zeppelin-interpreter/src/main/java/com/nflabs/zeppelin/scheduler/Job.java b/zeppelin-interpreter/src/main/java/com/nflabs/zeppelin/scheduler/Job.java deleted file mode 100644 index 29f72b5..0000000 --- a/zeppelin-interpreter/src/main/java/com/nflabs/zeppelin/scheduler/Job.java +++ /dev/null @@ -1,246 +0,0 @@ -package com.nflabs.zeppelin.scheduler; - -import java.text.SimpleDateFormat; -import java.util.Date; -import java.util.Map; - -import com.nflabs.zeppelin.interpreter.InterpreterResult; -import org.slf4j.Logger; -import org.slf4j.LoggerFactory; - - -/** - * Skeletal implementation of the Job concept. - * - designed for inheritance - * - should be run on a separate thread - * - maintains internal state: it's status - * - supports listeners who are updated on status change - * - * Job class is serialized/deserialized and used server<->client communication - * and saving/loading jobs from disk. - * Changing/adding/deleting non transitive field name need consideration of that. - * - * @author Leemoonsoo - */ -public abstract class Job { - /** - * Job status. - * - * READY - Job is not running, ready to run. - * PENDING - Job is submitted to scheduler. but not running yet - * RUNNING - Job is running. - * FINISHED - Job finished run. with success - * ERROR - Job finished run. with error - * ABORT - Job finished by abort - * - */ - public static enum Status { - READY, - PENDING, - RUNNING, - FINISHED, - ERROR, - ABORT; - boolean isReady() { - return this == READY; - } - - boolean isRunning() { - return this == RUNNING; - } - - boolean isPending() { - return this == PENDING; - } - } - - private String jobName; - String id; - Object result; - Date dateCreated; - Date dateStarted; - Date dateFinished; - Status status; - - transient boolean aborted = false; - - String errorMessage; - private transient Throwable exception; - private transient JobListener listener; - private long progressUpdateIntervalMs; - - public Job(String jobName, JobListener listener, long progressUpdateIntervalMs) { - this.jobName = jobName; - this.listener = listener; - this.progressUpdateIntervalMs = progressUpdateIntervalMs; - - dateCreated = new Date(); - SimpleDateFormat dateFormat = new SimpleDateFormat("yyyyMMdd-HHmmss"); - id = dateFormat.format(dateCreated) + "_" + super.hashCode(); - - setStatus(Status.READY); - } - - public Job(String jobName, JobListener listener) { - this(jobName, listener, JobProgressPoller.DEFAULT_INTERVAL_MSEC); - } - - public Job(String jobId, String jobName, JobListener listener, long progressUpdateIntervalMs) { - this.jobName = jobName; - this.listener = listener; - this.progressUpdateIntervalMs = progressUpdateIntervalMs; - - id = jobId; - - setStatus(Status.READY); - } - - public String getId() { - return id; - } - - @Override - public int hashCode() { - return id.hashCode(); - } - - @Override - public boolean equals(Object o) { - return ((Job) o).hashCode() == hashCode(); - } - - public Status getStatus() { - return status; - } - - public void setStatus(Status status) { - if (this.status == status) { - return; - } - Status before = this.status; - Status after = status; - if (listener != null) { - listener.beforeStatusChange(this, before, after); - } - this.status = status; - if (listener != null) { - listener.afterStatusChange(this, before, after); - } - } - - public void setListener(JobListener listener) { - this.listener = listener; - } - - public JobListener getListener() { - return listener; - } - - public boolean isTerminated() { - return !this.status.isReady() && !this.status.isRunning() && !this.status.isPending(); - } - - public boolean isRunning() { - return this.status.isRunning(); - } - - public void run() { - JobProgressPoller progressUpdator = null; - try { - progressUpdator = new JobProgressPoller(this, progressUpdateIntervalMs); - progressUpdator.start(); - dateStarted = new Date(); - result = jobRun(); - this.exception = null; - errorMessage = null; - dateFinished = new Date(); - progressUpdator.terminate(); - } catch (NullPointerException e) { - logger().error("Job failed", e); - progressUpdator.terminate(); - this.exception = e; - result = e.getMessage(); - errorMessage = getStack(e); - dateFinished = new Date(); - } catch (Throwable e) { - logger().error("Job failed", e); - progressUpdator.terminate(); - this.exception = e; - result = e.getMessage(); - errorMessage = getStack(e); - dateFinished = new Date(); - } finally { - //aborted = false; - } - } - - public String getStack(Throwable e) { - StackTraceElement[] stacks = e.getStackTrace(); - if (stacks == null) { - return ""; - } - String ss = ""; - for (StackTraceElement s : stacks) { - ss += s.toString() + "\n"; - } - - return ss; - } - - public Throwable getException() { - return exception; - } - - protected void setException(Throwable t) { - exception = t; - errorMessage = getStack(t); - } - - public Object getReturn() { - return result; - } - - public String getJobName() { - return jobName; - } - - public void setJobName(String jobName) { - this.jobName = jobName; - } - - public abstract int progress(); - - public abstract Map<String, Object> info(); - - protected abstract Object jobRun() throws Throwable; - - protected abstract boolean jobAbort(); - - public void abort() { - aborted = jobAbort(); - } - - public boolean isAborted() { - return aborted; - } - - public Date getDateCreated() { - return dateCreated; - } - - public Date getDateStarted() { - return dateStarted; - } - - public Date getDateFinished() { - return dateFinished; - } - - private Logger logger() { - return LoggerFactory.getLogger(Job.class); - } - - protected void setResult(Object result) { - this.result = result; - } -} http://git-wip-us.apache.org/repos/asf/incubator-zeppelin/blob/669d408d/zeppelin-interpreter/src/main/java/com/nflabs/zeppelin/scheduler/JobListener.java ---------------------------------------------------------------------- diff --git a/zeppelin-interpreter/src/main/java/com/nflabs/zeppelin/scheduler/JobListener.java b/zeppelin-interpreter/src/main/java/com/nflabs/zeppelin/scheduler/JobListener.java deleted file mode 100644 index 0e573a1..0000000 --- a/zeppelin-interpreter/src/main/java/com/nflabs/zeppelin/scheduler/JobListener.java +++ /dev/null @@ -1,15 +0,0 @@ -package com.nflabs.zeppelin.scheduler; - -/** - * TODO(moon) : add description. - * - * @author Leemoonsoo - * - */ -public interface JobListener { - public void onProgressUpdate(Job job, int progress); - - public void beforeStatusChange(Job job, Job.Status before, Job.Status after); - - public void afterStatusChange(Job job, Job.Status before, Job.Status after); -} http://git-wip-us.apache.org/repos/asf/incubator-zeppelin/blob/669d408d/zeppelin-interpreter/src/main/java/com/nflabs/zeppelin/scheduler/JobProgressPoller.java ---------------------------------------------------------------------- diff --git a/zeppelin-interpreter/src/main/java/com/nflabs/zeppelin/scheduler/JobProgressPoller.java b/zeppelin-interpreter/src/main/java/com/nflabs/zeppelin/scheduler/JobProgressPoller.java deleted file mode 100644 index 142842a..0000000 --- a/zeppelin-interpreter/src/main/java/com/nflabs/zeppelin/scheduler/JobProgressPoller.java +++ /dev/null @@ -1,53 +0,0 @@ -package com.nflabs.zeppelin.scheduler; - -import org.slf4j.Logger; -import org.slf4j.LoggerFactory; - -/** - * TODO(moon) : add description. - * - * @author Leemoonsoo - * - */ -public class JobProgressPoller extends Thread { - public static final long DEFAULT_INTERVAL_MSEC = 500; - Logger logger = LoggerFactory.getLogger(JobProgressPoller.class); - private Job job; - private long intervalMs; - boolean terminate = false; - - public JobProgressPoller(Job job, long intervalMs) { - this.job = job; - this.intervalMs = intervalMs; - } - - @Override - public void run() { - if (intervalMs < 0) { - return; - } else if (intervalMs == 0) { - intervalMs = DEFAULT_INTERVAL_MSEC; - } - - while (terminate == false) { - JobListener listener = job.getListener(); - if (listener != null) { - try { - if (job.isRunning()) { - listener.onProgressUpdate(job, job.progress()); - } - } catch (Exception e) { - logger.error("Can not get or update progress", e); - } - } - try { - Thread.sleep(intervalMs); - } catch (InterruptedException e) { - } - } - } - - public void terminate() { - terminate = true; - } -} http://git-wip-us.apache.org/repos/asf/incubator-zeppelin/blob/669d408d/zeppelin-interpreter/src/main/java/com/nflabs/zeppelin/scheduler/ParallelScheduler.java ---------------------------------------------------------------------- diff --git a/zeppelin-interpreter/src/main/java/com/nflabs/zeppelin/scheduler/ParallelScheduler.java b/zeppelin-interpreter/src/main/java/com/nflabs/zeppelin/scheduler/ParallelScheduler.java deleted file mode 100644 index d28e125..0000000 --- a/zeppelin-interpreter/src/main/java/com/nflabs/zeppelin/scheduler/ParallelScheduler.java +++ /dev/null @@ -1,162 +0,0 @@ -package com.nflabs.zeppelin.scheduler; - -import java.util.Collection; -import java.util.LinkedList; -import java.util.List; -import java.util.concurrent.ExecutorService; - -import com.nflabs.zeppelin.scheduler.Job.Status; - -/** - * TODO(moon) : add description. - * - * @author Leemoonsoo - * - */ -public class ParallelScheduler implements Scheduler { - List<Job> queue = new LinkedList<Job>(); - List<Job> running = new LinkedList<Job>(); - private ExecutorService executor; - private SchedulerListener listener; - boolean terminate = false; - private String name; - private int maxConcurrency; - - public ParallelScheduler(String name, ExecutorService executor, SchedulerListener listener, - int maxConcurrency) { - this.name = name; - this.executor = executor; - this.listener = listener; - this.maxConcurrency = maxConcurrency; - } - - @Override - public String getName() { - return name; - } - - @Override - public Collection<Job> getJobsWaiting() { - List<Job> ret = new LinkedList<Job>(); - synchronized (queue) { - for (Job job : queue) { - ret.add(job); - } - } - return ret; - } - - @Override - public Collection<Job> getJobsRunning() { - List<Job> ret = new LinkedList<Job>(); - synchronized (queue) { - for (Job job : running) { - ret.add(job); - } - } - return ret; - } - - - - @Override - public void submit(Job job) { - job.setStatus(Status.PENDING); - synchronized (queue) { - queue.add(job); - queue.notify(); - } - } - - @Override - public void run() { - - synchronized (queue) { - while (terminate == false) { - if (running.size() >= maxConcurrency || queue.isEmpty() == true) { - try { - queue.wait(500); - } catch (InterruptedException e) { - } - continue; - } - - Job job = queue.remove(0); - running.add(job); - Scheduler scheduler = this; - - executor.execute(new JobRunner(scheduler, job)); - } - - - } - } - - public void setMaxConcurrency(int maxConcurrency) { - this.maxConcurrency = maxConcurrency; - synchronized (queue) { - queue.notify(); - } - } - - private class JobRunner implements Runnable { - private Scheduler scheduler; - private Job job; - - public JobRunner(Scheduler scheduler, Job job) { - this.scheduler = scheduler; - this.job = job; - } - - @Override - public void run() { - if (job.isAborted()) { - job.setStatus(Status.ABORT); - job.aborted = false; - - synchronized (queue) { - running.remove(job); - queue.notify(); - } - - return; - } - - job.setStatus(Status.RUNNING); - if (listener != null) { - listener.jobStarted(scheduler, job); - } - job.run(); - if (job.isAborted()) { - job.setStatus(Status.ABORT); - } else { - if (job.getException() != null) { - job.setStatus(Status.ERROR); - } else { - job.setStatus(Status.FINISHED); - } - } - - if (listener != null) { - listener.jobFinished(scheduler, job); - } - - // reset aborted flag to allow retry - job.aborted = false; - synchronized (queue) { - running.remove(job); - queue.notify(); - } - } - } - - - @Override - public void stop() { - terminate = true; - synchronized (queue) { - queue.notify(); - } - } - -} http://git-wip-us.apache.org/repos/asf/incubator-zeppelin/blob/669d408d/zeppelin-interpreter/src/main/java/com/nflabs/zeppelin/scheduler/RemoteScheduler.java ---------------------------------------------------------------------- diff --git a/zeppelin-interpreter/src/main/java/com/nflabs/zeppelin/scheduler/RemoteScheduler.java b/zeppelin-interpreter/src/main/java/com/nflabs/zeppelin/scheduler/RemoteScheduler.java deleted file mode 100644 index 14baa9b..0000000 --- a/zeppelin-interpreter/src/main/java/com/nflabs/zeppelin/scheduler/RemoteScheduler.java +++ /dev/null @@ -1,357 +0,0 @@ -package com.nflabs.zeppelin.scheduler; - -import java.util.Collection; -import java.util.LinkedList; -import java.util.List; -import java.util.concurrent.ExecutorService; - -import org.apache.thrift.TException; -import org.slf4j.Logger; -import org.slf4j.LoggerFactory; - -import com.nflabs.zeppelin.interpreter.remote.RemoteInterpreterProcess; -import com.nflabs.zeppelin.interpreter.thrift.RemoteInterpreterService.Client; -import com.nflabs.zeppelin.scheduler.Job.Status; - -/** - * - */ -public class RemoteScheduler implements Scheduler { - Logger logger = LoggerFactory.getLogger(RemoteScheduler.class); - - List<Job> queue = new LinkedList<Job>(); - List<Job> running = new LinkedList<Job>(); - private ExecutorService executor; - private SchedulerListener listener; - boolean terminate = false; - private String name; - private int maxConcurrency; - private RemoteInterpreterProcess interpreterProcess; - - public RemoteScheduler(String name, ExecutorService executor, - RemoteInterpreterProcess interpreterProcess, SchedulerListener listener, - int maxConcurrency) { - this.name = name; - this.executor = executor; - this.listener = listener; - this.interpreterProcess = interpreterProcess; - this.maxConcurrency = maxConcurrency; - } - - @Override - public void run() { - while (terminate == false) { - Job job = null; - - synchronized (queue) { - if (running.size() >= maxConcurrency || queue.isEmpty() == true) { - try { - queue.wait(500); - } catch (InterruptedException e) { - } - continue; - } - - job = queue.remove(0); - running.add(job); - } - - // run - Scheduler scheduler = this; - JobRunner jobRunner = new JobRunner(scheduler, job); - executor.execute(jobRunner); - - // wait until it is submitted to the remote - while (!jobRunner.isJobSubmittedInRemote()) { - synchronized (queue) { - try { - queue.wait(500); - } catch (InterruptedException e) { - } - } - } - } - } - - @Override - public String getName() { - return name; - } - - @Override - public Collection<Job> getJobsWaiting() { - List<Job> ret = new LinkedList<Job>(); - synchronized (queue) { - for (Job job : queue) { - ret.add(job); - } - } - return ret; - } - - @Override - public Collection<Job> getJobsRunning() { - List<Job> ret = new LinkedList<Job>(); - synchronized (queue) { - for (Job job : running) { - ret.add(job); - } - } - return ret; - } - - @Override - public void submit(Job job) { - job.setStatus(Status.PENDING); - - synchronized (queue) { - queue.add(job); - queue.notify(); - } - } - - public void setMaxConcurrency(int maxConcurrency) { - this.maxConcurrency = maxConcurrency; - synchronized (queue) { - queue.notify(); - } - } - - /** - * Role of the class is get status info from remote process from PENDING to - * RUNNING status. - */ - private class JobStatusPoller extends Thread { - private long initialPeriodMsec; - private long initialPeriodCheckIntervalMsec; - private long checkIntervalMsec; - private boolean terminate; - private JobListener listener; - private Job job; - Status lastStatus; - - public JobStatusPoller(long initialPeriodMsec, - long initialPeriodCheckIntervalMsec, long checkIntervalMsec, Job job, - JobListener listener) { - this.initialPeriodMsec = initialPeriodMsec; - this.initialPeriodCheckIntervalMsec = initialPeriodCheckIntervalMsec; - this.checkIntervalMsec = checkIntervalMsec; - this.job = job; - this.listener = listener; - this.terminate = false; - } - - @Override - public void run() { - long started = System.currentTimeMillis(); - while (terminate == false) { - long current = System.currentTimeMillis(); - long interval; - if (current - started < initialPeriodMsec) { - interval = initialPeriodCheckIntervalMsec; - } else { - interval = checkIntervalMsec; - } - - synchronized (this) { - try { - this.wait(interval); - } catch (InterruptedException e) { - } - } - - - Status newStatus = getStatus(); - if (newStatus == null) { // unknown - continue; - } - - if (newStatus != Status.READY && newStatus != Status.PENDING) { - // we don't need more - continue; - } - } - } - - public void shutdown() { - terminate = true; - synchronized (this) { - this.notify(); - } - } - - - private Status getLastStatus() { - if (terminate == true) { - if (lastStatus != Status.FINISHED && - lastStatus != Status.ERROR && - lastStatus != Status.ABORT) { - return Status.FINISHED; - } else { - return (lastStatus == null) ? Status.FINISHED : lastStatus; - } - } else { - return (lastStatus == null) ? Status.FINISHED : lastStatus; - } - } - - public synchronized Job.Status getStatus() { - if (interpreterProcess.referenceCount() <= 0) { - return getLastStatus(); - } - - Client client; - try { - client = interpreterProcess.getClient(); - } catch (Exception e) { - logger.error("Can't get status information", e); - lastStatus = Status.ERROR; - return Status.ERROR; - } - - try { - String statusStr = client.getStatus(job.getId()); - if ("Unknown".equals(statusStr)) { - // not found this job in the remote schedulers. - // maybe not submitted, maybe already finished - Status status = getLastStatus(); - listener.afterStatusChange(job, null, status); - return status; - } - Status status = Status.valueOf(statusStr); - lastStatus = status; - listener.afterStatusChange(job, null, status); - return status; - } catch (TException e) { - logger.error("Can't get status information", e); - lastStatus = Status.ERROR; - return Status.ERROR; - } catch (Exception e) { - logger.error("Unknown status", e); - lastStatus = Status.ERROR; - return Status.ERROR; - } finally { - interpreterProcess.releaseClient(client); - } - } - } - - private class JobRunner implements Runnable, JobListener { - private Scheduler scheduler; - private Job job; - private boolean jobExecuted; - boolean jobSubmittedRemotely; - - public JobRunner(Scheduler scheduler, Job job) { - this.scheduler = scheduler; - this.job = job; - jobExecuted = false; - jobSubmittedRemotely = false; - } - - public boolean isJobSubmittedInRemote() { - return jobSubmittedRemotely; - } - - @Override - public void run() { - if (job.isAborted()) { - job.setStatus(Status.ABORT); - job.aborted = false; - - synchronized (queue) { - running.remove(job); - queue.notify(); - } - - return; - } - - JobStatusPoller jobStatusPoller = new JobStatusPoller(1500, 100, 500, - job, this); - jobStatusPoller.start(); - - if (listener != null) { - listener.jobStarted(scheduler, job); - } - job.run(); - jobExecuted = true; - jobSubmittedRemotely = true; - - jobStatusPoller.shutdown(); - try { - jobStatusPoller.join(); - } catch (InterruptedException e) { - logger.error("JobStatusPoller interrupted", e); - } - - job.setStatus(jobStatusPoller.getStatus()); - if (listener != null) { - listener.jobFinished(scheduler, job); - } - - // reset aborted flag to allow retry - job.aborted = false; - - synchronized (queue) { - running.remove(job); - queue.notify(); - } - } - - @Override - public void onProgressUpdate(Job job, int progress) { - } - - @Override - public void beforeStatusChange(Job job, Status before, Status after) { - } - - @Override - public void afterStatusChange(Job job, Status before, Status after) { - if (after == null) { // unknown. maybe before sumitted remotely, maybe already finished. - if (jobExecuted) { - jobSubmittedRemotely = true; - if (job.isAborted()) { - job.setStatus(Status.ABORT); - } else if (job.getException() != null) { - job.setStatus(Status.ERROR); - } else { - job.setStatus(Status.FINISHED); - } - } - return; - } - - - // Update remoteStatus - if (jobExecuted == false) { - if (after == Status.FINISHED || after == Status.ABORT - || after == Status.ERROR) { - // it can be status of last run. - // so not updating the remoteStatus - return; - } else if (after == Status.RUNNING) { - jobSubmittedRemotely = true; - } - } else { - jobSubmittedRemotely = true; - } - - // status polled by status poller - if (job.getStatus() != after) { - job.setStatus(after); - } - } - } - - @Override - public void stop() { - terminate = true; - synchronized (queue) { - queue.notify(); - } - - } - -} http://git-wip-us.apache.org/repos/asf/incubator-zeppelin/blob/669d408d/zeppelin-interpreter/src/main/java/com/nflabs/zeppelin/scheduler/Scheduler.java ---------------------------------------------------------------------- diff --git a/zeppelin-interpreter/src/main/java/com/nflabs/zeppelin/scheduler/Scheduler.java b/zeppelin-interpreter/src/main/java/com/nflabs/zeppelin/scheduler/Scheduler.java deleted file mode 100644 index e772c38..0000000 --- a/zeppelin-interpreter/src/main/java/com/nflabs/zeppelin/scheduler/Scheduler.java +++ /dev/null @@ -1,21 +0,0 @@ -package com.nflabs.zeppelin.scheduler; - -import java.util.Collection; - -/** - * TODO(moon) : add description. - * - * @author Leemoonsoo - * - */ -public interface Scheduler extends Runnable { - public String getName(); - - public Collection<Job> getJobsWaiting(); - - public Collection<Job> getJobsRunning(); - - public void submit(Job job); - - public void stop(); -} http://git-wip-us.apache.org/repos/asf/incubator-zeppelin/blob/669d408d/zeppelin-interpreter/src/main/java/com/nflabs/zeppelin/scheduler/SchedulerFactory.java ---------------------------------------------------------------------- diff --git a/zeppelin-interpreter/src/main/java/com/nflabs/zeppelin/scheduler/SchedulerFactory.java b/zeppelin-interpreter/src/main/java/com/nflabs/zeppelin/scheduler/SchedulerFactory.java deleted file mode 100644 index 115e2b1..0000000 --- a/zeppelin-interpreter/src/main/java/com/nflabs/zeppelin/scheduler/SchedulerFactory.java +++ /dev/null @@ -1,129 +0,0 @@ -package com.nflabs.zeppelin.scheduler; - -import java.util.Collection; -import java.util.LinkedHashMap; -import java.util.LinkedList; -import java.util.List; -import java.util.Map; -import java.util.concurrent.Executors; -import java.util.concurrent.ScheduledExecutorService; - -import org.slf4j.Logger; -import org.slf4j.LoggerFactory; - -import com.nflabs.zeppelin.interpreter.remote.RemoteInterpreterProcess; - -/** - * TODO(moon) : add description. - * - * @author Leemoonsoo - * - */ -public class SchedulerFactory implements SchedulerListener { - private final Logger logger = LoggerFactory.getLogger(SchedulerFactory.class); - ScheduledExecutorService executor; - Map<String, Scheduler> schedulers = new LinkedHashMap<String, Scheduler>(); - - private static SchedulerFactory singleton; - private static Long singletonLock = new Long(0); - - public static SchedulerFactory singleton() { - if (singleton == null) { - synchronized (singletonLock) { - if (singleton == null) { - try { - singleton = new SchedulerFactory(); - } catch (Exception e) { - e.printStackTrace(); - } - } - } - } - return singleton; - } - - public SchedulerFactory() throws Exception { - executor = Executors.newScheduledThreadPool(100); - } - - public void destroy() { - executor.shutdown(); - } - - public Scheduler createOrGetFIFOScheduler(String name) { - synchronized (schedulers) { - if (schedulers.containsKey(name) == false) { - Scheduler s = new FIFOScheduler(name, executor, this); - schedulers.put(name, s); - executor.execute(s); - } - return schedulers.get(name); - } - } - - public Scheduler createOrGetParallelScheduler(String name, int maxConcurrency) { - synchronized (schedulers) { - if (schedulers.containsKey(name) == false) { - Scheduler s = new ParallelScheduler(name, executor, this, maxConcurrency); - schedulers.put(name, s); - executor.execute(s); - } - return schedulers.get(name); - } - } - - public Scheduler createOrGetRemoteScheduler( - String name, - RemoteInterpreterProcess interpreterProcess, - int maxConcurrency) { - - synchronized (schedulers) { - if (schedulers.containsKey(name) == false) { - Scheduler s = new RemoteScheduler( - name, - executor, - interpreterProcess, - this, - maxConcurrency); - schedulers.put(name, s); - executor.execute(s); - } - return schedulers.get(name); - } - } - - public Scheduler removeScheduler(String name) { - synchronized (schedulers) { - Scheduler s = schedulers.remove(name); - if (s != null) { - s.stop(); - } - } - return null; - } - - public Collection<Scheduler> listScheduler(String name) { - List<Scheduler> s = new LinkedList<Scheduler>(); - synchronized (schedulers) { - for (Scheduler ss : schedulers.values()) { - s.add(ss); - } - } - return s; - } - - @Override - public void jobStarted(Scheduler scheduler, Job job) { - logger.info("Job " + job.getJobName() + " started by scheduler " + scheduler.getName()); - - } - - @Override - public void jobFinished(Scheduler scheduler, Job job) { - logger.info("Job " + job.getJobName() + " finished by scheduler " + scheduler.getName()); - - } - - - -} http://git-wip-us.apache.org/repos/asf/incubator-zeppelin/blob/669d408d/zeppelin-interpreter/src/main/java/com/nflabs/zeppelin/scheduler/SchedulerListener.java ---------------------------------------------------------------------- diff --git a/zeppelin-interpreter/src/main/java/com/nflabs/zeppelin/scheduler/SchedulerListener.java b/zeppelin-interpreter/src/main/java/com/nflabs/zeppelin/scheduler/SchedulerListener.java deleted file mode 100644 index d551679..0000000 --- a/zeppelin-interpreter/src/main/java/com/nflabs/zeppelin/scheduler/SchedulerListener.java +++ /dev/null @@ -1,13 +0,0 @@ -package com.nflabs.zeppelin.scheduler; - -/** - * TODO(moon) : add description. - * - * @author Leemoonsoo - * - */ -public interface SchedulerListener { - public void jobStarted(Scheduler scheduler, Job job); - - public void jobFinished(Scheduler scheduler, Job job); -} http://git-wip-us.apache.org/repos/asf/incubator-zeppelin/blob/669d408d/zeppelin-interpreter/src/main/java/org/apache/zeppelin/display/GUI.java ---------------------------------------------------------------------- diff --git a/zeppelin-interpreter/src/main/java/org/apache/zeppelin/display/GUI.java b/zeppelin-interpreter/src/main/java/org/apache/zeppelin/display/GUI.java new file mode 100644 index 0000000..8ae7630 --- /dev/null +++ b/zeppelin-interpreter/src/main/java/org/apache/zeppelin/display/GUI.java @@ -0,0 +1,85 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.zeppelin.display; + +import java.io.Serializable; +import java.util.HashMap; +import java.util.Map; +import java.util.TreeMap; + +import org.apache.zeppelin.display.Input.ParamOption; + +/** + * Settings of a form. + * + * @author Leemoonsoo + * + */ +public class GUI implements Serializable { + + Map<String, Object> params = new HashMap<String, Object>(); // form parameters from client + Map<String, Input> forms = new TreeMap<String, Input>(); // form configuration + + public GUI() { + + } + + public void setParams(Map<String, Object> values) { + this.params = values; + } + + public Map<String, Object> getParams() { + return params; + } + + public Map<String, Input> getForms() { + return forms; + } + + public void setForms(Map<String, Input> forms) { + this.forms = forms; + } + + public Object input(String id, Object defaultValue) { + // first find values from client and then use default + Object value = params.get(id); + if (value == null) { + value = defaultValue; + } + + forms.put(id, new Input(id, defaultValue)); + return value; + } + + public Object input(String id) { + return input(id, ""); + } + + public Object select(String id, Object defaultValue, ParamOption[] options) { + Object value = params.get(id); + if (value == null) { + value = defaultValue; + } + forms.put(id, new Input(id, defaultValue, options)); + return value; + } + + public void clear() { + this.forms = new TreeMap<String, Input>(); + } +} http://git-wip-us.apache.org/repos/asf/incubator-zeppelin/blob/669d408d/zeppelin-interpreter/src/main/java/org/apache/zeppelin/display/Input.java ---------------------------------------------------------------------- diff --git a/zeppelin-interpreter/src/main/java/org/apache/zeppelin/display/Input.java b/zeppelin-interpreter/src/main/java/org/apache/zeppelin/display/Input.java new file mode 100644 index 0000000..2f7858c --- /dev/null +++ b/zeppelin-interpreter/src/main/java/org/apache/zeppelin/display/Input.java @@ -0,0 +1,476 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.zeppelin.display; + +import java.io.Serializable; +import java.util.ArrayList; +import java.util.HashMap; +import java.util.LinkedList; +import java.util.List; +import java.util.Map; +import java.util.regex.Matcher; +import java.util.regex.Pattern; + +/** + * Input type. + * + * @author Leemoonsoo + * + */ +public class Input implements Serializable { + /** + * Parameters option. + * + * @author Leemoonsoo + * + */ + public static class ParamOption { + Object value; + String displayName; + + public ParamOption(Object value, String displayName) { + super(); + this.value = value; + this.displayName = displayName; + } + + public Object getValue() { + return value; + } + + public void setValue(Object value) { + this.value = value; + } + + public String getDisplayName() { + return displayName; + } + + public void setDisplayName(String displayName) { + this.displayName = displayName; + } + + } + + String name; + String displayName; + String type; + Object defaultValue; + ParamOption[] options; + boolean hidden; + + public Input(String name, Object defaultValue) { + this.name = name; + this.displayName = name; + this.defaultValue = defaultValue; + } + + public Input(String name, Object defaultValue, ParamOption[] options) { + this.name = name; + this.displayName = name; + this.defaultValue = defaultValue; + this.options = options; + } + + + public Input(String name, String displayName, String type, Object defaultValue, + ParamOption[] options, boolean hidden) { + super(); + this.name = name; + this.displayName = displayName; + this.type = type; + this.defaultValue = defaultValue; + this.options = options; + this.hidden = hidden; + } + + @Override + public boolean equals(Object o) { + return name.equals(((Input) o).getName()); + } + + public String getName() { + return name; + } + + public void setName(String name) { + this.name = name; + } + + public String getDisplayName() { + return displayName; + } + + public void setDisplayName(String displayName) { + this.displayName = displayName; + } + + public String getType() { + return type; + } + + public void setType(String type) { + this.type = type; + } + + public Object getDefaultValue() { + return defaultValue; + } + + public void setDefaultValue(Object defaultValue) { + this.defaultValue = defaultValue; + } + + public ParamOption[] getOptions() { + return options; + } + + public void setOptions(ParamOption[] options) { + this.options = options; + } + + public boolean isHidden() { + return hidden; + } + + + private static String[] getNameAndDisplayName(String str) { + Pattern p = Pattern.compile("([^(]*)\\s*[(]([^)]*)[)]"); + Matcher m = p.matcher(str.trim()); + if (m == null || m.find() == false) { + return null; + } + String[] ret = new String[2]; + ret[0] = m.group(1); + ret[1] = m.group(2); + return ret; + } + + private static String[] getType(String str) { + Pattern p = Pattern.compile("([^:]*)\\s*:\\s*(.*)"); + Matcher m = p.matcher(str.trim()); + if (m == null || m.find() == false) { + return null; + } + String[] ret = new String[2]; + ret[0] = m.group(1).trim(); + ret[1] = m.group(2).trim(); + return ret; + } + + public static Map<String, Input> extractSimpleQueryParam(String script) { + Map<String, Input> params = new HashMap<String, Input>(); + if (script == null) { + return params; + } + String replaced = script; + + Pattern pattern = Pattern.compile("([_])?[$][{]([^=}]*([=][^}]*)?)[}]"); + + Matcher match = pattern.matcher(replaced); + while (match.find()) { + String hiddenPart = match.group(1); + boolean hidden = false; + if ("_".equals(hiddenPart)) { + hidden = true; + } + String m = match.group(2); + + String namePart; + String valuePart; + + int p = m.indexOf('='); + if (p > 0) { + namePart = m.substring(0, p); + valuePart = m.substring(p + 1); + } else { + namePart = m; + valuePart = null; + } + + + String varName; + String displayName = null; + String type = null; + String defaultValue = ""; + ParamOption[] paramOptions = null; + + // get var name type + String varNamePart; + String[] typeArray = getType(namePart); + if (typeArray != null) { + type = typeArray[0]; + varNamePart = typeArray[1]; + } else { + varNamePart = namePart; + } + + // get var name and displayname + String[] varNameArray = getNameAndDisplayName(varNamePart); + if (varNameArray != null) { + varName = varNameArray[0]; + displayName = varNameArray[1]; + } else { + varName = varNamePart.trim(); + } + + // get defaultValue + if (valuePart != null) { + // find default value + int optionP = valuePart.indexOf(","); + if (optionP > 0) { // option available + defaultValue = valuePart.substring(0, optionP); + String optionPart = valuePart.substring(optionP + 1); + String[] options = Input.splitPipe(optionPart); + + paramOptions = new ParamOption[options.length]; + + for (int i = 0; i < options.length; i++) { + + String[] optNameArray = getNameAndDisplayName(options[i]); + if (optNameArray != null) { + paramOptions[i] = new ParamOption(optNameArray[0], optNameArray[1]); + } else { + paramOptions[i] = new ParamOption(options[i], null); + } + } + + + } else { // no option + defaultValue = valuePart; + } + + } + + Input param = new Input(varName, displayName, type, defaultValue, paramOptions, hidden); + params.put(varName, param); + } + + params.remove("pql"); + return params; + } + + public static String getSimpleQuery(Map<String, Object> params, String script) { + String replaced = script; + + for (String key : params.keySet()) { + Object value = params.get(key); + replaced = + replaced.replaceAll("[_]?[$][{]([^:]*[:])?" + key + "([(][^)]*[)])?(=[^}]*)?[}]", + value.toString()); + } + + Pattern pattern = Pattern.compile("[$][{]([^=}]*[=][^}]*)[}]"); + while (true) { + Matcher match = pattern.matcher(replaced); + if (match != null && match.find()) { + String m = match.group(1); + int p = m.indexOf('='); + String replacement = m.substring(p + 1); + int optionP = replacement.indexOf(","); + if (optionP > 0) { + replacement = replacement.substring(0, optionP); + } + replaced = + replaced.replaceFirst("[_]?[$][{]" + + m.replaceAll("[(]", ".").replaceAll("[)]", ".").replaceAll("[|]", ".") + "[}]", + replacement); + } else { + break; + } + } + + replaced = replaced.replace("[_]?[$][{]([^=}]*)[}]", ""); + return replaced; + } + + + public static String[] split(String str) { + return str.split(";(?=([^\"']*\"[^\"']*\")*[^\"']*$)"); + + } + + /* + * public static String [] splitPipe(String str){ //return + * str.split("\\|(?=([^\"']*\"[^\"']*\")*[^\"']*$)"); return + * str.split("\\|(?=([^\"']*\"[^\"']*\")*[^\"']*$)"); } + */ + + + public static String[] splitPipe(String str) { + return split(str, '|'); + } + + public static String[] split(String str, char split) { + return split(str, new String[] {String.valueOf(split)}, false); + } + + public static String[] split(String str, String[] splitters, boolean includeSplitter) { + String escapeSeq = "\"',;${}"; + char escapeChar = '\\'; + + String[] blockStart = new String[] {"\"", "'", "${", "N_(", "N_<"}; + String[] blockEnd = new String[] {"\"", "'", "}", "N_)", "N_>"}; + + return split(str, escapeSeq, escapeChar, blockStart, blockEnd, splitters, includeSplitter); + + } + + public static String[] split(String str, String escapeSeq, char escapeChar, String[] blockStart, + String[] blockEnd, String[] splitters, boolean includeSplitter) { + + List<String> splits = new ArrayList<String>(); + + String curString = ""; + + boolean escape = false; // true when escape char is found + int lastEscapeOffset = -1; + int blockStartPos = -1; + List<Integer> blockStack = new LinkedList<Integer>(); + + for (int i = 0; i < str.length(); i++) { + char c = str.charAt(i); + + // escape char detected + if (c == escapeChar && escape == false) { + escape = true; + continue; + } + + // escaped char comes + if (escape == true) { + if (escapeSeq.indexOf(c) < 0) { + curString += escapeChar; + } + curString += c; + escape = false; + lastEscapeOffset = curString.length(); + continue; + } + + if (blockStack.size() > 0) { // inside of block + curString += c; + // check multichar block + boolean multicharBlockDetected = false; + for (int b = 0; b < blockStart.length; b++) { + if (blockStartPos >= 0 + && getBlockStr(blockStart[b]).compareTo(str.substring(blockStartPos, i)) == 0) { + blockStack.remove(0); + blockStack.add(0, b); + multicharBlockDetected = true; + break; + } + } + + if (multicharBlockDetected == true) { + continue; + } + + // check if current block is nestable + if (isNestedBlock(blockStart[blockStack.get(0)]) == true) { + // try to find nested block start + + if (curString.substring(lastEscapeOffset + 1).endsWith( + getBlockStr(blockStart[blockStack.get(0)])) == true) { + blockStack.add(0, blockStack.get(0)); // block is started + blockStartPos = i; + continue; + } + } + + // check if block is finishing + if (curString.substring(lastEscapeOffset + 1).endsWith( + getBlockStr(blockEnd[blockStack.get(0)]))) { + // the block closer is one of the splitters (and not nested block) + if (isNestedBlock(blockEnd[blockStack.get(0)]) == false) { + for (String splitter : splitters) { + if (splitter.compareTo(getBlockStr(blockEnd[blockStack.get(0)])) == 0) { + splits.add(curString); + if (includeSplitter == true) { + splits.add(splitter); + } + curString = ""; + lastEscapeOffset = -1; + + break; + } + } + } + blockStartPos = -1; + blockStack.remove(0); + continue; + } + + } else { // not in the block + boolean splitted = false; + for (String splitter : splitters) { + // forward check for splitter + int curentLenght = i + splitter.length(); + if (splitter.compareTo(str.substring(i, Math.min(curentLenght, str.length()))) == 0) { + splits.add(curString); + if (includeSplitter == true) { + splits.add(splitter); + } + curString = ""; + lastEscapeOffset = -1; + i += splitter.length() - 1; + splitted = true; + break; + } + } + if (splitted == true) { + continue; + } + + // add char to current string + curString += c; + + // check if block is started + for (int b = 0; b < blockStart.length; b++) { + if (curString.substring(lastEscapeOffset + 1) + .endsWith(getBlockStr(blockStart[b])) == true) { + blockStack.add(0, b); // block is started + blockStartPos = i; + break; + } + } + } + } + if (curString.length() > 0) { + splits.add(curString.trim()); + } + return splits.toArray(new String[] {}); + + } + + private static String getBlockStr(String blockDef) { + if (blockDef.startsWith("N_")) { + return blockDef.substring("N_".length()); + } else { + return blockDef; + } + } + + private static boolean isNestedBlock(String blockDef) { + if (blockDef.startsWith("N_")) { + return true; + } else { + return false; + } + } +} http://git-wip-us.apache.org/repos/asf/incubator-zeppelin/blob/669d408d/zeppelin-interpreter/src/main/java/org/apache/zeppelin/interpreter/ClassloaderInterpreter.java ---------------------------------------------------------------------- diff --git a/zeppelin-interpreter/src/main/java/org/apache/zeppelin/interpreter/ClassloaderInterpreter.java b/zeppelin-interpreter/src/main/java/org/apache/zeppelin/interpreter/ClassloaderInterpreter.java new file mode 100644 index 0000000..d3d6c1c --- /dev/null +++ b/zeppelin-interpreter/src/main/java/org/apache/zeppelin/interpreter/ClassloaderInterpreter.java @@ -0,0 +1,278 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.zeppelin.interpreter; + +import java.net.URL; +import java.util.List; +import java.util.Properties; + +import org.apache.zeppelin.scheduler.Scheduler; + +/** + * Add to the classpath interpreters. + * + */ +public class ClassloaderInterpreter + extends Interpreter + implements WrappedInterpreter { + + private ClassLoader cl; + private Interpreter intp; + + public ClassloaderInterpreter(Interpreter intp, ClassLoader cl) { + super(new Properties()); + this.cl = cl; + this.intp = intp; + } + + @Override + public Interpreter getInnerInterpreter() { + return intp; + } + + public ClassLoader getClassloader() { + return cl; + } + + @Override + public InterpreterResult interpret(String st, InterpreterContext context) { + ClassLoader oldcl = Thread.currentThread().getContextClassLoader(); + Thread.currentThread().setContextClassLoader(cl); + try { + return intp.interpret(st, context); + } catch (Exception e) { + e.printStackTrace(); + throw new InterpreterException(e); + } finally { + cl = Thread.currentThread().getContextClassLoader(); + Thread.currentThread().setContextClassLoader(oldcl); + } + } + + + @Override + public void open() { + ClassLoader oldcl = Thread.currentThread().getContextClassLoader(); + Thread.currentThread().setContextClassLoader(cl); + try { + intp.open(); + } catch (Exception e) { + throw new InterpreterException(e); + } finally { + cl = Thread.currentThread().getContextClassLoader(); + Thread.currentThread().setContextClassLoader(oldcl); + } + } + + @Override + public void close() { + ClassLoader oldcl = Thread.currentThread().getContextClassLoader(); + Thread.currentThread().setContextClassLoader(cl); + try { + intp.close(); + } catch (Exception e) { + throw new InterpreterException(e); + } finally { + cl = Thread.currentThread().getContextClassLoader(); + Thread.currentThread().setContextClassLoader(oldcl); + } + } + + @Override + public void cancel(InterpreterContext context) { + ClassLoader oldcl = Thread.currentThread().getContextClassLoader(); + Thread.currentThread().setContextClassLoader(cl); + try { + intp.cancel(context); + } catch (Exception e) { + throw new InterpreterException(e); + } finally { + cl = Thread.currentThread().getContextClassLoader(); + Thread.currentThread().setContextClassLoader(oldcl); + } + } + + @Override + public FormType getFormType() { + ClassLoader oldcl = Thread.currentThread().getContextClassLoader(); + Thread.currentThread().setContextClassLoader(cl); + try { + return intp.getFormType(); + } catch (Exception e) { + throw new InterpreterException(e); + } finally { + cl = Thread.currentThread().getContextClassLoader(); + Thread.currentThread().setContextClassLoader(oldcl); + } + } + + @Override + public int getProgress(InterpreterContext context) { + ClassLoader oldcl = Thread.currentThread().getContextClassLoader(); + Thread.currentThread().setContextClassLoader(cl); + try { + return intp.getProgress(context); + } catch (Exception e) { + throw new InterpreterException(e); + } finally { + cl = Thread.currentThread().getContextClassLoader(); + Thread.currentThread().setContextClassLoader(oldcl); + } + } + + @Override + public Scheduler getScheduler() { + ClassLoader oldcl = Thread.currentThread().getContextClassLoader(); + Thread.currentThread().setContextClassLoader(cl); + try { + return intp.getScheduler(); + } catch (Exception e) { + throw new InterpreterException(e); + } finally { + cl = Thread.currentThread().getContextClassLoader(); + Thread.currentThread().setContextClassLoader(oldcl); + } + } + + @Override + public List<String> completion(String buf, int cursor) { + ClassLoader oldcl = Thread.currentThread().getContextClassLoader(); + Thread.currentThread().setContextClassLoader(cl); + try { + return intp.completion(buf, cursor); + } catch (Exception e) { + throw new InterpreterException(e); + } finally { + cl = Thread.currentThread().getContextClassLoader(); + Thread.currentThread().setContextClassLoader(oldcl); + } + } + + + @Override + public String getClassName() { + ClassLoader oldcl = Thread.currentThread().getContextClassLoader(); + Thread.currentThread().setContextClassLoader(cl); + try { + return intp.getClassName(); + } catch (Exception e) { + throw new InterpreterException(e); + } finally { + cl = Thread.currentThread().getContextClassLoader(); + Thread.currentThread().setContextClassLoader(oldcl); + } + } + + @Override + public void setInterpreterGroup(InterpreterGroup interpreterGroup) { + ClassLoader oldcl = Thread.currentThread().getContextClassLoader(); + Thread.currentThread().setContextClassLoader(cl); + try { + intp.setInterpreterGroup(interpreterGroup); + } catch (Exception e) { + throw new InterpreterException(e); + } finally { + cl = Thread.currentThread().getContextClassLoader(); + Thread.currentThread().setContextClassLoader(oldcl); + } + } + + @Override + public InterpreterGroup getInterpreterGroup() { + ClassLoader oldcl = Thread.currentThread().getContextClassLoader(); + Thread.currentThread().setContextClassLoader(cl); + try { + return intp.getInterpreterGroup(); + } catch (Exception e) { + throw new InterpreterException(e); + } finally { + cl = Thread.currentThread().getContextClassLoader(); + Thread.currentThread().setContextClassLoader(oldcl); + } + } + + @Override + public void setClassloaderUrls(URL [] urls) { + ClassLoader oldcl = Thread.currentThread().getContextClassLoader(); + Thread.currentThread().setContextClassLoader(cl); + try { + intp.setClassloaderUrls(urls); + } catch (Exception e) { + throw new InterpreterException(e); + } finally { + cl = Thread.currentThread().getContextClassLoader(); + Thread.currentThread().setContextClassLoader(oldcl); + } + } + + @Override + public URL [] getClassloaderUrls() { + ClassLoader oldcl = Thread.currentThread().getContextClassLoader(); + Thread.currentThread().setContextClassLoader(cl); + try { + return intp.getClassloaderUrls(); + } catch (Exception e) { + throw new InterpreterException(e); + } finally { + cl = Thread.currentThread().getContextClassLoader(); + Thread.currentThread().setContextClassLoader(oldcl); + } + } + + @Override + public void setProperty(Properties property) { + ClassLoader oldcl = Thread.currentThread().getContextClassLoader(); + Thread.currentThread().setContextClassLoader(cl); + try { + intp.setProperty(property); + } catch (Exception e) { + throw new InterpreterException(e); + } finally { + cl = Thread.currentThread().getContextClassLoader(); + Thread.currentThread().setContextClassLoader(oldcl); + } + } + + @Override + public Properties getProperty() { + ClassLoader oldcl = Thread.currentThread().getContextClassLoader(); + Thread.currentThread().setContextClassLoader(cl); + try { + return intp.getProperty(); + } catch (Exception e) { + throw new InterpreterException(e); + } finally { + cl = Thread.currentThread().getContextClassLoader(); + Thread.currentThread().setContextClassLoader(oldcl); + } + } + + @Override + public String getProperty(String key) { + ClassLoader oldcl = Thread.currentThread().getContextClassLoader(); + Thread.currentThread().setContextClassLoader(cl); + try { + return intp.getProperty(key); + } catch (Exception e) { + throw new InterpreterException(e); + } finally { + cl = Thread.currentThread().getContextClassLoader(); + Thread.currentThread().setContextClassLoader(oldcl); + } + } +} http://git-wip-us.apache.org/repos/asf/incubator-zeppelin/blob/669d408d/zeppelin-interpreter/src/main/java/org/apache/zeppelin/interpreter/Interpreter.java ---------------------------------------------------------------------- diff --git a/zeppelin-interpreter/src/main/java/org/apache/zeppelin/interpreter/Interpreter.java b/zeppelin-interpreter/src/main/java/org/apache/zeppelin/interpreter/Interpreter.java new file mode 100644 index 0000000..58dcb64 --- /dev/null +++ b/zeppelin-interpreter/src/main/java/org/apache/zeppelin/interpreter/Interpreter.java @@ -0,0 +1,283 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.zeppelin.interpreter; + + +import java.net.URL; +import java.util.Collections; +import java.util.HashMap; +import java.util.List; +import java.util.Map; +import java.util.Properties; + +import org.apache.zeppelin.scheduler.Scheduler; +import org.apache.zeppelin.scheduler.SchedulerFactory; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +/** + * Interface for interpreters. + * If you want to implement new Zeppelin interpreter, extend this class + * + * Please see, + * http://zeppelin.incubator.apache.org/docs/development/writingzeppelininterpreter.html + * + * open(), close(), interpreter() is three the most important method you need to implement. + * cancel(), getProgress(), completion() is good to have + * getFormType(), getScheduler() determine Zeppelin's behavior + * + */ +public abstract class Interpreter { + + /** + * Opens interpreter. You may want to place your initialize routine here. + * open() is called only once + */ + public abstract void open(); + + /** + * Closes interpreter. You may want to free your resources up here. + * close() is called only once + */ + public abstract void close(); + + /** + * Run code and return result, in synchronous way. + * + * @param st statements to run + * @param context + * @return + */ + public abstract InterpreterResult interpret(String st, InterpreterContext context); + + /** + * Optionally implement the canceling routine to abort interpret() method + * + * @param context + */ + public abstract void cancel(InterpreterContext context); + + /** + * Dynamic form handling + * see http://zeppelin.incubator.apache.org/docs/dynamicform.html + * + * @return FormType.SIMPLE enables simple pattern replacement (eg. Hello ${name=world}), + * FormType.NATIVE handles form in API + */ + public abstract FormType getFormType(); + + /** + * get interpret() method running process in percentage. + * + * @param context + * @return number between 0-100 + */ + public abstract int getProgress(InterpreterContext context); + + /** + * Get completion list based on cursor position. + * By implementing this method, it enables auto-completion. + * + * @param buf statements + * @param cursor cursor position in statements + * @return list of possible completion. Return empty list if there're nothing to return. + */ + public abstract List<String> completion(String buf, int cursor); + + /** + * Interpreter can implements it's own scheduler by overriding this method. + * There're two default scheduler provided, FIFO, Parallel. + * If your interpret() can handle concurrent request, use Parallel or use FIFO. + * + * You can get default scheduler by using + * SchedulerFactory.singleton().createOrGetFIFOScheduler() + * SchedulerFactory.singleton().createOrGetParallelScheduler() + * + * + * @return return scheduler instance. + * This method can be called multiple times and have to return the same instance. + * Can not return null. + */ + public Scheduler getScheduler() { + return SchedulerFactory.singleton().createOrGetFIFOScheduler("interpreter_" + this.hashCode()); + } + + /** + * Called when interpreter is no longer used. + */ + public void destroy() { + getScheduler().stop(); + } + + + + + + static Logger logger = LoggerFactory.getLogger(Interpreter.class); + private InterpreterGroup interpreterGroup; + private URL [] classloaderUrls; + protected Properties property; + + public Interpreter(Properties property) { + this.property = property; + } + + public void setProperty(Properties property) { + this.property = property; + } + + public Properties getProperty() { + Properties p = new Properties(); + p.putAll(property); + + Map<String, InterpreterProperty> defaultProperties = Interpreter + .findRegisteredInterpreterByClassName(getClassName()).getProperties(); + for (String k : defaultProperties.keySet()) { + if (!p.contains(k)) { + String value = defaultProperties.get(k).getDefaultValue(); + if (value != null) { + p.put(k, defaultProperties.get(k).getDefaultValue()); + } + } + } + + return property; + } + + public String getProperty(String key) { + if (property.containsKey(key)) { + return property.getProperty(key); + } + + Map<String, InterpreterProperty> defaultProperties = Interpreter + .findRegisteredInterpreterByClassName(getClassName()).getProperties(); + if (defaultProperties.containsKey(key)) { + return defaultProperties.get(key).getDefaultValue(); + } + + return null; + } + + + public String getClassName() { + return this.getClass().getName(); + } + + public void setInterpreterGroup(InterpreterGroup interpreterGroup) { + this.interpreterGroup = interpreterGroup; + } + + public InterpreterGroup getInterpreterGroup() { + return this.interpreterGroup; + } + + public URL[] getClassloaderUrls() { + return classloaderUrls; + } + + public void setClassloaderUrls(URL[] classloaderUrls) { + this.classloaderUrls = classloaderUrls; + } + + + /** + * Type of interpreter. + */ + public static enum FormType { + NATIVE, SIMPLE, NONE + } + + /** + * Represent registered interpreter class + */ + public static class RegisteredInterpreter { + private String name; + private String group; + private String className; + private Map<String, InterpreterProperty> properties; + private String path; + + public RegisteredInterpreter(String name, String group, String className, + Map<String, InterpreterProperty> properties) { + super(); + this.name = name; + this.group = group; + this.className = className; + this.properties = properties; + } + + public String getName() { + return name; + } + + public String getGroup() { + return group; + } + + public String getClassName() { + return className; + } + + public Map<String, InterpreterProperty> getProperties() { + return properties; + } + + public void setPath(String path) { + this.path = path; + } + + public String getPath() { + return path; + } + + } + + /** + * Type of Scheduling. + */ + public static enum SchedulingMode { + FIFO, PARALLEL + } + + public static Map<String, RegisteredInterpreter> registeredInterpreters = Collections + .synchronizedMap(new HashMap<String, RegisteredInterpreter>()); + + public static void register(String name, String className) { + register(name, name, className); + } + + public static void register(String name, String group, String className) { + register(name, group, className, new HashMap<String, InterpreterProperty>()); + } + + public static void register(String name, String group, String className, + Map<String, InterpreterProperty> properties) { + registeredInterpreters.put(name, new RegisteredInterpreter(name, group, className, properties)); + } + + public static RegisteredInterpreter findRegisteredInterpreterByClassName(String className) { + for (RegisteredInterpreter ri : registeredInterpreters.values()) { + if (ri.getClassName().equals(className)) { + return ri; + } + } + return null; + } + + +} http://git-wip-us.apache.org/repos/asf/incubator-zeppelin/blob/669d408d/zeppelin-interpreter/src/main/java/org/apache/zeppelin/interpreter/InterpreterContext.java ---------------------------------------------------------------------- diff --git a/zeppelin-interpreter/src/main/java/org/apache/zeppelin/interpreter/InterpreterContext.java b/zeppelin-interpreter/src/main/java/org/apache/zeppelin/interpreter/InterpreterContext.java new file mode 100644 index 0000000..2d70c8e --- /dev/null +++ b/zeppelin-interpreter/src/main/java/org/apache/zeppelin/interpreter/InterpreterContext.java @@ -0,0 +1,68 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.zeppelin.interpreter; + +import java.util.Map; + +import org.apache.zeppelin.display.GUI; + +/** + * Interpreter context + */ +public class InterpreterContext { + private final String paragraphTitle; + private final String paragraphId; + private final String paragraphText; + private final Map<String, Object> config; + private GUI gui; + + + public InterpreterContext(String paragraphId, + String paragraphTitle, + String paragraphText, + Map<String, Object> config, + GUI gui + ) { + this.paragraphId = paragraphId; + this.paragraphTitle = paragraphTitle; + this.paragraphText = paragraphText; + this.config = config; + this.gui = gui; + } + + public String getParagraphId() { + return paragraphId; + } + + public String getParagraphText() { + return paragraphText; + } + + public String getParagraphTitle() { + return paragraphTitle; + } + + public Map<String, Object> getConfig() { + return config; + } + + public GUI getGui() { + return gui; + } + +} http://git-wip-us.apache.org/repos/asf/incubator-zeppelin/blob/669d408d/zeppelin-interpreter/src/main/java/org/apache/zeppelin/interpreter/InterpreterException.java ---------------------------------------------------------------------- diff --git a/zeppelin-interpreter/src/main/java/org/apache/zeppelin/interpreter/InterpreterException.java b/zeppelin-interpreter/src/main/java/org/apache/zeppelin/interpreter/InterpreterException.java new file mode 100644 index 0000000..30c1c0a --- /dev/null +++ b/zeppelin-interpreter/src/main/java/org/apache/zeppelin/interpreter/InterpreterException.java @@ -0,0 +1,34 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.zeppelin.interpreter; + +/** + * Runtime Exception for interpreters. + * + */ +public class InterpreterException extends RuntimeException { + + public InterpreterException(Throwable e) { + super(e); + } + + public InterpreterException(String m) { + super(m); + } + +} http://git-wip-us.apache.org/repos/asf/incubator-zeppelin/blob/669d408d/zeppelin-interpreter/src/main/java/org/apache/zeppelin/interpreter/InterpreterGroup.java ---------------------------------------------------------------------- diff --git a/zeppelin-interpreter/src/main/java/org/apache/zeppelin/interpreter/InterpreterGroup.java b/zeppelin-interpreter/src/main/java/org/apache/zeppelin/interpreter/InterpreterGroup.java new file mode 100644 index 0000000..834630a --- /dev/null +++ b/zeppelin-interpreter/src/main/java/org/apache/zeppelin/interpreter/InterpreterGroup.java @@ -0,0 +1,65 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.zeppelin.interpreter; + +import java.util.LinkedList; +import java.util.Properties; +import java.util.Random; + +/** + * InterpreterGroup is list of interpreters in the same group. + * And unit of interpreter instantiate, restart, bind, unbind. + */ +public class InterpreterGroup extends LinkedList<Interpreter>{ + String id; + + private static String generateId() { + return "InterpreterGroup_" + System.currentTimeMillis() + "_" + + new Random().nextInt(); + } + + public String getId() { + synchronized (this) { + if (id == null) { + id = generateId(); + } + return id; + } + } + + + public Properties getProperty() { + Properties p = new Properties(); + for (Interpreter intp : this) { + p.putAll(intp.getProperty()); + } + return p; + } + + public void close() { + for (Interpreter intp : this) { + intp.close(); + } + } + + public void destroy() { + for (Interpreter intp : this) { + intp.destroy(); + } + } +} http://git-wip-us.apache.org/repos/asf/incubator-zeppelin/blob/669d408d/zeppelin-interpreter/src/main/java/org/apache/zeppelin/interpreter/InterpreterProperty.java ---------------------------------------------------------------------- diff --git a/zeppelin-interpreter/src/main/java/org/apache/zeppelin/interpreter/InterpreterProperty.java b/zeppelin-interpreter/src/main/java/org/apache/zeppelin/interpreter/InterpreterProperty.java new file mode 100644 index 0000000..cc13ace --- /dev/null +++ b/zeppelin-interpreter/src/main/java/org/apache/zeppelin/interpreter/InterpreterProperty.java @@ -0,0 +1,49 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.zeppelin.interpreter; + +/** + * Represent property of interpreter + */ +public class InterpreterProperty { + String defaultValue; + String description; + + public InterpreterProperty(String defaultValue, + String description) { + super(); + this.defaultValue = defaultValue; + this.description = description; + } + + public String getDefaultValue() { + return defaultValue; + } + + public void setDefaultValue(String defaultValue) { + this.defaultValue = defaultValue; + } + + public String getDescription() { + return description; + } + + public void setDescription(String description) { + this.description = description; + } +} http://git-wip-us.apache.org/repos/asf/incubator-zeppelin/blob/669d408d/zeppelin-interpreter/src/main/java/org/apache/zeppelin/interpreter/InterpreterPropertyBuilder.java ---------------------------------------------------------------------- diff --git a/zeppelin-interpreter/src/main/java/org/apache/zeppelin/interpreter/InterpreterPropertyBuilder.java b/zeppelin-interpreter/src/main/java/org/apache/zeppelin/interpreter/InterpreterPropertyBuilder.java new file mode 100644 index 0000000..f077b4e --- /dev/null +++ b/zeppelin-interpreter/src/main/java/org/apache/zeppelin/interpreter/InterpreterPropertyBuilder.java @@ -0,0 +1,37 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.zeppelin.interpreter; + +import java.util.HashMap; +import java.util.Map; + +/** + * InterpreterPropertyBuilder + */ +public class InterpreterPropertyBuilder { + Map<String, InterpreterProperty> properties = new HashMap<String, InterpreterProperty>(); + + public InterpreterPropertyBuilder add(String name, String defaultValue, String description){ + properties.put(name, new InterpreterProperty(defaultValue, description)); + return this; + } + + public Map<String, InterpreterProperty> build(){ + return properties; + } +} http://git-wip-us.apache.org/repos/asf/incubator-zeppelin/blob/669d408d/zeppelin-interpreter/src/main/java/org/apache/zeppelin/interpreter/InterpreterResult.java ---------------------------------------------------------------------- diff --git a/zeppelin-interpreter/src/main/java/org/apache/zeppelin/interpreter/InterpreterResult.java b/zeppelin-interpreter/src/main/java/org/apache/zeppelin/interpreter/InterpreterResult.java new file mode 100644 index 0000000..0659a47 --- /dev/null +++ b/zeppelin-interpreter/src/main/java/org/apache/zeppelin/interpreter/InterpreterResult.java @@ -0,0 +1,137 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.zeppelin.interpreter; + +import java.io.Serializable; + +/** + * Interpreter result template. + * + * @author Leemoonsoo + * + */ +public class InterpreterResult implements Serializable { + + /** + * Type of result after code execution. + * + * @author Leemoonsoo + * + */ + public static enum Code { + SUCCESS, + INCOMPLETE, + ERROR + } + + /** + * Type of Data. + * + * @author Leemoonsoo + * + */ + public static enum Type { + TEXT, + HTML, + TABLE, + IMG, + SVG, + NULL + } + + Code code; + Type type; + String msg; + + public InterpreterResult(Code code) { + this.code = code; + this.msg = null; + this.type = Type.TEXT; + } + + public InterpreterResult(Code code, String msg) { + this.code = code; + this.msg = getData(msg); + this.type = getType(msg); + } + + public InterpreterResult(Code code, Type type, String msg) { + this.code = code; + this.msg = msg; + this.type = type; + } + + /** + * Magic is like %html %text. + * + * @param msg + * @return + */ + private String getData(String msg) { + if (msg == null) { + return null; + } + + Type[] types = Type.values(); + for (Type t : types) { + String magic = "%" + t.name().toLowerCase(); + if (msg.startsWith(magic + " ") || msg.startsWith(magic + "\n")) { + int magicLength = magic.length() + 1; + if (msg.length() > magicLength) { + return msg.substring(magicLength); + } else { + return ""; + } + } + } + + return msg; + } + + + private Type getType(String msg) { + if (msg == null) { + return Type.TEXT; + } + Type[] types = Type.values(); + for (Type t : types) { + String magic = "%" + t.name().toLowerCase(); + if (msg.startsWith(magic + " ") || msg.startsWith(magic + "\n")) { + return t; + } + } + return Type.TEXT; + } + + public Code code() { + return code; + } + + public String message() { + return msg; + } + + public Type type() { + return type; + } + + public InterpreterResult type(Type type) { + this.type = type; + return this; + } +} http://git-wip-us.apache.org/repos/asf/incubator-zeppelin/blob/669d408d/zeppelin-interpreter/src/main/java/org/apache/zeppelin/interpreter/InterpreterUtils.java ---------------------------------------------------------------------- diff --git a/zeppelin-interpreter/src/main/java/org/apache/zeppelin/interpreter/InterpreterUtils.java b/zeppelin-interpreter/src/main/java/org/apache/zeppelin/interpreter/InterpreterUtils.java new file mode 100644 index 0000000..c3d3b9e --- /dev/null +++ b/zeppelin-interpreter/src/main/java/org/apache/zeppelin/interpreter/InterpreterUtils.java @@ -0,0 +1,36 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.zeppelin.interpreter; + +import java.lang.reflect.InvocationTargetException; + +/** + * Interpreter utility functions + */ +public class InterpreterUtils { + + public static String getMostRelevantMessage(Exception ex) { + if (ex instanceof InvocationTargetException) { + Throwable cause = ((InvocationTargetException) ex).getCause(); + if (cause != null) { + return cause.getMessage(); + } + } + return ex.getMessage(); + } +} http://git-wip-us.apache.org/repos/asf/incubator-zeppelin/blob/669d408d/zeppelin-interpreter/src/main/java/org/apache/zeppelin/interpreter/LazyOpenInterpreter.java ---------------------------------------------------------------------- diff --git a/zeppelin-interpreter/src/main/java/org/apache/zeppelin/interpreter/LazyOpenInterpreter.java b/zeppelin-interpreter/src/main/java/org/apache/zeppelin/interpreter/LazyOpenInterpreter.java new file mode 100644 index 0000000..599a24a --- /dev/null +++ b/zeppelin-interpreter/src/main/java/org/apache/zeppelin/interpreter/LazyOpenInterpreter.java @@ -0,0 +1,148 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.zeppelin.interpreter; + +import java.net.URL; +import java.util.List; +import java.util.Properties; + +import org.apache.zeppelin.scheduler.Scheduler; + +/** + * Interpreter wrapper for lazy initialization + */ +public class LazyOpenInterpreter + extends Interpreter + implements WrappedInterpreter { + private Interpreter intp; + boolean opened = false; + + public LazyOpenInterpreter(Interpreter intp) { + super(new Properties()); + this.intp = intp; + } + + @Override + public Interpreter getInnerInterpreter() { + return intp; + } + + @Override + public void setProperty(Properties property) { + intp.setProperty(property); + } + + @Override + public Properties getProperty() { + return intp.getProperty(); + } + + @Override + public String getProperty(String key) { + return intp.getProperty(key); + } + + @Override + public void open() { + if (opened == true) { + return; + } + + synchronized (intp) { + if (opened == false) { + intp.open(); + opened = true; + } + } + } + + @Override + public void close() { + synchronized (intp) { + if (opened == true) { + intp.close(); + opened = false; + } + } + } + + public boolean isOpen() { + synchronized (intp) { + return opened; + } + } + + @Override + public InterpreterResult interpret(String st, InterpreterContext context) { + open(); + return intp.interpret(st, context); + } + + @Override + public void cancel(InterpreterContext context) { + open(); + intp.cancel(context); + } + + @Override + public FormType getFormType() { + return intp.getFormType(); + } + + @Override + public int getProgress(InterpreterContext context) { + open(); + return intp.getProgress(context); + } + + @Override + public Scheduler getScheduler() { + return intp.getScheduler(); + } + + @Override + public List<String> completion(String buf, int cursor) { + open(); + return intp.completion(buf, cursor); + } + + @Override + public String getClassName() { + return intp.getClassName(); + } + + @Override + public InterpreterGroup getInterpreterGroup() { + return intp.getInterpreterGroup(); + } + + @Override + public void setInterpreterGroup(InterpreterGroup interpreterGroup) { + intp.setInterpreterGroup(interpreterGroup); + } + + @Override + public URL [] getClassloaderUrls() { + return intp.getClassloaderUrls(); + } + + @Override + public void setClassloaderUrls(URL [] urls) { + intp.setClassloaderUrls(urls); + } +} http://git-wip-us.apache.org/repos/asf/incubator-zeppelin/blob/669d408d/zeppelin-interpreter/src/main/java/org/apache/zeppelin/interpreter/WrappedInterpreter.java ---------------------------------------------------------------------- diff --git a/zeppelin-interpreter/src/main/java/org/apache/zeppelin/interpreter/WrappedInterpreter.java b/zeppelin-interpreter/src/main/java/org/apache/zeppelin/interpreter/WrappedInterpreter.java new file mode 100644 index 0000000..a12a9aa --- /dev/null +++ b/zeppelin-interpreter/src/main/java/org/apache/zeppelin/interpreter/WrappedInterpreter.java @@ -0,0 +1,25 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.zeppelin.interpreter; + +/** + * WrappedInterpreter + */ +public interface WrappedInterpreter { + public Interpreter getInnerInterpreter(); +}
