http://git-wip-us.apache.org/repos/asf/zeppelin/blob/2a379102/zeppelin-interpreter/src/main/java/org/apache/zeppelin/interpreter/remote/RemoteInterpreter.java ---------------------------------------------------------------------- diff --git a/zeppelin-interpreter/src/main/java/org/apache/zeppelin/interpreter/remote/RemoteInterpreter.java b/zeppelin-interpreter/src/main/java/org/apache/zeppelin/interpreter/remote/RemoteInterpreter.java deleted file mode 100644 index bb90dd8..0000000 --- a/zeppelin-interpreter/src/main/java/org/apache/zeppelin/interpreter/remote/RemoteInterpreter.java +++ /dev/null @@ -1,371 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.zeppelin.interpreter.remote; - -import com.google.gson.Gson; -import com.google.gson.reflect.TypeToken; -import org.apache.thrift.TException; -import org.apache.zeppelin.conf.ZeppelinConfiguration; -import org.apache.zeppelin.display.AngularObject; -import org.apache.zeppelin.display.AngularObjectRegistry; -import org.apache.zeppelin.display.GUI; -import org.apache.zeppelin.display.Input; -import org.apache.zeppelin.interpreter.*; -import org.apache.zeppelin.interpreter.thrift.InterpreterCompletion; -import org.apache.zeppelin.interpreter.thrift.RemoteInterpreterContext; -import org.apache.zeppelin.interpreter.thrift.RemoteInterpreterResult; -import org.apache.zeppelin.interpreter.thrift.RemoteInterpreterResultMessage; -import org.apache.zeppelin.interpreter.thrift.RemoteInterpreterService.Client; -import org.apache.zeppelin.scheduler.Job; -import org.apache.zeppelin.scheduler.Scheduler; -import org.apache.zeppelin.scheduler.SchedulerFactory; -import org.slf4j.Logger; -import org.slf4j.LoggerFactory; - -import java.util.ArrayList; -import java.util.List; -import java.util.Map; -import java.util.Properties; - -/** - * Proxy for Interpreter instance that runs on separate process - */ -public class RemoteInterpreter extends Interpreter { - private static final Logger LOGGER = LoggerFactory.getLogger(RemoteInterpreter.class); - private static final Gson gson = new Gson(); - - - private String className; - private String sessionId; - private String userName; - private FormType formType; - - private RemoteInterpreterProcess interpreterProcess; - private volatile boolean isOpened = false; - private volatile boolean isCreated = false; - - /** - * Remote interpreter and manage interpreter process - */ - public RemoteInterpreter(Properties properties, - String sessionId, - String className, - String userName) { - super(properties); - this.sessionId = sessionId; - this.className = className; - this.userName = userName; - } - - public boolean isOpened() { - return isOpened; - } - - @Override - public String getClassName() { - return className; - } - - public String getSessionId() { - return this.sessionId; - } - - public synchronized RemoteInterpreterProcess getOrCreateInterpreterProcess() { - if (this.interpreterProcess != null) { - return this.interpreterProcess; - } - InterpreterGroup intpGroup = getInterpreterGroup(); - this.interpreterProcess = intpGroup.getOrCreateInterpreterProcess(); - synchronized (interpreterProcess) { - if (!interpreterProcess.isRunning()) { - interpreterProcess.start(userName, false); - interpreterProcess.getRemoteInterpreterEventPoller() - .setInterpreterProcess(interpreterProcess); - interpreterProcess.getRemoteInterpreterEventPoller().setInterpreterGroup(intpGroup); - interpreterProcess.getRemoteInterpreterEventPoller().start(); - } - } - return interpreterProcess; - } - - @Override - public void open() { - synchronized (this) { - if (!isOpened) { - // create all the interpreters of the same session first, then Open the internal interpreter - // of this RemoteInterpreter. - // The why we we create all the interpreter of the session is because some interpreter - // depends on other interpreter. e.g. PySparkInterpreter depends on SparkInterpreter. - // also see method Interpreter.getInterpreterInTheSameSessionByClassName - for (Interpreter interpreter : getInterpreterGroup().getOrCreateSession( - userName, sessionId)) { - ((RemoteInterpreter) interpreter).internal_create(); - } - - interpreterProcess.callRemoteFunction(new RemoteInterpreterProcess.RemoteFunction<Void>() { - @Override - public Void call(Client client) throws Exception { - LOGGER.info("Open RemoteInterpreter {}", getClassName()); - client.open(sessionId, className); - // Push angular object loaded from JSON file to remote interpreter - synchronized (getInterpreterGroup()) { - if (!getInterpreterGroup().isAngularRegistryPushed()) { - pushAngularObjectRegistryToRemote(client); - getInterpreterGroup().setAngularRegistryPushed(true); - } - } - return null; - } - }); - isOpened = true; - } - } - } - - private void internal_create() { - synchronized (this) { - if (!isCreated) { - RemoteInterpreterProcess interpreterProcess = getOrCreateInterpreterProcess(); - interpreterProcess.callRemoteFunction(new RemoteInterpreterProcess.RemoteFunction<Void>() { - @Override - public Void call(Client client) throws Exception { - LOGGER.info("Create RemoteInterpreter {}", getClassName()); - client.createInterpreter(getInterpreterGroup().getId(), sessionId, - className, (Map) property, userName); - return null; - } - }); - isCreated = true; - } - } - } - - - @Override - public void close() { - if (isOpened) { - RemoteInterpreterProcess interpreterProcess = getOrCreateInterpreterProcess(); - interpreterProcess.callRemoteFunction(new RemoteInterpreterProcess.RemoteFunction<Void>() { - @Override - public Void call(Client client) throws Exception { - client.close(sessionId, className); - return null; - } - }); - } else { - LOGGER.warn("close is called when RemoterInterpreter is not opened for " + className); - } - } - - @Override - public InterpreterResult interpret(final String st, final InterpreterContext context) { - if (LOGGER.isDebugEnabled()) { - LOGGER.debug("st:\n{}", st); - } - - final FormType form = getFormType(); - RemoteInterpreterProcess interpreterProcess = getOrCreateInterpreterProcess(); - InterpreterContextRunnerPool interpreterContextRunnerPool = interpreterProcess - .getInterpreterContextRunnerPool(); - List<InterpreterContextRunner> runners = context.getRunners(); - if (runners != null && runners.size() != 0) { - // assume all runners in this InterpreterContext have the same note id - String noteId = runners.get(0).getNoteId(); - - interpreterContextRunnerPool.clear(noteId); - interpreterContextRunnerPool.addAll(noteId, runners); - } - return interpreterProcess.callRemoteFunction( - new RemoteInterpreterProcess.RemoteFunction<InterpreterResult>() { - @Override - public InterpreterResult call(Client client) throws Exception { - - RemoteInterpreterResult remoteResult = client.interpret( - sessionId, className, st, convert(context)); - Map<String, Object> remoteConfig = (Map<String, Object>) gson.fromJson( - remoteResult.getConfig(), new TypeToken<Map<String, Object>>() { - }.getType()); - context.getConfig().clear(); - context.getConfig().putAll(remoteConfig); - GUI currentGUI = context.getGui(); - if (form == FormType.NATIVE) { - GUI remoteGui = GUI.fromJson(remoteResult.getGui()); - currentGUI.clear(); - currentGUI.setParams(remoteGui.getParams()); - currentGUI.setForms(remoteGui.getForms()); - } else if (form == FormType.SIMPLE) { - final Map<String, Input> currentForms = currentGUI.getForms(); - final Map<String, Object> currentParams = currentGUI.getParams(); - final GUI remoteGUI = GUI.fromJson(remoteResult.getGui()); - final Map<String, Input> remoteForms = remoteGUI.getForms(); - final Map<String, Object> remoteParams = remoteGUI.getParams(); - currentForms.putAll(remoteForms); - currentParams.putAll(remoteParams); - } - - InterpreterResult result = convert(remoteResult); - return result; - } - } - ); - - } - - @Override - public void cancel(final InterpreterContext context) { - if (!isOpened) { - LOGGER.warn("Cancel is called when RemoterInterpreter is not opened for " + className); - return; - } - RemoteInterpreterProcess interpreterProcess = getOrCreateInterpreterProcess(); - interpreterProcess.callRemoteFunction(new RemoteInterpreterProcess.RemoteFunction<Void>() { - @Override - public Void call(Client client) throws Exception { - client.cancel(sessionId, className, convert(context)); - return null; - } - }); - } - - @Override - public FormType getFormType() { - if (formType != null) { - return formType; - } - - // it is possible to call getFormType before it is opened - synchronized (this) { - if (!isOpened) { - open(); - } - } - RemoteInterpreterProcess interpreterProcess = getOrCreateInterpreterProcess(); - FormType type = interpreterProcess.callRemoteFunction( - new RemoteInterpreterProcess.RemoteFunction<FormType>() { - @Override - public FormType call(Client client) throws Exception { - formType = FormType.valueOf(client.getFormType(sessionId, className)); - return formType; - } - }); - return type; - } - - @Override - public int getProgress(final InterpreterContext context) { - if (!isOpened) { - LOGGER.warn("getProgress is called when RemoterInterpreter is not opened for " + className); - return 0; - } - RemoteInterpreterProcess interpreterProcess = getOrCreateInterpreterProcess(); - return interpreterProcess.callRemoteFunction( - new RemoteInterpreterProcess.RemoteFunction<Integer>() { - @Override - public Integer call(Client client) throws Exception { - return client.getProgress(sessionId, className, convert(context)); - } - }); - } - - - @Override - public List<InterpreterCompletion> completion(final String buf, final int cursor, - final InterpreterContext interpreterContext) { - if (!isOpened) { - LOGGER.warn("completion is called when RemoterInterpreter is not opened for " + className); - return new ArrayList<>(); - } - RemoteInterpreterProcess interpreterProcess = getOrCreateInterpreterProcess(); - return interpreterProcess.callRemoteFunction( - new RemoteInterpreterProcess.RemoteFunction<List<InterpreterCompletion>>() { - @Override - public List<InterpreterCompletion> call(Client client) throws Exception { - return client.completion(sessionId, className, buf, cursor, - convert(interpreterContext)); - } - }); - } - - public String getStatus(final String jobId) { - if (!isOpened) { - LOGGER.warn("getStatus is called when RemoteInterpreter is not opened for " + className); - return Job.Status.UNKNOWN.name(); - } - RemoteInterpreterProcess interpreterProcess = getOrCreateInterpreterProcess(); - return interpreterProcess.callRemoteFunction( - new RemoteInterpreterProcess.RemoteFunction<String>() { - @Override - public String call(Client client) throws Exception { - return client.getStatus(sessionId, jobId); - } - }); - } - - //TODO(zjffdu) Share the Scheduler in the same session or in the same InterpreterGroup ? - @Override - public Scheduler getScheduler() { - int maxConcurrency = Integer.parseInt( - property.getProperty("zeppelin.interpreter.max.poolsize", - ZeppelinConfiguration.ConfVars.ZEPPELIN_INTERPRETER_MAX_POOL_SIZE.getIntValue() + "")); - return SchedulerFactory.singleton().createOrGetRemoteScheduler( - RemoteInterpreter.class.getName() + "-" + sessionId, - sessionId, this, maxConcurrency); - } - - private RemoteInterpreterContext convert(InterpreterContext ic) { - return new RemoteInterpreterContext(ic.getNoteId(), ic.getParagraphId(), ic.getReplName(), - ic.getParagraphTitle(), ic.getParagraphText(), gson.toJson(ic.getAuthenticationInfo()), - gson.toJson(ic.getConfig()), gson.toJson(ic.getGui()), gson.toJson(ic.getRunners())); - } - - private InterpreterResult convert(RemoteInterpreterResult result) { - InterpreterResult r = new InterpreterResult( - InterpreterResult.Code.valueOf(result.getCode())); - - for (RemoteInterpreterResultMessage m : result.getMsg()) { - r.add(InterpreterResult.Type.valueOf(m.getType()), m.getData()); - } - - return r; - } - - /** - * Push local angular object registry to - * remote interpreter. This method should be - * call ONLY once when the first Interpreter is created - */ - private void pushAngularObjectRegistryToRemote(Client client) throws TException { - final AngularObjectRegistry angularObjectRegistry = this.getInterpreterGroup() - .getAngularObjectRegistry(); - if (angularObjectRegistry != null && angularObjectRegistry.getRegistry() != null) { - final Map<String, Map<String, AngularObject>> registry = angularObjectRegistry - .getRegistry(); - LOGGER.info("Push local angular object registry from ZeppelinServer to" + - " remote interpreter group {}", this.getInterpreterGroup().getId()); - final java.lang.reflect.Type registryType = new TypeToken<Map<String, - Map<String, AngularObject>>>() { - }.getType(); - client.angularRegistryPush(gson.toJson(registry, registryType)); - } - } - - @Override - public String toString() { - return "RemoteInterpreter_" + className + "_" + sessionId; - } -}
http://git-wip-us.apache.org/repos/asf/zeppelin/blob/2a379102/zeppelin-interpreter/src/main/java/org/apache/zeppelin/interpreter/remote/RemoteInterpreterEventPoller.java ---------------------------------------------------------------------- diff --git a/zeppelin-interpreter/src/main/java/org/apache/zeppelin/interpreter/remote/RemoteInterpreterEventPoller.java b/zeppelin-interpreter/src/main/java/org/apache/zeppelin/interpreter/remote/RemoteInterpreterEventPoller.java index f3bce2f..26c9d79 100644 --- a/zeppelin-interpreter/src/main/java/org/apache/zeppelin/interpreter/remote/RemoteInterpreterEventPoller.java +++ b/zeppelin-interpreter/src/main/java/org/apache/zeppelin/interpreter/remote/RemoteInterpreterEventPoller.java @@ -29,7 +29,6 @@ import org.apache.zeppelin.interpreter.InterpreterResult; import org.apache.zeppelin.interpreter.RemoteZeppelinServerResource; import org.apache.zeppelin.interpreter.thrift.RemoteInterpreterEvent; import org.apache.zeppelin.interpreter.thrift.RemoteInterpreterEventType; -import org.apache.zeppelin.interpreter.thrift.RemoteInterpreterService; import org.apache.zeppelin.interpreter.thrift.RemoteInterpreterService.Client; import org.apache.zeppelin.interpreter.thrift.ZeppelinServerResourceParagraphRunner; import org.apache.zeppelin.resource.Resource; @@ -39,7 +38,6 @@ import org.apache.zeppelin.resource.ResourceSet; import org.slf4j.Logger; import org.slf4j.LoggerFactory; -import java.io.IOException; import java.lang.reflect.InvocationTargetException; import java.lang.reflect.Method; import java.nio.ByteBuffer; @@ -86,6 +84,7 @@ public class RemoteInterpreterEventPoller extends Thread { @Override public void run() { + Client client = null; AppendOutputRunner runner = new AppendOutputRunner(listener); ScheduledFuture<?> appendFuture = appendService.scheduleWithFixedDelay( runner, 0, AppendOutputRunner.BUFFER_TIME_MS, TimeUnit.MILLISECONDS); @@ -101,14 +100,26 @@ public class RemoteInterpreterEventPoller extends Thread { continue; } - RemoteInterpreterEvent event = interpreterProcess.callRemoteFunction( - new RemoteInterpreterProcess.RemoteFunction<RemoteInterpreterEvent>() { - @Override - public RemoteInterpreterEvent call(Client client) throws Exception { - return client.getEvent(); - } - } - ); + try { + client = interpreterProcess.getClient(); + } catch (Exception e1) { + logger.error("Can't get RemoteInterpreterEvent", e1); + waitQuietly(); + continue; + } + + RemoteInterpreterEvent event = null; + boolean broken = false; + try { + event = client.getEvent(); + } catch (TException e) { + broken = true; + logger.error("Can't get RemoteInterpreterEvent", e); + waitQuietly(); + continue; + } finally { + interpreterProcess.releaseClient(client, broken); + } AngularObjectRegistry angularObjectRegistry = interpreterGroup.getAngularObjectRegistry(); @@ -275,7 +286,10 @@ public class RemoteInterpreterEventPoller extends Thread { boolean broken = false; final Gson gson = new Gson(); final String eventOwnerKey = reqResourceBody.getOwnerKey(); + Client interpreterServerMain = null; try { + interpreterServerMain = interpreterProcess.getClient(); + final Client eventClient = interpreterServerMain; if (resourceType == RemoteZeppelinServerResource.Type.PARAGRAPH_RUNNERS) { final List<ZeppelinServerResourceParagraphRunner> remoteRunners = new LinkedList<>(); @@ -294,6 +308,7 @@ public class RemoteInterpreterEventPoller extends Thread { @Override public void onFinished(Object resultObject) { + boolean clientBroken = false; if (resultObject != null && resultObject instanceof List) { List<InterpreterContextRunner> runnerList = (List<InterpreterContextRunner>) resultObject; @@ -309,15 +324,15 @@ public class RemoteInterpreterEventPoller extends Thread { resResource.setResourceType(RemoteZeppelinServerResource.Type.PARAGRAPH_RUNNERS); resResource.setData(remoteRunners); - interpreterProcess.callRemoteFunction( - new RemoteInterpreterProcess.RemoteFunction<Void>() { - @Override - public Void call(Client client) throws Exception { - client.onReceivedZeppelinResource(resResource.toJson()); - return null; - } - } - ); + try { + eventClient.onReceivedZeppelinResource(resResource.toJson()); + } catch (Exception e) { + clientBroken = true; + logger.error("Can't get RemoteInterpreterEvent", e); + waitQuietly(); + } finally { + interpreterProcess.releaseClient(eventClient, clientBroken); + } } } @@ -331,32 +346,39 @@ public class RemoteInterpreterEventPoller extends Thread { reqRunnerContext.getNoteId(), reqRunnerContext.getParagraphId(), callBackEvent); } } catch (Exception e) { + broken = true; logger.error("Can't get RemoteInterpreterEvent", e); waitQuietly(); + } finally { + interpreterProcess.releaseClient(interpreterServerMain, broken); } } - private void sendResourcePoolResponseGetAll(final ResourceSet resourceSet) { - interpreterProcess.callRemoteFunction( - new RemoteInterpreterProcess.RemoteFunction<Void>() { - @Override - public Void call(Client client) throws Exception { - List<String> resourceList = new LinkedList<>(); - for (Resource r : resourceSet) { - resourceList.add(r.toJson()); - } - client.resourcePoolResponseGetAll(resourceList); - return null; - } - } - ); + private void sendResourcePoolResponseGetAll(ResourceSet resourceSet) { + Client client = null; + boolean broken = false; + try { + client = interpreterProcess.getClient(); + List<String> resourceList = new LinkedList<>(); + Gson gson = new Gson(); + for (Resource r : resourceSet) { + resourceList.add(gson.toJson(r)); + } + client.resourcePoolResponseGetAll(resourceList); + } catch (Exception e) { + logger.error(e.getMessage(), e); + broken = true; + } finally { + if (client != null) { + interpreterProcess.releaseClient(client, broken); + } + } } private ResourceSet getAllResourcePoolExcept() { ResourceSet resourceSet = new ResourceSet(); - for (InterpreterGroup intpGroup : interpreterGroup.getInterpreterSetting() - .getInterpreterSettingManager().getAllInterpreterGroup()) { + for (InterpreterGroup intpGroup : InterpreterGroup.getAll()) { if (intpGroup.getId().equals(interpreterGroup.getId())) { continue; } @@ -368,94 +390,115 @@ public class RemoteInterpreterEventPoller extends Thread { resourceSet.addAll(localPool.getAll()); } } else if (interpreterProcess.isRunning()) { - List<String> resourceList = remoteInterpreterProcess.callRemoteFunction( - new RemoteInterpreterProcess.RemoteFunction<List<String>>() { - @Override - public List<String> call(Client client) throws Exception { - return client.resourcePoolGetAll(); - } - } - ); - for (String res : resourceList) { - resourceSet.add(Resource.fromJson(res)); + Client client = null; + boolean broken = false; + try { + client = remoteInterpreterProcess.getClient(); + List<String> resourceList = client.resourcePoolGetAll(); + Gson gson = new Gson(); + for (String res : resourceList) { + resourceSet.add(Resource.fromJson(res)); + } + } catch (Exception e) { + logger.error(e.getMessage(), e); + broken = true; + } finally { + if (client != null) { + intpGroup.getRemoteInterpreterProcess().releaseClient(client, broken); + } } } } return resourceSet; } - private void sendResourceResponseGet(final ResourceId resourceId, final Object o) { - interpreterProcess.callRemoteFunction( - new RemoteInterpreterProcess.RemoteFunction<Void>() { - @Override - public Void call(Client client) throws Exception { - String rid = resourceId.toJson(); - ByteBuffer obj; - if (o == null) { - obj = ByteBuffer.allocate(0); - } else { - obj = Resource.serializeObject(o); - } - client.resourceResponseGet(rid, obj); - return null; - } - } - ); + private void sendResourceResponseGet(ResourceId resourceId, Object o) { + Client client = null; + boolean broken = false; + try { + client = interpreterProcess.getClient(); + Gson gson = new Gson(); + String rid = gson.toJson(resourceId); + ByteBuffer obj; + if (o == null) { + obj = ByteBuffer.allocate(0); + } else { + obj = Resource.serializeObject(o); + } + client.resourceResponseGet(rid, obj); + } catch (Exception e) { + logger.error(e.getMessage(), e); + broken = true; + } finally { + if (client != null) { + interpreterProcess.releaseClient(client, broken); + } + } } - private Object getResource(final ResourceId resourceId) { - InterpreterGroup intpGroup = interpreterGroup.getInterpreterSetting() - .getInterpreterSettingManager() - .getInterpreterGroupById(resourceId.getResourcePoolId()); + private Object getResource(ResourceId resourceId) { + InterpreterGroup intpGroup = InterpreterGroup.getByInterpreterGroupId( + resourceId.getResourcePoolId()); if (intpGroup == null) { return null; } RemoteInterpreterProcess remoteInterpreterProcess = intpGroup.getRemoteInterpreterProcess(); - ByteBuffer buffer = remoteInterpreterProcess.callRemoteFunction( - new RemoteInterpreterProcess.RemoteFunction<ByteBuffer>() { - @Override - public ByteBuffer call(Client client) throws Exception { - return client.resourceGet( - resourceId.getNoteId(), - resourceId.getParagraphId(), - resourceId.getName()); - } + if (remoteInterpreterProcess == null) { + ResourcePool localPool = intpGroup.getResourcePool(); + if (localPool != null) { + return localPool.get(resourceId.getName()); + } + } else if (interpreterProcess.isRunning()) { + Client client = null; + boolean broken = false; + try { + client = remoteInterpreterProcess.getClient(); + ByteBuffer res = client.resourceGet( + resourceId.getNoteId(), + resourceId.getParagraphId(), + resourceId.getName()); + Object o = Resource.deserializeObject(res); + return o; + } catch (Exception e) { + logger.error(e.getMessage(), e); + broken = true; + } finally { + if (client != null) { + intpGroup.getRemoteInterpreterProcess().releaseClient(client, broken); } - ); + } + } + return null; + } + public void sendInvokeMethodResult(InvokeResourceMethodEventMessage message, Object o) { + Client client = null; + boolean broken = false; try { - Object o = Resource.deserializeObject(buffer); - return o; + client = interpreterProcess.getClient(); + Gson gson = new Gson(); + String invokeMessage = gson.toJson(message); + ByteBuffer obj; + if (o == null) { + obj = ByteBuffer.allocate(0); + } else { + obj = Resource.serializeObject(o); + } + client.resourceResponseInvokeMethod(invokeMessage, obj); } catch (Exception e) { logger.error(e.getMessage(), e); + broken = true; + } finally { + if (client != null) { + interpreterProcess.releaseClient(client, broken); + } } - return null; } - public void sendInvokeMethodResult(final InvokeResourceMethodEventMessage message, - final Object o) { - interpreterProcess.callRemoteFunction( - new RemoteInterpreterProcess.RemoteFunction<Void>() { - @Override - public Void call(Client client) throws Exception { - String invokeMessage = message.toJson(); - ByteBuffer obj; - if (o == null) { - obj = ByteBuffer.allocate(0); - } else { - obj = Resource.serializeObject(o); - } - client.resourceResponseInvokeMethod(invokeMessage, obj); - return null; - } - } - ); - } - - private Object invokeResourceMethod(final InvokeResourceMethodEventMessage message) { - final ResourceId resourceId = message.resourceId; - InterpreterGroup intpGroup = interpreterGroup.getInterpreterSetting() - .getInterpreterSettingManager().getInterpreterGroupById(resourceId.getResourcePoolId()); + private Object invokeResourceMethod(InvokeResourceMethodEventMessage message) { + ResourceId resourceId = message.resourceId; + InterpreterGroup intpGroup = InterpreterGroup.getByInterpreterGroupId( + resourceId.getResourcePoolId()); if (intpGroup == null) { return null; } @@ -486,25 +529,25 @@ public class RemoteInterpreterEventPoller extends Thread { return null; } } else if (interpreterProcess.isRunning()) { - ByteBuffer res = interpreterProcess.callRemoteFunction( - new RemoteInterpreterProcess.RemoteFunction<ByteBuffer>() { - @Override - public ByteBuffer call(Client client) throws Exception { - return client.resourceInvokeMethod( - resourceId.getNoteId(), - resourceId.getParagraphId(), - resourceId.getName(), - message.toJson()); - } - } - ); - + Client client = null; + boolean broken = false; try { - return Resource.deserializeObject(res); + client = remoteInterpreterProcess.getClient(); + ByteBuffer res = client.resourceInvokeMethod( + resourceId.getNoteId(), + resourceId.getParagraphId(), + resourceId.getName(), + gson.toJson(message)); + Object o = Resource.deserializeObject(res); + return o; } catch (Exception e) { logger.error(e.getMessage(), e); + broken = true; + } finally { + if (client != null) { + intpGroup.getRemoteInterpreterProcess().releaseClient(client, broken); + } } - return null; } return null; } http://git-wip-us.apache.org/repos/asf/zeppelin/blob/2a379102/zeppelin-interpreter/src/main/java/org/apache/zeppelin/interpreter/remote/RemoteInterpreterManagedProcess.java ---------------------------------------------------------------------- diff --git a/zeppelin-interpreter/src/main/java/org/apache/zeppelin/interpreter/remote/RemoteInterpreterManagedProcess.java b/zeppelin-interpreter/src/main/java/org/apache/zeppelin/interpreter/remote/RemoteInterpreterManagedProcess.java deleted file mode 100644 index 19356fb..0000000 --- a/zeppelin-interpreter/src/main/java/org/apache/zeppelin/interpreter/remote/RemoteInterpreterManagedProcess.java +++ /dev/null @@ -1,260 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.zeppelin.interpreter.remote; - -import org.apache.commons.exec.*; -import org.apache.commons.exec.environment.EnvironmentUtils; -import org.apache.zeppelin.helium.ApplicationEventListener; -import org.apache.zeppelin.interpreter.InterpreterException; -import org.apache.zeppelin.interpreter.thrift.RemoteInterpreterService; -import org.slf4j.Logger; -import org.slf4j.LoggerFactory; - -import java.io.ByteArrayOutputStream; -import java.io.IOException; -import java.io.OutputStream; -import java.util.Map; - -/** - * This class manages start / stop of remote interpreter process - */ -public class RemoteInterpreterManagedProcess extends RemoteInterpreterProcess - implements ExecuteResultHandler { - private static final Logger logger = LoggerFactory.getLogger( - RemoteInterpreterManagedProcess.class); - private final String interpreterRunner; - - private DefaultExecutor executor; - private ExecuteWatchdog watchdog; - boolean running = false; - private int port = -1; - private final String interpreterDir; - private final String localRepoDir; - private final String interpreterGroupName; - - private Map<String, String> env; - - public RemoteInterpreterManagedProcess( - String intpRunner, - String intpDir, - String localRepoDir, - Map<String, String> env, - int connectTimeout, - RemoteInterpreterProcessListener listener, - ApplicationEventListener appListener, - String interpreterGroupName) { - super(new RemoteInterpreterEventPoller(listener, appListener), - connectTimeout); - this.interpreterRunner = intpRunner; - this.env = env; - this.interpreterDir = intpDir; - this.localRepoDir = localRepoDir; - this.interpreterGroupName = interpreterGroupName; - } - - RemoteInterpreterManagedProcess(String intpRunner, - String intpDir, - String localRepoDir, - Map<String, String> env, - RemoteInterpreterEventPoller remoteInterpreterEventPoller, - int connectTimeout, - String interpreterGroupName) { - super(remoteInterpreterEventPoller, - connectTimeout); - this.interpreterRunner = intpRunner; - this.env = env; - this.interpreterDir = intpDir; - this.localRepoDir = localRepoDir; - this.interpreterGroupName = interpreterGroupName; - } - - @Override - public String getHost() { - return "localhost"; - } - - @Override - public int getPort() { - return port; - } - - @Override - public void start(String userName, Boolean isUserImpersonate) { - // start server process - try { - port = RemoteInterpreterUtils.findRandomAvailablePortOnAllLocalInterfaces(); - logger.info("Choose port {} for RemoteInterpreterProcess", port); - } catch (IOException e1) { - throw new InterpreterException(e1); - } - - CommandLine cmdLine = CommandLine.parse(interpreterRunner); - cmdLine.addArgument("-d", false); - cmdLine.addArgument(interpreterDir, false); - cmdLine.addArgument("-p", false); - cmdLine.addArgument(Integer.toString(port), false); - if (isUserImpersonate && !userName.equals("anonymous")) { - cmdLine.addArgument("-u", false); - cmdLine.addArgument(userName, false); - } - cmdLine.addArgument("-l", false); - cmdLine.addArgument(localRepoDir, false); - cmdLine.addArgument("-g", false); - cmdLine.addArgument(interpreterGroupName, false); - - executor = new DefaultExecutor(); - - ByteArrayOutputStream cmdOut = new ByteArrayOutputStream(); - ProcessLogOutputStream processOutput = new ProcessLogOutputStream(logger); - processOutput.setOutputStream(cmdOut); - - executor.setStreamHandler(new PumpStreamHandler(processOutput)); - watchdog = new ExecuteWatchdog(ExecuteWatchdog.INFINITE_TIMEOUT); - executor.setWatchdog(watchdog); - - try { - Map procEnv = EnvironmentUtils.getProcEnvironment(); - procEnv.putAll(env); - - logger.info("Run interpreter process {}", cmdLine); - executor.execute(cmdLine, procEnv, this); - running = true; - } catch (IOException e) { - running = false; - throw new InterpreterException(e); - } - - - long startTime = System.currentTimeMillis(); - while (System.currentTimeMillis() - startTime < getConnectTimeout()) { - if (!running) { - try { - cmdOut.flush(); - } catch (IOException e) { - // nothing to do - } - throw new InterpreterException(new String(cmdOut.toByteArray())); - } - - try { - if (RemoteInterpreterUtils.checkIfRemoteEndpointAccessible("localhost", port)) { - break; - } else { - try { - Thread.sleep(500); - } catch (InterruptedException e) { - logger.error("Exception in RemoteInterpreterProcess while synchronized reference " + - "Thread.sleep", e); - } - } - } catch (Exception e) { - if (logger.isDebugEnabled()) { - logger.debug("Remote interpreter not yet accessible at localhost:" + port); - } - } - } - processOutput.setOutputStream(null); - } - - public void stop() { - if (isRunning()) { - logger.info("kill interpreter process"); - try { - callRemoteFunction(new RemoteFunction<Void>() { - @Override - public Void call(RemoteInterpreterService.Client client) throws Exception { - client.shutdown(); - return null; - } - }); - } catch (Exception e) { - logger.warn("ignore the exception when shutting down"); - } - watchdog.destroyProcess(); - } - - executor = null; - watchdog = null; - running = false; - logger.info("Remote process terminated"); - } - - @Override - public void onProcessComplete(int exitValue) { - logger.info("Interpreter process exited {}", exitValue); - running = false; - - } - - @Override - public void onProcessFailed(ExecuteException e) { - logger.info("Interpreter process failed {}", e); - running = false; - } - - public boolean isRunning() { - return running; - } - - private static class ProcessLogOutputStream extends LogOutputStream { - - private Logger logger; - OutputStream out; - - public ProcessLogOutputStream(Logger logger) { - this.logger = logger; - } - - @Override - protected void processLine(String s, int i) { - this.logger.debug(s); - } - - @Override - public void write(byte [] b) throws IOException { - super.write(b); - - if (out != null) { - synchronized (this) { - if (out != null) { - out.write(b); - } - } - } - } - - @Override - public void write(byte [] b, int offset, int len) throws IOException { - super.write(b, offset, len); - - if (out != null) { - synchronized (this) { - if (out != null) { - out.write(b, offset, len); - } - } - } - } - - public void setOutputStream(OutputStream out) { - synchronized (this) { - this.out = out; - } - } - } -} http://git-wip-us.apache.org/repos/asf/zeppelin/blob/2a379102/zeppelin-interpreter/src/main/java/org/apache/zeppelin/interpreter/remote/RemoteInterpreterProcess.java ---------------------------------------------------------------------- diff --git a/zeppelin-interpreter/src/main/java/org/apache/zeppelin/interpreter/remote/RemoteInterpreterProcess.java b/zeppelin-interpreter/src/main/java/org/apache/zeppelin/interpreter/remote/RemoteInterpreterProcess.java index a78088c..1d48a1e 100644 --- a/zeppelin-interpreter/src/main/java/org/apache/zeppelin/interpreter/remote/RemoteInterpreterProcess.java +++ b/zeppelin-interpreter/src/main/java/org/apache/zeppelin/interpreter/remote/RemoteInterpreterProcess.java @@ -20,13 +20,10 @@ import com.google.gson.Gson; import org.apache.commons.pool2.impl.GenericObjectPool; import org.apache.thrift.TException; import org.apache.zeppelin.helium.ApplicationEventListener; -import org.apache.zeppelin.interpreter.InterpreterException; import org.apache.zeppelin.interpreter.InterpreterGroup; import org.apache.zeppelin.interpreter.thrift.RemoteInterpreterService.Client; import org.slf4j.Logger; import org.slf4j.LoggerFactory; - -import java.io.IOException; import java.util.concurrent.atomic.AtomicInteger; /** @@ -35,6 +32,9 @@ import java.util.concurrent.atomic.AtomicInteger; public abstract class RemoteInterpreterProcess { private static final Logger logger = LoggerFactory.getLogger(RemoteInterpreterProcess.class); + // number of sessions that are attached to this process + private final AtomicInteger referenceCount; + private GenericObjectPool<Client> clientPool; private final RemoteInterpreterEventPoller remoteInterpreterEventPoller; private final InterpreterContextRunnerPool interpreterContextRunnerPool; @@ -46,20 +46,16 @@ public abstract class RemoteInterpreterProcess { ApplicationEventListener appListener) { this(new RemoteInterpreterEventPoller(listener, appListener), connectTimeout); - this.remoteInterpreterEventPoller.setInterpreterProcess(this); } RemoteInterpreterProcess(RemoteInterpreterEventPoller remoteInterpreterEventPoller, int connectTimeout) { this.interpreterContextRunnerPool = new InterpreterContextRunnerPool(); + referenceCount = new AtomicInteger(0); this.remoteInterpreterEventPoller = remoteInterpreterEventPoller; this.connectTimeout = connectTimeout; } - public RemoteInterpreterEventPoller getRemoteInterpreterEventPoller() { - return remoteInterpreterEventPoller; - } - public abstract String getHost(); public abstract int getPort(); public abstract void start(String userName, Boolean isUserImpersonate); @@ -70,18 +66,37 @@ public abstract class RemoteInterpreterProcess { return connectTimeout; } - public synchronized Client getClient() throws Exception { + public int reference(InterpreterGroup interpreterGroup, String userName, + Boolean isUserImpersonate) { + synchronized (referenceCount) { + if (!isRunning()) { + start(userName, isUserImpersonate); + } + + if (clientPool == null) { + clientPool = new GenericObjectPool<>(new ClientFactory(getHost(), getPort())); + clientPool.setTestOnBorrow(true); + + remoteInterpreterEventPoller.setInterpreterGroup(interpreterGroup); + remoteInterpreterEventPoller.setInterpreterProcess(this); + remoteInterpreterEventPoller.start(); + } + return referenceCount.incrementAndGet(); + } + } + + public Client getClient() throws Exception { if (clientPool == null || clientPool.isClosed()) { - clientPool = new GenericObjectPool<>(new ClientFactory(getHost(), getPort())); + return null; } return clientPool.borrowObject(); } - private void releaseClient(Client client) { + public void releaseClient(Client client) { releaseClient(client, false); } - private void releaseClient(Client client, boolean broken) { + public void releaseClient(Client client, boolean broken) { if (broken) { releaseBrokenClient(client); } else { @@ -93,7 +108,7 @@ public abstract class RemoteInterpreterProcess { } } - private void releaseBrokenClient(Client client) { + public void releaseBrokenClient(Client client) { try { clientPool.invalidateObject(client); } catch (Exception e) { @@ -101,6 +116,90 @@ public abstract class RemoteInterpreterProcess { } } + public int dereference() { + synchronized (referenceCount) { + int r = referenceCount.decrementAndGet(); + if (r == 0) { + logger.info("shutdown interpreter process"); + remoteInterpreterEventPoller.shutdown(); + + // first try shutdown + Client client = null; + try { + client = getClient(); + client.shutdown(); + } catch (Exception e) { + // safely ignore exception while client.shutdown() may terminates remote process + logger.info("Exception in RemoteInterpreterProcess while synchronized dereference, can " + + "safely ignore exception while client.shutdown() may terminates remote process"); + logger.debug(e.getMessage(), e); + } finally { + if (client != null) { + // no longer used + releaseBrokenClient(client); + } + } + + clientPool.clear(); + clientPool.close(); + + // wait for some time (connectTimeout) and force kill + // remote process server.serve() loop is not always finishing gracefully + long startTime = System.currentTimeMillis(); + while (System.currentTimeMillis() - startTime < connectTimeout) { + if (this.isRunning()) { + try { + Thread.sleep(500); + } catch (InterruptedException e) { + logger.error("Exception in RemoteInterpreterProcess while synchronized dereference " + + "Thread.sleep", e); + } + } else { + break; + } + } + } + return r; + } + } + + public int referenceCount() { + synchronized (referenceCount) { + return referenceCount.get(); + } + } + + public int getNumActiveClient() { + if (clientPool == null) { + return 0; + } else { + return clientPool.getNumActive(); + } + } + + public int getNumIdleClient() { + if (clientPool == null) { + return 0; + } else { + return clientPool.getNumIdle(); + } + } + + public void setMaxPoolSize(int size) { + if (clientPool != null) { + //Size + 2 for progress poller , cancel operation + clientPool.setMaxTotal(size + 2); + } + } + + public int getMaxPoolSize() { + if (clientPool != null) { + return clientPool.getMaxTotal(); + } else { + return 0; + } + } + /** * Called when angular object is updated in client side to propagate * change to the remote process @@ -140,33 +239,4 @@ public abstract class RemoteInterpreterProcess { public InterpreterContextRunnerPool getInterpreterContextRunnerPool() { return interpreterContextRunnerPool; } - - public <T> T callRemoteFunction(RemoteFunction<T> func) { - Client client = null; - boolean broken = false; - try { - client = getClient(); - if (client != null) { - return func.call(client); - } - } catch (TException e) { - broken = true; - throw new InterpreterException(e); - } catch (Exception e1) { - throw new InterpreterException(e1); - } finally { - if (client != null) { - releaseClient(client, broken); - } - } - return null; - } - - /** - * - * @param <T> - */ - public interface RemoteFunction<T> { - T call(Client client) throws Exception; - } } http://git-wip-us.apache.org/repos/asf/zeppelin/blob/2a379102/zeppelin-interpreter/src/main/java/org/apache/zeppelin/interpreter/remote/RemoteInterpreterRunningProcess.java ---------------------------------------------------------------------- diff --git a/zeppelin-interpreter/src/main/java/org/apache/zeppelin/interpreter/remote/RemoteInterpreterRunningProcess.java b/zeppelin-interpreter/src/main/java/org/apache/zeppelin/interpreter/remote/RemoteInterpreterRunningProcess.java deleted file mode 100644 index bb176be..0000000 --- a/zeppelin-interpreter/src/main/java/org/apache/zeppelin/interpreter/remote/RemoteInterpreterRunningProcess.java +++ /dev/null @@ -1,67 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package org.apache.zeppelin.interpreter.remote; - -import org.apache.zeppelin.helium.ApplicationEventListener; -import org.slf4j.Logger; -import org.slf4j.LoggerFactory; - -/** - * This class connects to existing process - */ -public class RemoteInterpreterRunningProcess extends RemoteInterpreterProcess { - private final Logger logger = LoggerFactory.getLogger(RemoteInterpreterRunningProcess.class); - private final String host; - private final int port; - - public RemoteInterpreterRunningProcess( - int connectTimeout, - RemoteInterpreterProcessListener listener, - ApplicationEventListener appListener, - String host, - int port - ) { - super(connectTimeout, listener, appListener); - this.host = host; - this.port = port; - } - - @Override - public String getHost() { - return host; - } - - @Override - public int getPort() { - return port; - } - - @Override - public void start(String userName, Boolean isUserImpersonate) { - // assume process is externally managed. nothing to do - } - - @Override - public void stop() { - // assume process is externally managed. nothing to do - } - - @Override - public boolean isRunning() { - return RemoteInterpreterUtils.checkIfRemoteEndpointAccessible(getHost(), getPort()); - } -} http://git-wip-us.apache.org/repos/asf/zeppelin/blob/2a379102/zeppelin-interpreter/src/main/java/org/apache/zeppelin/interpreter/remote/RemoteInterpreterServer.java ---------------------------------------------------------------------- diff --git a/zeppelin-interpreter/src/main/java/org/apache/zeppelin/interpreter/remote/RemoteInterpreterServer.java b/zeppelin-interpreter/src/main/java/org/apache/zeppelin/interpreter/remote/RemoteInterpreterServer.java index 3d8123e..3853468 100644 --- a/zeppelin-interpreter/src/main/java/org/apache/zeppelin/interpreter/remote/RemoteInterpreterServer.java +++ b/zeppelin-interpreter/src/main/java/org/apache/zeppelin/interpreter/remote/RemoteInterpreterServer.java @@ -106,7 +106,6 @@ public class RemoteInterpreterServer @Override public void shutdown() throws TException { - logger.info("Shutting down..."); eventClient.waitForEventQueueBecomesEmpty(DEFAULT_SHUTDOWN_TIMEOUT); if (interpreterGroup != null) { interpreterGroup.close(); @@ -160,7 +159,7 @@ public class RemoteInterpreterServer } @Override - public void createInterpreter(String interpreterGroupId, String sessionId, String + public void createInterpreter(String interpreterGroupId, String sessionKey, String className, Map<String, String> properties, String userName) throws TException { if (interpreterGroup == null) { interpreterGroup = new InterpreterGroup(interpreterGroupId); @@ -191,11 +190,20 @@ public class RemoteInterpreterServer replClass.getConstructor(new Class[] {Properties.class}); Interpreter repl = constructor.newInstance(p); repl.setClassloaderUrls(new URL[]{}); + + synchronized (interpreterGroup) { + List<Interpreter> interpreters = interpreterGroup.get(sessionKey); + if (interpreters == null) { + interpreters = new LinkedList<>(); + interpreterGroup.put(sessionKey, interpreters); + } + + interpreters.add(new LazyOpenInterpreter(repl)); + } + logger.info("Instantiate interpreter {}", className); repl.setInterpreterGroup(interpreterGroup); repl.setUserName(userName); - - interpreterGroup.addInterpreterToSession(new LazyOpenInterpreter(repl), sessionId); } catch (ClassNotFoundException | NoSuchMethodException | SecurityException | InstantiationException | IllegalAccessException | IllegalArgumentException | InvocationTargetException e) { @@ -229,13 +237,13 @@ public class RemoteInterpreterServer } } - protected Interpreter getInterpreter(String sessionId, String className) throws TException { + protected Interpreter getInterpreter(String sessionKey, String className) throws TException { if (interpreterGroup == null) { throw new TException( new InterpreterException("Interpreter instance " + className + " not created")); } synchronized (interpreterGroup) { - List<Interpreter> interpreters = interpreterGroup.get(sessionId); + List<Interpreter> interpreters = interpreterGroup.get(sessionKey); if (interpreters == null) { throw new TException( new InterpreterException("Interpreter " + className + " not initialized")); @@ -251,20 +259,19 @@ public class RemoteInterpreterServer } @Override - public void open(String sessionId, String className) throws TException { - logger.info(String.format("Open Interpreter %s for session %s ", className, sessionId)); - Interpreter intp = getInterpreter(sessionId, className); + public void open(String noteId, String className) throws TException { + Interpreter intp = getInterpreter(noteId, className); intp.open(); } @Override - public void close(String sessionId, String className) throws TException { + public void close(String sessionKey, String className) throws TException { // unload all applications for (String appId : runningApplications.keySet()) { RunningApplication appInfo = runningApplications.get(appId); // see NoteInterpreterLoader.SHARED_SESSION - if (appInfo.noteId.equals(sessionId) || sessionId.equals("shared_session")) { + if (appInfo.noteId.equals(sessionKey) || sessionKey.equals("shared_session")) { try { logger.info("Unload App {} ", appInfo.pkg.getName()); appInfo.app.unload(); @@ -279,7 +286,7 @@ public class RemoteInterpreterServer // close interpreters List<Interpreter> interpreters; synchronized (interpreterGroup) { - interpreters = interpreterGroup.get(sessionId); + interpreters = interpreterGroup.get(sessionKey); } if (interpreters != null) { Iterator<Interpreter> it = interpreters.iterator(); @@ -315,6 +322,7 @@ public class RemoteInterpreterServer intp, st, context); + scheduler.submit(job); while (!job.isTerminated()) { @@ -558,34 +566,30 @@ public class RemoteInterpreterServer } @Override - public int getProgress(String sessionId, String className, + public int getProgress(String noteId, String className, RemoteInterpreterContext interpreterContext) throws TException { Integer manuallyProvidedProgress = progressMap.get(interpreterContext.getParagraphId()); if (manuallyProvidedProgress != null) { return manuallyProvidedProgress; } else { - Interpreter intp = getInterpreter(sessionId, className); - if (intp == null) { - throw new TException("No interpreter {} existed for session {}".format( - className, sessionId)); - } + Interpreter intp = getInterpreter(noteId, className); return intp.getProgress(convert(interpreterContext, null)); } } @Override - public String getFormType(String sessionId, String className) throws TException { - Interpreter intp = getInterpreter(sessionId, className); + public String getFormType(String noteId, String className) throws TException { + Interpreter intp = getInterpreter(noteId, className); return intp.getFormType().toString(); } @Override - public List<InterpreterCompletion> completion(String sessionId, + public List<InterpreterCompletion> completion(String noteId, String className, String buf, int cursor, RemoteInterpreterContext remoteInterpreterContext) throws TException { - Interpreter intp = getInterpreter(sessionId, className); + Interpreter intp = getInterpreter(noteId, className); List completion = intp.completion(buf, cursor, convert(remoteInterpreterContext, null)); return completion; } @@ -762,16 +766,16 @@ public class RemoteInterpreterServer } @Override - public String getStatus(String sessionId, String jobId) + public String getStatus(String sessionKey, String jobId) throws TException { if (interpreterGroup == null) { - return Status.UNKNOWN.name(); + return "Unknown"; } synchronized (interpreterGroup) { - List<Interpreter> interpreters = interpreterGroup.get(sessionId); + List<Interpreter> interpreters = interpreterGroup.get(sessionKey); if (interpreters == null) { - return Status.UNKNOWN.name(); + return "Unknown"; } for (Interpreter intp : interpreters) { @@ -788,7 +792,7 @@ public class RemoteInterpreterServer } } } - return Status.UNKNOWN.name(); + return "Unknown"; } http://git-wip-us.apache.org/repos/asf/zeppelin/blob/2a379102/zeppelin-interpreter/src/main/java/org/apache/zeppelin/resource/ResourcePoolUtils.java ---------------------------------------------------------------------- diff --git a/zeppelin-interpreter/src/main/java/org/apache/zeppelin/resource/ResourcePoolUtils.java b/zeppelin-interpreter/src/main/java/org/apache/zeppelin/resource/ResourcePoolUtils.java new file mode 100644 index 0000000..b26995a --- /dev/null +++ b/zeppelin-interpreter/src/main/java/org/apache/zeppelin/resource/ResourcePoolUtils.java @@ -0,0 +1,138 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.zeppelin.resource; + +import org.apache.zeppelin.interpreter.InterpreterGroup; +import org.apache.zeppelin.interpreter.remote.RemoteInterpreterProcess; +import org.apache.zeppelin.interpreter.thrift.RemoteInterpreterService; +import org.slf4j.Logger; + +import java.util.List; + +/** + * Utilities for ResourcePool + */ +public class ResourcePoolUtils { + static Logger logger = org.slf4j.LoggerFactory.getLogger(ResourcePoolUtils.class); + + public static ResourceSet getAllResources() { + return getAllResourcesExcept(null); + } + + public static ResourceSet getAllResourcesExcept(String interpreterGroupExcludsion) { + ResourceSet resourceSet = new ResourceSet(); + for (InterpreterGroup intpGroup : InterpreterGroup.getAll()) { + if (interpreterGroupExcludsion != null && + intpGroup.getId().equals(interpreterGroupExcludsion)) { + continue; + } + + RemoteInterpreterProcess remoteInterpreterProcess = intpGroup.getRemoteInterpreterProcess(); + if (remoteInterpreterProcess == null) { + ResourcePool localPool = intpGroup.getResourcePool(); + if (localPool != null) { + resourceSet.addAll(localPool.getAll()); + } + } else if (remoteInterpreterProcess.isRunning()) { + RemoteInterpreterService.Client client = null; + boolean broken = false; + try { + client = remoteInterpreterProcess.getClient(); + if (client == null) { + // remote interpreter may not started yet or terminated. + continue; + } + List<String> resourceList = client.resourcePoolGetAll(); + for (String res : resourceList) { + resourceSet.add(Resource.fromJson(res)); + } + } catch (Exception e) { + logger.error(e.getMessage(), e); + broken = true; + } finally { + if (client != null) { + intpGroup.getRemoteInterpreterProcess().releaseClient(client, broken); + } + } + } + } + return resourceSet; + } + + public static void removeResourcesBelongsToNote(String noteId) { + removeResourcesBelongsToParagraph(noteId, null); + } + + public static void removeResourcesBelongsToParagraph(String noteId, String paragraphId) { + for (InterpreterGroup intpGroup : InterpreterGroup.getAll()) { + ResourceSet resourceSet = new ResourceSet(); + RemoteInterpreterProcess remoteInterpreterProcess = intpGroup.getRemoteInterpreterProcess(); + if (remoteInterpreterProcess == null) { + ResourcePool localPool = intpGroup.getResourcePool(); + if (localPool != null) { + resourceSet.addAll(localPool.getAll()); + } + if (noteId != null) { + resourceSet = resourceSet.filterByNoteId(noteId); + } + if (paragraphId != null) { + resourceSet = resourceSet.filterByParagraphId(paragraphId); + } + + for (Resource r : resourceSet) { + localPool.remove( + r.getResourceId().getNoteId(), + r.getResourceId().getParagraphId(), + r.getResourceId().getName()); + } + } else if (remoteInterpreterProcess.isRunning()) { + RemoteInterpreterService.Client client = null; + boolean broken = false; + try { + client = remoteInterpreterProcess.getClient(); + List<String> resourceList = client.resourcePoolGetAll(); + for (String res : resourceList) { + resourceSet.add(Resource.fromJson(res)); + } + + if (noteId != null) { + resourceSet = resourceSet.filterByNoteId(noteId); + } + if (paragraphId != null) { + resourceSet = resourceSet.filterByParagraphId(paragraphId); + } + + for (Resource r : resourceSet) { + client.resourceRemove( + r.getResourceId().getNoteId(), + r.getResourceId().getParagraphId(), + r.getResourceId().getName()); + } + } catch (Exception e) { + logger.error(e.getMessage(), e); + broken = true; + } finally { + if (client != null) { + intpGroup.getRemoteInterpreterProcess().releaseClient(client, broken); + } + } + } + } + } +} + http://git-wip-us.apache.org/repos/asf/zeppelin/blob/2a379102/zeppelin-interpreter/src/main/java/org/apache/zeppelin/scheduler/Job.java ---------------------------------------------------------------------- diff --git a/zeppelin-interpreter/src/main/java/org/apache/zeppelin/scheduler/Job.java b/zeppelin-interpreter/src/main/java/org/apache/zeppelin/scheduler/Job.java index 191902a..d0025d8 100644 --- a/zeppelin-interpreter/src/main/java/org/apache/zeppelin/scheduler/Job.java +++ b/zeppelin-interpreter/src/main/java/org/apache/zeppelin/scheduler/Job.java @@ -41,7 +41,6 @@ public abstract class Job { /** * Job status. * - * UNKNOWN - Job is not found in remote * READY - Job is not running, ready to run. * PENDING - Job is submitted to scheduler. but not running yet * RUNNING - Job is running. @@ -49,8 +48,8 @@ public abstract class Job { * ERROR - Job finished run. with error * ABORT - Job finished by abort */ - public enum Status { - UNKNOWN, READY, PENDING, RUNNING, FINISHED, ERROR, ABORT; + public static enum Status { + READY, PENDING, RUNNING, FINISHED, ERROR, ABORT; public boolean isReady() { return this == READY; @@ -71,14 +70,14 @@ public abstract class Job { Date dateCreated; Date dateStarted; Date dateFinished; - volatile Status status; + Status status; static Logger LOGGER = LoggerFactory.getLogger(Job.class); transient boolean aborted = false; - private volatile String errorMessage; - private transient volatile Throwable exception; + private String errorMessage; + private transient Throwable exception; private transient JobListener listener; private long progressUpdateIntervalMs; http://git-wip-us.apache.org/repos/asf/zeppelin/blob/2a379102/zeppelin-interpreter/src/main/java/org/apache/zeppelin/scheduler/RemoteScheduler.java ---------------------------------------------------------------------- diff --git a/zeppelin-interpreter/src/main/java/org/apache/zeppelin/scheduler/RemoteScheduler.java b/zeppelin-interpreter/src/main/java/org/apache/zeppelin/scheduler/RemoteScheduler.java index e41540b..f9ddc4e 100644 --- a/zeppelin-interpreter/src/main/java/org/apache/zeppelin/scheduler/RemoteScheduler.java +++ b/zeppelin-interpreter/src/main/java/org/apache/zeppelin/scheduler/RemoteScheduler.java @@ -17,9 +17,11 @@ package org.apache.zeppelin.scheduler; +import org.apache.thrift.TException; import org.apache.zeppelin.interpreter.InterpreterResult; import org.apache.zeppelin.interpreter.InterpreterResult.Code; -import org.apache.zeppelin.interpreter.remote.RemoteInterpreter; +import org.apache.zeppelin.interpreter.remote.RemoteInterpreterProcess; +import org.apache.zeppelin.interpreter.thrift.RemoteInterpreterService.Client; import org.apache.zeppelin.scheduler.Job.Status; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -32,7 +34,6 @@ import java.util.concurrent.ExecutorService; /** * RemoteScheduler runs in ZeppelinServer and proxies Scheduler running on RemoteInterpreter - * */ public class RemoteScheduler implements Scheduler { Logger logger = LoggerFactory.getLogger(RemoteScheduler.class); @@ -44,17 +45,17 @@ public class RemoteScheduler implements Scheduler { boolean terminate = false; private String name; private int maxConcurrency; - private final String sessionId; - private RemoteInterpreter remoteInterpreter; + private final String noteId; + private RemoteInterpreterProcess interpreterProcess; - public RemoteScheduler(String name, ExecutorService executor, String sessionId, - RemoteInterpreter remoteInterpreter, SchedulerListener listener, + public RemoteScheduler(String name, ExecutorService executor, String noteId, + RemoteInterpreterProcess interpreterProcess, SchedulerListener listener, int maxConcurrency) { this.name = name; this.executor = executor; this.listener = listener; - this.sessionId = sessionId; - this.remoteInterpreter = remoteInterpreter; + this.noteId = noteId; + this.interpreterProcess = interpreterProcess; this.maxConcurrency = maxConcurrency; } @@ -166,15 +167,14 @@ public class RemoteScheduler implements Scheduler { private long initialPeriodMsec; private long initialPeriodCheckIntervalMsec; private long checkIntervalMsec; - private volatile boolean terminate; + private boolean terminate; private JobListener listener; private Job job; - volatile Status lastStatus; + Status lastStatus; public JobStatusPoller(long initialPeriodMsec, long initialPeriodCheckIntervalMsec, long checkIntervalMsec, Job job, JobListener listener) { - setName("JobStatusPoller-" + job.getId()); this.initialPeriodMsec = initialPeriodMsec; this.initialPeriodCheckIntervalMsec = initialPeriodCheckIntervalMsec; this.checkIntervalMsec = checkIntervalMsec; @@ -209,7 +209,7 @@ public class RemoteScheduler implements Scheduler { } Status newStatus = getStatus(); - if (newStatus == Status.UNKNOWN) { // unknown + if (newStatus == null) { // unknown continue; } @@ -231,9 +231,7 @@ public class RemoteScheduler implements Scheduler { private Status getLastStatus() { if (terminate == true) { - if (job.getErrorMessage() != null) { - return Status.ERROR; - } else if (lastStatus != Status.FINISHED && + if (lastStatus != Status.FINISHED && lastStatus != Status.ERROR && lastStatus != Status.ABORT) { return Status.FINISHED; @@ -241,35 +239,58 @@ public class RemoteScheduler implements Scheduler { return (lastStatus == null) ? Status.FINISHED : lastStatus; } } else { - return (lastStatus == null) ? Status.UNKNOWN : lastStatus; + return (lastStatus == null) ? Status.FINISHED : lastStatus; } } public synchronized Job.Status getStatus() { - if (!remoteInterpreter.isOpened()) { + if (interpreterProcess.referenceCount() <= 0) { return getLastStatus(); } - Status status = Status.valueOf(remoteInterpreter.getStatus(job.getId())); - if (status == Status.UNKNOWN) { - // not found this job in the remote schedulers. - // maybe not submitted, maybe already finished - //Status status = getLastStatus(); - listener.afterStatusChange(job, null, null); - return job.getStatus(); + + Client client; + try { + client = interpreterProcess.getClient(); + } catch (Exception e) { + logger.error("Can't get status information", e); + lastStatus = Status.ERROR; + return Status.ERROR; + } + + boolean broken = false; + try { + String statusStr = client.getStatus(noteId, job.getId()); + if ("Unknown".equals(statusStr)) { + // not found this job in the remote schedulers. + // maybe not submitted, maybe already finished + //Status status = getLastStatus(); + listener.afterStatusChange(job, null, null); + return job.getStatus(); + } + Status status = Status.valueOf(statusStr); + lastStatus = status; + listener.afterStatusChange(job, null, status); + return status; + } catch (TException e) { + broken = true; + logger.error("Can't get status information", e); + lastStatus = Status.ERROR; + return Status.ERROR; + } catch (Exception e) { + logger.error("Unknown status", e); + lastStatus = Status.ERROR; + return Status.ERROR; + } finally { + interpreterProcess.releaseClient(client, broken); } - lastStatus = status; - listener.afterStatusChange(job, null, status); - return status; } } - //TODO(zjffdu) need to refactor the schdule module which is too complicated private class JobRunner implements Runnable, JobListener { - private final Logger logger = LoggerFactory.getLogger(JobRunner.class); private Scheduler scheduler; private Job job; - private volatile boolean jobExecuted; - volatile boolean jobSubmittedRemotely; + private boolean jobExecuted; + boolean jobSubmittedRemotely; public JobRunner(Scheduler scheduler, Job job) { this.scheduler = scheduler; @@ -317,22 +338,20 @@ public class RemoteScheduler implements Scheduler { } // set job status based on result. + Status lastStatus = jobStatusPoller.getStatus(); Object jobResult = job.getReturn(); - if (job.isAborted()) { - job.setStatus(Status.ABORT); - } else if (job.getException() != null) { -// logger.info("Job ABORT, " + job.getId()); - job.setStatus(Status.ERROR); - } else if (jobResult != null && jobResult instanceof InterpreterResult - && ((InterpreterResult) jobResult).code() == Code.ERROR) { -// logger.info("Job Error, " + job.getId()); - job.setStatus(Status.ERROR); - } else { -// logger.info("Job Finished, " + job.getId()); - job.setStatus(Status.FINISHED); + if (jobResult != null && jobResult instanceof InterpreterResult) { + if (((InterpreterResult) jobResult).code() == Code.ERROR) { + lastStatus = Status.ERROR; + } + } + if (job.getException() != null) { + lastStatus = Status.ERROR; } synchronized (queue) { + job.setStatus(lastStatus); + if (listener != null) { listener.jobFinished(scheduler, job); } @@ -355,6 +374,25 @@ public class RemoteScheduler implements Scheduler { @Override public void afterStatusChange(Job job, Status before, Status after) { + if (after == null) { // unknown. maybe before sumitted remotely, maybe already finished. + if (jobExecuted) { + jobSubmittedRemotely = true; + Object jobResult = job.getReturn(); + if (job.isAborted()) { + job.setStatus(Status.ABORT); + } else if (job.getException() != null) { + job.setStatus(Status.ERROR); + } else if (jobResult != null && jobResult instanceof InterpreterResult + && ((InterpreterResult) jobResult).code() == Code.ERROR) { + job.setStatus(Status.ERROR); + } else { + job.setStatus(Status.FINISHED); + } + } + return; + } + + // Update remoteStatus if (jobExecuted == false) { if (after == Status.FINISHED || after == Status.ABORT @@ -364,18 +402,14 @@ public class RemoteScheduler implements Scheduler { return; } else if (after == Status.RUNNING) { jobSubmittedRemotely = true; - job.setStatus(Status.RUNNING); -// logger.info("Job RUNNING, " + job.getId()); } } else { jobSubmittedRemotely = true; } - // only set status when it is RUNNING - // We would set other status based on the interpret result - if (after == Status.RUNNING) { -// logger.info("Job RUNNING, " + job.getId()); - job.setStatus(Status.RUNNING); + // status polled by status poller + if (job.getStatus() != after) { + job.setStatus(after); } } } http://git-wip-us.apache.org/repos/asf/zeppelin/blob/2a379102/zeppelin-interpreter/src/main/java/org/apache/zeppelin/scheduler/SchedulerFactory.java ---------------------------------------------------------------------- diff --git a/zeppelin-interpreter/src/main/java/org/apache/zeppelin/scheduler/SchedulerFactory.java b/zeppelin-interpreter/src/main/java/org/apache/zeppelin/scheduler/SchedulerFactory.java index 5871ca5..af52dec 100644 --- a/zeppelin-interpreter/src/main/java/org/apache/zeppelin/scheduler/SchedulerFactory.java +++ b/zeppelin-interpreter/src/main/java/org/apache/zeppelin/scheduler/SchedulerFactory.java @@ -24,18 +24,17 @@ import java.util.List; import java.util.Map; import java.util.concurrent.ExecutorService; -import org.apache.zeppelin.interpreter.remote.RemoteInterpreter; +import org.apache.zeppelin.interpreter.remote.RemoteInterpreterProcess; import org.slf4j.Logger; import org.slf4j.LoggerFactory; /** - * Factory class for creating schedulers - * + * TODO(moon) : add description. */ public class SchedulerFactory implements SchedulerListener { private static final Logger logger = LoggerFactory.getLogger(SchedulerFactory.class); - private ExecutorService executor; - private Map<String, Scheduler> schedulers = new LinkedHashMap<>(); + ExecutorService executor; + Map<String, Scheduler> schedulers = new LinkedHashMap<>(); private static SchedulerFactory singleton; private static Long singletonLock = new Long(0); @@ -55,17 +54,17 @@ public class SchedulerFactory implements SchedulerListener { return singleton; } - SchedulerFactory() throws Exception { - executor = ExecutorFactory.singleton().createOrGet("SchedulerFactory", 100); + public SchedulerFactory() throws Exception { + executor = ExecutorFactory.singleton().createOrGet("schedulerFactory", 100); } public void destroy() { - ExecutorFactory.singleton().shutdown("SchedulerFactory"); + ExecutorFactory.singleton().shutdown("schedulerFactory"); } public Scheduler createOrGetFIFOScheduler(String name) { synchronized (schedulers) { - if (!schedulers.containsKey(name)) { + if (schedulers.containsKey(name) == false) { Scheduler s = new FIFOScheduler(name, executor, this); schedulers.put(name, s); executor.execute(s); @@ -76,7 +75,7 @@ public class SchedulerFactory implements SchedulerListener { public Scheduler createOrGetParallelScheduler(String name, int maxConcurrency) { synchronized (schedulers) { - if (!schedulers.containsKey(name)) { + if (schedulers.containsKey(name) == false) { Scheduler s = new ParallelScheduler(name, executor, this, maxConcurrency); schedulers.put(name, s); executor.execute(s); @@ -87,17 +86,17 @@ public class SchedulerFactory implements SchedulerListener { public Scheduler createOrGetRemoteScheduler( String name, - String sessionId, - RemoteInterpreter remoteInterpreter, + String noteId, + RemoteInterpreterProcess interpreterProcess, int maxConcurrency) { synchronized (schedulers) { - if (!schedulers.containsKey(name)) { + if (schedulers.containsKey(name) == false) { Scheduler s = new RemoteScheduler( name, executor, - sessionId, - remoteInterpreter, + noteId, + interpreterProcess, this, maxConcurrency); schedulers.put(name, s); @@ -107,24 +106,38 @@ public class SchedulerFactory implements SchedulerListener { } } - public void removeScheduler(String name) { + public Scheduler removeScheduler(String name) { synchronized (schedulers) { Scheduler s = schedulers.remove(name); if (s != null) { s.stop(); } } + return null; + } + + public Collection<Scheduler> listScheduler(String name) { + List<Scheduler> s = new LinkedList<>(); + synchronized (schedulers) { + for (Scheduler ss : schedulers.values()) { + s.add(ss); + } + } + return s; } @Override public void jobStarted(Scheduler scheduler, Job job) { - logger.info("Job " + job.getId() + " started by scheduler " + scheduler.getName()); + logger.info("Job " + job.getJobName() + " started by scheduler " + scheduler.getName()); } @Override public void jobFinished(Scheduler scheduler, Job job) { - logger.info("Job " + job.getId() + " finished by scheduler " + scheduler.getName()); + logger.info("Job " + job.getJobName() + " finished by scheduler " + scheduler.getName()); } + + + } http://git-wip-us.apache.org/repos/asf/zeppelin/blob/2a379102/zeppelin-interpreter/src/main/java/org/apache/zeppelin/tabledata/TableDataProxy.java ---------------------------------------------------------------------- diff --git a/zeppelin-interpreter/src/main/java/org/apache/zeppelin/tabledata/TableDataProxy.java b/zeppelin-interpreter/src/main/java/org/apache/zeppelin/tabledata/TableDataProxy.java index 1926528..8673476 100644 --- a/zeppelin-interpreter/src/main/java/org/apache/zeppelin/tabledata/TableDataProxy.java +++ b/zeppelin-interpreter/src/main/java/org/apache/zeppelin/tabledata/TableDataProxy.java @@ -17,6 +17,7 @@ package org.apache.zeppelin.tabledata; import org.apache.zeppelin.resource.Resource; +import org.apache.zeppelin.resource.ResourcePoolUtils; import java.util.Iterator; http://git-wip-us.apache.org/repos/asf/zeppelin/blob/2a379102/zeppelin-interpreter/src/main/java/org/apache/zeppelin/util/IdHashes.java ---------------------------------------------------------------------- diff --git a/zeppelin-interpreter/src/main/java/org/apache/zeppelin/util/IdHashes.java b/zeppelin-interpreter/src/main/java/org/apache/zeppelin/util/IdHashes.java deleted file mode 100644 index 14c03a1..0000000 --- a/zeppelin-interpreter/src/main/java/org/apache/zeppelin/util/IdHashes.java +++ /dev/null @@ -1,76 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.zeppelin.util; - -import java.math.BigInteger; -import java.util.ArrayList; -import java.util.List; -import java.util.Random; - -/** - * Generate Tiny ID. - */ -public class IdHashes { - private static final char[] DICTIONARY = new char[] {'1', '2', '3', '4', '5', '6', '7', '8', '9', - 'A', 'B', 'C', 'D', 'E', 'F', 'G', 'H', 'J', 'K', 'M', 'N', 'P', 'Q', 'R', 'S', 'T', 'U', 'V', - 'W', 'X', 'Y', 'Z'}; - - /** - * encodes the given string into the base of the dictionary provided in the constructor. - * - * @param value the number to encode. - * @return the encoded string. - */ - private static String encode(Long value) { - - List<Character> result = new ArrayList<>(); - BigInteger base = new BigInteger("" + DICTIONARY.length); - int exponent = 1; - BigInteger remaining = new BigInteger(value.toString()); - while (true) { - BigInteger a = base.pow(exponent); // 16^1 = 16 - BigInteger b = remaining.mod(a); // 119 % 16 = 7 | 112 % 256 = 112 - BigInteger c = base.pow(exponent - 1); - BigInteger d = b.divide(c); - - // if d > dictionary.length, we have a problem. but BigInteger doesnt have - // a greater than method :-( hope for the best. theoretically, d is always - // an index of the dictionary! - result.add(DICTIONARY[d.intValue()]); - remaining = remaining.subtract(b); // 119 - 7 = 112 | 112 - 112 = 0 - - // finished? - if (remaining.equals(BigInteger.ZERO)) { - break; - } - - exponent++; - } - - // need to reverse it, since the start of the list contains the least significant values - StringBuffer sb = new StringBuffer(); - for (int i = result.size() - 1; i >= 0; i--) { - sb.append(result.get(i)); - } - return sb.toString(); - } - - public static String generateId() { - return encode(System.currentTimeMillis() + new Random().nextInt()); - } -} http://git-wip-us.apache.org/repos/asf/zeppelin/blob/2a379102/zeppelin-interpreter/src/main/java/org/apache/zeppelin/util/Util.java ---------------------------------------------------------------------- diff --git a/zeppelin-interpreter/src/main/java/org/apache/zeppelin/util/Util.java b/zeppelin-interpreter/src/main/java/org/apache/zeppelin/util/Util.java deleted file mode 100644 index 6153f49..0000000 --- a/zeppelin-interpreter/src/main/java/org/apache/zeppelin/util/Util.java +++ /dev/null @@ -1,76 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.zeppelin.util; - -import org.apache.commons.lang.StringUtils; - -import java.io.IOException; -import java.util.Properties; - -/** - * TODO(moon) : add description. - */ -public class Util { - private static final String PROJECT_PROPERTIES_VERSION_KEY = "version"; - private static final String GIT_PROPERTIES_COMMIT_ID_KEY = "git.commit.id.abbrev"; - private static final String GIT_PROPERTIES_COMMIT_TS_KEY = "git.commit.time"; - - private static Properties projectProperties; - private static Properties gitProperties; - - static { - projectProperties = new Properties(); - gitProperties = new Properties(); - try { - projectProperties.load(Util.class.getResourceAsStream("/project.properties")); - gitProperties.load(Util.class.getResourceAsStream("/git.properties")); - } catch (IOException e) { - //Fail to read project.properties - } - } - - /** - * Get Zeppelin version - * - * @return Current Zeppelin version - */ - public static String getVersion() { - return StringUtils.defaultIfEmpty(projectProperties.getProperty(PROJECT_PROPERTIES_VERSION_KEY), - StringUtils.EMPTY); - } - - /** - * Get Zeppelin Git latest commit id - * - * @return Latest Zeppelin commit id - */ - public static String getGitCommitId() { - return StringUtils.defaultIfEmpty(gitProperties.getProperty(GIT_PROPERTIES_COMMIT_ID_KEY), - StringUtils.EMPTY); - } - - /** - * Get Zeppelin Git latest commit timestamp - * - * @return Latest Zeppelin commit timestamp - */ - public static String getGitTimestamp() { - return StringUtils.defaultIfEmpty(gitProperties.getProperty(GIT_PROPERTIES_COMMIT_TS_KEY), - StringUtils.EMPTY); - } -}