http://git-wip-us.apache.org/repos/asf/zeppelin/blob/0d746fa2/zeppelin-zengine/src/main/java/org/apache/zeppelin/interpreter/RemoteInterpreterEventServer.java ---------------------------------------------------------------------- diff --git a/zeppelin-zengine/src/main/java/org/apache/zeppelin/interpreter/RemoteInterpreterEventServer.java b/zeppelin-zengine/src/main/java/org/apache/zeppelin/interpreter/RemoteInterpreterEventServer.java index a535c96..5b7223c 100644 --- a/zeppelin-zengine/src/main/java/org/apache/zeppelin/interpreter/RemoteInterpreterEventServer.java +++ b/zeppelin-zengine/src/main/java/org/apache/zeppelin/interpreter/RemoteInterpreterEventServer.java @@ -19,15 +19,6 @@ package org.apache.zeppelin.interpreter; import com.google.gson.Gson; import com.google.gson.reflect.TypeToken; -import java.io.IOException; -import java.nio.ByteBuffer; -import java.util.LinkedList; -import java.util.List; -import java.util.Map; -import java.util.concurrent.Executors; -import java.util.concurrent.ScheduledExecutorService; -import java.util.concurrent.ScheduledFuture; -import java.util.concurrent.TimeUnit; import org.apache.thrift.TException; import org.apache.thrift.server.TThreadPoolServer; import org.apache.thrift.transport.TServerSocket; @@ -44,10 +35,10 @@ import org.apache.zeppelin.interpreter.remote.RemoteInterpreterUtils; import org.apache.zeppelin.interpreter.thrift.AppOutputAppendEvent; import org.apache.zeppelin.interpreter.thrift.AppOutputUpdateEvent; import org.apache.zeppelin.interpreter.thrift.AppStatusUpdateEvent; +import org.apache.zeppelin.interpreter.thrift.RegisterInfo; import org.apache.zeppelin.interpreter.thrift.OutputAppendEvent; import org.apache.zeppelin.interpreter.thrift.OutputUpdateAllEvent; import org.apache.zeppelin.interpreter.thrift.OutputUpdateEvent; -import org.apache.zeppelin.interpreter.thrift.RegisterInfo; import org.apache.zeppelin.interpreter.thrift.RemoteInterpreterEventService; import org.apache.zeppelin.interpreter.thrift.RemoteInterpreterResultMessage; import org.apache.zeppelin.interpreter.thrift.RemoteInterpreterService; @@ -60,6 +51,16 @@ import org.apache.zeppelin.resource.ResourceSet; import org.slf4j.Logger; import org.slf4j.LoggerFactory; +import java.io.IOException; +import java.nio.ByteBuffer; +import java.util.LinkedList; +import java.util.List; +import java.util.Map; +import java.util.concurrent.Executors; +import java.util.concurrent.ScheduledExecutorService; +import java.util.concurrent.ScheduledFuture; +import java.util.concurrent.TimeUnit; + public class RemoteInterpreterEventServer implements RemoteInterpreterEventService.Iface { private static final Logger LOGGER = LoggerFactory.getLogger(RemoteInterpreterEventServer.class); @@ -78,8 +79,8 @@ public class RemoteInterpreterEventServer implements RemoteInterpreterEventServi private final ApplicationEventListener appListener; private final Gson gson = new Gson(); - public RemoteInterpreterEventServer( - ZeppelinConfiguration zConf, InterpreterSettingManager interpreterSettingManager) { + public RemoteInterpreterEventServer(ZeppelinConfiguration zConf, + InterpreterSettingManager interpreterSettingManager) { this.portRange = zConf.getZeppelinServerRPCPortRange(); this.interpreterSettingManager = interpreterSettingManager; this.listener = interpreterSettingManager.getRemoteInterpreterProcessListener(); @@ -99,33 +100,32 @@ public class RemoteInterpreterEventServer implements RemoteInterpreterEventServi LOGGER.info("InterpreterEventServer will start. Port: {}", port); RemoteInterpreterEventService.Processor processor = new RemoteInterpreterEventService.Processor(this); - this.thriftServer = - new TThreadPoolServer(new TThreadPoolServer.Args(tSocket).processor(processor)); + this.thriftServer = new TThreadPoolServer( + new TThreadPoolServer.Args(tSocket).processor(processor)); this.thriftServer.serve(); } public void start() throws IOException { - Thread startingThread = - new Thread() { - @Override - public void run() { - TServerSocket tSocket = null; - try { - tSocket = RemoteInterpreterUtils.createTServerSocket(portRange); - port = tSocket.getServerSocket().getLocalPort(); - host = RemoteInterpreterUtils.findAvailableHostAddress(); - } catch (IOException e1) { - throw new RuntimeException(e1); - } + Thread startingThread = new Thread() { + @Override + public void run() { + TServerSocket tSocket = null; + try { + tSocket = RemoteInterpreterUtils.createTServerSocket(portRange); + port = tSocket.getServerSocket().getLocalPort(); + host = RemoteInterpreterUtils.findAvailableHostAddress(); + } catch (IOException e1) { + throw new RuntimeException(e1); + } - LOGGER.info("InterpreterEventServer will start. Port: {}", port); - RemoteInterpreterEventService.Processor processor = - new RemoteInterpreterEventService.Processor(RemoteInterpreterEventServer.this); - thriftServer = - new TThreadPoolServer(new TThreadPoolServer.Args(tSocket).processor(processor)); - thriftServer.serve(); - } - }; + LOGGER.info("InterpreterEventServer will start. Port: {}", port); + RemoteInterpreterEventService.Processor processor = + new RemoteInterpreterEventService.Processor(RemoteInterpreterEventServer.this); + thriftServer = new TThreadPoolServer( + new TThreadPoolServer.Args(tSocket).processor(processor)); + thriftServer.serve(); + } + }; startingThread.start(); long start = System.currentTimeMillis(); while ((System.currentTimeMillis() - start) < 30 * 1000) { @@ -145,9 +145,8 @@ public class RemoteInterpreterEventServer implements RemoteInterpreterEventServi LOGGER.info("InterpreterEventServer is started"); runner = new AppendOutputRunner(listener); - appendFuture = - appendService.scheduleWithFixedDelay( - runner, 0, AppendOutputRunner.BUFFER_TIME_MS, TimeUnit.MILLISECONDS); + appendFuture = appendService.scheduleWithFixedDelay( + runner, 0, AppendOutputRunner.BUFFER_TIME_MS, TimeUnit.MILLISECONDS); } public void stop() { @@ -159,6 +158,7 @@ public class RemoteInterpreterEventServer implements RemoteInterpreterEventServi } } + public int getPort() { return port; } @@ -178,9 +178,8 @@ public class RemoteInterpreterEventServer implements RemoteInterpreterEventServi RemoteInterpreterProcess interpreterProcess = ((ManagedInterpreterGroup) interpreterGroup).getInterpreterProcess(); if (interpreterProcess == null) { - LOGGER.warn( - "Interpreter process does not existed yet for InterpreterGroup: " - + registerInfo.getInterpreterGroupId()); + LOGGER.warn("Interpreter process does not existed yet for InterpreterGroup: " + + registerInfo.getInterpreterGroupId()); } ((RemoteInterpreterManagedProcess) interpreterProcess) .processStarted(registerInfo.port, registerInfo.host); @@ -192,32 +191,19 @@ public class RemoteInterpreterEventServer implements RemoteInterpreterEventServi runner.appendBuffer( event.getNoteId(), event.getParagraphId(), event.getIndex(), event.getData()); } else { - appListener.onOutputAppend( - event.getNoteId(), - event.getParagraphId(), - event.getIndex(), - event.getAppId(), - event.getData()); + appListener.onOutputAppend(event.getNoteId(), event.getParagraphId(), event.getIndex(), + event.getAppId(), event.getData()); } } @Override public void updateOutput(OutputUpdateEvent event) throws TException { if (event.getAppId() == null) { - listener.onOutputUpdated( - event.getNoteId(), - event.getParagraphId(), - event.getIndex(), - InterpreterResult.Type.valueOf(event.getType()), - event.getData()); + listener.onOutputUpdated(event.getNoteId(), event.getParagraphId(), event.getIndex(), + InterpreterResult.Type.valueOf(event.getType()), event.getData()); } else { - appListener.onOutputUpdated( - event.getNoteId(), - event.getParagraphId(), - event.getIndex(), - event.getAppId(), - InterpreterResult.Type.valueOf(event.getType()), - event.getData()); + appListener.onOutputUpdated(event.getNoteId(), event.getParagraphId(), event.getIndex(), + event.getAppId(), InterpreterResult.Type.valueOf(event.getType()), event.getData()); } } @@ -226,30 +212,21 @@ public class RemoteInterpreterEventServer implements RemoteInterpreterEventServi listener.onOutputClear(event.getNoteId(), event.getParagraphId()); for (int i = 0; i < event.getMsg().size(); i++) { RemoteInterpreterResultMessage msg = event.getMsg().get(i); - listener.onOutputUpdated( - event.getNoteId(), - event.getParagraphId(), - i, - InterpreterResult.Type.valueOf(msg.getType()), - msg.getData()); + listener.onOutputUpdated(event.getNoteId(), event.getParagraphId(), i, + InterpreterResult.Type.valueOf(msg.getType()), msg.getData()); } } @Override public void appendAppOutput(AppOutputAppendEvent event) throws TException { - appListener.onOutputAppend( - event.noteId, event.paragraphId, event.index, event.appId, event.data); + appListener.onOutputAppend(event.noteId, event.paragraphId, event.index, event.appId, + event.data); } @Override public void updateAppOutput(AppOutputUpdateEvent event) throws TException { - appListener.onOutputUpdated( - event.noteId, - event.paragraphId, - event.index, - event.appId, - InterpreterResult.Type.valueOf(event.type), - event.data); + appListener.onOutputUpdated(event.noteId, event.paragraphId, event.index, event.appId, + InterpreterResult.Type.valueOf(event.type), event.data); } @Override @@ -260,14 +237,11 @@ public class RemoteInterpreterEventServer implements RemoteInterpreterEventServi @Override public void runParagraphs(RunParagraphsEvent event) throws TException { try { - listener.runParagraphs( - event.getNoteId(), - event.getParagraphIndices(), - event.getParagraphIds(), - event.getCurParagraphId()); + listener.runParagraphs(event.getNoteId(), event.getParagraphIndices(), + event.getParagraphIds(), event.getCurParagraphId()); if (InterpreterContext.get() != null) { - LOGGER.info( - "complete runParagraphs." + InterpreterContext.get().getParagraphId() + " " + event); + LOGGER.info("complete runParagraphs." + InterpreterContext.get().getParagraphId() + " " + + event); } else { LOGGER.info("complete runParagraphs." + event); } @@ -284,13 +258,8 @@ public class RemoteInterpreterEventServer implements RemoteInterpreterEventServi if (interpreterGroup == null) { throw new TException("Invalid InterpreterGroupId: " + intpGroupId); } - interpreterGroup - .getAngularObjectRegistry() - .add( - angularObject.getName(), - angularObject.get(), - angularObject.getNoteId(), - angularObject.getParagraphId()); + interpreterGroup.getAngularObjectRegistry().add(angularObject.getName(), + angularObject.get(), angularObject.getNoteId(), angularObject.getParagraphId()); } @Override @@ -301,22 +270,22 @@ public class RemoteInterpreterEventServer implements RemoteInterpreterEventServi if (interpreterGroup == null) { throw new TException("Invalid InterpreterGroupId: " + intpGroupId); } - AngularObject localAngularObject = - interpreterGroup - .getAngularObjectRegistry() - .get( - angularObject.getName(), angularObject.getNoteId(), angularObject.getParagraphId()); + AngularObject localAngularObject = interpreterGroup.getAngularObjectRegistry().get( + angularObject.getName(), angularObject.getNoteId(), angularObject.getParagraphId()); if (localAngularObject instanceof RemoteAngularObject) { // to avoid ping-pong loop - ((RemoteAngularObject) localAngularObject).set(angularObject.get(), true, false); + ((RemoteAngularObject) localAngularObject).set( + angularObject.get(), true, false); } else { localAngularObject.set(angularObject.get()); } } @Override - public void removeAngularObject( - String intpGroupId, String noteId, String paragraphId, String name) throws TException { + public void removeAngularObject(String intpGroupId, + String noteId, + String paragraphId, + String name) throws TException { InterpreterGroup interpreterGroup = interpreterSettingManager.getInterpreterGroupById(intpGroupId); if (interpreterGroup == null) { @@ -333,11 +302,13 @@ public class RemoteInterpreterEventServer implements RemoteInterpreterEventServi throw new TException("Invalid InterpreterGroupId: " + intpGroupId); } - Map<String, String> paraInfos = - gson.fromJson(json, new TypeToken<Map<String, String>>() {}.getType()); + Map<String, String> paraInfos = gson.fromJson(json, + new TypeToken<Map<String, String>>() { + }.getType()); String noteId = paraInfos.get("noteId"); String paraId = paraInfos.get("paraId"); - String settingId = RemoteInterpreterUtils.getInterpreterSettingId(interpreterGroup.getId()); + String settingId = RemoteInterpreterUtils. + getInterpreterSettingId(interpreterGroup.getId()); if (noteId != null && paraId != null && settingId != null) { listener.onParaInfosReceived(noteId, paraId, settingId, paraInfos); } @@ -388,8 +359,8 @@ public class RemoteInterpreterEventServer implements RemoteInterpreterEventServi return obj; } - private Object invokeResourceMethod( - String intpGroupId, final InvokeResourceMethodEventMessage message) { + private Object invokeResourceMethod(String intpGroupId, + final InvokeResourceMethodEventMessage message) { final ResourceId resourceId = message.resourceId; ManagedInterpreterGroup intpGroup = interpreterSettingManager.getInterpreterGroupById(resourceId.getResourcePoolId()); @@ -422,26 +393,21 @@ public class RemoteInterpreterEventServer implements RemoteInterpreterEventServi LOGGER.error("no resource pool"); return null; } - } else if (interpreterSettingManager - .getInterpreterGroupById(intpGroupId) - .getInterpreterProcess() - .isRunning()) { - ByteBuffer res = - interpreterSettingManager - .getInterpreterGroupById(intpGroupId) - .getInterpreterProcess() - .callRemoteFunction( - new RemoteInterpreterProcess.RemoteFunction<ByteBuffer>() { - @Override - public ByteBuffer call(RemoteInterpreterService.Client client) - throws Exception { - return client.resourceInvokeMethod( - resourceId.getNoteId(), - resourceId.getParagraphId(), - resourceId.getName(), - message.toJson()); - } - }); + } else if (interpreterSettingManager.getInterpreterGroupById(intpGroupId) + .getInterpreterProcess().isRunning()) { + ByteBuffer res = interpreterSettingManager.getInterpreterGroupById(intpGroupId) + .getInterpreterProcess().callRemoteFunction( + new RemoteInterpreterProcess.RemoteFunction<ByteBuffer>() { + @Override + public ByteBuffer call(RemoteInterpreterService.Client client) throws Exception { + return client.resourceInvokeMethod( + resourceId.getNoteId(), + resourceId.getParagraphId(), + resourceId.getName(), + message.toJson()); + } + } + ); try { return Resource.deserializeObject(res); @@ -454,21 +420,23 @@ public class RemoteInterpreterEventServer implements RemoteInterpreterEventServi } private Object getResource(final ResourceId resourceId) { - ManagedInterpreterGroup intpGroup = - interpreterSettingManager.getInterpreterGroupById(resourceId.getResourcePoolId()); + ManagedInterpreterGroup intpGroup = interpreterSettingManager + .getInterpreterGroupById(resourceId.getResourcePoolId()); if (intpGroup == null) { return null; } RemoteInterpreterProcess remoteInterpreterProcess = intpGroup.getRemoteInterpreterProcess(); - ByteBuffer buffer = - remoteInterpreterProcess.callRemoteFunction( - new RemoteInterpreterProcess.RemoteFunction<ByteBuffer>() { - @Override - public ByteBuffer call(RemoteInterpreterService.Client client) throws Exception { - return client.resourceGet( - resourceId.getNoteId(), resourceId.getParagraphId(), resourceId.getName()); - } - }); + ByteBuffer buffer = remoteInterpreterProcess.callRemoteFunction( + new RemoteInterpreterProcess.RemoteFunction<ByteBuffer>() { + @Override + public ByteBuffer call(RemoteInterpreterService.Client client) throws Exception { + return client.resourceGet( + resourceId.getNoteId(), + resourceId.getParagraphId(), + resourceId.getName()); + } + } + ); try { Object o = Resource.deserializeObject(buffer); @@ -493,15 +461,14 @@ public class RemoteInterpreterEventServer implements RemoteInterpreterEventServi resourceSet.addAll(localPool.getAll()); } } else if (remoteInterpreterProcess.isRunning()) { - List<String> resourceList = - remoteInterpreterProcess.callRemoteFunction( - new RemoteInterpreterProcess.RemoteFunction<List<String>>() { - @Override - public List<String> call(RemoteInterpreterService.Client client) - throws Exception { - return client.resourcePoolGetAll(); - } - }); + List<String> resourceList = remoteInterpreterProcess.callRemoteFunction( + new RemoteInterpreterProcess.RemoteFunction<List<String>>() { + @Override + public List<String> call(RemoteInterpreterService.Client client) throws Exception { + return client.resourcePoolGetAll(); + } + } + ); for (String res : resourceList) { resourceSet.add(RemoteResource.fromJson(res)); }
http://git-wip-us.apache.org/repos/asf/zeppelin/blob/0d746fa2/zeppelin-zengine/src/main/java/org/apache/zeppelin/interpreter/SessionConfInterpreter.java ---------------------------------------------------------------------- diff --git a/zeppelin-zengine/src/main/java/org/apache/zeppelin/interpreter/SessionConfInterpreter.java b/zeppelin-zengine/src/main/java/org/apache/zeppelin/interpreter/SessionConfInterpreter.java index 4dd0eb2..e303ee6 100644 --- a/zeppelin-zengine/src/main/java/org/apache/zeppelin/interpreter/SessionConfInterpreter.java +++ b/zeppelin-zengine/src/main/java/org/apache/zeppelin/interpreter/SessionConfInterpreter.java @@ -17,24 +17,24 @@ package org.apache.zeppelin.interpreter; -import java.io.IOException; -import java.io.StringReader; -import java.util.List; -import java.util.Properties; import org.apache.commons.lang.exception.ExceptionUtils; import org.apache.zeppelin.interpreter.remote.RemoteInterpreter; import org.slf4j.Logger; import org.slf4j.LoggerFactory; +import java.io.IOException; +import java.io.StringReader; +import java.util.List; +import java.util.Properties; + public class SessionConfInterpreter extends ConfInterpreter { private static Logger LOGGER = LoggerFactory.getLogger(SessionConfInterpreter.class); - public SessionConfInterpreter( - Properties properties, - String sessionId, - String interpreterGroupId, - InterpreterSetting interpreterSetting) { + public SessionConfInterpreter(Properties properties, + String sessionId, + String interpreterGroupId, + InterpreterSetting interpreterSetting) { super(properties, sessionId, interpreterGroupId, interpreterSetting); } @@ -56,8 +56,7 @@ public class SessionConfInterpreter extends ConfInterpreter { if (intp instanceof RemoteInterpreter) { RemoteInterpreter remoteInterpreter = (RemoteInterpreter) intp; if (remoteInterpreter.isOpened()) { - return new InterpreterResult( - InterpreterResult.Code.ERROR, + return new InterpreterResult(InterpreterResult.Code.ERROR, "Can not change interpreter session properties after this session is started"); } remoteInterpreter.setProperties(finalProperties); http://git-wip-us.apache.org/repos/asf/zeppelin/blob/0d746fa2/zeppelin-zengine/src/main/java/org/apache/zeppelin/interpreter/SparkDownloadUtils.java ---------------------------------------------------------------------- diff --git a/zeppelin-zengine/src/main/java/org/apache/zeppelin/interpreter/SparkDownloadUtils.java b/zeppelin-zengine/src/main/java/org/apache/zeppelin/interpreter/SparkDownloadUtils.java index a336bcd..9bef4d9 100644 --- a/zeppelin-zengine/src/main/java/org/apache/zeppelin/interpreter/SparkDownloadUtils.java +++ b/zeppelin-zengine/src/main/java/org/apache/zeppelin/interpreter/SparkDownloadUtils.java @@ -12,7 +12,10 @@ import org.apache.commons.lang3.StringUtils; import org.slf4j.Logger; import org.slf4j.LoggerFactory; -/** Utility class for downloading spark. This is used for spark integration test. */ +/** + * Utility class for downloading spark. This is used for spark integration test. + * + */ public class SparkDownloadUtils { private static Logger LOGGER = LoggerFactory.getLogger(SparkDownloadUtils.class); @@ -26,6 +29,7 @@ public class SparkDownloadUtils { } } + public static String downloadSpark(String version) { File targetSparkHomeFolder = new File(downloadFolder + "/spark-" + version + "-bin-hadoop2.6"); if (targetSparkHomeFolder.exists()) { @@ -36,19 +40,11 @@ public class SparkDownloadUtils { boolean downloaded = false; for (int i = 0; i < 3; i++) { try { - String preferredMirror = - IOUtils.toString(new URL("https://www.apache.org/dyn/closer.lua?preferred=true")); + String preferredMirror = IOUtils.toString(new URL("https://www.apache.org/dyn/closer.lua?preferred=true")); File downloadFile = new File(downloadFolder + "/spark-" + version + "-bin-hadoop2.6.tgz"); - String downloadURL = - preferredMirror - + "/spark/spark-" - + version - + "/spark-" - + version - + "-bin-hadoop2.6.tgz"; + String downloadURL = preferredMirror + "/spark/spark-" + version + "/spark-" + version + "-bin-hadoop2.6.tgz"; runShellCommand(new String[] {"wget", downloadURL, "-P", downloadFolder}); - runShellCommand( - new String[] {"tar", "-xvf", downloadFile.getAbsolutePath(), "-C", downloadFolder}); + runShellCommand(new String[]{"tar", "-xvf", downloadFile.getAbsolutePath(), "-C", downloadFolder}); downloaded = true; break; } catch (Exception e) { @@ -86,20 +82,11 @@ public class SparkDownloadUtils { // Try mirrors a few times until one succeeds for (int i = 0; i < 3; i++) { try { - String preferredMirror = - IOUtils.toString(new URL("https://www.apache.org/dyn/closer.lua?preferred=true")); - File downloadFile = - new File(downloadFolder + "/flink-" + version + "-bin-hadoop27-scala_2.11.tgz"); - String downloadURL = - preferredMirror - + "/flink/flink-" - + version - + "/flink-" - + version - + "-bin-hadoop27-scala_2.11.tgz"; + String preferredMirror = IOUtils.toString(new URL("https://www.apache.org/dyn/closer.lua?preferred=true")); + File downloadFile = new File(downloadFolder + "/flink-" + version + "-bin-hadoop27-scala_2.11.tgz"); + String downloadURL = preferredMirror + "/flink/flink-" + version + "/flink-" + version + "-bin-hadoop27-scala_2.11.tgz"; runShellCommand(new String[] {"wget", downloadURL, "-P", downloadFolder}); - runShellCommand( - new String[] {"tar", "-xvf", downloadFile.getAbsolutePath(), "-C", downloadFolder}); + runShellCommand(new String[]{"tar", "-xvf", downloadFile.getAbsolutePath(), "-C", downloadFolder}); downloaded = true; break; } catch (Exception e) { @@ -109,8 +96,7 @@ public class SparkDownloadUtils { // fallback to use apache archive if (!downloaded) { - File downloadFile = - new File(downloadFolder + "/flink-" + version + "-bin-hadoop27-scala_2.11.tgz"); + File downloadFile = new File(downloadFolder + "/flink-" + version + "-bin-hadoop27-scala_2.11.tgz"); String downloadURL = "https://archive.apache.org/dist/flink/flink-" + version @@ -155,7 +141,7 @@ public class SparkDownloadUtils { BufferedReader br = new BufferedReader(isr); String line = null; long startTime = System.currentTimeMillis(); - while ((line = br.readLine()) != null) { + while ( (line = br.readLine()) != null) { // logging per 5 seconds if ((System.currentTimeMillis() - startTime) > 5000) { LOGGER.info(line); http://git-wip-us.apache.org/repos/asf/zeppelin/blob/0d746fa2/zeppelin-zengine/src/main/java/org/apache/zeppelin/interpreter/install/InstallInterpreter.java ---------------------------------------------------------------------- diff --git a/zeppelin-zengine/src/main/java/org/apache/zeppelin/interpreter/install/InstallInterpreter.java b/zeppelin-zengine/src/main/java/org/apache/zeppelin/interpreter/install/InstallInterpreter.java index c5d330b..0817595 100644 --- a/zeppelin-zengine/src/main/java/org/apache/zeppelin/interpreter/install/InstallInterpreter.java +++ b/zeppelin-zengine/src/main/java/org/apache/zeppelin/interpreter/install/InstallInterpreter.java @@ -16,6 +16,12 @@ */ package org.apache.zeppelin.interpreter.install; +import org.apache.commons.io.FileUtils; +import org.apache.zeppelin.conf.ZeppelinConfiguration; +import org.apache.zeppelin.dep.DependencyResolver; +import org.apache.zeppelin.util.Util; +import org.sonatype.aether.RepositoryException; + import java.io.File; import java.io.IOException; import java.net.URL; @@ -24,13 +30,10 @@ import java.util.List; import java.util.Locale; import java.util.regex.Matcher; import java.util.regex.Pattern; -import org.apache.commons.io.FileUtils; -import org.apache.zeppelin.conf.ZeppelinConfiguration; -import org.apache.zeppelin.dep.DependencyResolver; -import org.apache.zeppelin.util.Util; -import org.sonatype.aether.RepositoryException; -/** Commandline utility to install interpreter from maven repository */ +/** + * Commandline utility to install interpreter from maven repository + */ public class InstallInterpreter { private final File interpreterListFile; private final File interpreterBaseDir; @@ -41,6 +44,7 @@ public class InstallInterpreter { private String proxyPassword; /** + * * @param interpreterListFile * @param interpreterBaseDir interpreter directory for installing binaries * @throws IOException @@ -54,7 +58,10 @@ public class InstallInterpreter { readAvailableInterpreters(); } - /** Information for available informations */ + + /** + * Information for available informations + */ private static class AvailableInterpreterInfo { public final String name; public final String artifact; @@ -114,7 +121,7 @@ public class InstallInterpreter { } } - public void install(String[] names) { + public void install(String [] names) { for (String name : names) { install(name); } @@ -132,7 +139,7 @@ public class InstallInterpreter { throw new RuntimeException("Can't find interpreter '" + name + "'"); } - public void install(String[] names, String[] artifacts) { + public void install(String [] names, String [] artifacts) { if (names.length != artifacts.length) { throw new RuntimeException("Length of given names and artifacts are different"); } @@ -150,18 +157,19 @@ public class InstallInterpreter { File installDir = new File(interpreterBaseDir, name); if (installDir.exists()) { - System.err.println( - "Directory " + installDir.getAbsolutePath() + " already exists" + "\n\nSkipped"); + System.err.println("Directory " + installDir.getAbsolutePath() + + " already exists" + + "\n\nSkipped"); return; } - System.out.println( - "Install " + name + "(" + artifact + ") to " + installDir.getAbsolutePath() + " ... "); + System.out.println("Install " + name + "(" + artifact + ") to " + + installDir.getAbsolutePath() + " ... "); try { depResolver.load(artifact, installDir); - System.out.println( - "Interpreter " + name + " installed under " + installDir.getAbsolutePath() + "."); + System.out.println("Interpreter " + name + " installed under " + + installDir.getAbsolutePath() + "."); startTip(); } catch (RepositoryException e) { e.printStackTrace(); @@ -180,32 +188,29 @@ public class InstallInterpreter { System.out.println("Options"); System.out.println(" -l, --list List available interpreters"); System.out.println(" -a, --all Install all available interpreters"); - System.out.println( - " -n, --name [NAMES] Install interpreters (comma separated " - + "list)" - + "e.g. md,shell,jdbc,python,angular"); - System.out.println( - " -t, --artifact [ARTIFACTS] (Optional with -n) custom artifact names" - + ". " - + "(comma separated list correspond to --name) " - + "e.g. customGroup:customArtifact:customVersion"); + System.out.println(" -n, --name [NAMES] Install interpreters (comma separated " + + "list)" + + "e.g. md,shell,jdbc,python,angular"); + System.out.println(" -t, --artifact [ARTIFACTS] (Optional with -n) custom artifact names" + + ". " + + "(comma separated list correspond to --name) " + + "e.g. customGroup:customArtifact:customVersion"); System.out.println(" --proxy-url [url] (Optional) proxy url. http(s)://host:port"); System.out.println(" --proxy-user [user] (Optional) proxy user"); System.out.println(" --proxy-password [password] (Optional) proxy password"); } - public static void main(String[] args) throws IOException { + public static void main(String [] args) throws IOException { if (args.length == 0) { usage(); return; } ZeppelinConfiguration conf = ZeppelinConfiguration.create(); - InstallInterpreter installer = - new InstallInterpreter( - new File(conf.getInterpreterListPath()), - new File(conf.getInterpreterDir()), - conf.getInterpreterLocalRepoPath()); + InstallInterpreter installer = new InstallInterpreter( + new File(conf.getInterpreterListPath()), + new File(conf.getInterpreterDir()), + conf.getInterpreterLocalRepoPath()); String names = null; String artifacts = null; @@ -276,9 +281,8 @@ public class InstallInterpreter { } private static void startTip() { - System.out.println( - "\n1. Restart Zeppelin" - + "\n2. Create interpreter setting in 'Interpreter' menu on Zeppelin GUI" - + "\n3. Then you can bind the interpreter on your note"); + System.out.println("\n1. Restart Zeppelin" + + "\n2. Create interpreter setting in 'Interpreter' menu on Zeppelin GUI" + + "\n3. Then you can bind the interpreter on your note"); } } http://git-wip-us.apache.org/repos/asf/zeppelin/blob/0d746fa2/zeppelin-zengine/src/main/java/org/apache/zeppelin/interpreter/lifecycle/NullLifecycleManager.java ---------------------------------------------------------------------- diff --git a/zeppelin-zengine/src/main/java/org/apache/zeppelin/interpreter/lifecycle/NullLifecycleManager.java b/zeppelin-zengine/src/main/java/org/apache/zeppelin/interpreter/lifecycle/NullLifecycleManager.java index 3baa05c..5a62d22 100644 --- a/zeppelin-zengine/src/main/java/org/apache/zeppelin/interpreter/lifecycle/NullLifecycleManager.java +++ b/zeppelin-zengine/src/main/java/org/apache/zeppelin/interpreter/lifecycle/NullLifecycleManager.java @@ -15,20 +15,29 @@ * limitations under the License. */ + package org.apache.zeppelin.interpreter.lifecycle; import org.apache.zeppelin.conf.ZeppelinConfiguration; import org.apache.zeppelin.interpreter.LifecycleManager; import org.apache.zeppelin.interpreter.ManagedInterpreterGroup; -/** Do nothing for the lifecycle of interpreter. User need to explicitly start/stop interpreter. */ +/** + * Do nothing for the lifecycle of interpreter. User need to explicitly start/stop interpreter. + */ public class NullLifecycleManager implements LifecycleManager { - public NullLifecycleManager(ZeppelinConfiguration zConf) {} + public NullLifecycleManager(ZeppelinConfiguration zConf) { + + } @Override - public void onInterpreterProcessStarted(ManagedInterpreterGroup interpreterGroup) {} + public void onInterpreterProcessStarted(ManagedInterpreterGroup interpreterGroup) { + + } @Override - public void onInterpreterUse(ManagedInterpreterGroup interpreterGroup, String sessionId) {} + public void onInterpreterUse(ManagedInterpreterGroup interpreterGroup, String sessionId) { + + } } http://git-wip-us.apache.org/repos/asf/zeppelin/blob/0d746fa2/zeppelin-zengine/src/main/java/org/apache/zeppelin/interpreter/lifecycle/TimeoutLifecycleManager.java ---------------------------------------------------------------------- diff --git a/zeppelin-zengine/src/main/java/org/apache/zeppelin/interpreter/lifecycle/TimeoutLifecycleManager.java b/zeppelin-zengine/src/main/java/org/apache/zeppelin/interpreter/lifecycle/TimeoutLifecycleManager.java index b580c49..90f3f55 100644 --- a/zeppelin-zengine/src/main/java/org/apache/zeppelin/interpreter/lifecycle/TimeoutLifecycleManager.java +++ b/zeppelin-zengine/src/main/java/org/apache/zeppelin/interpreter/lifecycle/TimeoutLifecycleManager.java @@ -1,22 +1,24 @@ package org.apache.zeppelin.interpreter.lifecycle; -import java.util.Map; -import java.util.Timer; -import java.util.TimerTask; -import java.util.concurrent.ConcurrentHashMap; import org.apache.zeppelin.conf.ZeppelinConfiguration; import org.apache.zeppelin.interpreter.LifecycleManager; import org.apache.zeppelin.interpreter.ManagedInterpreterGroup; import org.slf4j.Logger; import org.slf4j.LoggerFactory; +import java.util.Map; +import java.util.Timer; +import java.util.TimerTask; +import java.util.concurrent.ConcurrentHashMap; + + /** * This lifecycle manager would close interpreter after it is timeout. By default, it is timeout * after no using in 1 hour. * - * <p>For now, this class only manage the lifecycle of interpreter group (will close interpreter - * process after timeout). Managing the lifecycle of interpreter session could be done in future if - * necessary. + * For now, this class only manage the lifecycle of interpreter group (will close interpreter + * process after timeout). Managing the lifecycle of interpreter session could be done in future + * if necessary. */ public class TimeoutLifecycleManager implements LifecycleManager { @@ -31,38 +33,28 @@ public class TimeoutLifecycleManager implements LifecycleManager { private Timer checkTimer; public TimeoutLifecycleManager(ZeppelinConfiguration zConf) { - this.checkInterval = - zConf.getLong( - ZeppelinConfiguration.ConfVars - .ZEPPELIN_INTERPRETER_LIFECYCLE_MANAGER_TIMEOUT_CHECK_INTERVAL); - this.timeoutThreshold = - zConf.getLong( - ZeppelinConfiguration.ConfVars - .ZEPPELIN_INTERPRETER_LIFECYCLE_MANAGER_TIMEOUT_THRESHOLD); + this.checkInterval = zConf.getLong(ZeppelinConfiguration.ConfVars + .ZEPPELIN_INTERPRETER_LIFECYCLE_MANAGER_TIMEOUT_CHECK_INTERVAL); + this.timeoutThreshold = zConf.getLong( + ZeppelinConfiguration.ConfVars.ZEPPELIN_INTERPRETER_LIFECYCLE_MANAGER_TIMEOUT_THRESHOLD); this.checkTimer = new Timer(true); - this.checkTimer.scheduleAtFixedRate( - new TimerTask() { - @Override - public void run() { - long now = System.currentTimeMillis(); - for (Map.Entry<ManagedInterpreterGroup, Long> entry : interpreterGroups.entrySet()) { - ManagedInterpreterGroup interpreterGroup = entry.getKey(); - Long lastTimeUsing = entry.getValue(); - if ((now - lastTimeUsing) > timeoutThreshold) { - LOGGER.info("InterpreterGroup {} is timeout.", interpreterGroup.getId()); - interpreterGroup.close(); - interpreterGroups.remove(entry.getKey()); - } - } + this.checkTimer.scheduleAtFixedRate(new TimerTask() { + @Override + public void run() { + long now = System.currentTimeMillis(); + for (Map.Entry<ManagedInterpreterGroup, Long> entry : interpreterGroups.entrySet()) { + ManagedInterpreterGroup interpreterGroup = entry.getKey(); + Long lastTimeUsing = entry.getValue(); + if ((now - lastTimeUsing) > timeoutThreshold ) { + LOGGER.info("InterpreterGroup {} is timeout.", interpreterGroup.getId()); + interpreterGroup.close(); + interpreterGroups.remove(entry.getKey()); } - }, - checkInterval, - checkInterval); - LOGGER.info( - "TimeoutLifecycleManager is started with checkinterval: " - + checkInterval - + ", timeoutThreshold: " - + timeoutThreshold); + } + } + }, checkInterval, checkInterval); + LOGGER.info("TimeoutLifecycleManager is started with checkinterval: " + checkInterval + + ", timeoutThreshold: " + timeoutThreshold); } @Override http://git-wip-us.apache.org/repos/asf/zeppelin/blob/0d746fa2/zeppelin-zengine/src/main/java/org/apache/zeppelin/interpreter/recovery/FileSystemRecoveryStorage.java ---------------------------------------------------------------------- diff --git a/zeppelin-zengine/src/main/java/org/apache/zeppelin/interpreter/recovery/FileSystemRecoveryStorage.java b/zeppelin-zengine/src/main/java/org/apache/zeppelin/interpreter/recovery/FileSystemRecoveryStorage.java index e854702..4dffff1 100644 --- a/zeppelin-zengine/src/main/java/org/apache/zeppelin/interpreter/recovery/FileSystemRecoveryStorage.java +++ b/zeppelin-zengine/src/main/java/org/apache/zeppelin/interpreter/recovery/FileSystemRecoveryStorage.java @@ -17,11 +17,6 @@ package org.apache.zeppelin.interpreter.recovery; -import java.io.IOException; -import java.util.ArrayList; -import java.util.HashMap; -import java.util.List; -import java.util.Map; import org.apache.commons.lang.StringUtils; import org.apache.hadoop.fs.Path; import org.apache.zeppelin.conf.ZeppelinConfiguration; @@ -35,10 +30,18 @@ import org.apache.zeppelin.notebook.FileSystemStorage; import org.slf4j.Logger; import org.slf4j.LoggerFactory; +import java.io.IOException; +import java.util.ArrayList; +import java.util.HashMap; +import java.util.List; +import java.util.Map; + + /** * Hadoop compatible FileSystem based RecoveryStorage implementation. * - * <p>Save InterpreterProcess in the format of: InterpreterGroupId host:port + * Save InterpreterProcess in the format of: + * InterpreterGroupId host:port */ public class FileSystemRecoveryStorage extends RecoveryStorage { @@ -48,15 +51,15 @@ public class FileSystemRecoveryStorage extends RecoveryStorage { private FileSystemStorage fs; private Path recoveryDir; - public FileSystemRecoveryStorage( - ZeppelinConfiguration zConf, InterpreterSettingManager interpreterSettingManager) + public FileSystemRecoveryStorage(ZeppelinConfiguration zConf, + InterpreterSettingManager interpreterSettingManager) throws IOException { super(zConf); this.interpreterSettingManager = interpreterSettingManager; this.zConf = zConf; this.fs = new FileSystemStorage(zConf, zConf.getRecoveryDir()); - LOGGER.info( - "Creating FileSystem: " + this.fs.getFs().getClass().getName() + " for Zeppelin Recovery."); + LOGGER.info("Creating FileSystem: " + this.fs.getFs().getClass().getName() + + " for Zeppelin Recovery."); this.recoveryDir = this.fs.makeQualified(new Path(zConf.getRecoveryDir())); LOGGER.info("Using folder {} to store recovery data", recoveryDir); this.fs.tryMkDir(recoveryDir); @@ -79,12 +82,8 @@ public class FileSystemRecoveryStorage extends RecoveryStorage { for (ManagedInterpreterGroup interpreterGroup : interpreterSetting.getAllInterpreterGroups()) { RemoteInterpreterProcess interpreterProcess = interpreterGroup.getInterpreterProcess(); if (interpreterProcess != null) { - recoveryContent.add( - interpreterGroup.getId() - + "\t" - + interpreterProcess.getHost() - + ":" - + interpreterProcess.getPort()); + recoveryContent.add(interpreterGroup.getId() + "\t" + interpreterProcess.getHost() + ":" + + interpreterProcess.getPort()); } } LOGGER.debug("Updating recovery data for interpreterSetting: " + interpreterSettingName); @@ -100,8 +99,8 @@ public class FileSystemRecoveryStorage extends RecoveryStorage { for (Path path : paths) { String fileName = path.getName(); - String interpreterSettingName = - fileName.substring(0, fileName.length() - ".recovery".length()); + String interpreterSettingName = fileName.substring(0, + fileName.length() - ".recovery".length()); String recoveryContent = fs.readFile(path); if (!StringUtils.isBlank(recoveryContent)) { for (String line : recoveryContent.split(System.lineSeparator())) { @@ -110,12 +109,8 @@ public class FileSystemRecoveryStorage extends RecoveryStorage { String[] hostPort = tokens[1].split(":"); int connectTimeout = zConf.getInt(ZeppelinConfiguration.ConfVars.ZEPPELIN_INTERPRETER_CONNECT_TIMEOUT); - RemoteInterpreterRunningProcess client = - new RemoteInterpreterRunningProcess( - interpreterSettingName, - connectTimeout, - hostPort[0], - Integer.parseInt(hostPort[1])); + RemoteInterpreterRunningProcess client = new RemoteInterpreterRunningProcess( + interpreterSettingName, connectTimeout, hostPort[0], Integer.parseInt(hostPort[1])); // interpreterSettingManager may be null when this class is used when it is used // stop-interpreter.sh clients.put(groupId, client); http://git-wip-us.apache.org/repos/asf/zeppelin/blob/0d746fa2/zeppelin-zengine/src/main/java/org/apache/zeppelin/interpreter/recovery/NullRecoveryStorage.java ---------------------------------------------------------------------- diff --git a/zeppelin-zengine/src/main/java/org/apache/zeppelin/interpreter/recovery/NullRecoveryStorage.java b/zeppelin-zengine/src/main/java/org/apache/zeppelin/interpreter/recovery/NullRecoveryStorage.java index c0bcdca..3a7d12c 100644 --- a/zeppelin-zengine/src/main/java/org/apache/zeppelin/interpreter/recovery/NullRecoveryStorage.java +++ b/zeppelin-zengine/src/main/java/org/apache/zeppelin/interpreter/recovery/NullRecoveryStorage.java @@ -17,26 +17,35 @@ package org.apache.zeppelin.interpreter.recovery; -import java.io.IOException; -import java.util.Map; import org.apache.zeppelin.conf.ZeppelinConfiguration; import org.apache.zeppelin.interpreter.InterpreterSettingManager; import org.apache.zeppelin.interpreter.launcher.InterpreterClient; -/** RecoveryStorage that do nothing, used when recovery is not enabled. */ +import java.io.IOException; +import java.util.Map; + + +/** + * RecoveryStorage that do nothing, used when recovery is not enabled. + * + */ public class NullRecoveryStorage extends RecoveryStorage { - public NullRecoveryStorage( - ZeppelinConfiguration zConf, InterpreterSettingManager interpreterSettingManager) + public NullRecoveryStorage(ZeppelinConfiguration zConf, + InterpreterSettingManager interpreterSettingManager) throws IOException { super(zConf); } @Override - public void onInterpreterClientStart(InterpreterClient client) throws IOException {} + public void onInterpreterClientStart(InterpreterClient client) throws IOException { + + } @Override - public void onInterpreterClientStop(InterpreterClient client) throws IOException {} + public void onInterpreterClientStop(InterpreterClient client) throws IOException { + + } @Override public Map<String, InterpreterClient> restore() throws IOException { http://git-wip-us.apache.org/repos/asf/zeppelin/blob/0d746fa2/zeppelin-zengine/src/main/java/org/apache/zeppelin/interpreter/recovery/StopInterpreter.java ---------------------------------------------------------------------- diff --git a/zeppelin-zengine/src/main/java/org/apache/zeppelin/interpreter/recovery/StopInterpreter.java b/zeppelin-zengine/src/main/java/org/apache/zeppelin/interpreter/recovery/StopInterpreter.java index 7808bf2..d74b162 100644 --- a/zeppelin-zengine/src/main/java/org/apache/zeppelin/interpreter/recovery/StopInterpreter.java +++ b/zeppelin-zengine/src/main/java/org/apache/zeppelin/interpreter/recovery/StopInterpreter.java @@ -1,7 +1,5 @@ package org.apache.zeppelin.interpreter.recovery; -import java.io.IOException; -import java.util.Map; import org.apache.zeppelin.conf.ZeppelinConfiguration; import org.apache.zeppelin.interpreter.InterpreterSettingManager; import org.apache.zeppelin.interpreter.launcher.InterpreterClient; @@ -9,10 +7,14 @@ import org.apache.zeppelin.util.ReflectionUtils; import org.slf4j.Logger; import org.slf4j.LoggerFactory; +import java.io.IOException; +import java.util.Map; + + /** - * Utility class for stopping interpreter in the case that you want to stop all the interpreter - * process even when you enable recovery, or you want to kill interpreter process to avoid orphan - * process. + * Utility class for stopping interpreter in the case that you want to stop all the + * interpreter process even when you enable recovery, or you want to kill interpreter process + * to avoid orphan process. */ public class StopInterpreter { @@ -22,11 +24,9 @@ public class StopInterpreter { ZeppelinConfiguration zConf = ZeppelinConfiguration.create(); RecoveryStorage recoveryStorage = null; - recoveryStorage = - ReflectionUtils.createClazzInstance( - zConf.getRecoveryStorageClass(), - new Class[] {ZeppelinConfiguration.class, InterpreterSettingManager.class}, - new Object[] {zConf, null}); + recoveryStorage = ReflectionUtils.createClazzInstance(zConf.getRecoveryStorageClass(), + new Class[] {ZeppelinConfiguration.class, InterpreterSettingManager.class}, + new Object[] {zConf, null}); LOGGER.info("Using RecoveryStorage: " + recoveryStorage.getClass().getName()); Map<String, InterpreterClient> restoredClients = recoveryStorage.restore(); http://git-wip-us.apache.org/repos/asf/zeppelin/blob/0d746fa2/zeppelin-zengine/src/main/java/org/apache/zeppelin/interpreter/remote/AppendOutputBuffer.java ---------------------------------------------------------------------- diff --git a/zeppelin-zengine/src/main/java/org/apache/zeppelin/interpreter/remote/AppendOutputBuffer.java b/zeppelin-zengine/src/main/java/org/apache/zeppelin/interpreter/remote/AppendOutputBuffer.java index e6f08da..b139404 100644 --- a/zeppelin-zengine/src/main/java/org/apache/zeppelin/interpreter/remote/AppendOutputBuffer.java +++ b/zeppelin-zengine/src/main/java/org/apache/zeppelin/interpreter/remote/AppendOutputBuffer.java @@ -17,7 +17,10 @@ package org.apache.zeppelin.interpreter.remote; -/** This element stores the buffered append-data of paragraph's output. */ +/** + * This element stores the buffered + * append-data of paragraph's output. + */ public class AppendOutputBuffer { private String noteId; @@ -47,4 +50,5 @@ public class AppendOutputBuffer { public String getData() { return data; } + } http://git-wip-us.apache.org/repos/asf/zeppelin/blob/0d746fa2/zeppelin-zengine/src/main/java/org/apache/zeppelin/interpreter/remote/AppendOutputRunner.java ---------------------------------------------------------------------- diff --git a/zeppelin-zengine/src/main/java/org/apache/zeppelin/interpreter/remote/AppendOutputRunner.java b/zeppelin-zengine/src/main/java/org/apache/zeppelin/interpreter/remote/AppendOutputRunner.java index 54a75ac..2a88dc2 100644 --- a/zeppelin-zengine/src/main/java/org/apache/zeppelin/interpreter/remote/AppendOutputRunner.java +++ b/zeppelin-zengine/src/main/java/org/apache/zeppelin/interpreter/remote/AppendOutputRunner.java @@ -17,22 +17,26 @@ package org.apache.zeppelin.interpreter.remote; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + import java.util.HashMap; import java.util.LinkedList; import java.util.List; import java.util.Map; import java.util.concurrent.BlockingQueue; import java.util.concurrent.LinkedBlockingQueue; -import org.slf4j.Logger; -import org.slf4j.LoggerFactory; /** - * This thread sends paragraph's append-data periodically, rather than continously, with a period of - * BUFFER_TIME_MS. It handles append-data for all paragraphs across all notebooks. + * This thread sends paragraph's append-data + * periodically, rather than continously, with + * a period of BUFFER_TIME_MS. It handles append-data + * for all paragraphs across all notebooks. */ public class AppendOutputRunner implements Runnable { - private static final Logger logger = LoggerFactory.getLogger(AppendOutputRunner.class); + private static final Logger logger = + LoggerFactory.getLogger(AppendOutputRunner.class); public static final Long BUFFER_TIME_MS = new Long(100); private static final Long SAFE_PROCESSING_TIME = new Long(10); private static final Long SAFE_PROCESSING_STRING_SIZE = new Long(100000); @@ -66,16 +70,14 @@ public class AppendOutputRunner implements Runnable { Long processingStartTime = System.currentTimeMillis(); queue.drainTo(list); - for (AppendOutputBuffer buffer : list) { + for (AppendOutputBuffer buffer: list) { String noteId = buffer.getNoteId(); String paragraphId = buffer.getParagraphId(); int index = buffer.getIndex(); String stringBufferKey = noteId + ":" + paragraphId + ":" + index; - StringBuilder builder = - stringBufferMap.containsKey(stringBufferKey) - ? stringBufferMap.get(stringBufferKey) - : new StringBuilder(); + StringBuilder builder = stringBufferMap.containsKey(stringBufferKey) ? + stringBufferMap.get(stringBufferKey) : new StringBuilder(); builder.append(buffer.getData()); stringBufferMap.put(stringBufferKey, builder); @@ -83,12 +85,11 @@ public class AppendOutputRunner implements Runnable { Long processingTime = System.currentTimeMillis() - processingStartTime; if (processingTime > SAFE_PROCESSING_TIME) { - logger.warn( - "Processing time for buffered append-output is high: " - + processingTime - + " milliseconds."); + logger.warn("Processing time for buffered append-output is high: " + + processingTime + " milliseconds."); } else { - logger.debug("Processing time for append-output took " + processingTime + " milliseconds"); + logger.debug("Processing time for append-output took " + + processingTime + " milliseconds"); } Long sizeProcessed = new Long(0); @@ -100,14 +101,16 @@ public class AppendOutputRunner implements Runnable { } if (sizeProcessed > SAFE_PROCESSING_STRING_SIZE) { - logger.warn( - "Processing size for buffered append-output is high: " + sizeProcessed + " characters."); + logger.warn("Processing size for buffered append-output is high: " + + sizeProcessed + " characters."); } else { - logger.debug("Processing size for append-output is " + sizeProcessed + " characters"); + logger.debug("Processing size for append-output is " + + sizeProcessed + " characters"); } } public void appendBuffer(String noteId, String paragraphId, int index, String outputToAppend) { queue.offer(new AppendOutputBuffer(noteId, paragraphId, index, outputToAppend)); } + } http://git-wip-us.apache.org/repos/asf/zeppelin/blob/0d746fa2/zeppelin-zengine/src/main/java/org/apache/zeppelin/interpreter/remote/ClientFactory.java ---------------------------------------------------------------------- diff --git a/zeppelin-zengine/src/main/java/org/apache/zeppelin/interpreter/remote/ClientFactory.java b/zeppelin-zengine/src/main/java/org/apache/zeppelin/interpreter/remote/ClientFactory.java index 346bb26..b2cb78f 100644 --- a/zeppelin-zengine/src/main/java/org/apache/zeppelin/interpreter/remote/ClientFactory.java +++ b/zeppelin-zengine/src/main/java/org/apache/zeppelin/interpreter/remote/ClientFactory.java @@ -19,6 +19,7 @@ package org.apache.zeppelin.interpreter.remote; import java.util.HashMap; import java.util.Map; + import org.apache.commons.pool2.BasePooledObjectFactory; import org.apache.commons.pool2.PooledObject; import org.apache.commons.pool2.impl.DefaultPooledObject; @@ -30,8 +31,10 @@ import org.apache.zeppelin.interpreter.InterpreterException; import org.apache.zeppelin.interpreter.thrift.RemoteInterpreterService; import org.apache.zeppelin.interpreter.thrift.RemoteInterpreterService.Client; -/** */ -public class ClientFactory extends BasePooledObjectFactory<Client> { +/** + * + */ +public class ClientFactory extends BasePooledObjectFactory<Client>{ private String host; private int port; Map<Client, TSocket> clientSocketMap = new HashMap<>(); @@ -50,7 +53,7 @@ public class ClientFactory extends BasePooledObjectFactory<Client> { throw new InterpreterException(e); } - TProtocol protocol = new TBinaryProtocol(transport); + TProtocol protocol = new TBinaryProtocol(transport); Client client = new RemoteInterpreterService.Client(protocol); synchronized (clientSocketMap) { http://git-wip-us.apache.org/repos/asf/zeppelin/blob/0d746fa2/zeppelin-zengine/src/main/java/org/apache/zeppelin/interpreter/remote/RemoteAngularObject.java ---------------------------------------------------------------------- diff --git a/zeppelin-zengine/src/main/java/org/apache/zeppelin/interpreter/remote/RemoteAngularObject.java b/zeppelin-zengine/src/main/java/org/apache/zeppelin/interpreter/remote/RemoteAngularObject.java index 896ec47..62c8efd 100644 --- a/zeppelin-zengine/src/main/java/org/apache/zeppelin/interpreter/remote/RemoteAngularObject.java +++ b/zeppelin-zengine/src/main/java/org/apache/zeppelin/interpreter/remote/RemoteAngularObject.java @@ -19,27 +19,26 @@ package org.apache.zeppelin.interpreter.remote; import org.apache.zeppelin.display.AngularObject; import org.apache.zeppelin.display.AngularObjectListener; +import org.apache.zeppelin.interpreter.InterpreterGroup; import org.apache.zeppelin.interpreter.ManagedInterpreterGroup; -/** Proxy for AngularObject that exists in remote interpreter process */ +/** + * Proxy for AngularObject that exists in remote interpreter process + */ public class RemoteAngularObject extends AngularObject { private transient ManagedInterpreterGroup interpreterGroup; - RemoteAngularObject( - String name, - Object o, - String noteId, - String paragraphId, - ManagedInterpreterGroup interpreterGroup, - AngularObjectListener listener) { + RemoteAngularObject(String name, Object o, String noteId, String paragraphId, + ManagedInterpreterGroup interpreterGroup, + AngularObjectListener listener) { super(name, o, noteId, paragraphId, listener); this.interpreterGroup = interpreterGroup; } @Override public void set(Object o, boolean emit) { - set(o, emit, true); + set(o, emit, true); } public void set(Object o, boolean emitWeb, boolean emitRemoteProcess) { @@ -47,9 +46,9 @@ public class RemoteAngularObject extends AngularObject { if (emitRemoteProcess) { // send updated value to remote interpreter - interpreterGroup - .getRemoteInterpreterProcess() - .updateRemoteAngularObject(getName(), getNoteId(), getParagraphId(), o); + interpreterGroup.getRemoteInterpreterProcess(). + updateRemoteAngularObject( + getName(), getNoteId(), getParagraphId(), o); } } } http://git-wip-us.apache.org/repos/asf/zeppelin/blob/0d746fa2/zeppelin-zengine/src/main/java/org/apache/zeppelin/interpreter/remote/RemoteAngularObjectRegistry.java ---------------------------------------------------------------------- diff --git a/zeppelin-zengine/src/main/java/org/apache/zeppelin/interpreter/remote/RemoteAngularObjectRegistry.java b/zeppelin-zengine/src/main/java/org/apache/zeppelin/interpreter/remote/RemoteAngularObjectRegistry.java index db7a330..7458ce5 100644 --- a/zeppelin-zengine/src/main/java/org/apache/zeppelin/interpreter/remote/RemoteAngularObjectRegistry.java +++ b/zeppelin-zengine/src/main/java/org/apache/zeppelin/interpreter/remote/RemoteAngularObjectRegistry.java @@ -18,24 +18,28 @@ package org.apache.zeppelin.interpreter.remote; import com.google.gson.Gson; -import java.util.List; +import org.apache.thrift.TException; import org.apache.zeppelin.display.AngularObject; import org.apache.zeppelin.display.AngularObjectRegistry; import org.apache.zeppelin.display.AngularObjectRegistryListener; +import org.apache.zeppelin.interpreter.InterpreterGroup; import org.apache.zeppelin.interpreter.ManagedInterpreterGroup; import org.apache.zeppelin.interpreter.thrift.RemoteInterpreterService.Client; import org.slf4j.Logger; import org.slf4j.LoggerFactory; -/** Proxy for AngularObjectRegistry that exists in remote interpreter process */ +import java.util.List; + +/** + * Proxy for AngularObjectRegistry that exists in remote interpreter process + */ public class RemoteAngularObjectRegistry extends AngularObjectRegistry { Logger logger = LoggerFactory.getLogger(RemoteAngularObjectRegistry.class); private ManagedInterpreterGroup interpreterGroup; - public RemoteAngularObjectRegistry( - String interpreterId, - AngularObjectRegistryListener listener, - ManagedInterpreterGroup interpreterGroup) { + public RemoteAngularObjectRegistry(String interpreterId, + AngularObjectRegistryListener listener, + ManagedInterpreterGroup interpreterGroup) { super(interpreterId, listener); this.interpreterGroup = interpreterGroup; } @@ -45,16 +49,17 @@ public class RemoteAngularObjectRegistry extends AngularObjectRegistry { } /** - * When ZeppelinServer side code want to add angularObject to the registry, this method should be - * used instead of add() - * + * When ZeppelinServer side code want to add angularObject to the registry, + * this method should be used instead of add() * @param name * @param o * @param noteId * @return */ - public AngularObject addAndNotifyRemoteProcess( - final String name, final Object o, final String noteId, final String paragraphId) { + public AngularObject addAndNotifyRemoteProcess(final String name, + final Object o, + final String noteId, + final String paragraphId) { RemoteInterpreterProcess remoteInterpreterProcess = getRemoteInterpreterProcess(); if (!remoteInterpreterProcess.isRunning()) { @@ -69,38 +74,41 @@ public class RemoteAngularObjectRegistry extends AngularObjectRegistry { client.angularObjectAdd(name, noteId, paragraphId, gson.toJson(o)); return null; } - }); + } + ); return super.add(name, o, noteId, paragraphId, true); + } /** - * When ZeppelinServer side code want to remove angularObject from the registry, this method - * should be used instead of remove() - * + * When ZeppelinServer side code want to remove angularObject from the registry, + * this method should be used instead of remove() * @param name * @param noteId * @param paragraphId * @return */ - public AngularObject removeAndNotifyRemoteProcess( - final String name, final String noteId, final String paragraphId) { + public AngularObject removeAndNotifyRemoteProcess(final String name, + final String noteId, + final String paragraphId) { RemoteInterpreterProcess remoteInterpreterProcess = getRemoteInterpreterProcess(); if (remoteInterpreterProcess == null || !remoteInterpreterProcess.isRunning()) { return super.remove(name, noteId, paragraphId); } remoteInterpreterProcess.callRemoteFunction( - new RemoteInterpreterProcess.RemoteFunction<Void>() { - @Override - public Void call(Client client) throws Exception { - client.angularObjectRemove(name, noteId, paragraphId); - return null; - } - }); + new RemoteInterpreterProcess.RemoteFunction<Void>() { + @Override + public Void call(Client client) throws Exception { + client.angularObjectRemove(name, noteId, paragraphId); + return null; + } + } + ); return super.remove(name, noteId, paragraphId); } - + public void removeAllAndNotifyRemoteProcess(String noteId, String paragraphId) { List<AngularObject> all = getAll(noteId, paragraphId); for (AngularObject ao : all) { @@ -109,9 +117,9 @@ public class RemoteAngularObjectRegistry extends AngularObjectRegistry { } @Override - protected AngularObject createNewAngularObject( - String name, Object o, String noteId, String paragraphId) { - return new RemoteAngularObject( - name, o, noteId, paragraphId, interpreterGroup, getAngularObjectListener()); + protected AngularObject createNewAngularObject(String name, Object o, String noteId, String + paragraphId) { + return new RemoteAngularObject(name, o, noteId, paragraphId, interpreterGroup, + getAngularObjectListener()); } } http://git-wip-us.apache.org/repos/asf/zeppelin/blob/0d746fa2/zeppelin-zengine/src/main/java/org/apache/zeppelin/interpreter/remote/RemoteInterpreter.java ---------------------------------------------------------------------- diff --git a/zeppelin-zengine/src/main/java/org/apache/zeppelin/interpreter/remote/RemoteInterpreter.java b/zeppelin-zengine/src/main/java/org/apache/zeppelin/interpreter/remote/RemoteInterpreter.java index 795b02f..6f9f81f 100644 --- a/zeppelin-zengine/src/main/java/org/apache/zeppelin/interpreter/remote/RemoteInterpreter.java +++ b/zeppelin-zengine/src/main/java/org/apache/zeppelin/interpreter/remote/RemoteInterpreter.java @@ -20,10 +20,6 @@ package org.apache.zeppelin.interpreter.remote; import com.google.common.annotations.VisibleForTesting; import com.google.gson.Gson; import com.google.gson.reflect.TypeToken; -import java.io.IOException; -import java.util.List; -import java.util.Map; -import java.util.Properties; import org.apache.thrift.TException; import org.apache.zeppelin.conf.ZeppelinConfiguration; import org.apache.zeppelin.display.AngularObject; @@ -49,11 +45,20 @@ import org.apache.zeppelin.scheduler.SchedulerFactory; import org.slf4j.Logger; import org.slf4j.LoggerFactory; -/** Proxy for Interpreter instance that runs on separate process */ +import java.io.IOException; +import java.util.ArrayList; +import java.util.List; +import java.util.Map; +import java.util.Properties; + +/** + * Proxy for Interpreter instance that runs on separate process + */ public class RemoteInterpreter extends Interpreter { private static final Logger LOGGER = LoggerFactory.getLogger(RemoteInterpreter.class); private static final Gson gson = new Gson(); + private String className; private String sessionId; private FormType formType; @@ -64,13 +69,14 @@ public class RemoteInterpreter extends Interpreter { private LifecycleManager lifecycleManager; - /** Remote interpreter and manage interpreter process */ - public RemoteInterpreter( - Properties properties, - String sessionId, - String className, - String userName, - LifecycleManager lifecycleManager) { + /** + * Remote interpreter and manage interpreter process + */ + public RemoteInterpreter(Properties properties, + String sessionId, + String className, + String userName, + LifecycleManager lifecycleManager) { super(properties); this.sessionId = sessionId; this.className = className; @@ -118,8 +124,8 @@ public class RemoteInterpreter extends Interpreter { // The why we we create all the interpreter of the session is because some interpreter // depends on other interpreter. e.g. PySparkInterpreter depends on SparkInterpreter. // also see method Interpreter.getInterpreterInTheSameSessionByClassName - for (Interpreter interpreter : - getInterpreterGroup().getOrCreateSession(this.getUserName(), sessionId)) { + for (Interpreter interpreter : getInterpreterGroup() + .getOrCreateSession(this.getUserName(), sessionId)) { try { if (!(interpreter instanceof ConfInterpreter)) { ((RemoteInterpreter) interpreter).internal_create(); @@ -129,23 +135,22 @@ public class RemoteInterpreter extends Interpreter { } } - interpreterProcess.callRemoteFunction( - new RemoteInterpreterProcess.RemoteFunction<Void>() { - @Override - public Void call(Client client) throws Exception { - LOGGER.info("Open RemoteInterpreter {}", getClassName()); - // open interpreter here instead of in the jobRun method in RemoteInterpreterServer - // client.open(sessionId, className); - // Push angular object loaded from JSON file to remote interpreter - synchronized (getInterpreterGroup()) { - if (!getInterpreterGroup().isAngularRegistryPushed()) { - pushAngularObjectRegistryToRemote(client); - getInterpreterGroup().setAngularRegistryPushed(true); - } - } - return null; + interpreterProcess.callRemoteFunction(new RemoteInterpreterProcess.RemoteFunction<Void>() { + @Override + public Void call(Client client) throws Exception { + LOGGER.info("Open RemoteInterpreter {}", getClassName()); + // open interpreter here instead of in the jobRun method in RemoteInterpreterServer + // client.open(sessionId, className); + // Push angular object loaded from JSON file to remote interpreter + synchronized (getInterpreterGroup()) { + if (!getInterpreterGroup().isAngularRegistryPushed()) { + pushAngularObjectRegistryToRemote(client); + getInterpreterGroup().setAngularRegistryPushed(true); } - }); + } + return null; + } + }); isOpened = true; this.lifecycleManager.onInterpreterUse(this.getInterpreterGroup(), sessionId); } @@ -156,25 +161,21 @@ public class RemoteInterpreter extends Interpreter { synchronized (this) { if (!isCreated) { this.interpreterProcess = getOrCreateInterpreterProcess(); - interpreterProcess.callRemoteFunction( - new RemoteInterpreterProcess.RemoteFunction<Void>() { - @Override - public Void call(Client client) throws Exception { - LOGGER.info("Create RemoteInterpreter {}", getClassName()); - client.createInterpreter( - getInterpreterGroup().getId(), - sessionId, - className, - (Map) getProperties(), - getUserName()); - return null; - } - }); + interpreterProcess.callRemoteFunction(new RemoteInterpreterProcess.RemoteFunction<Void>() { + @Override + public Void call(Client client) throws Exception { + LOGGER.info("Create RemoteInterpreter {}", getClassName()); + client.createInterpreter(getInterpreterGroup().getId(), sessionId, + className, (Map) getProperties(), getUserName()); + return null; + } + }); isCreated = true; } } } + @Override public void close() throws InterpreterException { if (isOpened) { @@ -184,14 +185,13 @@ public class RemoteInterpreter extends Interpreter { } catch (IOException e) { throw new InterpreterException(e); } - interpreterProcess.callRemoteFunction( - new RemoteInterpreterProcess.RemoteFunction<Void>() { - @Override - public Void call(Client client) throws Exception { - client.close(sessionId, className); - return null; - } - }); + interpreterProcess.callRemoteFunction(new RemoteInterpreterProcess.RemoteFunction<Void>() { + @Override + public Void call(Client client) throws Exception { + client.close(sessionId, className); + return null; + } + }); isOpened = false; this.lifecycleManager.onInterpreterUse(this.getInterpreterGroup(), sessionId); } else { @@ -219,13 +219,11 @@ public class RemoteInterpreter extends Interpreter { @Override public InterpreterResult call(Client client) throws Exception { - RemoteInterpreterResult remoteResult = - client.interpret(sessionId, className, st, convert(context)); - Map<String, Object> remoteConfig = - (Map<String, Object>) - gson.fromJson( - remoteResult.getConfig(), - new TypeToken<Map<String, Object>>() {}.getType()); + RemoteInterpreterResult remoteResult = client.interpret( + sessionId, className, st, convert(context)); + Map<String, Object> remoteConfig = (Map<String, Object>) gson.fromJson( + remoteResult.getConfig(), new TypeToken<Map<String, Object>>() { + }.getType()); context.getConfig().clear(); if (remoteConfig != null) { context.getConfig().putAll(remoteConfig); @@ -253,7 +251,9 @@ public class RemoteInterpreter extends Interpreter { InterpreterResult result = convert(remoteResult); return result; } - }); + } + ); + } @Override @@ -269,14 +269,13 @@ public class RemoteInterpreter extends Interpreter { throw new InterpreterException(e); } this.lifecycleManager.onInterpreterUse(this.getInterpreterGroup(), sessionId); - interpreterProcess.callRemoteFunction( - new RemoteInterpreterProcess.RemoteFunction<Void>() { - @Override - public Void call(Client client) throws Exception { - client.cancel(sessionId, className, convert(context)); - return null; - } - }); + interpreterProcess.callRemoteFunction(new RemoteInterpreterProcess.RemoteFunction<Void>() { + @Override + public Void call(Client client) throws Exception { + client.cancel(sessionId, className, convert(context)); + return null; + } + }); } @Override @@ -298,18 +297,18 @@ public class RemoteInterpreter extends Interpreter { throw new InterpreterException(e); } this.lifecycleManager.onInterpreterUse(this.getInterpreterGroup(), sessionId); - FormType type = - interpreterProcess.callRemoteFunction( - new RemoteInterpreterProcess.RemoteFunction<FormType>() { - @Override - public FormType call(Client client) throws Exception { - formType = FormType.valueOf(client.getFormType(sessionId, className)); - return formType; - } - }); + FormType type = interpreterProcess.callRemoteFunction( + new RemoteInterpreterProcess.RemoteFunction<FormType>() { + @Override + public FormType call(Client client) throws Exception { + formType = FormType.valueOf(client.getFormType(sessionId, className)); + return formType; + } + }); return type; } + @Override public int getProgress(final InterpreterContext context) throws InterpreterException { if (!isOpened) { @@ -332,9 +331,10 @@ public class RemoteInterpreter extends Interpreter { }); } + @Override - public List<InterpreterCompletion> completion( - final String buf, final int cursor, final InterpreterContext interpreterContext) + public List<InterpreterCompletion> completion(final String buf, final int cursor, + final InterpreterContext interpreterContext) throws InterpreterException { if (!isOpened) { open(); @@ -350,8 +350,8 @@ public class RemoteInterpreter extends Interpreter { new RemoteInterpreterProcess.RemoteFunction<List<InterpreterCompletion>>() { @Override public List<InterpreterCompletion> call(Client client) throws Exception { - return client.completion( - sessionId, className, buf, cursor, convert(interpreterContext)); + return client.completion(sessionId, className, buf, cursor, + convert(interpreterContext)); } }); } @@ -377,48 +377,36 @@ public class RemoteInterpreter extends Interpreter { }); } + @Override public Scheduler getScheduler() { - int maxConcurrency = - Integer.parseInt( - getProperty( - "zeppelin.interpreter.max.poolsize", - ZeppelinConfiguration.ConfVars.ZEPPELIN_INTERPRETER_MAX_POOL_SIZE.getIntValue() - + "")); + int maxConcurrency = Integer.parseInt( + getProperty("zeppelin.interpreter.max.poolsize", + ZeppelinConfiguration.ConfVars.ZEPPELIN_INTERPRETER_MAX_POOL_SIZE.getIntValue() + "")); // one session own one Scheduler, so that when one session is closed, all the jobs/paragraphs // running under the scheduler of this session will be aborted. - Scheduler s = - new RemoteScheduler( - RemoteInterpreter.class.getSimpleName() - + "-" - + getInterpreterGroup().getId() - + "-" - + sessionId, - SchedulerFactory.singleton().getExecutor(), - sessionId, - this, - SchedulerFactory.singleton(), - maxConcurrency); + Scheduler s = new RemoteScheduler( + RemoteInterpreter.class.getSimpleName() + "-" + getInterpreterGroup().getId() + "-" + + sessionId, + SchedulerFactory.singleton().getExecutor(), + sessionId, + this, + SchedulerFactory.singleton(), + maxConcurrency); return SchedulerFactory.singleton().createOrGetScheduler(s); } private RemoteInterpreterContext convert(InterpreterContext ic) { - return new RemoteInterpreterContext( - ic.getNoteId(), - ic.getNoteName(), - ic.getParagraphId(), - ic.getReplName(), - ic.getParagraphTitle(), - ic.getParagraphText(), - gson.toJson(ic.getAuthenticationInfo()), - gson.toJson(ic.getConfig()), - ic.getGui().toJson(), + return new RemoteInterpreterContext(ic.getNoteId(), ic.getNoteName(), ic.getParagraphId(), + ic.getReplName(), ic.getParagraphTitle(), ic.getParagraphText(), + gson.toJson(ic.getAuthenticationInfo()), gson.toJson(ic.getConfig()), ic.getGui().toJson(), gson.toJson(ic.getNoteGui()), ic.getLocalProperties()); } private InterpreterResult convert(RemoteInterpreterResult result) { - InterpreterResult r = new InterpreterResult(InterpreterResult.Code.valueOf(result.getCode())); + InterpreterResult r = new InterpreterResult( + InterpreterResult.Code.valueOf(result.getCode())); for (RemoteInterpreterResultMessage m : result.getMsg()) { r.add(InterpreterResult.Type.valueOf(m.getType()), m.getData()); @@ -428,20 +416,21 @@ public class RemoteInterpreter extends Interpreter { } /** - * Push local angular object registry to remote interpreter. This method should be call ONLY once - * when the first Interpreter is created + * Push local angular object registry to + * remote interpreter. This method should be + * call ONLY once when the first Interpreter is created */ private void pushAngularObjectRegistryToRemote(Client client) throws TException { - final AngularObjectRegistry angularObjectRegistry = - this.getInterpreterGroup().getAngularObjectRegistry(); + final AngularObjectRegistry angularObjectRegistry = this.getInterpreterGroup() + .getAngularObjectRegistry(); if (angularObjectRegistry != null && angularObjectRegistry.getRegistry() != null) { - final Map<String, Map<String, AngularObject>> registry = angularObjectRegistry.getRegistry(); - LOGGER.info( - "Push local angular object registry from ZeppelinServer to" - + " remote interpreter group {}", - this.getInterpreterGroup().getId()); - final java.lang.reflect.Type registryType = - new TypeToken<Map<String, Map<String, AngularObject>>>() {}.getType(); + final Map<String, Map<String, AngularObject>> registry = angularObjectRegistry + .getRegistry(); + LOGGER.info("Push local angular object registry from ZeppelinServer to" + + " remote interpreter group {}", this.getInterpreterGroup().getId()); + final java.lang.reflect.Type registryType = new TypeToken<Map<String, + Map<String, AngularObject>>>() { + }.getType(); client.angularRegistryPush(gson.toJson(registry, registryType)); } } http://git-wip-us.apache.org/repos/asf/zeppelin/blob/0d746fa2/zeppelin-zengine/src/main/java/org/apache/zeppelin/interpreter/remote/RemoteInterpreterManagedProcess.java ---------------------------------------------------------------------- diff --git a/zeppelin-zengine/src/main/java/org/apache/zeppelin/interpreter/remote/RemoteInterpreterManagedProcess.java b/zeppelin-zengine/src/main/java/org/apache/zeppelin/interpreter/remote/RemoteInterpreterManagedProcess.java index 1572fc2..db6d263 100644 --- a/zeppelin-zengine/src/main/java/org/apache/zeppelin/interpreter/remote/RemoteInterpreterManagedProcess.java +++ b/zeppelin-zengine/src/main/java/org/apache/zeppelin/interpreter/remote/RemoteInterpreterManagedProcess.java @@ -18,11 +18,6 @@ package org.apache.zeppelin.interpreter.remote; import com.google.common.annotations.VisibleForTesting; -import java.io.ByteArrayOutputStream; -import java.io.IOException; -import java.io.OutputStream; -import java.util.Map; -import java.util.concurrent.atomic.AtomicBoolean; import org.apache.commons.exec.CommandLine; import org.apache.commons.exec.DefaultExecutor; import org.apache.commons.exec.ExecuteException; @@ -35,11 +30,19 @@ import org.apache.zeppelin.interpreter.thrift.RemoteInterpreterService; import org.slf4j.Logger; import org.slf4j.LoggerFactory; -/** This class manages start / stop of remote interpreter process */ +import java.io.ByteArrayOutputStream; +import java.io.IOException; +import java.io.OutputStream; +import java.util.Map; +import java.util.concurrent.atomic.AtomicBoolean; + +/** + * This class manages start / stop of remote interpreter process + */ public class RemoteInterpreterManagedProcess extends RemoteInterpreterProcess implements ExecuteResultHandler { - private static final Logger logger = - LoggerFactory.getLogger(RemoteInterpreterManagedProcess.class); + private static final Logger logger = LoggerFactory.getLogger( + RemoteInterpreterManagedProcess.class); private final String interpreterRunner; private final int zeppelinServerRPCPort; @@ -144,15 +147,11 @@ public class RemoteInterpreterManagedProcess extends RemoteInterpreterProcess } } if (!running.get()) { - throw new IOException( - new String( - String.format( - "Interpreter Process creation is time out in %d seconds", - getConnectTimeout() / 1000) - + "\n" - + "You can increase timeout threshold via " - + "setting zeppelin.interpreter.connect.timeout of this interpreter.\n" - + cmdOut.toString())); + throw new IOException(new String( + String.format("Interpreter Process creation is time out in %d seconds", + getConnectTimeout()/1000) + "\n" + "You can increase timeout threshold via " + + "setting zeppelin.interpreter.connect.timeout of this interpreter.\n" + + cmdOut.toString())); } } catch (InterruptedException e) { logger.error("Remote interpreter is not accessible"); @@ -164,14 +163,13 @@ public class RemoteInterpreterManagedProcess extends RemoteInterpreterProcess if (isRunning()) { logger.info("Kill interpreter process"); try { - callRemoteFunction( - new RemoteFunction<Void>() { - @Override - public Void call(RemoteInterpreterService.Client client) throws Exception { - client.shutdown(); - return null; - } - }); + callRemoteFunction(new RemoteFunction<Void>() { + @Override + public Void call(RemoteInterpreterService.Client client) throws Exception { + client.shutdown(); + return null; + } + }); } catch (Exception e) { logger.warn("ignore the exception when shutting down"); } @@ -188,6 +186,7 @@ public class RemoteInterpreterManagedProcess extends RemoteInterpreterProcess public void onProcessComplete(int exitValue) { logger.info("Interpreter process exited {}", exitValue); running.set(false); + } // called by RemoteInterpreterServer to notify that RemoteInterpreter Process is started @@ -254,7 +253,7 @@ public class RemoteInterpreterManagedProcess extends RemoteInterpreterProcess } @Override - public void write(byte[] b) throws IOException { + public void write(byte [] b) throws IOException { super.write(b); if (out != null) { @@ -267,7 +266,7 @@ public class RemoteInterpreterManagedProcess extends RemoteInterpreterProcess } @Override - public void write(byte[] b, int offset, int len) throws IOException { + public void write(byte [] b, int offset, int len) throws IOException { super.write(b, offset, len); if (out != null) { http://git-wip-us.apache.org/repos/asf/zeppelin/blob/0d746fa2/zeppelin-zengine/src/main/java/org/apache/zeppelin/interpreter/remote/RemoteInterpreterProcess.java ---------------------------------------------------------------------- diff --git a/zeppelin-zengine/src/main/java/org/apache/zeppelin/interpreter/remote/RemoteInterpreterProcess.java b/zeppelin-zengine/src/main/java/org/apache/zeppelin/interpreter/remote/RemoteInterpreterProcess.java index 5e50265..e8b3482 100644 --- a/zeppelin-zengine/src/main/java/org/apache/zeppelin/interpreter/remote/RemoteInterpreterProcess.java +++ b/zeppelin-zengine/src/main/java/org/apache/zeppelin/interpreter/remote/RemoteInterpreterProcess.java @@ -24,14 +24,17 @@ import org.apache.zeppelin.interpreter.thrift.RemoteInterpreterService.Client; import org.slf4j.Logger; import org.slf4j.LoggerFactory; -/** Abstract class for interpreter process */ +/** + * Abstract class for interpreter process + */ public abstract class RemoteInterpreterProcess implements InterpreterClient { private static final Logger logger = LoggerFactory.getLogger(RemoteInterpreterProcess.class); private GenericObjectPool<Client> clientPool; private int connectTimeout; - public RemoteInterpreterProcess(int connectTimeout) { + public RemoteInterpreterProcess( + int connectTimeout) { this.connectTimeout = connectTimeout; } @@ -71,8 +74,8 @@ public abstract class RemoteInterpreterProcess implements InterpreterClient { } /** - * Called when angular object is updated in client side to propagate change to the remote process - * + * Called when angular object is updated in client side to propagate + * change to the remote process * @param name * @param o */ @@ -82,10 +85,8 @@ public abstract class RemoteInterpreterProcess implements InterpreterClient { client = getClient(); } catch (NullPointerException e) { // remote process not started - logger.info( - "NullPointerException in RemoteInterpreterProcess while " - + "updateRemoteAngularObject getClient, remote process not started", - e); + logger.info("NullPointerException in RemoteInterpreterProcess while " + + "updateRemoteAngularObject getClient, remote process not started", e); return; } catch (Exception e) { logger.error("Can't update angular object", e); @@ -129,7 +130,10 @@ public abstract class RemoteInterpreterProcess implements InterpreterClient { return null; } - /** @param <T> */ + /** + * + * @param <T> + */ public interface RemoteFunction<T> { T call(Client client) throws Exception; }
