http://git-wip-us.apache.org/repos/asf/incubator-zeppelin/blob/669d408d/zeppelin-interpreter/src/main/java/org/apache/zeppelin/scheduler/FIFOScheduler.java ---------------------------------------------------------------------- diff --git a/zeppelin-interpreter/src/main/java/org/apache/zeppelin/scheduler/FIFOScheduler.java b/zeppelin-interpreter/src/main/java/org/apache/zeppelin/scheduler/FIFOScheduler.java new file mode 100644 index 0000000..e7f950a --- /dev/null +++ b/zeppelin-interpreter/src/main/java/org/apache/zeppelin/scheduler/FIFOScheduler.java @@ -0,0 +1,151 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.zeppelin.scheduler; + +import java.util.Collection; +import java.util.LinkedList; +import java.util.List; +import java.util.concurrent.ExecutorService; + +import org.apache.zeppelin.scheduler.Job.Status; + +/** + * TODO(moon) : add description. + * + * @author Leemoonsoo + * + */ +public class FIFOScheduler implements Scheduler { + List<Job> queue = new LinkedList<Job>(); + private ExecutorService executor; + private SchedulerListener listener; + boolean terminate = false; + Job runningJob = null; + private String name; + + public FIFOScheduler(String name, ExecutorService executor, SchedulerListener listener) { + this.name = name; + this.executor = executor; + this.listener = listener; + } + + @Override + public String getName() { + return name; + } + + @Override + public Collection<Job> getJobsWaiting() { + List<Job> ret = new LinkedList<Job>(); + synchronized (queue) { + for (Job job : queue) { + ret.add(job); + } + } + return ret; + } + + @Override + public Collection<Job> getJobsRunning() { + List<Job> ret = new LinkedList<Job>(); + Job job = runningJob; + + if (job != null) { + ret.add(job); + } + + return ret; + } + + + + @Override + public void submit(Job job) { + job.setStatus(Status.PENDING); + synchronized (queue) { + queue.add(job); + queue.notify(); + } + } + + @Override + public void run() { + + synchronized (queue) { + while (terminate == false) { + if (runningJob != null || queue.isEmpty() == true) { + try { + queue.wait(500); + } catch (InterruptedException e) { + } + continue; + } + + runningJob = queue.remove(0); + + final Scheduler scheduler = this; + this.executor.execute(new Runnable() { + @Override + public void run() { + if (runningJob.isAborted()) { + runningJob.setStatus(Status.ABORT); + runningJob.aborted = false; + synchronized (queue) { + queue.notify(); + } + return; + } + + runningJob.setStatus(Status.RUNNING); + if (listener != null) { + listener.jobStarted(scheduler, runningJob); + } + runningJob.run(); + if (runningJob.isAborted()) { + runningJob.setStatus(Status.ABORT); + } else { + if (runningJob.getException() != null) { + runningJob.setStatus(Status.ERROR); + } else { + runningJob.setStatus(Status.FINISHED); + } + } + if (listener != null) { + listener.jobFinished(scheduler, runningJob); + } + // reset aborted flag to allow retry + runningJob.aborted = false; + runningJob = null; + synchronized (queue) { + queue.notify(); + } + } + }); + } + } + } + + @Override + public void stop() { + terminate = true; + synchronized (queue) { + queue.notify(); + } + } + +}
http://git-wip-us.apache.org/repos/asf/incubator-zeppelin/blob/669d408d/zeppelin-interpreter/src/main/java/org/apache/zeppelin/scheduler/Job.java ---------------------------------------------------------------------- diff --git a/zeppelin-interpreter/src/main/java/org/apache/zeppelin/scheduler/Job.java b/zeppelin-interpreter/src/main/java/org/apache/zeppelin/scheduler/Job.java new file mode 100644 index 0000000..9837ad2 --- /dev/null +++ b/zeppelin-interpreter/src/main/java/org/apache/zeppelin/scheduler/Job.java @@ -0,0 +1,263 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.zeppelin.scheduler; + +import java.text.SimpleDateFormat; +import java.util.Date; +import java.util.Map; + +import org.apache.zeppelin.interpreter.InterpreterResult; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + + +/** + * Skeletal implementation of the Job concept. + * - designed for inheritance + * - should be run on a separate thread + * - maintains internal state: it's status + * - supports listeners who are updated on status change + * + * Job class is serialized/deserialized and used server<->client communication + * and saving/loading jobs from disk. + * Changing/adding/deleting non transitive field name need consideration of that. + * + * @author Leemoonsoo + */ +public abstract class Job { + /** + * Job status. + * + * READY - Job is not running, ready to run. + * PENDING - Job is submitted to scheduler. but not running yet + * RUNNING - Job is running. + * FINISHED - Job finished run. with success + * ERROR - Job finished run. with error + * ABORT - Job finished by abort + * + */ + public static enum Status { + READY, + PENDING, + RUNNING, + FINISHED, + ERROR, + ABORT; + boolean isReady() { + return this == READY; + } + + boolean isRunning() { + return this == RUNNING; + } + + boolean isPending() { + return this == PENDING; + } + } + + private String jobName; + String id; + Object result; + Date dateCreated; + Date dateStarted; + Date dateFinished; + Status status; + + transient boolean aborted = false; + + String errorMessage; + private transient Throwable exception; + private transient JobListener listener; + private long progressUpdateIntervalMs; + + public Job(String jobName, JobListener listener, long progressUpdateIntervalMs) { + this.jobName = jobName; + this.listener = listener; + this.progressUpdateIntervalMs = progressUpdateIntervalMs; + + dateCreated = new Date(); + SimpleDateFormat dateFormat = new SimpleDateFormat("yyyyMMdd-HHmmss"); + id = dateFormat.format(dateCreated) + "_" + super.hashCode(); + + setStatus(Status.READY); + } + + public Job(String jobName, JobListener listener) { + this(jobName, listener, JobProgressPoller.DEFAULT_INTERVAL_MSEC); + } + + public Job(String jobId, String jobName, JobListener listener, long progressUpdateIntervalMs) { + this.jobName = jobName; + this.listener = listener; + this.progressUpdateIntervalMs = progressUpdateIntervalMs; + + id = jobId; + + setStatus(Status.READY); + } + + public String getId() { + return id; + } + + @Override + public int hashCode() { + return id.hashCode(); + } + + @Override + public boolean equals(Object o) { + return ((Job) o).hashCode() == hashCode(); + } + + public Status getStatus() { + return status; + } + + public void setStatus(Status status) { + if (this.status == status) { + return; + } + Status before = this.status; + Status after = status; + if (listener != null) { + listener.beforeStatusChange(this, before, after); + } + this.status = status; + if (listener != null) { + listener.afterStatusChange(this, before, after); + } + } + + public void setListener(JobListener listener) { + this.listener = listener; + } + + public JobListener getListener() { + return listener; + } + + public boolean isTerminated() { + return !this.status.isReady() && !this.status.isRunning() && !this.status.isPending(); + } + + public boolean isRunning() { + return this.status.isRunning(); + } + + public void run() { + JobProgressPoller progressUpdator = null; + try { + progressUpdator = new JobProgressPoller(this, progressUpdateIntervalMs); + progressUpdator.start(); + dateStarted = new Date(); + result = jobRun(); + this.exception = null; + errorMessage = null; + dateFinished = new Date(); + progressUpdator.terminate(); + } catch (NullPointerException e) { + logger().error("Job failed", e); + progressUpdator.terminate(); + this.exception = e; + result = e.getMessage(); + errorMessage = getStack(e); + dateFinished = new Date(); + } catch (Throwable e) { + logger().error("Job failed", e); + progressUpdator.terminate(); + this.exception = e; + result = e.getMessage(); + errorMessage = getStack(e); + dateFinished = new Date(); + } finally { + //aborted = false; + } + } + + public String getStack(Throwable e) { + StackTraceElement[] stacks = e.getStackTrace(); + if (stacks == null) { + return ""; + } + String ss = ""; + for (StackTraceElement s : stacks) { + ss += s.toString() + "\n"; + } + + return ss; + } + + public Throwable getException() { + return exception; + } + + protected void setException(Throwable t) { + exception = t; + errorMessage = getStack(t); + } + + public Object getReturn() { + return result; + } + + public String getJobName() { + return jobName; + } + + public void setJobName(String jobName) { + this.jobName = jobName; + } + + public abstract int progress(); + + public abstract Map<String, Object> info(); + + protected abstract Object jobRun() throws Throwable; + + protected abstract boolean jobAbort(); + + public void abort() { + aborted = jobAbort(); + } + + public boolean isAborted() { + return aborted; + } + + public Date getDateCreated() { + return dateCreated; + } + + public Date getDateStarted() { + return dateStarted; + } + + public Date getDateFinished() { + return dateFinished; + } + + private Logger logger() { + return LoggerFactory.getLogger(Job.class); + } + + protected void setResult(Object result) { + this.result = result; + } +} http://git-wip-us.apache.org/repos/asf/incubator-zeppelin/blob/669d408d/zeppelin-interpreter/src/main/java/org/apache/zeppelin/scheduler/JobListener.java ---------------------------------------------------------------------- diff --git a/zeppelin-interpreter/src/main/java/org/apache/zeppelin/scheduler/JobListener.java b/zeppelin-interpreter/src/main/java/org/apache/zeppelin/scheduler/JobListener.java new file mode 100644 index 0000000..1ed551f --- /dev/null +++ b/zeppelin-interpreter/src/main/java/org/apache/zeppelin/scheduler/JobListener.java @@ -0,0 +1,32 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.zeppelin.scheduler; + +/** + * TODO(moon) : add description. + * + * @author Leemoonsoo + * + */ +public interface JobListener { + public void onProgressUpdate(Job job, int progress); + + public void beforeStatusChange(Job job, Job.Status before, Job.Status after); + + public void afterStatusChange(Job job, Job.Status before, Job.Status after); +} http://git-wip-us.apache.org/repos/asf/incubator-zeppelin/blob/669d408d/zeppelin-interpreter/src/main/java/org/apache/zeppelin/scheduler/JobProgressPoller.java ---------------------------------------------------------------------- diff --git a/zeppelin-interpreter/src/main/java/org/apache/zeppelin/scheduler/JobProgressPoller.java b/zeppelin-interpreter/src/main/java/org/apache/zeppelin/scheduler/JobProgressPoller.java new file mode 100644 index 0000000..9de1325 --- /dev/null +++ b/zeppelin-interpreter/src/main/java/org/apache/zeppelin/scheduler/JobProgressPoller.java @@ -0,0 +1,70 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.zeppelin.scheduler; + +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +/** + * TODO(moon) : add description. + * + * @author Leemoonsoo + * + */ +public class JobProgressPoller extends Thread { + public static final long DEFAULT_INTERVAL_MSEC = 500; + Logger logger = LoggerFactory.getLogger(JobProgressPoller.class); + private Job job; + private long intervalMs; + boolean terminate = false; + + public JobProgressPoller(Job job, long intervalMs) { + this.job = job; + this.intervalMs = intervalMs; + } + + @Override + public void run() { + if (intervalMs < 0) { + return; + } else if (intervalMs == 0) { + intervalMs = DEFAULT_INTERVAL_MSEC; + } + + while (terminate == false) { + JobListener listener = job.getListener(); + if (listener != null) { + try { + if (job.isRunning()) { + listener.onProgressUpdate(job, job.progress()); + } + } catch (Exception e) { + logger.error("Can not get or update progress", e); + } + } + try { + Thread.sleep(intervalMs); + } catch (InterruptedException e) { + } + } + } + + public void terminate() { + terminate = true; + } +} http://git-wip-us.apache.org/repos/asf/incubator-zeppelin/blob/669d408d/zeppelin-interpreter/src/main/java/org/apache/zeppelin/scheduler/ParallelScheduler.java ---------------------------------------------------------------------- diff --git a/zeppelin-interpreter/src/main/java/org/apache/zeppelin/scheduler/ParallelScheduler.java b/zeppelin-interpreter/src/main/java/org/apache/zeppelin/scheduler/ParallelScheduler.java new file mode 100644 index 0000000..c8e8e04 --- /dev/null +++ b/zeppelin-interpreter/src/main/java/org/apache/zeppelin/scheduler/ParallelScheduler.java @@ -0,0 +1,179 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.zeppelin.scheduler; + +import java.util.Collection; +import java.util.LinkedList; +import java.util.List; +import java.util.concurrent.ExecutorService; + +import org.apache.zeppelin.scheduler.Job.Status; + +/** + * TODO(moon) : add description. + * + * @author Leemoonsoo + * + */ +public class ParallelScheduler implements Scheduler { + List<Job> queue = new LinkedList<Job>(); + List<Job> running = new LinkedList<Job>(); + private ExecutorService executor; + private SchedulerListener listener; + boolean terminate = false; + private String name; + private int maxConcurrency; + + public ParallelScheduler(String name, ExecutorService executor, SchedulerListener listener, + int maxConcurrency) { + this.name = name; + this.executor = executor; + this.listener = listener; + this.maxConcurrency = maxConcurrency; + } + + @Override + public String getName() { + return name; + } + + @Override + public Collection<Job> getJobsWaiting() { + List<Job> ret = new LinkedList<Job>(); + synchronized (queue) { + for (Job job : queue) { + ret.add(job); + } + } + return ret; + } + + @Override + public Collection<Job> getJobsRunning() { + List<Job> ret = new LinkedList<Job>(); + synchronized (queue) { + for (Job job : running) { + ret.add(job); + } + } + return ret; + } + + + + @Override + public void submit(Job job) { + job.setStatus(Status.PENDING); + synchronized (queue) { + queue.add(job); + queue.notify(); + } + } + + @Override + public void run() { + + synchronized (queue) { + while (terminate == false) { + if (running.size() >= maxConcurrency || queue.isEmpty() == true) { + try { + queue.wait(500); + } catch (InterruptedException e) { + } + continue; + } + + Job job = queue.remove(0); + running.add(job); + Scheduler scheduler = this; + + executor.execute(new JobRunner(scheduler, job)); + } + + + } + } + + public void setMaxConcurrency(int maxConcurrency) { + this.maxConcurrency = maxConcurrency; + synchronized (queue) { + queue.notify(); + } + } + + private class JobRunner implements Runnable { + private Scheduler scheduler; + private Job job; + + public JobRunner(Scheduler scheduler, Job job) { + this.scheduler = scheduler; + this.job = job; + } + + @Override + public void run() { + if (job.isAborted()) { + job.setStatus(Status.ABORT); + job.aborted = false; + + synchronized (queue) { + running.remove(job); + queue.notify(); + } + + return; + } + + job.setStatus(Status.RUNNING); + if (listener != null) { + listener.jobStarted(scheduler, job); + } + job.run(); + if (job.isAborted()) { + job.setStatus(Status.ABORT); + } else { + if (job.getException() != null) { + job.setStatus(Status.ERROR); + } else { + job.setStatus(Status.FINISHED); + } + } + + if (listener != null) { + listener.jobFinished(scheduler, job); + } + + // reset aborted flag to allow retry + job.aborted = false; + synchronized (queue) { + running.remove(job); + queue.notify(); + } + } + } + + + @Override + public void stop() { + terminate = true; + synchronized (queue) { + queue.notify(); + } + } + +} http://git-wip-us.apache.org/repos/asf/incubator-zeppelin/blob/669d408d/zeppelin-interpreter/src/main/java/org/apache/zeppelin/scheduler/RemoteScheduler.java ---------------------------------------------------------------------- diff --git a/zeppelin-interpreter/src/main/java/org/apache/zeppelin/scheduler/RemoteScheduler.java b/zeppelin-interpreter/src/main/java/org/apache/zeppelin/scheduler/RemoteScheduler.java new file mode 100644 index 0000000..15e4a3c --- /dev/null +++ b/zeppelin-interpreter/src/main/java/org/apache/zeppelin/scheduler/RemoteScheduler.java @@ -0,0 +1,373 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.zeppelin.scheduler; + +import java.util.Collection; +import java.util.LinkedList; +import java.util.List; +import java.util.concurrent.ExecutorService; + +import org.apache.thrift.TException; +import org.apache.zeppelin.interpreter.remote.RemoteInterpreterProcess; +import org.apache.zeppelin.interpreter.thrift.RemoteInterpreterService.Client; +import org.apache.zeppelin.scheduler.Job.Status; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +/** + * + */ +public class RemoteScheduler implements Scheduler { + Logger logger = LoggerFactory.getLogger(RemoteScheduler.class); + + List<Job> queue = new LinkedList<Job>(); + List<Job> running = new LinkedList<Job>(); + private ExecutorService executor; + private SchedulerListener listener; + boolean terminate = false; + private String name; + private int maxConcurrency; + private RemoteInterpreterProcess interpreterProcess; + + public RemoteScheduler(String name, ExecutorService executor, + RemoteInterpreterProcess interpreterProcess, SchedulerListener listener, + int maxConcurrency) { + this.name = name; + this.executor = executor; + this.listener = listener; + this.interpreterProcess = interpreterProcess; + this.maxConcurrency = maxConcurrency; + } + + @Override + public void run() { + while (terminate == false) { + Job job = null; + + synchronized (queue) { + if (running.size() >= maxConcurrency || queue.isEmpty() == true) { + try { + queue.wait(500); + } catch (InterruptedException e) { + } + continue; + } + + job = queue.remove(0); + running.add(job); + } + + // run + Scheduler scheduler = this; + JobRunner jobRunner = new JobRunner(scheduler, job); + executor.execute(jobRunner); + + // wait until it is submitted to the remote + while (!jobRunner.isJobSubmittedInRemote()) { + synchronized (queue) { + try { + queue.wait(500); + } catch (InterruptedException e) { + } + } + } + } + } + + @Override + public String getName() { + return name; + } + + @Override + public Collection<Job> getJobsWaiting() { + List<Job> ret = new LinkedList<Job>(); + synchronized (queue) { + for (Job job : queue) { + ret.add(job); + } + } + return ret; + } + + @Override + public Collection<Job> getJobsRunning() { + List<Job> ret = new LinkedList<Job>(); + synchronized (queue) { + for (Job job : running) { + ret.add(job); + } + } + return ret; + } + + @Override + public void submit(Job job) { + job.setStatus(Status.PENDING); + + synchronized (queue) { + queue.add(job); + queue.notify(); + } + } + + public void setMaxConcurrency(int maxConcurrency) { + this.maxConcurrency = maxConcurrency; + synchronized (queue) { + queue.notify(); + } + } + + /** + * Role of the class is get status info from remote process from PENDING to + * RUNNING status. + */ + private class JobStatusPoller extends Thread { + private long initialPeriodMsec; + private long initialPeriodCheckIntervalMsec; + private long checkIntervalMsec; + private boolean terminate; + private JobListener listener; + private Job job; + Status lastStatus; + + public JobStatusPoller(long initialPeriodMsec, + long initialPeriodCheckIntervalMsec, long checkIntervalMsec, Job job, + JobListener listener) { + this.initialPeriodMsec = initialPeriodMsec; + this.initialPeriodCheckIntervalMsec = initialPeriodCheckIntervalMsec; + this.checkIntervalMsec = checkIntervalMsec; + this.job = job; + this.listener = listener; + this.terminate = false; + } + + @Override + public void run() { + long started = System.currentTimeMillis(); + while (terminate == false) { + long current = System.currentTimeMillis(); + long interval; + if (current - started < initialPeriodMsec) { + interval = initialPeriodCheckIntervalMsec; + } else { + interval = checkIntervalMsec; + } + + synchronized (this) { + try { + this.wait(interval); + } catch (InterruptedException e) { + } + } + + + Status newStatus = getStatus(); + if (newStatus == null) { // unknown + continue; + } + + if (newStatus != Status.READY && newStatus != Status.PENDING) { + // we don't need more + continue; + } + } + } + + public void shutdown() { + terminate = true; + synchronized (this) { + this.notify(); + } + } + + + private Status getLastStatus() { + if (terminate == true) { + if (lastStatus != Status.FINISHED && + lastStatus != Status.ERROR && + lastStatus != Status.ABORT) { + return Status.FINISHED; + } else { + return (lastStatus == null) ? Status.FINISHED : lastStatus; + } + } else { + return (lastStatus == null) ? Status.FINISHED : lastStatus; + } + } + + public synchronized Job.Status getStatus() { + if (interpreterProcess.referenceCount() <= 0) { + return getLastStatus(); + } + + Client client; + try { + client = interpreterProcess.getClient(); + } catch (Exception e) { + logger.error("Can't get status information", e); + lastStatus = Status.ERROR; + return Status.ERROR; + } + + try { + String statusStr = client.getStatus(job.getId()); + if ("Unknown".equals(statusStr)) { + // not found this job in the remote schedulers. + // maybe not submitted, maybe already finished + Status status = getLastStatus(); + listener.afterStatusChange(job, null, status); + return status; + } + Status status = Status.valueOf(statusStr); + lastStatus = status; + listener.afterStatusChange(job, null, status); + return status; + } catch (TException e) { + logger.error("Can't get status information", e); + lastStatus = Status.ERROR; + return Status.ERROR; + } catch (Exception e) { + logger.error("Unknown status", e); + lastStatus = Status.ERROR; + return Status.ERROR; + } finally { + interpreterProcess.releaseClient(client); + } + } + } + + private class JobRunner implements Runnable, JobListener { + private Scheduler scheduler; + private Job job; + private boolean jobExecuted; + boolean jobSubmittedRemotely; + + public JobRunner(Scheduler scheduler, Job job) { + this.scheduler = scheduler; + this.job = job; + jobExecuted = false; + jobSubmittedRemotely = false; + } + + public boolean isJobSubmittedInRemote() { + return jobSubmittedRemotely; + } + + @Override + public void run() { + if (job.isAborted()) { + job.setStatus(Status.ABORT); + job.aborted = false; + + synchronized (queue) { + running.remove(job); + queue.notify(); + } + + return; + } + + JobStatusPoller jobStatusPoller = new JobStatusPoller(1500, 100, 500, + job, this); + jobStatusPoller.start(); + + if (listener != null) { + listener.jobStarted(scheduler, job); + } + job.run(); + jobExecuted = true; + jobSubmittedRemotely = true; + + jobStatusPoller.shutdown(); + try { + jobStatusPoller.join(); + } catch (InterruptedException e) { + logger.error("JobStatusPoller interrupted", e); + } + + job.setStatus(jobStatusPoller.getStatus()); + if (listener != null) { + listener.jobFinished(scheduler, job); + } + + // reset aborted flag to allow retry + job.aborted = false; + + synchronized (queue) { + running.remove(job); + queue.notify(); + } + } + + @Override + public void onProgressUpdate(Job job, int progress) { + } + + @Override + public void beforeStatusChange(Job job, Status before, Status after) { + } + + @Override + public void afterStatusChange(Job job, Status before, Status after) { + if (after == null) { // unknown. maybe before sumitted remotely, maybe already finished. + if (jobExecuted) { + jobSubmittedRemotely = true; + if (job.isAborted()) { + job.setStatus(Status.ABORT); + } else if (job.getException() != null) { + job.setStatus(Status.ERROR); + } else { + job.setStatus(Status.FINISHED); + } + } + return; + } + + + // Update remoteStatus + if (jobExecuted == false) { + if (after == Status.FINISHED || after == Status.ABORT + || after == Status.ERROR) { + // it can be status of last run. + // so not updating the remoteStatus + return; + } else if (after == Status.RUNNING) { + jobSubmittedRemotely = true; + } + } else { + jobSubmittedRemotely = true; + } + + // status polled by status poller + if (job.getStatus() != after) { + job.setStatus(after); + } + } + } + + @Override + public void stop() { + terminate = true; + synchronized (queue) { + queue.notify(); + } + + } + +} http://git-wip-us.apache.org/repos/asf/incubator-zeppelin/blob/669d408d/zeppelin-interpreter/src/main/java/org/apache/zeppelin/scheduler/Scheduler.java ---------------------------------------------------------------------- diff --git a/zeppelin-interpreter/src/main/java/org/apache/zeppelin/scheduler/Scheduler.java b/zeppelin-interpreter/src/main/java/org/apache/zeppelin/scheduler/Scheduler.java new file mode 100644 index 0000000..a886c22 --- /dev/null +++ b/zeppelin-interpreter/src/main/java/org/apache/zeppelin/scheduler/Scheduler.java @@ -0,0 +1,38 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.zeppelin.scheduler; + +import java.util.Collection; + +/** + * TODO(moon) : add description. + * + * @author Leemoonsoo + * + */ +public interface Scheduler extends Runnable { + public String getName(); + + public Collection<Job> getJobsWaiting(); + + public Collection<Job> getJobsRunning(); + + public void submit(Job job); + + public void stop(); +} http://git-wip-us.apache.org/repos/asf/incubator-zeppelin/blob/669d408d/zeppelin-interpreter/src/main/java/org/apache/zeppelin/scheduler/SchedulerFactory.java ---------------------------------------------------------------------- diff --git a/zeppelin-interpreter/src/main/java/org/apache/zeppelin/scheduler/SchedulerFactory.java b/zeppelin-interpreter/src/main/java/org/apache/zeppelin/scheduler/SchedulerFactory.java new file mode 100644 index 0000000..2556a81 --- /dev/null +++ b/zeppelin-interpreter/src/main/java/org/apache/zeppelin/scheduler/SchedulerFactory.java @@ -0,0 +1,145 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.zeppelin.scheduler; + +import java.util.Collection; +import java.util.LinkedHashMap; +import java.util.LinkedList; +import java.util.List; +import java.util.Map; +import java.util.concurrent.Executors; +import java.util.concurrent.ScheduledExecutorService; + +import org.apache.zeppelin.interpreter.remote.RemoteInterpreterProcess; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +/** + * TODO(moon) : add description. + * + * @author Leemoonsoo + * + */ +public class SchedulerFactory implements SchedulerListener { + private final Logger logger = LoggerFactory.getLogger(SchedulerFactory.class); + ScheduledExecutorService executor; + Map<String, Scheduler> schedulers = new LinkedHashMap<String, Scheduler>(); + + private static SchedulerFactory singleton; + private static Long singletonLock = new Long(0); + + public static SchedulerFactory singleton() { + if (singleton == null) { + synchronized (singletonLock) { + if (singleton == null) { + try { + singleton = new SchedulerFactory(); + } catch (Exception e) { + e.printStackTrace(); + } + } + } + } + return singleton; + } + + public SchedulerFactory() throws Exception { + executor = Executors.newScheduledThreadPool(100); + } + + public void destroy() { + executor.shutdown(); + } + + public Scheduler createOrGetFIFOScheduler(String name) { + synchronized (schedulers) { + if (schedulers.containsKey(name) == false) { + Scheduler s = new FIFOScheduler(name, executor, this); + schedulers.put(name, s); + executor.execute(s); + } + return schedulers.get(name); + } + } + + public Scheduler createOrGetParallelScheduler(String name, int maxConcurrency) { + synchronized (schedulers) { + if (schedulers.containsKey(name) == false) { + Scheduler s = new ParallelScheduler(name, executor, this, maxConcurrency); + schedulers.put(name, s); + executor.execute(s); + } + return schedulers.get(name); + } + } + + public Scheduler createOrGetRemoteScheduler( + String name, + RemoteInterpreterProcess interpreterProcess, + int maxConcurrency) { + + synchronized (schedulers) { + if (schedulers.containsKey(name) == false) { + Scheduler s = new RemoteScheduler( + name, + executor, + interpreterProcess, + this, + maxConcurrency); + schedulers.put(name, s); + executor.execute(s); + } + return schedulers.get(name); + } + } + + public Scheduler removeScheduler(String name) { + synchronized (schedulers) { + Scheduler s = schedulers.remove(name); + if (s != null) { + s.stop(); + } + } + return null; + } + + public Collection<Scheduler> listScheduler(String name) { + List<Scheduler> s = new LinkedList<Scheduler>(); + synchronized (schedulers) { + for (Scheduler ss : schedulers.values()) { + s.add(ss); + } + } + return s; + } + + @Override + public void jobStarted(Scheduler scheduler, Job job) { + logger.info("Job " + job.getJobName() + " started by scheduler " + scheduler.getName()); + + } + + @Override + public void jobFinished(Scheduler scheduler, Job job) { + logger.info("Job " + job.getJobName() + " finished by scheduler " + scheduler.getName()); + + } + + + +} http://git-wip-us.apache.org/repos/asf/incubator-zeppelin/blob/669d408d/zeppelin-interpreter/src/main/java/org/apache/zeppelin/scheduler/SchedulerListener.java ---------------------------------------------------------------------- diff --git a/zeppelin-interpreter/src/main/java/org/apache/zeppelin/scheduler/SchedulerListener.java b/zeppelin-interpreter/src/main/java/org/apache/zeppelin/scheduler/SchedulerListener.java new file mode 100644 index 0000000..6fdd176 --- /dev/null +++ b/zeppelin-interpreter/src/main/java/org/apache/zeppelin/scheduler/SchedulerListener.java @@ -0,0 +1,30 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.zeppelin.scheduler; + +/** + * TODO(moon) : add description. + * + * @author Leemoonsoo + * + */ +public interface SchedulerListener { + public void jobStarted(Scheduler scheduler, Job job); + + public void jobFinished(Scheduler scheduler, Job job); +} http://git-wip-us.apache.org/repos/asf/incubator-zeppelin/blob/669d408d/zeppelin-interpreter/src/main/thrift/RemoteInterpreterService.thrift ---------------------------------------------------------------------- diff --git a/zeppelin-interpreter/src/main/thrift/RemoteInterpreterService.thrift b/zeppelin-interpreter/src/main/thrift/RemoteInterpreterService.thrift index bbb54b1..051730e 100644 --- a/zeppelin-interpreter/src/main/thrift/RemoteInterpreterService.thrift +++ b/zeppelin-interpreter/src/main/thrift/RemoteInterpreterService.thrift @@ -1,4 +1,22 @@ -namespace java com.nflabs.zeppelin.interpreter.thrift +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +namespace java org.apache.zeppelin.interpreter.thrift struct RemoteInterpreterContext { http://git-wip-us.apache.org/repos/asf/incubator-zeppelin/blob/669d408d/zeppelin-interpreter/src/test/java/com/nflabs/zeppelin/display/InputTest.java ---------------------------------------------------------------------- diff --git a/zeppelin-interpreter/src/test/java/com/nflabs/zeppelin/display/InputTest.java b/zeppelin-interpreter/src/test/java/com/nflabs/zeppelin/display/InputTest.java deleted file mode 100644 index 091473b..0000000 --- a/zeppelin-interpreter/src/test/java/com/nflabs/zeppelin/display/InputTest.java +++ /dev/null @@ -1,24 +0,0 @@ -package com.nflabs.zeppelin.display; - -import static org.junit.Assert.*; - -import java.io.IOException; - -import org.junit.After; -import org.junit.Before; -import org.junit.Test; - -public class InputTest { - - @Before - public void setUp() throws Exception { - } - - @After - public void tearDown() throws Exception { - } - - @Test - public void testDefaultParamReplace() throws IOException{ - } -} http://git-wip-us.apache.org/repos/asf/incubator-zeppelin/blob/669d408d/zeppelin-interpreter/src/test/java/com/nflabs/zeppelin/interpreter/remote/RemoteInterpreterProcessTest.java ---------------------------------------------------------------------- diff --git a/zeppelin-interpreter/src/test/java/com/nflabs/zeppelin/interpreter/remote/RemoteInterpreterProcessTest.java b/zeppelin-interpreter/src/test/java/com/nflabs/zeppelin/interpreter/remote/RemoteInterpreterProcessTest.java deleted file mode 100644 index 181b1b0..0000000 --- a/zeppelin-interpreter/src/test/java/com/nflabs/zeppelin/interpreter/remote/RemoteInterpreterProcessTest.java +++ /dev/null @@ -1,46 +0,0 @@ -package com.nflabs.zeppelin.interpreter.remote; - -import static org.junit.Assert.assertEquals; -import static org.junit.Assert.assertFalse; - -import java.util.HashMap; - -import org.junit.Test; - -import com.nflabs.zeppelin.interpreter.thrift.RemoteInterpreterService.Client; - -public class RemoteInterpreterProcessTest { - - @Test - public void testStartStop() { - RemoteInterpreterProcess rip = new RemoteInterpreterProcess("../bin/interpreter.sh", "nonexists", new HashMap<String, String>()); - assertFalse(rip.isRunning()); - assertEquals(0, rip.referenceCount()); - assertEquals(1, rip.reference()); - assertEquals(2, rip.reference()); - assertEquals(true, rip.isRunning()); - assertEquals(1, rip.dereference()); - assertEquals(true, rip.isRunning()); - assertEquals(0, rip.dereference()); - assertEquals(false, rip.isRunning()); - } - - @Test - public void testClientFactory() throws Exception { - RemoteInterpreterProcess rip = new RemoteInterpreterProcess("../bin/interpreter.sh", "nonexists", new HashMap<String, String>()); - rip.reference(); - assertEquals(0, rip.getNumActiveClient()); - assertEquals(0, rip.getNumIdleClient()); - - Client client = rip.getClient(); - assertEquals(1, rip.getNumActiveClient()); - assertEquals(0, rip.getNumIdleClient()); - - rip.releaseClient(client); - assertEquals(0, rip.getNumActiveClient()); - assertEquals(1, rip.getNumIdleClient()); - - rip.dereference(); - } - -} http://git-wip-us.apache.org/repos/asf/incubator-zeppelin/blob/669d408d/zeppelin-interpreter/src/test/java/com/nflabs/zeppelin/interpreter/remote/RemoteInterpreterServerTest.java ---------------------------------------------------------------------- diff --git a/zeppelin-interpreter/src/test/java/com/nflabs/zeppelin/interpreter/remote/RemoteInterpreterServerTest.java b/zeppelin-interpreter/src/test/java/com/nflabs/zeppelin/interpreter/remote/RemoteInterpreterServerTest.java deleted file mode 100644 index 809c76e..0000000 --- a/zeppelin-interpreter/src/test/java/com/nflabs/zeppelin/interpreter/remote/RemoteInterpreterServerTest.java +++ /dev/null @@ -1,57 +0,0 @@ -package com.nflabs.zeppelin.interpreter.remote; - -import static org.junit.Assert.assertEquals; - -import java.io.IOException; - -import org.apache.thrift.TException; -import org.junit.After; -import org.junit.Before; -import org.junit.Test; - -public class RemoteInterpreterServerTest { - @Before - public void setUp() throws Exception { - } - - @After - public void tearDown() throws Exception { - } - - @Test - public void testStartStop() throws InterruptedException, IOException, TException { - RemoteInterpreterServer server = new RemoteInterpreterServer( - RemoteInterpreterUtils.findRandomAvailablePortOnAllLocalInterfaces()); - assertEquals(false, server.isRunning()); - - server.start(); - long startTime = System.currentTimeMillis(); - boolean running = false; - - while (System.currentTimeMillis() - startTime < 10 * 1000) { - if (server.isRunning()) { - running = true; - break; - } else { - Thread.sleep(200); - } - } - - assertEquals(true, running); - assertEquals(true, RemoteInterpreterUtils.checkIfRemoteEndpointAccessible("localhost", server.getPort())); - - server.shutdown(); - - while (System.currentTimeMillis() - startTime < 10 * 1000) { - if (server.isRunning()) { - Thread.sleep(200); - } else { - running = false; - break; - } - } - assertEquals(false, running); - } - - -} http://git-wip-us.apache.org/repos/asf/incubator-zeppelin/blob/669d408d/zeppelin-interpreter/src/test/java/com/nflabs/zeppelin/interpreter/remote/RemoteInterpreterTest.java ---------------------------------------------------------------------- diff --git a/zeppelin-interpreter/src/test/java/com/nflabs/zeppelin/interpreter/remote/RemoteInterpreterTest.java b/zeppelin-interpreter/src/test/java/com/nflabs/zeppelin/interpreter/remote/RemoteInterpreterTest.java deleted file mode 100644 index dcee6aa..0000000 --- a/zeppelin-interpreter/src/test/java/com/nflabs/zeppelin/interpreter/remote/RemoteInterpreterTest.java +++ /dev/null @@ -1,428 +0,0 @@ -package com.nflabs.zeppelin.interpreter.remote; - -import static org.junit.Assert.assertEquals; -import static org.junit.Assert.assertFalse; -import static org.junit.Assert.assertTrue; - -import java.io.File; -import java.io.IOException; -import java.util.HashMap; -import java.util.LinkedList; -import java.util.List; -import java.util.Map; -import java.util.Properties; - -import org.apache.thrift.transport.TTransportException; -import org.junit.After; -import org.junit.Before; -import org.junit.Test; - -import com.nflabs.zeppelin.display.GUI; -import com.nflabs.zeppelin.interpreter.InterpreterContext; -import com.nflabs.zeppelin.interpreter.InterpreterGroup; -import com.nflabs.zeppelin.interpreter.InterpreterResult; -import com.nflabs.zeppelin.interpreter.remote.mock.MockInterpreterA; -import com.nflabs.zeppelin.interpreter.remote.mock.MockInterpreterB; -import com.nflabs.zeppelin.scheduler.Job; -import com.nflabs.zeppelin.scheduler.Job.Status; -import com.nflabs.zeppelin.scheduler.Scheduler; - -public class RemoteInterpreterTest { - - - private InterpreterGroup intpGroup; - private HashMap<String, String> env; - - @Before - public void setUp() throws Exception { - intpGroup = new InterpreterGroup(); - env = new HashMap<String, String>(); - env.put("ZEPPELIN_CLASSPATH", new File("./target/test-classes").getAbsolutePath()); - } - - @After - public void tearDown() throws Exception { - intpGroup.clone(); - intpGroup.destroy(); - } - - @Test - public void testRemoteInterperterCall() throws TTransportException, IOException { - Properties p = new Properties(); - - RemoteInterpreter intpA = new RemoteInterpreter( - p, - MockInterpreterA.class.getName(), - new File("../bin/interpreter.sh").getAbsolutePath(), - "fake", - env - ); - - intpGroup.add(intpA); - intpA.setInterpreterGroup(intpGroup); - - RemoteInterpreter intpB = new RemoteInterpreter( - p, - MockInterpreterB.class.getName(), - new File("../bin/interpreter.sh").getAbsolutePath(), - "fake", - env - ); - - intpGroup.add(intpB); - intpB.setInterpreterGroup(intpGroup); - - - RemoteInterpreterProcess process = intpA.getInterpreterProcess(); - process.equals(intpB.getInterpreterProcess()); - - assertFalse(process.isRunning()); - assertEquals(0, process.getNumIdleClient()); - assertEquals(0, process.referenceCount()); - - intpA.open(); - assertTrue(process.isRunning()); - assertEquals(1, process.getNumIdleClient()); - assertEquals(1, process.referenceCount()); - - intpA.interpret("1", - new InterpreterContext( - "id", - "title", - "text", - new HashMap<String, Object>(), - new GUI())); - - intpB.open(); - assertEquals(2, process.referenceCount()); - - intpA.close(); - assertEquals(1, process.referenceCount()); - intpB.close(); - assertEquals(0, process.referenceCount()); - - assertFalse(process.isRunning()); - - } - - @Test - public void testRemoteSchedulerSharing() throws TTransportException, IOException { - Properties p = new Properties(); - - RemoteInterpreter intpA = new RemoteInterpreter( - p, - MockInterpreterA.class.getName(), - new File("../bin/interpreter.sh").getAbsolutePath(), - "fake", - env - ); - - intpGroup.add(intpA); - intpA.setInterpreterGroup(intpGroup); - - RemoteInterpreter intpB = new RemoteInterpreter( - p, - MockInterpreterB.class.getName(), - new File("../bin/interpreter.sh").getAbsolutePath(), - "fake", - env - ); - - intpGroup.add(intpB); - intpB.setInterpreterGroup(intpGroup); - - intpA.open(); - intpB.open(); - - long start = System.currentTimeMillis(); - InterpreterResult ret = intpA.interpret("500", - new InterpreterContext( - "id", - "title", - "text", - new HashMap<String, Object>(), - new GUI())); - assertEquals("500", ret.message()); - - ret = intpB.interpret("500", - new InterpreterContext( - "id", - "title", - "text", - new HashMap<String, Object>(), - new GUI())); - assertEquals("1000", ret.message()); - long end = System.currentTimeMillis(); - assertTrue(end - start >= 1000); - - - intpA.close(); - intpB.close(); - - RemoteInterpreterProcess process = intpA.getInterpreterProcess(); - assertFalse(process.isRunning()); - } - - @Test - public void testRemoteSchedulerSharingSubmit() throws TTransportException, IOException, InterruptedException { - Properties p = new Properties(); - - final RemoteInterpreter intpA = new RemoteInterpreter( - p, - MockInterpreterA.class.getName(), - new File("../bin/interpreter.sh").getAbsolutePath(), - "fake", - env - ); - - intpGroup.add(intpA); - intpA.setInterpreterGroup(intpGroup); - - final RemoteInterpreter intpB = new RemoteInterpreter( - p, - MockInterpreterB.class.getName(), - new File("../bin/interpreter.sh").getAbsolutePath(), - "fake", - env - ); - - intpGroup.add(intpB); - intpB.setInterpreterGroup(intpGroup); - - intpA.open(); - intpB.open(); - - long start = System.currentTimeMillis(); - Job jobA = new Job("jobA", null) { - - @Override - public int progress() { - return 0; - } - - @Override - public Map<String, Object> info() { - return null; - } - - @Override - protected Object jobRun() throws Throwable { - return intpA.interpret("500", - new InterpreterContext( - "jobA", - "title", - "text", - new HashMap<String, Object>(), - new GUI())); - } - - @Override - protected boolean jobAbort() { - return false; - } - - }; - intpA.getScheduler().submit(jobA); - - Job jobB = new Job("jobB", null) { - - @Override - public int progress() { - return 0; - } - - @Override - public Map<String, Object> info() { - return null; - } - - @Override - protected Object jobRun() throws Throwable { - return intpB.interpret("500", - new InterpreterContext( - "jobB", - "title", - "text", - new HashMap<String, Object>(), - new GUI())); - } - - @Override - protected boolean jobAbort() { - return false; - } - - }; - intpB.getScheduler().submit(jobB); - - // wait until both job finished - while (jobA.getStatus() != Status.FINISHED || - jobB.getStatus() != Status.FINISHED) { - Thread.sleep(100); - } - - long end = System.currentTimeMillis(); - assertTrue(end - start >= 1000); - - assertEquals("1000", ((InterpreterResult) jobB.getReturn()).message()); - - intpA.close(); - intpB.close(); - - RemoteInterpreterProcess process = intpA.getInterpreterProcess(); - assertFalse(process.isRunning()); - } - - @Test - public void testRunOrderPreserved() throws InterruptedException { - Properties p = new Properties(); - - final RemoteInterpreter intpA = new RemoteInterpreter( - p, - MockInterpreterA.class.getName(), - new File("../bin/interpreter.sh").getAbsolutePath(), - "fake", - env - ); - - intpGroup.add(intpA); - intpA.setInterpreterGroup(intpGroup); - - intpA.open(); - - int concurrency = 3; - final List<String> results = new LinkedList<String>(); - - Scheduler scheduler = intpA.getScheduler(); - for (int i = 0; i < concurrency; i++) { - final String jobId = Integer.toString(i); - scheduler.submit(new Job(jobId, Integer.toString(i), null, 200) { - - @Override - public int progress() { - return 0; - } - - @Override - public Map<String, Object> info() { - return null; - } - - @Override - protected Object jobRun() throws Throwable { - InterpreterResult ret = intpA.interpret(getJobName(), new InterpreterContext( - jobId, - "title", - "text", - new HashMap<String, Object>(), - new GUI())); - - synchronized (results) { - results.add(ret.message()); - results.notify(); - } - return null; - } - - @Override - protected boolean jobAbort() { - return false; - } - - }); - } - - // wait for job finished - synchronized (results) { - while (results.size() != concurrency) { - results.wait(300); - } - } - - int i = 0; - for (String result : results) { - assertEquals(Integer.toString(i++), result); - } - assertEquals(concurrency, i); - - intpA.close(); - } - - - @Test - public void testRunParallel() throws InterruptedException { - Properties p = new Properties(); - p.put("parallel", "true"); - - final RemoteInterpreter intpA = new RemoteInterpreter( - p, - MockInterpreterA.class.getName(), - new File("../bin/interpreter.sh").getAbsolutePath(), - "fake", - env - ); - - intpGroup.add(intpA); - intpA.setInterpreterGroup(intpGroup); - - intpA.open(); - - int concurrency = 4; - final int timeToSleep = 1000; - final List<String> results = new LinkedList<String>(); - long start = System.currentTimeMillis(); - - Scheduler scheduler = intpA.getScheduler(); - for (int i = 0; i < concurrency; i++) { - final String jobId = Integer.toString(i); - scheduler.submit(new Job(jobId, Integer.toString(i), null, 300) { - - @Override - public int progress() { - return 0; - } - - @Override - public Map<String, Object> info() { - return null; - } - - @Override - protected Object jobRun() throws Throwable { - String stmt = Integer.toString(timeToSleep); - InterpreterResult ret = intpA.interpret(stmt, new InterpreterContext( - jobId, - "title", - "text", - new HashMap<String, Object>(), - new GUI())); - - synchronized (results) { - results.add(ret.message()); - results.notify(); - } - return stmt; - } - - @Override - protected boolean jobAbort() { - return false; - } - - }); - } - - // wait for job finished - synchronized (results) { - while (results.size() != concurrency) { - results.wait(300); - } - } - - long end = System.currentTimeMillis(); - - assertTrue(end - start < timeToSleep * concurrency); - - intpA.close(); - } -} http://git-wip-us.apache.org/repos/asf/incubator-zeppelin/blob/669d408d/zeppelin-interpreter/src/test/java/com/nflabs/zeppelin/interpreter/remote/RemoteInterpreterUtilsTest.java ---------------------------------------------------------------------- diff --git a/zeppelin-interpreter/src/test/java/com/nflabs/zeppelin/interpreter/remote/RemoteInterpreterUtilsTest.java b/zeppelin-interpreter/src/test/java/com/nflabs/zeppelin/interpreter/remote/RemoteInterpreterUtilsTest.java deleted file mode 100644 index 3035cf2..0000000 --- a/zeppelin-interpreter/src/test/java/com/nflabs/zeppelin/interpreter/remote/RemoteInterpreterUtilsTest.java +++ /dev/null @@ -1,16 +0,0 @@ -package com.nflabs.zeppelin.interpreter.remote; - -import static org.junit.Assert.assertTrue; - -import java.io.IOException; - -import org.junit.Test; - -public class RemoteInterpreterUtilsTest { - - @Test - public void testFindRandomAvailablePortOnAllLocalInterfaces() throws IOException { - assertTrue(RemoteInterpreterUtils.findRandomAvailablePortOnAllLocalInterfaces() > 0); - } - -} http://git-wip-us.apache.org/repos/asf/incubator-zeppelin/blob/669d408d/zeppelin-interpreter/src/test/java/com/nflabs/zeppelin/interpreter/remote/mock/MockInterpreterA.java ---------------------------------------------------------------------- diff --git a/zeppelin-interpreter/src/test/java/com/nflabs/zeppelin/interpreter/remote/mock/MockInterpreterA.java b/zeppelin-interpreter/src/test/java/com/nflabs/zeppelin/interpreter/remote/mock/MockInterpreterA.java deleted file mode 100644 index 1df3979..0000000 --- a/zeppelin-interpreter/src/test/java/com/nflabs/zeppelin/interpreter/remote/mock/MockInterpreterA.java +++ /dev/null @@ -1,84 +0,0 @@ -package com.nflabs.zeppelin.interpreter.remote.mock; - -import java.util.List; -import java.util.Properties; - -import com.nflabs.zeppelin.interpreter.Interpreter; -import com.nflabs.zeppelin.interpreter.InterpreterContext; -import com.nflabs.zeppelin.interpreter.InterpreterException; -import com.nflabs.zeppelin.interpreter.InterpreterPropertyBuilder; -import com.nflabs.zeppelin.interpreter.InterpreterResult; -import com.nflabs.zeppelin.interpreter.InterpreterResult.Code; -import com.nflabs.zeppelin.scheduler.Scheduler; -import com.nflabs.zeppelin.scheduler.SchedulerFactory; - -public class MockInterpreterA extends Interpreter { - static { - Interpreter.register( - "interpreterA", - "group1", - MockInterpreterA.class.getName(), - new InterpreterPropertyBuilder() - .add("p1", "v1", "property1").build()); - - } - - private String lastSt; - - public MockInterpreterA(Properties property) { - super(property); - } - - @Override - public void open() { - //new RuntimeException().printStackTrace(); - } - - @Override - public void close() { - } - - public String getLastStatement() { - return lastSt; - } - - @Override - public InterpreterResult interpret(String st, InterpreterContext context) { - try { - Thread.sleep(Long.parseLong(st)); - this.lastSt = st; - } catch (NumberFormatException | InterruptedException e) { - throw new InterpreterException(e); - } - return new InterpreterResult(Code.SUCCESS, st); - } - - @Override - public void cancel(InterpreterContext context) { - - } - - @Override - public FormType getFormType() { - return FormType.NATIVE; - } - - @Override - public int getProgress(InterpreterContext context) { - return 0; - } - - @Override - public List<String> completion(String buf, int cursor) { - return null; - } - - @Override - public Scheduler getScheduler() { - if (getProperty("parallel") != null && getProperty("parallel").equals("true")) { - return SchedulerFactory.singleton().createOrGetParallelScheduler("interpreter_" + this.hashCode(), 10); - } else { - return SchedulerFactory.singleton().createOrGetFIFOScheduler("interpreter_" + this.hashCode()); - } - } -} http://git-wip-us.apache.org/repos/asf/incubator-zeppelin/blob/669d408d/zeppelin-interpreter/src/test/java/com/nflabs/zeppelin/interpreter/remote/mock/MockInterpreterB.java ---------------------------------------------------------------------- diff --git a/zeppelin-interpreter/src/test/java/com/nflabs/zeppelin/interpreter/remote/mock/MockInterpreterB.java b/zeppelin-interpreter/src/test/java/com/nflabs/zeppelin/interpreter/remote/mock/MockInterpreterB.java deleted file mode 100644 index 39f2ab8..0000000 --- a/zeppelin-interpreter/src/test/java/com/nflabs/zeppelin/interpreter/remote/mock/MockInterpreterB.java +++ /dev/null @@ -1,101 +0,0 @@ -package com.nflabs.zeppelin.interpreter.remote.mock; - -import java.util.List; -import java.util.Properties; - -import com.nflabs.zeppelin.interpreter.Interpreter; -import com.nflabs.zeppelin.interpreter.InterpreterContext; -import com.nflabs.zeppelin.interpreter.InterpreterException; -import com.nflabs.zeppelin.interpreter.InterpreterGroup; -import com.nflabs.zeppelin.interpreter.InterpreterPropertyBuilder; -import com.nflabs.zeppelin.interpreter.InterpreterResult; -import com.nflabs.zeppelin.interpreter.InterpreterResult.Code; -import com.nflabs.zeppelin.interpreter.WrappedInterpreter; -import com.nflabs.zeppelin.scheduler.Scheduler; - -public class MockInterpreterB extends Interpreter { - static { - Interpreter.register( - "interpreterB", - "group1", - MockInterpreterA.class.getName(), - new InterpreterPropertyBuilder() - .add("p1", "v1", "property1").build()); - - } - public MockInterpreterB(Properties property) { - super(property); - } - - @Override - public void open() { - //new RuntimeException().printStackTrace(); - } - - @Override - public void close() { - } - - @Override - public InterpreterResult interpret(String st, InterpreterContext context) { - MockInterpreterA intpA = getInterpreterA(); - String intpASt = intpA.getLastStatement(); - long timeToSleep = Long.parseLong(st); - if (intpASt != null) { - timeToSleep += Long.parseLong(intpASt); - } - try { - Thread.sleep(timeToSleep); - } catch (NumberFormatException | InterruptedException e) { - throw new InterpreterException(e); - } - return new InterpreterResult(Code.SUCCESS, Long.toString(timeToSleep)); - } - - @Override - public void cancel(InterpreterContext context) { - - } - - @Override - public FormType getFormType() { - return FormType.NATIVE; - } - - @Override - public int getProgress(InterpreterContext context) { - return 0; - } - - @Override - public List<String> completion(String buf, int cursor) { - return null; - } - - public MockInterpreterA getInterpreterA() { - InterpreterGroup interpreterGroup = getInterpreterGroup(); - for (Interpreter intp : interpreterGroup) { - if (intp.getClassName().equals(MockInterpreterA.class.getName())) { - Interpreter p = intp; - while (p instanceof WrappedInterpreter) { - p = ((WrappedInterpreter) p).getInnerInterpreter(); - } - return (MockInterpreterA) p; - } - } - return null; - } - - @Override - public Scheduler getScheduler() { - InterpreterGroup interpreterGroup = getInterpreterGroup(); - for (Interpreter intp : interpreterGroup) { - if (intp.getClassName().equals(MockInterpreterA.class.getName())) { - return intp.getScheduler(); - } - } - - return null; - } - -} http://git-wip-us.apache.org/repos/asf/incubator-zeppelin/blob/669d408d/zeppelin-interpreter/src/test/java/com/nflabs/zeppelin/scheduler/FIFOSchedulerTest.java ---------------------------------------------------------------------- diff --git a/zeppelin-interpreter/src/test/java/com/nflabs/zeppelin/scheduler/FIFOSchedulerTest.java b/zeppelin-interpreter/src/test/java/com/nflabs/zeppelin/scheduler/FIFOSchedulerTest.java deleted file mode 100644 index 37a29d1..0000000 --- a/zeppelin-interpreter/src/test/java/com/nflabs/zeppelin/scheduler/FIFOSchedulerTest.java +++ /dev/null @@ -1,74 +0,0 @@ -package com.nflabs.zeppelin.scheduler; - -import junit.framework.TestCase; - -import com.nflabs.zeppelin.scheduler.Job.Status; - -public class FIFOSchedulerTest extends TestCase { - - private SchedulerFactory schedulerSvc; - - @Override - public void setUp() throws Exception{ - schedulerSvc = new SchedulerFactory(); - } - - @Override - public void tearDown(){ - - } - - public void testRun() throws InterruptedException{ - Scheduler s = schedulerSvc.createOrGetFIFOScheduler("test"); - assertEquals(0, s.getJobsRunning().size()); - assertEquals(0, s.getJobsWaiting().size()); - - Job job1 = new SleepingJob("job1", null, 500); - Job job2 = new SleepingJob("job2", null, 500); - - s.submit(job1); - s.submit(job2); - Thread.sleep(200); - - assertEquals(Status.RUNNING, job1.getStatus()); - assertEquals(Status.PENDING, job2.getStatus()); - assertEquals(1, s.getJobsRunning().size()); - assertEquals(1, s.getJobsWaiting().size()); - - - Thread.sleep(500); - assertEquals(Status.FINISHED, job1.getStatus()); - assertEquals(Status.RUNNING, job2.getStatus()); - assertTrue((500 < (Long)job1.getReturn())); - assertEquals(1, s.getJobsRunning().size()); - assertEquals(0, s.getJobsWaiting().size()); - - } - - public void testAbort() throws InterruptedException{ - Scheduler s = schedulerSvc.createOrGetFIFOScheduler("test"); - assertEquals(0, s.getJobsRunning().size()); - assertEquals(0, s.getJobsWaiting().size()); - - Job job1 = new SleepingJob("job1", null, 500); - Job job2 = new SleepingJob("job2", null, 500); - - s.submit(job1); - s.submit(job2); - - Thread.sleep(200); - - job1.abort(); - job2.abort(); - - Thread.sleep(200); - - assertEquals(Status.ABORT, job1.getStatus()); - assertEquals(Status.ABORT, job2.getStatus()); - - assertTrue((500 > (Long)job1.getReturn())); - assertEquals(null, job2.getReturn()); - - - } -} \ No newline at end of file http://git-wip-us.apache.org/repos/asf/incubator-zeppelin/blob/669d408d/zeppelin-interpreter/src/test/java/com/nflabs/zeppelin/scheduler/ParallelSchedulerTest.java ---------------------------------------------------------------------- diff --git a/zeppelin-interpreter/src/test/java/com/nflabs/zeppelin/scheduler/ParallelSchedulerTest.java b/zeppelin-interpreter/src/test/java/com/nflabs/zeppelin/scheduler/ParallelSchedulerTest.java deleted file mode 100644 index f88de4c..0000000 --- a/zeppelin-interpreter/src/test/java/com/nflabs/zeppelin/scheduler/ParallelSchedulerTest.java +++ /dev/null @@ -1,49 +0,0 @@ -package com.nflabs.zeppelin.scheduler; - - -import com.nflabs.zeppelin.scheduler.Job.Status; - -import junit.framework.TestCase; -public class ParallelSchedulerTest extends TestCase { - - private SchedulerFactory schedulerSvc; - - public void setUp() throws Exception{ - schedulerSvc = new SchedulerFactory(); - } - - public void tearDown(){ - - } - - public void testRun() throws InterruptedException{ - Scheduler s = schedulerSvc.createOrGetParallelScheduler("test", 2); - assertEquals(0, s.getJobsRunning().size()); - assertEquals(0, s.getJobsWaiting().size()); - - Job job1 = new SleepingJob("job1", null, 500); - Job job2 = new SleepingJob("job2", null, 500); - Job job3 = new SleepingJob("job3", null, 500); - - s.submit(job1); - s.submit(job2); - s.submit(job3); - Thread.sleep(200); - - assertEquals(Status.RUNNING, job1.getStatus()); - assertEquals(Status.RUNNING, job2.getStatus()); - assertEquals(Status.PENDING, job3.getStatus()); - assertEquals(2, s.getJobsRunning().size()); - assertEquals(1, s.getJobsWaiting().size()); - - Thread.sleep(500); - - assertEquals(Status.FINISHED, job1.getStatus()); - assertEquals(Status.FINISHED, job2.getStatus()); - assertEquals(Status.RUNNING, job3.getStatus()); - assertEquals(1, s.getJobsRunning().size()); - assertEquals(0, s.getJobsWaiting().size()); - - } - -} http://git-wip-us.apache.org/repos/asf/incubator-zeppelin/blob/669d408d/zeppelin-interpreter/src/test/java/com/nflabs/zeppelin/scheduler/RemoteSchedulerTest.java ---------------------------------------------------------------------- diff --git a/zeppelin-interpreter/src/test/java/com/nflabs/zeppelin/scheduler/RemoteSchedulerTest.java b/zeppelin-interpreter/src/test/java/com/nflabs/zeppelin/scheduler/RemoteSchedulerTest.java deleted file mode 100644 index 35aa1d3..0000000 --- a/zeppelin-interpreter/src/test/java/com/nflabs/zeppelin/scheduler/RemoteSchedulerTest.java +++ /dev/null @@ -1,105 +0,0 @@ -package com.nflabs.zeppelin.scheduler; - -import static org.junit.Assert.assertEquals; - -import java.io.File; -import java.util.HashMap; -import java.util.Map; -import java.util.Properties; - -import org.junit.After; -import org.junit.Before; -import org.junit.Test; - -import com.nflabs.zeppelin.display.GUI; -import com.nflabs.zeppelin.interpreter.InterpreterContext; -import com.nflabs.zeppelin.interpreter.InterpreterGroup; -import com.nflabs.zeppelin.interpreter.remote.RemoteInterpreter; -import com.nflabs.zeppelin.interpreter.remote.mock.MockInterpreterA; - -public class RemoteSchedulerTest { - - private SchedulerFactory schedulerSvc; - - @Before - public void setUp() throws Exception{ - schedulerSvc = new SchedulerFactory(); - } - - @After - public void tearDown(){ - - } - - @Test - public void test() throws Exception { - Properties p = new Properties(); - InterpreterGroup intpGroup = new InterpreterGroup(); - Map<String, String> env = new HashMap<String, String>(); - env.put("ZEPPELIN_CLASSPATH", new File("./target/test-classes").getAbsolutePath()); - - final RemoteInterpreter intpA = new RemoteInterpreter( - p, - MockInterpreterA.class.getName(), - new File("../bin/interpreter.sh").getAbsolutePath(), - "fake", - env - ); - - intpGroup.add(intpA); - intpA.setInterpreterGroup(intpGroup); - - intpA.open(); - - Scheduler scheduler = schedulerSvc.createOrGetRemoteScheduler("test", - intpA.getInterpreterProcess(), - 10); - - Job job = new Job("jobId", "jobName", null, 200) { - - @Override - public int progress() { - return 0; - } - - @Override - public Map<String, Object> info() { - return null; - } - - @Override - protected Object jobRun() throws Throwable { - intpA.interpret("1000", new InterpreterContext( - "jobId", - "title", - "text", - new HashMap<String, Object>(), - new GUI())); - return "1000"; - } - - @Override - protected boolean jobAbort() { - return false; - } - }; - scheduler.submit(job); - - while (job.isRunning() == false) { - Thread.sleep(100); - } - - Thread.sleep(500); - assertEquals(0, scheduler.getJobsWaiting().size()); - assertEquals(1, scheduler.getJobsRunning().size()); - - Thread.sleep(500); - - assertEquals(0, scheduler.getJobsWaiting().size()); - assertEquals(0, scheduler.getJobsRunning().size()); - - intpA.close(); - schedulerSvc.removeScheduler("test"); - } - -} http://git-wip-us.apache.org/repos/asf/incubator-zeppelin/blob/669d408d/zeppelin-interpreter/src/test/java/com/nflabs/zeppelin/scheduler/SleepingJob.java ---------------------------------------------------------------------- diff --git a/zeppelin-interpreter/src/test/java/com/nflabs/zeppelin/scheduler/SleepingJob.java b/zeppelin-interpreter/src/test/java/com/nflabs/zeppelin/scheduler/SleepingJob.java deleted file mode 100644 index 42d0316..0000000 --- a/zeppelin-interpreter/src/test/java/com/nflabs/zeppelin/scheduler/SleepingJob.java +++ /dev/null @@ -1,51 +0,0 @@ -package com.nflabs.zeppelin.scheduler; - -import java.util.HashMap; -import java.util.Map; - -public class SleepingJob extends Job{ - - private int time; - boolean abort = false; - private long start; - private int count; - - - public SleepingJob(String jobName, JobListener listener, int time){ - super(jobName, listener); - this.time = time; - count = 0; - } - public Object jobRun() { - start = System.currentTimeMillis(); - while(abort==false){ - count++; - try { - Thread.sleep(10); - } catch (InterruptedException e) { - } - if(System.currentTimeMillis() - start>time) break; - } - return System.currentTimeMillis()-start; - } - - public boolean jobAbort() { - abort = true; - return true; - } - - public int progress() { - long p = (System.currentTimeMillis() - start)*100 / time; - if(p<0) p = 0; - if(p>100) p = 100; - return (int) p; - } - - public Map<String, Object> info() { - Map<String, Object> i = new HashMap<String, Object>(); - i.put("LoopCount", Integer.toString(count)); - return i; - } - - -} http://git-wip-us.apache.org/repos/asf/incubator-zeppelin/blob/669d408d/zeppelin-interpreter/src/test/java/org/apache/zeppelin/display/InputTest.java ---------------------------------------------------------------------- diff --git a/zeppelin-interpreter/src/test/java/org/apache/zeppelin/display/InputTest.java b/zeppelin-interpreter/src/test/java/org/apache/zeppelin/display/InputTest.java new file mode 100644 index 0000000..626ae99 --- /dev/null +++ b/zeppelin-interpreter/src/test/java/org/apache/zeppelin/display/InputTest.java @@ -0,0 +1,39 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.zeppelin.display; + +import java.io.IOException; + +import org.junit.After; +import org.junit.Before; +import org.junit.Test; + +public class InputTest { + + @Before + public void setUp() throws Exception { + } + + @After + public void tearDown() throws Exception { + } + + @Test + public void testDefaultParamReplace() throws IOException{ + } +} http://git-wip-us.apache.org/repos/asf/incubator-zeppelin/blob/669d408d/zeppelin-interpreter/src/test/java/org/apache/zeppelin/interpreter/remote/RemoteInterpreterProcessTest.java ---------------------------------------------------------------------- diff --git a/zeppelin-interpreter/src/test/java/org/apache/zeppelin/interpreter/remote/RemoteInterpreterProcessTest.java b/zeppelin-interpreter/src/test/java/org/apache/zeppelin/interpreter/remote/RemoteInterpreterProcessTest.java new file mode 100644 index 0000000..02dc224 --- /dev/null +++ b/zeppelin-interpreter/src/test/java/org/apache/zeppelin/interpreter/remote/RemoteInterpreterProcessTest.java @@ -0,0 +1,63 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.zeppelin.interpreter.remote; + +import static org.junit.Assert.assertEquals; +import static org.junit.Assert.assertFalse; + +import java.util.HashMap; + +import org.apache.zeppelin.interpreter.remote.RemoteInterpreterProcess; +import org.apache.zeppelin.interpreter.thrift.RemoteInterpreterService.Client; +import org.junit.Test; + +public class RemoteInterpreterProcessTest { + + @Test + public void testStartStop() { + RemoteInterpreterProcess rip = new RemoteInterpreterProcess("../bin/interpreter.sh", "nonexists", new HashMap<String, String>()); + assertFalse(rip.isRunning()); + assertEquals(0, rip.referenceCount()); + assertEquals(1, rip.reference()); + assertEquals(2, rip.reference()); + assertEquals(true, rip.isRunning()); + assertEquals(1, rip.dereference()); + assertEquals(true, rip.isRunning()); + assertEquals(0, rip.dereference()); + assertEquals(false, rip.isRunning()); + } + + @Test + public void testClientFactory() throws Exception { + RemoteInterpreterProcess rip = new RemoteInterpreterProcess("../bin/interpreter.sh", "nonexists", new HashMap<String, String>()); + rip.reference(); + assertEquals(0, rip.getNumActiveClient()); + assertEquals(0, rip.getNumIdleClient()); + + Client client = rip.getClient(); + assertEquals(1, rip.getNumActiveClient()); + assertEquals(0, rip.getNumIdleClient()); + + rip.releaseClient(client); + assertEquals(0, rip.getNumActiveClient()); + assertEquals(1, rip.getNumIdleClient()); + + rip.dereference(); + } + +} http://git-wip-us.apache.org/repos/asf/incubator-zeppelin/blob/669d408d/zeppelin-interpreter/src/test/java/org/apache/zeppelin/interpreter/remote/RemoteInterpreterServerTest.java ---------------------------------------------------------------------- diff --git a/zeppelin-interpreter/src/test/java/org/apache/zeppelin/interpreter/remote/RemoteInterpreterServerTest.java b/zeppelin-interpreter/src/test/java/org/apache/zeppelin/interpreter/remote/RemoteInterpreterServerTest.java new file mode 100644 index 0000000..af6b4bd --- /dev/null +++ b/zeppelin-interpreter/src/test/java/org/apache/zeppelin/interpreter/remote/RemoteInterpreterServerTest.java @@ -0,0 +1,76 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.zeppelin.interpreter.remote; + +import static org.junit.Assert.assertEquals; + +import java.io.IOException; + +import org.apache.thrift.TException; +import org.apache.zeppelin.interpreter.remote.RemoteInterpreterServer; +import org.apache.zeppelin.interpreter.remote.RemoteInterpreterUtils; +import org.junit.After; +import org.junit.Before; +import org.junit.Test; + +public class RemoteInterpreterServerTest { + @Before + public void setUp() throws Exception { + } + + @After + public void tearDown() throws Exception { + } + + @Test + public void testStartStop() throws InterruptedException, IOException, TException { + RemoteInterpreterServer server = new RemoteInterpreterServer( + RemoteInterpreterUtils.findRandomAvailablePortOnAllLocalInterfaces()); + assertEquals(false, server.isRunning()); + + server.start(); + long startTime = System.currentTimeMillis(); + boolean running = false; + + while (System.currentTimeMillis() - startTime < 10 * 1000) { + if (server.isRunning()) { + running = true; + break; + } else { + Thread.sleep(200); + } + } + + assertEquals(true, running); + assertEquals(true, RemoteInterpreterUtils.checkIfRemoteEndpointAccessible("localhost", server.getPort())); + + server.shutdown(); + + while (System.currentTimeMillis() - startTime < 10 * 1000) { + if (server.isRunning()) { + Thread.sleep(200); + } else { + running = false; + break; + } + } + assertEquals(false, running); + } + + +}
