Repository: zeppelin Updated Branches: refs/heads/master b14f5ab95 -> 1812928bf
ZEPPELIN-2645. Adding way to register RemoteInterpreterServer's port into InterpreterProcess ### What is this PR for? Rebase PR #2418 , still use thrift as the communication protocol between zeppelin server and interpreter process. We can change it io netty in future when we implement 2 way communication channel between zeppelin server and interpreter process. ### What type of PR is it? [Bug Fix | Improvement | Feature | Documentation | Hot Fix | Refactoring] ### Todos * [ ] - Task ### What is the Jira issue? * https://issues.apache.org/jira/browse/ZEPPELIN-2645 ### Screenshots (if appropriate) ### Questions: * Does the licenses files need update? No * Is there breaking changes for older versions? NO * Does this needs documentation? No Author: Jongyoul Lee <jongy...@gmail.com> Closes #2562 from zjffdu/ZEPPELIN-2645 and squashes the following commits: 82bd8d0 [Jongyoul Lee] ZEPPELIN-2645. Adding way to register RemoteInterpreterServer's port into InterpreterProcess Project: http://git-wip-us.apache.org/repos/asf/zeppelin/repo Commit: http://git-wip-us.apache.org/repos/asf/zeppelin/commit/1812928b Tree: http://git-wip-us.apache.org/repos/asf/zeppelin/tree/1812928b Diff: http://git-wip-us.apache.org/repos/asf/zeppelin/diff/1812928b Branch: refs/heads/master Commit: 1812928bfbecb0cb685fe3233e65ef9d8c84f73f Parents: b14f5ab Author: Jongyoul Lee <jongy...@gmail.com> Authored: Wed Jun 14 19:43:49 2017 +0900 Committer: Jeff Zhang <zjf...@apache.org> Committed: Thu Sep 7 06:29:55 2017 +0800 ---------------------------------------------------------------------- bin/interpreter.sh | 13 +- conf/zeppelin-site.xml.template | 6 + .../zeppelin/helium/ZeppelinDevServer.java | 4 +- .../remote/RemoteInterpreterServer.java | 148 +++- .../remote/RemoteInterpreterUtils.java | 98 ++- .../interpreter/thrift/CallbackInfo.java | 518 +++++++++++ .../RemoteInterpreterCallbackService.java | 879 +++++++++++++++++++ .../main/thrift/RemoteInterpreterService.thrift | 9 + .../remote/RemoteInterpreterServerTest.java | 6 +- .../remote/RemoteInterpreterUtilsTest.java | 11 + .../zeppelin/conf/ZeppelinConfiguration.java | 8 +- .../interpreter/InterpreterSetting.java | 3 +- .../remote/RemoteInterpreterManagedProcess.java | 129 ++- .../apache/zeppelin/notebook/NotebookTest.java | 6 +- 14 files changed, 1754 insertions(+), 84 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/zeppelin/blob/1812928b/bin/interpreter.sh ---------------------------------------------------------------------- diff --git a/bin/interpreter.sh b/bin/interpreter.sh index 1344e31..fd93a06 100755 --- a/bin/interpreter.sh +++ b/bin/interpreter.sh @@ -23,7 +23,7 @@ function usage() { echo "usage) $0 -p <port> -d <interpreter dir to load> -l <local interpreter repo dir to load> -g <interpreter group name>" } -while getopts "hp:d:l:v:u:g:" o; do +while getopts "hc:p:d:l:v:u:g:" o; do case ${o} in h) usage @@ -32,8 +32,11 @@ while getopts "hp:d:l:v:u:g:" o; do d) INTERPRETER_DIR=${OPTARG} ;; + c) + CALLBACK_HOST=${OPTARG} # This will be used callback host + ;; p) - PORT=${OPTARG} + PORT=${OPTARG} # This will be used callback port ;; l) LOCAL_INTERPRETER_REPO=${OPTARG} @@ -202,12 +205,12 @@ fi if [[ -n "${SPARK_SUBMIT}" ]]; then if [[ -n "$ZEPPELIN_IMPERSONATE_USER" ]] && [[ "$ZEPPELIN_IMPERSONATE_SPARK_PROXY_USER" != "false" ]]; then - INTERPRETER_RUN_COMMAND+=' '` echo ${SPARK_SUBMIT} --class ${ZEPPELIN_SERVER} --driver-class-path \"${ZEPPELIN_INTP_CLASSPATH_OVERRIDES}:${ZEPPELIN_INTP_CLASSPATH}\" --driver-java-options \"${JAVA_INTP_OPTS}\" ${SPARK_SUBMIT_OPTIONS} ${ZEPPELIN_SPARK_CONF} --proxy-user ${ZEPPELIN_IMPERSONATE_USER} ${SPARK_APP_JAR} ${PORT}` + INTERPRETER_RUN_COMMAND+=' '` echo ${SPARK_SUBMIT} --class ${ZEPPELIN_SERVER} --driver-class-path \"${ZEPPELIN_INTP_CLASSPATH_OVERRIDES}:${ZEPPELIN_INTP_CLASSPATH}\" --driver-java-options \"${JAVA_INTP_OPTS}\" ${SPARK_SUBMIT_OPTIONS} ${ZEPPELIN_SPARK_CONF} --proxy-user ${ZEPPELIN_IMPERSONATE_USER} ${SPARK_APP_JAR} ${CALLBACK_HOST} ${PORT}` else - INTERPRETER_RUN_COMMAND+=' '` echo ${SPARK_SUBMIT} --class ${ZEPPELIN_SERVER} --driver-class-path \"${ZEPPELIN_INTP_CLASSPATH_OVERRIDES}:${ZEPPELIN_INTP_CLASSPATH}\" --driver-java-options \"${JAVA_INTP_OPTS}\" ${SPARK_SUBMIT_OPTIONS} ${ZEPPELIN_SPARK_CONF} ${SPARK_APP_JAR} ${PORT}` + INTERPRETER_RUN_COMMAND+=' '` echo ${SPARK_SUBMIT} --class ${ZEPPELIN_SERVER} --driver-class-path \"${ZEPPELIN_INTP_CLASSPATH_OVERRIDES}:${ZEPPELIN_INTP_CLASSPATH}\" --driver-java-options \"${JAVA_INTP_OPTS}\" ${SPARK_SUBMIT_OPTIONS} ${ZEPPELIN_SPARK_CONF} ${SPARK_APP_JAR} ${CALLBACK_HOST} ${PORT}` fi else - INTERPRETER_RUN_COMMAND+=' '` echo ${ZEPPELIN_RUNNER} ${JAVA_INTP_OPTS} ${ZEPPELIN_INTP_MEM} -cp ${ZEPPELIN_INTP_CLASSPATH_OVERRIDES}:${ZEPPELIN_INTP_CLASSPATH} ${ZEPPELIN_SERVER} ${PORT} ` + INTERPRETER_RUN_COMMAND+=' '` echo ${ZEPPELIN_RUNNER} ${JAVA_INTP_OPTS} ${ZEPPELIN_INTP_MEM} -cp ${ZEPPELIN_INTP_CLASSPATH_OVERRIDES}:${ZEPPELIN_INTP_CLASSPATH} ${ZEPPELIN_SERVER} ${CALLBACK_HOST} ${PORT} ` fi if [[ ! -z "$ZEPPELIN_IMPERSONATE_USER" ]] && [[ -n "${suid}" || -z "${SPARK_SUBMIT}" ]]; then http://git-wip-us.apache.org/repos/asf/zeppelin/blob/1812928b/conf/zeppelin-site.xml.template ---------------------------------------------------------------------- diff --git a/conf/zeppelin-site.xml.template b/conf/zeppelin-site.xml.template index 3ec6e27..ce3ffaa 100755 --- a/conf/zeppelin-site.xml.template +++ b/conf/zeppelin-site.xml.template @@ -430,4 +430,10 @@ <description>The HTTP X-XSS-Protection response header is a feature of Internet Explorer, Chrome and Safari that stops pages from loading when they detect reflected cross-site scripting (XSS) attacks. When value is set to 1 and a cross-site scripting attack is detected, the browser will sanitize the page (remove the unsafe parts).</description> </property> --> +<!-- +<property> + <name>zeppelin.interpreter.callback.portRange</name> + <value>10000:10010</value> +</property> +--> </configuration> http://git-wip-us.apache.org/repos/asf/zeppelin/blob/1812928b/helium-dev/src/main/java/org/apache/zeppelin/helium/ZeppelinDevServer.java ---------------------------------------------------------------------- diff --git a/helium-dev/src/main/java/org/apache/zeppelin/helium/ZeppelinDevServer.java b/helium-dev/src/main/java/org/apache/zeppelin/helium/ZeppelinDevServer.java index 2484469..3a5199d 100644 --- a/helium-dev/src/main/java/org/apache/zeppelin/helium/ZeppelinDevServer.java +++ b/helium-dev/src/main/java/org/apache/zeppelin/helium/ZeppelinDevServer.java @@ -38,8 +38,8 @@ public class ZeppelinDevServer extends private DevInterpreter interpreter = null; private InterpreterOutput out; - public ZeppelinDevServer(int port) throws TException { - super(port); + public ZeppelinDevServer(int port) throws TException, IOException { + super(null, port); } @Override http://git-wip-us.apache.org/repos/asf/zeppelin/blob/1812928b/zeppelin-interpreter/src/main/java/org/apache/zeppelin/interpreter/remote/RemoteInterpreterServer.java ---------------------------------------------------------------------- diff --git a/zeppelin-interpreter/src/main/java/org/apache/zeppelin/interpreter/remote/RemoteInterpreterServer.java b/zeppelin-interpreter/src/main/java/org/apache/zeppelin/interpreter/remote/RemoteInterpreterServer.java index 6925360..7f476e8 100644 --- a/zeppelin-interpreter/src/main/java/org/apache/zeppelin/interpreter/remote/RemoteInterpreterServer.java +++ b/zeppelin-interpreter/src/main/java/org/apache/zeppelin/interpreter/remote/RemoteInterpreterServer.java @@ -17,29 +17,55 @@ package org.apache.zeppelin.interpreter.remote; -import java.io.IOException; -import java.lang.reflect.Constructor; -import java.lang.reflect.InvocationTargetException; -import java.lang.reflect.Method; -import java.net.URL; -import java.nio.ByteBuffer; -import java.util.*; -import java.util.concurrent.ConcurrentHashMap; -import java.util.concurrent.ConcurrentMap; - +import com.google.gson.Gson; +import com.google.gson.reflect.TypeToken; import org.apache.thrift.TException; import org.apache.thrift.server.TThreadPoolServer; import org.apache.thrift.transport.TServerSocket; import org.apache.thrift.transport.TTransportException; import org.apache.zeppelin.dep.DependencyResolver; -import org.apache.zeppelin.display.*; -import org.apache.zeppelin.helium.*; -import org.apache.zeppelin.interpreter.*; -import org.apache.zeppelin.interpreter.InterpreterHookRegistry.HookType; +import org.apache.zeppelin.display.AngularObject; +import org.apache.zeppelin.display.AngularObjectRegistry; +import org.apache.zeppelin.display.AngularObjectRegistryListener; +import org.apache.zeppelin.display.GUI; +import org.apache.zeppelin.helium.Application; +import org.apache.zeppelin.helium.ApplicationContext; +import org.apache.zeppelin.helium.ApplicationException; +import org.apache.zeppelin.helium.ApplicationLoader; +import org.apache.zeppelin.helium.HeliumAppAngularObjectRegistry; +import org.apache.zeppelin.helium.HeliumPackage; +import org.apache.zeppelin.interpreter.Constants; +import org.apache.zeppelin.interpreter.Interpreter; +import org.apache.zeppelin.interpreter.InterpreterContext; +import org.apache.zeppelin.interpreter.InterpreterContextRunner; +import org.apache.zeppelin.interpreter.InterpreterException; +import org.apache.zeppelin.interpreter.InterpreterGroup; import org.apache.zeppelin.interpreter.InterpreterHookListener; +import org.apache.zeppelin.interpreter.InterpreterHookRegistry; +import org.apache.zeppelin.interpreter.InterpreterHookRegistry.HookType; +import org.apache.zeppelin.interpreter.InterpreterOutput; +import org.apache.zeppelin.interpreter.InterpreterOutputListener; +import org.apache.zeppelin.interpreter.InterpreterResult; import org.apache.zeppelin.interpreter.InterpreterResult.Code; -import org.apache.zeppelin.interpreter.thrift.*; -import org.apache.zeppelin.resource.*; +import org.apache.zeppelin.interpreter.InterpreterResultMessage; +import org.apache.zeppelin.interpreter.InterpreterResultMessageOutput; +import org.apache.zeppelin.interpreter.LazyOpenInterpreter; +import org.apache.zeppelin.interpreter.RemoteWorksController; +import org.apache.zeppelin.interpreter.RemoteZeppelinServerResource; +import org.apache.zeppelin.interpreter.thrift.CallbackInfo; +import org.apache.zeppelin.interpreter.thrift.InterpreterCompletion; +import org.apache.zeppelin.interpreter.thrift.RemoteApplicationResult; +import org.apache.zeppelin.interpreter.thrift.RemoteInterpreterContext; +import org.apache.zeppelin.interpreter.thrift.RemoteInterpreterEvent; +import org.apache.zeppelin.interpreter.thrift.RemoteInterpreterResult; +import org.apache.zeppelin.interpreter.thrift.RemoteInterpreterResultMessage; +import org.apache.zeppelin.interpreter.thrift.RemoteInterpreterService; +import org.apache.zeppelin.interpreter.thrift.ZeppelinServerResourceParagraphRunner; +import org.apache.zeppelin.resource.DistributedResourcePool; +import org.apache.zeppelin.resource.Resource; +import org.apache.zeppelin.resource.ResourcePool; +import org.apache.zeppelin.resource.ResourceSet; +import org.apache.zeppelin.resource.WellKnownResourceName; import org.apache.zeppelin.scheduler.Job; import org.apache.zeppelin.scheduler.Job.Status; import org.apache.zeppelin.scheduler.JobListener; @@ -49,8 +75,22 @@ import org.apache.zeppelin.user.AuthenticationInfo; import org.slf4j.Logger; import org.slf4j.LoggerFactory; -import com.google.gson.Gson; -import com.google.gson.reflect.TypeToken; +import java.io.IOException; +import java.lang.reflect.Constructor; +import java.lang.reflect.InvocationTargetException; +import java.lang.reflect.Method; +import java.net.URL; +import java.nio.ByteBuffer; +import java.util.Arrays; +import java.util.Collections; +import java.util.HashMap; +import java.util.Iterator; +import java.util.LinkedList; +import java.util.List; +import java.util.Map; +import java.util.Properties; +import java.util.concurrent.ConcurrentHashMap; +import java.util.concurrent.ConcurrentMap; /** * Entry point for Interpreter process. @@ -70,6 +110,9 @@ public class RemoteInterpreterServer Gson gson = new Gson(); RemoteInterpreterService.Processor<RemoteInterpreterServer> processor; + private String callbackHost; + private int callbackPort; + private String host; private int port; private TThreadPoolServer server; @@ -87,11 +130,34 @@ public class RemoteInterpreterServer // Hold information for manual progress update private ConcurrentMap<String, Integer> progressMap = new ConcurrentHashMap<>(); - public RemoteInterpreterServer(int port) throws TTransportException { - this.port = port; + private boolean isTest; + + public RemoteInterpreterServer(String callbackHost, int port) throws IOException, + TTransportException { + this(callbackHost, port, false); + } + + public RemoteInterpreterServer(String callbackHost, int port, boolean isTest) + throws TTransportException, IOException { + if (null != callbackHost) { + this.callbackHost = callbackHost; + this.callbackPort = port; + } else { + // DevInterpreter + this.port = port; + } + this.isTest = isTest; processor = new RemoteInterpreterService.Processor<>(this); - TServerSocket serverTransport = new TServerSocket(port); + TServerSocket serverTransport; + if (null == callbackHost) { + // Dev Interpreter + serverTransport = new TServerSocket(port); + } else { + this.port = RemoteInterpreterUtils.findRandomAvailablePortOnAllLocalInterfaces(); + this.host = RemoteInterpreterUtils.findAvailableHostAddress(); + serverTransport = new TServerSocket(this.port); + } server = new TThreadPoolServer( new TThreadPoolServer.Args(serverTransport).processor(processor)); remoteWorksResponsePool = Collections.synchronizedMap(new HashMap<String, Object>()); @@ -100,6 +166,36 @@ public class RemoteInterpreterServer @Override public void run() { + if (null != callbackHost && !isTest) { + new Thread(new Runnable() { + boolean interrupted = false; + @Override + public void run() { + while (!interrupted && !server.isServing()) { + try { + Thread.sleep(1000); + } catch (InterruptedException e) { + interrupted = true; + } + } + + if (!interrupted) { + CallbackInfo callbackInfo = new CallbackInfo(host, port); + try { + RemoteInterpreterUtils + .registerInterpreter(callbackHost, callbackPort, callbackInfo); + } catch (TException e) { + logger.error("Error while registering interpreter: {}", callbackInfo, e); + try { + shutdown(); + } catch (TException e1) { + logger.warn("Exception occurs while shutting down", e1); + } + } + } + } + }).start(); + } logger.info("Starting remote interpreter server on port {}", port); server.serve(); } @@ -151,13 +247,15 @@ public class RemoteInterpreterServer public static void main(String[] args) - throws TTransportException, InterruptedException { - + throws TTransportException, InterruptedException, IOException { + String callbackHost = null; int port = Constants.ZEPPELIN_INTERPRETER_DEFAUlT_PORT; if (args.length > 0) { - port = Integer.parseInt(args[0]); + callbackHost = args[0]; + port = Integer.parseInt(args[1]); } - RemoteInterpreterServer remoteInterpreterServer = new RemoteInterpreterServer(port); + RemoteInterpreterServer remoteInterpreterServer = + new RemoteInterpreterServer(callbackHost, port); remoteInterpreterServer.start(); remoteInterpreterServer.join(); System.exit(0); http://git-wip-us.apache.org/repos/asf/zeppelin/blob/1812928b/zeppelin-interpreter/src/main/java/org/apache/zeppelin/interpreter/remote/RemoteInterpreterUtils.java ---------------------------------------------------------------------- diff --git a/zeppelin-interpreter/src/main/java/org/apache/zeppelin/interpreter/remote/RemoteInterpreterUtils.java b/zeppelin-interpreter/src/main/java/org/apache/zeppelin/interpreter/remote/RemoteInterpreterUtils.java index 4ee6690..835199a 100644 --- a/zeppelin-interpreter/src/main/java/org/apache/zeppelin/interpreter/remote/RemoteInterpreterUtils.java +++ b/zeppelin-interpreter/src/main/java/org/apache/zeppelin/interpreter/remote/RemoteInterpreterUtils.java @@ -17,27 +17,96 @@ package org.apache.zeppelin.interpreter.remote; -import org.slf4j.Logger; -import org.slf4j.LoggerFactory; - import java.io.IOException; import java.net.ConnectException; +import java.net.Inet4Address; +import java.net.InetAddress; import java.net.InetSocketAddress; +import java.net.InterfaceAddress; +import java.net.NetworkInterface; import java.net.ServerSocket; import java.net.Socket; +import java.net.SocketException; +import java.net.UnknownHostException; +import java.util.Collections; +import org.apache.thrift.TException; +import org.apache.thrift.protocol.TBinaryProtocol; +import org.apache.thrift.protocol.TProtocol; +import org.apache.thrift.transport.TSocket; +import org.apache.thrift.transport.TTransport; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import org.apache.zeppelin.interpreter.thrift.CallbackInfo; +import org.apache.zeppelin.interpreter.thrift.RemoteInterpreterCallbackService; /** * */ public class RemoteInterpreterUtils { static Logger LOGGER = LoggerFactory.getLogger(RemoteInterpreterUtils.class); + + public static int findRandomAvailablePortOnAllLocalInterfaces() throws IOException { - int port; - try (ServerSocket socket = new ServerSocket(0);) { - port = socket.getLocalPort(); - socket.close(); + return findRandomAvailablePortOnAllLocalInterfaces(":"); + } + + /** + * start:end + * + * @param portRange + * @return + * @throws IOException + */ + public static int findRandomAvailablePortOnAllLocalInterfaces(String portRange) + throws IOException { + + // ':' is the default value which means no constraints on the portRange + if (portRange == null || portRange.equals(":")) { + int port; + try (ServerSocket socket = new ServerSocket(0);) { + port = socket.getLocalPort(); + socket.close(); + } + return port; } - return port; + // valid user registered port https://en.wikipedia.org/wiki/Registered_port + int start = 1024; + int end = 49151; + String[] ports = portRange.split(":", -1); + if (!ports[0].isEmpty()) { + start = Integer.parseInt(ports[0]); + } + if (!ports[1].isEmpty()) { + end = Integer.parseInt(ports[1]); + } + for (int i = start; i <= end; ++i) { + try { + ServerSocket socket = new ServerSocket(i); + return socket.getLocalPort(); + } catch (Exception e) { + // ignore this + } + } + throw new IOException("No available port in the portRange: " + portRange); + } + + public static String findAvailableHostAddress() throws UnknownHostException, SocketException { + InetAddress address = InetAddress.getLocalHost(); + if (address.isLoopbackAddress()) { + for (NetworkInterface networkInterface : Collections + .list(NetworkInterface.getNetworkInterfaces())) { + if (!networkInterface.isLoopback()) { + for (InterfaceAddress interfaceAddress : networkInterface.getInterfaceAddresses()) { + InetAddress a = interfaceAddress.getAddress(); + if (a instanceof Inet4Address) { + return a.getHostAddress(); + } + } + } + } + } + return address.getHostAddress(); } public static boolean checkIfRemoteEndpointAccessible(String host, int port) { @@ -80,4 +149,17 @@ public class RemoteInterpreterUtils { return key.matches("^[A-Z_0-9]*"); } + + public static void registerInterpreter(String callbackHost, int callbackPort, + final CallbackInfo callbackInfo) throws TException { + LOGGER.info("callbackHost: {}, callbackPort: {}, callbackInfo: {}", callbackHost, callbackPort, + callbackInfo); + try (TTransport transport = new TSocket(callbackHost, callbackPort)) { + transport.open(); + TProtocol protocol = new TBinaryProtocol(transport); + RemoteInterpreterCallbackService.Client client = new RemoteInterpreterCallbackService.Client( + protocol); + client.callback(callbackInfo); + } + } } http://git-wip-us.apache.org/repos/asf/zeppelin/blob/1812928b/zeppelin-interpreter/src/main/java/org/apache/zeppelin/interpreter/thrift/CallbackInfo.java ---------------------------------------------------------------------- diff --git a/zeppelin-interpreter/src/main/java/org/apache/zeppelin/interpreter/thrift/CallbackInfo.java b/zeppelin-interpreter/src/main/java/org/apache/zeppelin/interpreter/thrift/CallbackInfo.java new file mode 100644 index 0000000..b0c7e9a --- /dev/null +++ b/zeppelin-interpreter/src/main/java/org/apache/zeppelin/interpreter/thrift/CallbackInfo.java @@ -0,0 +1,518 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +/** + * Autogenerated by Thrift Compiler (0.9.2) + * + * DO NOT EDIT UNLESS YOU ARE SURE THAT YOU KNOW WHAT YOU ARE DOING + * @generated + */ +package org.apache.zeppelin.interpreter.thrift; + +import org.apache.thrift.scheme.IScheme; +import org.apache.thrift.scheme.SchemeFactory; +import org.apache.thrift.scheme.StandardScheme; + +import org.apache.thrift.scheme.TupleScheme; +import org.apache.thrift.protocol.TTupleProtocol; +import org.apache.thrift.protocol.TProtocolException; +import org.apache.thrift.EncodingUtils; +import org.apache.thrift.TException; +import org.apache.thrift.async.AsyncMethodCallback; +import org.apache.thrift.server.AbstractNonblockingServer.*; +import java.util.List; +import java.util.ArrayList; +import java.util.Map; +import java.util.HashMap; +import java.util.EnumMap; +import java.util.Set; +import java.util.HashSet; +import java.util.EnumSet; +import java.util.Collections; +import java.util.BitSet; +import java.nio.ByteBuffer; +import java.util.Arrays; +import javax.annotation.Generated; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +@SuppressWarnings({"cast", "rawtypes", "serial", "unchecked"}) +@Generated(value = "Autogenerated by Thrift Compiler (0.9.2)", date = "2017-6-17") +public class CallbackInfo implements org.apache.thrift.TBase<CallbackInfo, CallbackInfo._Fields>, java.io.Serializable, Cloneable, Comparable<CallbackInfo> { + private static final org.apache.thrift.protocol.TStruct STRUCT_DESC = new org.apache.thrift.protocol.TStruct("CallbackInfo"); + + private static final org.apache.thrift.protocol.TField HOST_FIELD_DESC = new org.apache.thrift.protocol.TField("host", org.apache.thrift.protocol.TType.STRING, (short)1); + private static final org.apache.thrift.protocol.TField PORT_FIELD_DESC = new org.apache.thrift.protocol.TField("port", org.apache.thrift.protocol.TType.I32, (short)2); + + private static final Map<Class<? extends IScheme>, SchemeFactory> schemes = new HashMap<Class<? extends IScheme>, SchemeFactory>(); + static { + schemes.put(StandardScheme.class, new CallbackInfoStandardSchemeFactory()); + schemes.put(TupleScheme.class, new CallbackInfoTupleSchemeFactory()); + } + + public String host; // required + public int port; // required + + /** The set of fields this struct contains, along with convenience methods for finding and manipulating them. */ + public enum _Fields implements org.apache.thrift.TFieldIdEnum { + HOST((short)1, "host"), + PORT((short)2, "port"); + + private static final Map<String, _Fields> byName = new HashMap<String, _Fields>(); + + static { + for (_Fields field : EnumSet.allOf(_Fields.class)) { + byName.put(field.getFieldName(), field); + } + } + + /** + * Find the _Fields constant that matches fieldId, or null if its not found. + */ + public static _Fields findByThriftId(int fieldId) { + switch(fieldId) { + case 1: // HOST + return HOST; + case 2: // PORT + return PORT; + default: + return null; + } + } + + /** + * Find the _Fields constant that matches fieldId, throwing an exception + * if it is not found. + */ + public static _Fields findByThriftIdOrThrow(int fieldId) { + _Fields fields = findByThriftId(fieldId); + if (fields == null) throw new IllegalArgumentException("Field " + fieldId + " doesn't exist!"); + return fields; + } + + /** + * Find the _Fields constant that matches name, or null if its not found. + */ + public static _Fields findByName(String name) { + return byName.get(name); + } + + private final short _thriftId; + private final String _fieldName; + + _Fields(short thriftId, String fieldName) { + _thriftId = thriftId; + _fieldName = fieldName; + } + + public short getThriftFieldId() { + return _thriftId; + } + + public String getFieldName() { + return _fieldName; + } + } + + // isset id assignments + private static final int __PORT_ISSET_ID = 0; + private byte __isset_bitfield = 0; + public static final Map<_Fields, org.apache.thrift.meta_data.FieldMetaData> metaDataMap; + static { + Map<_Fields, org.apache.thrift.meta_data.FieldMetaData> tmpMap = new EnumMap<_Fields, org.apache.thrift.meta_data.FieldMetaData>(_Fields.class); + tmpMap.put(_Fields.HOST, new org.apache.thrift.meta_data.FieldMetaData("host", org.apache.thrift.TFieldRequirementType.DEFAULT, + new org.apache.thrift.meta_data.FieldValueMetaData(org.apache.thrift.protocol.TType.STRING))); + tmpMap.put(_Fields.PORT, new org.apache.thrift.meta_data.FieldMetaData("port", org.apache.thrift.TFieldRequirementType.DEFAULT, + new org.apache.thrift.meta_data.FieldValueMetaData(org.apache.thrift.protocol.TType.I32))); + metaDataMap = Collections.unmodifiableMap(tmpMap); + org.apache.thrift.meta_data.FieldMetaData.addStructMetaDataMap(CallbackInfo.class, metaDataMap); + } + + public CallbackInfo() { + } + + public CallbackInfo( + String host, + int port) + { + this(); + this.host = host; + this.port = port; + setPortIsSet(true); + } + + /** + * Performs a deep copy on <i>other</i>. + */ + public CallbackInfo(CallbackInfo other) { + __isset_bitfield = other.__isset_bitfield; + if (other.isSetHost()) { + this.host = other.host; + } + this.port = other.port; + } + + public CallbackInfo deepCopy() { + return new CallbackInfo(this); + } + + @Override + public void clear() { + this.host = null; + setPortIsSet(false); + this.port = 0; + } + + public String getHost() { + return this.host; + } + + public CallbackInfo setHost(String host) { + this.host = host; + return this; + } + + public void unsetHost() { + this.host = null; + } + + /** Returns true if field host is set (has been assigned a value) and false otherwise */ + public boolean isSetHost() { + return this.host != null; + } + + public void setHostIsSet(boolean value) { + if (!value) { + this.host = null; + } + } + + public int getPort() { + return this.port; + } + + public CallbackInfo setPort(int port) { + this.port = port; + setPortIsSet(true); + return this; + } + + public void unsetPort() { + __isset_bitfield = EncodingUtils.clearBit(__isset_bitfield, __PORT_ISSET_ID); + } + + /** Returns true if field port is set (has been assigned a value) and false otherwise */ + public boolean isSetPort() { + return EncodingUtils.testBit(__isset_bitfield, __PORT_ISSET_ID); + } + + public void setPortIsSet(boolean value) { + __isset_bitfield = EncodingUtils.setBit(__isset_bitfield, __PORT_ISSET_ID, value); + } + + public void setFieldValue(_Fields field, Object value) { + switch (field) { + case HOST: + if (value == null) { + unsetHost(); + } else { + setHost((String)value); + } + break; + + case PORT: + if (value == null) { + unsetPort(); + } else { + setPort((Integer)value); + } + break; + + } + } + + public Object getFieldValue(_Fields field) { + switch (field) { + case HOST: + return getHost(); + + case PORT: + return Integer.valueOf(getPort()); + + } + throw new IllegalStateException(); + } + + /** Returns true if field corresponding to fieldID is set (has been assigned a value) and false otherwise */ + public boolean isSet(_Fields field) { + if (field == null) { + throw new IllegalArgumentException(); + } + + switch (field) { + case HOST: + return isSetHost(); + case PORT: + return isSetPort(); + } + throw new IllegalStateException(); + } + + @Override + public boolean equals(Object that) { + if (that == null) + return false; + if (that instanceof CallbackInfo) + return this.equals((CallbackInfo)that); + return false; + } + + public boolean equals(CallbackInfo that) { + if (that == null) + return false; + + boolean this_present_host = true && this.isSetHost(); + boolean that_present_host = true && that.isSetHost(); + if (this_present_host || that_present_host) { + if (!(this_present_host && that_present_host)) + return false; + if (!this.host.equals(that.host)) + return false; + } + + boolean this_present_port = true; + boolean that_present_port = true; + if (this_present_port || that_present_port) { + if (!(this_present_port && that_present_port)) + return false; + if (this.port != that.port) + return false; + } + + return true; + } + + @Override + public int hashCode() { + List<Object> list = new ArrayList<Object>(); + + boolean present_host = true && (isSetHost()); + list.add(present_host); + if (present_host) + list.add(host); + + boolean present_port = true; + list.add(present_port); + if (present_port) + list.add(port); + + return list.hashCode(); + } + + @Override + public int compareTo(CallbackInfo other) { + if (!getClass().equals(other.getClass())) { + return getClass().getName().compareTo(other.getClass().getName()); + } + + int lastComparison = 0; + + lastComparison = Boolean.valueOf(isSetHost()).compareTo(other.isSetHost()); + if (lastComparison != 0) { + return lastComparison; + } + if (isSetHost()) { + lastComparison = org.apache.thrift.TBaseHelper.compareTo(this.host, other.host); + if (lastComparison != 0) { + return lastComparison; + } + } + lastComparison = Boolean.valueOf(isSetPort()).compareTo(other.isSetPort()); + if (lastComparison != 0) { + return lastComparison; + } + if (isSetPort()) { + lastComparison = org.apache.thrift.TBaseHelper.compareTo(this.port, other.port); + if (lastComparison != 0) { + return lastComparison; + } + } + return 0; + } + + public _Fields fieldForId(int fieldId) { + return _Fields.findByThriftId(fieldId); + } + + public void read(org.apache.thrift.protocol.TProtocol iprot) throws org.apache.thrift.TException { + schemes.get(iprot.getScheme()).getScheme().read(iprot, this); + } + + public void write(org.apache.thrift.protocol.TProtocol oprot) throws org.apache.thrift.TException { + schemes.get(oprot.getScheme()).getScheme().write(oprot, this); + } + + @Override + public String toString() { + StringBuilder sb = new StringBuilder("CallbackInfo("); + boolean first = true; + + sb.append("host:"); + if (this.host == null) { + sb.append("null"); + } else { + sb.append(this.host); + } + first = false; + if (!first) sb.append(", "); + sb.append("port:"); + sb.append(this.port); + first = false; + sb.append(")"); + return sb.toString(); + } + + public void validate() throws org.apache.thrift.TException { + // check for required fields + // check for sub-struct validity + } + + private void writeObject(java.io.ObjectOutputStream out) throws java.io.IOException { + try { + write(new org.apache.thrift.protocol.TCompactProtocol(new org.apache.thrift.transport.TIOStreamTransport(out))); + } catch (org.apache.thrift.TException te) { + throw new java.io.IOException(te); + } + } + + private void readObject(java.io.ObjectInputStream in) throws java.io.IOException, ClassNotFoundException { + try { + // it doesn't seem like you should have to do this, but java serialization is wacky, and doesn't call the default constructor. + __isset_bitfield = 0; + read(new org.apache.thrift.protocol.TCompactProtocol(new org.apache.thrift.transport.TIOStreamTransport(in))); + } catch (org.apache.thrift.TException te) { + throw new java.io.IOException(te); + } + } + + private static class CallbackInfoStandardSchemeFactory implements SchemeFactory { + public CallbackInfoStandardScheme getScheme() { + return new CallbackInfoStandardScheme(); + } + } + + private static class CallbackInfoStandardScheme extends StandardScheme<CallbackInfo> { + + public void read(org.apache.thrift.protocol.TProtocol iprot, CallbackInfo struct) throws org.apache.thrift.TException { + org.apache.thrift.protocol.TField schemeField; + iprot.readStructBegin(); + while (true) + { + schemeField = iprot.readFieldBegin(); + if (schemeField.type == org.apache.thrift.protocol.TType.STOP) { + break; + } + switch (schemeField.id) { + case 1: // HOST + if (schemeField.type == org.apache.thrift.protocol.TType.STRING) { + struct.host = iprot.readString(); + struct.setHostIsSet(true); + } else { + org.apache.thrift.protocol.TProtocolUtil.skip(iprot, schemeField.type); + } + break; + case 2: // PORT + if (schemeField.type == org.apache.thrift.protocol.TType.I32) { + struct.port = iprot.readI32(); + struct.setPortIsSet(true); + } else { + org.apache.thrift.protocol.TProtocolUtil.skip(iprot, schemeField.type); + } + break; + default: + org.apache.thrift.protocol.TProtocolUtil.skip(iprot, schemeField.type); + } + iprot.readFieldEnd(); + } + iprot.readStructEnd(); + + // check for required fields of primitive type, which can't be checked in the validate method + struct.validate(); + } + + public void write(org.apache.thrift.protocol.TProtocol oprot, CallbackInfo struct) throws org.apache.thrift.TException { + struct.validate(); + + oprot.writeStructBegin(STRUCT_DESC); + if (struct.host != null) { + oprot.writeFieldBegin(HOST_FIELD_DESC); + oprot.writeString(struct.host); + oprot.writeFieldEnd(); + } + oprot.writeFieldBegin(PORT_FIELD_DESC); + oprot.writeI32(struct.port); + oprot.writeFieldEnd(); + oprot.writeFieldStop(); + oprot.writeStructEnd(); + } + + } + + private static class CallbackInfoTupleSchemeFactory implements SchemeFactory { + public CallbackInfoTupleScheme getScheme() { + return new CallbackInfoTupleScheme(); + } + } + + private static class CallbackInfoTupleScheme extends TupleScheme<CallbackInfo> { + + @Override + public void write(org.apache.thrift.protocol.TProtocol prot, CallbackInfo struct) throws org.apache.thrift.TException { + TTupleProtocol oprot = (TTupleProtocol) prot; + BitSet optionals = new BitSet(); + if (struct.isSetHost()) { + optionals.set(0); + } + if (struct.isSetPort()) { + optionals.set(1); + } + oprot.writeBitSet(optionals, 2); + if (struct.isSetHost()) { + oprot.writeString(struct.host); + } + if (struct.isSetPort()) { + oprot.writeI32(struct.port); + } + } + + @Override + public void read(org.apache.thrift.protocol.TProtocol prot, CallbackInfo struct) throws org.apache.thrift.TException { + TTupleProtocol iprot = (TTupleProtocol) prot; + BitSet incoming = iprot.readBitSet(2); + if (incoming.get(0)) { + struct.host = iprot.readString(); + struct.setHostIsSet(true); + } + if (incoming.get(1)) { + struct.port = iprot.readI32(); + struct.setPortIsSet(true); + } + } + } + +} + http://git-wip-us.apache.org/repos/asf/zeppelin/blob/1812928b/zeppelin-interpreter/src/main/java/org/apache/zeppelin/interpreter/thrift/RemoteInterpreterCallbackService.java ---------------------------------------------------------------------- diff --git a/zeppelin-interpreter/src/main/java/org/apache/zeppelin/interpreter/thrift/RemoteInterpreterCallbackService.java b/zeppelin-interpreter/src/main/java/org/apache/zeppelin/interpreter/thrift/RemoteInterpreterCallbackService.java new file mode 100644 index 0000000..6ef08f6 --- /dev/null +++ b/zeppelin-interpreter/src/main/java/org/apache/zeppelin/interpreter/thrift/RemoteInterpreterCallbackService.java @@ -0,0 +1,879 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +/** + * Autogenerated by Thrift Compiler (0.9.2) + * + * DO NOT EDIT UNLESS YOU ARE SURE THAT YOU KNOW WHAT YOU ARE DOING + * @generated + */ +package org.apache.zeppelin.interpreter.thrift; + +import org.apache.thrift.scheme.IScheme; +import org.apache.thrift.scheme.SchemeFactory; +import org.apache.thrift.scheme.StandardScheme; + +import org.apache.thrift.scheme.TupleScheme; +import org.apache.thrift.protocol.TTupleProtocol; +import org.apache.thrift.protocol.TProtocolException; +import org.apache.thrift.EncodingUtils; +import org.apache.thrift.TException; +import org.apache.thrift.async.AsyncMethodCallback; +import org.apache.thrift.server.AbstractNonblockingServer.*; +import java.util.List; +import java.util.ArrayList; +import java.util.Map; +import java.util.HashMap; +import java.util.EnumMap; +import java.util.Set; +import java.util.HashSet; +import java.util.EnumSet; +import java.util.Collections; +import java.util.BitSet; +import java.nio.ByteBuffer; +import java.util.Arrays; +import javax.annotation.Generated; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +@SuppressWarnings({"cast", "rawtypes", "serial", "unchecked"}) +@Generated(value = "Autogenerated by Thrift Compiler (0.9.2)", date = "2017-6-17") +public class RemoteInterpreterCallbackService { + + public interface Iface { + + public void callback(CallbackInfo callbackInfo) throws org.apache.thrift.TException; + + } + + public interface AsyncIface { + + public void callback(CallbackInfo callbackInfo, org.apache.thrift.async.AsyncMethodCallback resultHandler) throws org.apache.thrift.TException; + + } + + public static class Client extends org.apache.thrift.TServiceClient implements Iface { + public static class Factory implements org.apache.thrift.TServiceClientFactory<Client> { + public Factory() {} + public Client getClient(org.apache.thrift.protocol.TProtocol prot) { + return new Client(prot); + } + public Client getClient(org.apache.thrift.protocol.TProtocol iprot, org.apache.thrift.protocol.TProtocol oprot) { + return new Client(iprot, oprot); + } + } + + public Client(org.apache.thrift.protocol.TProtocol prot) + { + super(prot, prot); + } + + public Client(org.apache.thrift.protocol.TProtocol iprot, org.apache.thrift.protocol.TProtocol oprot) { + super(iprot, oprot); + } + + public void callback(CallbackInfo callbackInfo) throws org.apache.thrift.TException + { + send_callback(callbackInfo); + recv_callback(); + } + + public void send_callback(CallbackInfo callbackInfo) throws org.apache.thrift.TException + { + callback_args args = new callback_args(); + args.setCallbackInfo(callbackInfo); + sendBase("callback", args); + } + + public void recv_callback() throws org.apache.thrift.TException + { + callback_result result = new callback_result(); + receiveBase(result, "callback"); + return; + } + + } + public static class AsyncClient extends org.apache.thrift.async.TAsyncClient implements AsyncIface { + public static class Factory implements org.apache.thrift.async.TAsyncClientFactory<AsyncClient> { + private org.apache.thrift.async.TAsyncClientManager clientManager; + private org.apache.thrift.protocol.TProtocolFactory protocolFactory; + public Factory(org.apache.thrift.async.TAsyncClientManager clientManager, org.apache.thrift.protocol.TProtocolFactory protocolFactory) { + this.clientManager = clientManager; + this.protocolFactory = protocolFactory; + } + public AsyncClient getAsyncClient(org.apache.thrift.transport.TNonblockingTransport transport) { + return new AsyncClient(protocolFactory, clientManager, transport); + } + } + + public AsyncClient(org.apache.thrift.protocol.TProtocolFactory protocolFactory, org.apache.thrift.async.TAsyncClientManager clientManager, org.apache.thrift.transport.TNonblockingTransport transport) { + super(protocolFactory, clientManager, transport); + } + + public void callback(CallbackInfo callbackInfo, org.apache.thrift.async.AsyncMethodCallback resultHandler) throws org.apache.thrift.TException { + checkReady(); + callback_call method_call = new callback_call(callbackInfo, resultHandler, this, ___protocolFactory, ___transport); + this.___currentMethod = method_call; + ___manager.call(method_call); + } + + public static class callback_call extends org.apache.thrift.async.TAsyncMethodCall { + private CallbackInfo callbackInfo; + public callback_call(CallbackInfo callbackInfo, org.apache.thrift.async.AsyncMethodCallback resultHandler, org.apache.thrift.async.TAsyncClient client, org.apache.thrift.protocol.TProtocolFactory protocolFactory, org.apache.thrift.transport.TNonblockingTransport transport) throws org.apache.thrift.TException { + super(client, protocolFactory, transport, resultHandler, false); + this.callbackInfo = callbackInfo; + } + + public void write_args(org.apache.thrift.protocol.TProtocol prot) throws org.apache.thrift.TException { + prot.writeMessageBegin(new org.apache.thrift.protocol.TMessage("callback", org.apache.thrift.protocol.TMessageType.CALL, 0)); + callback_args args = new callback_args(); + args.setCallbackInfo(callbackInfo); + args.write(prot); + prot.writeMessageEnd(); + } + + public void getResult() throws org.apache.thrift.TException { + if (getState() != org.apache.thrift.async.TAsyncMethodCall.State.RESPONSE_READ) { + throw new IllegalStateException("Method call not finished!"); + } + org.apache.thrift.transport.TMemoryInputTransport memoryTransport = new org.apache.thrift.transport.TMemoryInputTransport(getFrameBuffer().array()); + org.apache.thrift.protocol.TProtocol prot = client.getProtocolFactory().getProtocol(memoryTransport); + (new Client(prot)).recv_callback(); + } + } + + } + + public static class Processor<I extends Iface> extends org.apache.thrift.TBaseProcessor<I> implements org.apache.thrift.TProcessor { + private static final Logger LOGGER = LoggerFactory.getLogger(Processor.class.getName()); + public Processor(I iface) { + super(iface, getProcessMap(new HashMap<String, org.apache.thrift.ProcessFunction<I, ? extends org.apache.thrift.TBase>>())); + } + + protected Processor(I iface, Map<String, org.apache.thrift.ProcessFunction<I, ? extends org.apache.thrift.TBase>> processMap) { + super(iface, getProcessMap(processMap)); + } + + private static <I extends Iface> Map<String, org.apache.thrift.ProcessFunction<I, ? extends org.apache.thrift.TBase>> getProcessMap(Map<String, org.apache.thrift.ProcessFunction<I, ? extends org.apache.thrift.TBase>> processMap) { + processMap.put("callback", new callback()); + return processMap; + } + + public static class callback<I extends Iface> extends org.apache.thrift.ProcessFunction<I, callback_args> { + public callback() { + super("callback"); + } + + public callback_args getEmptyArgsInstance() { + return new callback_args(); + } + + protected boolean isOneway() { + return false; + } + + public callback_result getResult(I iface, callback_args args) throws org.apache.thrift.TException { + callback_result result = new callback_result(); + iface.callback(args.callbackInfo); + return result; + } + } + + } + + public static class AsyncProcessor<I extends AsyncIface> extends org.apache.thrift.TBaseAsyncProcessor<I> { + private static final Logger LOGGER = LoggerFactory.getLogger(AsyncProcessor.class.getName()); + public AsyncProcessor(I iface) { + super(iface, getProcessMap(new HashMap<String, org.apache.thrift.AsyncProcessFunction<I, ? extends org.apache.thrift.TBase, ?>>())); + } + + protected AsyncProcessor(I iface, Map<String, org.apache.thrift.AsyncProcessFunction<I, ? extends org.apache.thrift.TBase, ?>> processMap) { + super(iface, getProcessMap(processMap)); + } + + private static <I extends AsyncIface> Map<String, org.apache.thrift.AsyncProcessFunction<I, ? extends org.apache.thrift.TBase,?>> getProcessMap(Map<String, org.apache.thrift.AsyncProcessFunction<I, ? extends org.apache.thrift.TBase, ?>> processMap) { + processMap.put("callback", new callback()); + return processMap; + } + + public static class callback<I extends AsyncIface> extends org.apache.thrift.AsyncProcessFunction<I, callback_args, Void> { + public callback() { + super("callback"); + } + + public callback_args getEmptyArgsInstance() { + return new callback_args(); + } + + public AsyncMethodCallback<Void> getResultHandler(final AsyncFrameBuffer fb, final int seqid) { + final org.apache.thrift.AsyncProcessFunction fcall = this; + return new AsyncMethodCallback<Void>() { + public void onComplete(Void o) { + callback_result result = new callback_result(); + try { + fcall.sendResponse(fb,result, org.apache.thrift.protocol.TMessageType.REPLY,seqid); + return; + } catch (Exception e) { + LOGGER.error("Exception writing to internal frame buffer", e); + } + fb.close(); + } + public void onError(Exception e) { + byte msgType = org.apache.thrift.protocol.TMessageType.REPLY; + org.apache.thrift.TBase msg; + callback_result result = new callback_result(); + { + msgType = org.apache.thrift.protocol.TMessageType.EXCEPTION; + msg = (org.apache.thrift.TBase)new org.apache.thrift.TApplicationException(org.apache.thrift.TApplicationException.INTERNAL_ERROR, e.getMessage()); + } + try { + fcall.sendResponse(fb,msg,msgType,seqid); + return; + } catch (Exception ex) { + LOGGER.error("Exception writing to internal frame buffer", ex); + } + fb.close(); + } + }; + } + + protected boolean isOneway() { + return false; + } + + public void start(I iface, callback_args args, org.apache.thrift.async.AsyncMethodCallback<Void> resultHandler) throws TException { + iface.callback(args.callbackInfo,resultHandler); + } + } + + } + + public static class callback_args implements org.apache.thrift.TBase<callback_args, callback_args._Fields>, java.io.Serializable, Cloneable, Comparable<callback_args> { + private static final org.apache.thrift.protocol.TStruct STRUCT_DESC = new org.apache.thrift.protocol.TStruct("callback_args"); + + private static final org.apache.thrift.protocol.TField CALLBACK_INFO_FIELD_DESC = new org.apache.thrift.protocol.TField("callbackInfo", org.apache.thrift.protocol.TType.STRUCT, (short)1); + + private static final Map<Class<? extends IScheme>, SchemeFactory> schemes = new HashMap<Class<? extends IScheme>, SchemeFactory>(); + static { + schemes.put(StandardScheme.class, new callback_argsStandardSchemeFactory()); + schemes.put(TupleScheme.class, new callback_argsTupleSchemeFactory()); + } + + public CallbackInfo callbackInfo; // required + + /** The set of fields this struct contains, along with convenience methods for finding and manipulating them. */ + public enum _Fields implements org.apache.thrift.TFieldIdEnum { + CALLBACK_INFO((short)1, "callbackInfo"); + + private static final Map<String, _Fields> byName = new HashMap<String, _Fields>(); + + static { + for (_Fields field : EnumSet.allOf(_Fields.class)) { + byName.put(field.getFieldName(), field); + } + } + + /** + * Find the _Fields constant that matches fieldId, or null if its not found. + */ + public static _Fields findByThriftId(int fieldId) { + switch(fieldId) { + case 1: // CALLBACK_INFO + return CALLBACK_INFO; + default: + return null; + } + } + + /** + * Find the _Fields constant that matches fieldId, throwing an exception + * if it is not found. + */ + public static _Fields findByThriftIdOrThrow(int fieldId) { + _Fields fields = findByThriftId(fieldId); + if (fields == null) throw new IllegalArgumentException("Field " + fieldId + " doesn't exist!"); + return fields; + } + + /** + * Find the _Fields constant that matches name, or null if its not found. + */ + public static _Fields findByName(String name) { + return byName.get(name); + } + + private final short _thriftId; + private final String _fieldName; + + _Fields(short thriftId, String fieldName) { + _thriftId = thriftId; + _fieldName = fieldName; + } + + public short getThriftFieldId() { + return _thriftId; + } + + public String getFieldName() { + return _fieldName; + } + } + + // isset id assignments + public static final Map<_Fields, org.apache.thrift.meta_data.FieldMetaData> metaDataMap; + static { + Map<_Fields, org.apache.thrift.meta_data.FieldMetaData> tmpMap = new EnumMap<_Fields, org.apache.thrift.meta_data.FieldMetaData>(_Fields.class); + tmpMap.put(_Fields.CALLBACK_INFO, new org.apache.thrift.meta_data.FieldMetaData("callbackInfo", org.apache.thrift.TFieldRequirementType.DEFAULT, + new org.apache.thrift.meta_data.StructMetaData(org.apache.thrift.protocol.TType.STRUCT, CallbackInfo.class))); + metaDataMap = Collections.unmodifiableMap(tmpMap); + org.apache.thrift.meta_data.FieldMetaData.addStructMetaDataMap(callback_args.class, metaDataMap); + } + + public callback_args() { + } + + public callback_args( + CallbackInfo callbackInfo) + { + this(); + this.callbackInfo = callbackInfo; + } + + /** + * Performs a deep copy on <i>other</i>. + */ + public callback_args(callback_args other) { + if (other.isSetCallbackInfo()) { + this.callbackInfo = new CallbackInfo(other.callbackInfo); + } + } + + public callback_args deepCopy() { + return new callback_args(this); + } + + @Override + public void clear() { + this.callbackInfo = null; + } + + public CallbackInfo getCallbackInfo() { + return this.callbackInfo; + } + + public callback_args setCallbackInfo(CallbackInfo callbackInfo) { + this.callbackInfo = callbackInfo; + return this; + } + + public void unsetCallbackInfo() { + this.callbackInfo = null; + } + + /** Returns true if field callbackInfo is set (has been assigned a value) and false otherwise */ + public boolean isSetCallbackInfo() { + return this.callbackInfo != null; + } + + public void setCallbackInfoIsSet(boolean value) { + if (!value) { + this.callbackInfo = null; + } + } + + public void setFieldValue(_Fields field, Object value) { + switch (field) { + case CALLBACK_INFO: + if (value == null) { + unsetCallbackInfo(); + } else { + setCallbackInfo((CallbackInfo)value); + } + break; + + } + } + + public Object getFieldValue(_Fields field) { + switch (field) { + case CALLBACK_INFO: + return getCallbackInfo(); + + } + throw new IllegalStateException(); + } + + /** Returns true if field corresponding to fieldID is set (has been assigned a value) and false otherwise */ + public boolean isSet(_Fields field) { + if (field == null) { + throw new IllegalArgumentException(); + } + + switch (field) { + case CALLBACK_INFO: + return isSetCallbackInfo(); + } + throw new IllegalStateException(); + } + + @Override + public boolean equals(Object that) { + if (that == null) + return false; + if (that instanceof callback_args) + return this.equals((callback_args)that); + return false; + } + + public boolean equals(callback_args that) { + if (that == null) + return false; + + boolean this_present_callbackInfo = true && this.isSetCallbackInfo(); + boolean that_present_callbackInfo = true && that.isSetCallbackInfo(); + if (this_present_callbackInfo || that_present_callbackInfo) { + if (!(this_present_callbackInfo && that_present_callbackInfo)) + return false; + if (!this.callbackInfo.equals(that.callbackInfo)) + return false; + } + + return true; + } + + @Override + public int hashCode() { + List<Object> list = new ArrayList<Object>(); + + boolean present_callbackInfo = true && (isSetCallbackInfo()); + list.add(present_callbackInfo); + if (present_callbackInfo) + list.add(callbackInfo); + + return list.hashCode(); + } + + @Override + public int compareTo(callback_args other) { + if (!getClass().equals(other.getClass())) { + return getClass().getName().compareTo(other.getClass().getName()); + } + + int lastComparison = 0; + + lastComparison = Boolean.valueOf(isSetCallbackInfo()).compareTo(other.isSetCallbackInfo()); + if (lastComparison != 0) { + return lastComparison; + } + if (isSetCallbackInfo()) { + lastComparison = org.apache.thrift.TBaseHelper.compareTo(this.callbackInfo, other.callbackInfo); + if (lastComparison != 0) { + return lastComparison; + } + } + return 0; + } + + public _Fields fieldForId(int fieldId) { + return _Fields.findByThriftId(fieldId); + } + + public void read(org.apache.thrift.protocol.TProtocol iprot) throws org.apache.thrift.TException { + schemes.get(iprot.getScheme()).getScheme().read(iprot, this); + } + + public void write(org.apache.thrift.protocol.TProtocol oprot) throws org.apache.thrift.TException { + schemes.get(oprot.getScheme()).getScheme().write(oprot, this); + } + + @Override + public String toString() { + StringBuilder sb = new StringBuilder("callback_args("); + boolean first = true; + + sb.append("callbackInfo:"); + if (this.callbackInfo == null) { + sb.append("null"); + } else { + sb.append(this.callbackInfo); + } + first = false; + sb.append(")"); + return sb.toString(); + } + + public void validate() throws org.apache.thrift.TException { + // check for required fields + // check for sub-struct validity + if (callbackInfo != null) { + callbackInfo.validate(); + } + } + + private void writeObject(java.io.ObjectOutputStream out) throws java.io.IOException { + try { + write(new org.apache.thrift.protocol.TCompactProtocol(new org.apache.thrift.transport.TIOStreamTransport(out))); + } catch (org.apache.thrift.TException te) { + throw new java.io.IOException(te); + } + } + + private void readObject(java.io.ObjectInputStream in) throws java.io.IOException, ClassNotFoundException { + try { + read(new org.apache.thrift.protocol.TCompactProtocol(new org.apache.thrift.transport.TIOStreamTransport(in))); + } catch (org.apache.thrift.TException te) { + throw new java.io.IOException(te); + } + } + + private static class callback_argsStandardSchemeFactory implements SchemeFactory { + public callback_argsStandardScheme getScheme() { + return new callback_argsStandardScheme(); + } + } + + private static class callback_argsStandardScheme extends StandardScheme<callback_args> { + + public void read(org.apache.thrift.protocol.TProtocol iprot, callback_args struct) throws org.apache.thrift.TException { + org.apache.thrift.protocol.TField schemeField; + iprot.readStructBegin(); + while (true) + { + schemeField = iprot.readFieldBegin(); + if (schemeField.type == org.apache.thrift.protocol.TType.STOP) { + break; + } + switch (schemeField.id) { + case 1: // CALLBACK_INFO + if (schemeField.type == org.apache.thrift.protocol.TType.STRUCT) { + struct.callbackInfo = new CallbackInfo(); + struct.callbackInfo.read(iprot); + struct.setCallbackInfoIsSet(true); + } else { + org.apache.thrift.protocol.TProtocolUtil.skip(iprot, schemeField.type); + } + break; + default: + org.apache.thrift.protocol.TProtocolUtil.skip(iprot, schemeField.type); + } + iprot.readFieldEnd(); + } + iprot.readStructEnd(); + + // check for required fields of primitive type, which can't be checked in the validate method + struct.validate(); + } + + public void write(org.apache.thrift.protocol.TProtocol oprot, callback_args struct) throws org.apache.thrift.TException { + struct.validate(); + + oprot.writeStructBegin(STRUCT_DESC); + if (struct.callbackInfo != null) { + oprot.writeFieldBegin(CALLBACK_INFO_FIELD_DESC); + struct.callbackInfo.write(oprot); + oprot.writeFieldEnd(); + } + oprot.writeFieldStop(); + oprot.writeStructEnd(); + } + + } + + private static class callback_argsTupleSchemeFactory implements SchemeFactory { + public callback_argsTupleScheme getScheme() { + return new callback_argsTupleScheme(); + } + } + + private static class callback_argsTupleScheme extends TupleScheme<callback_args> { + + @Override + public void write(org.apache.thrift.protocol.TProtocol prot, callback_args struct) throws org.apache.thrift.TException { + TTupleProtocol oprot = (TTupleProtocol) prot; + BitSet optionals = new BitSet(); + if (struct.isSetCallbackInfo()) { + optionals.set(0); + } + oprot.writeBitSet(optionals, 1); + if (struct.isSetCallbackInfo()) { + struct.callbackInfo.write(oprot); + } + } + + @Override + public void read(org.apache.thrift.protocol.TProtocol prot, callback_args struct) throws org.apache.thrift.TException { + TTupleProtocol iprot = (TTupleProtocol) prot; + BitSet incoming = iprot.readBitSet(1); + if (incoming.get(0)) { + struct.callbackInfo = new CallbackInfo(); + struct.callbackInfo.read(iprot); + struct.setCallbackInfoIsSet(true); + } + } + } + + } + + public static class callback_result implements org.apache.thrift.TBase<callback_result, callback_result._Fields>, java.io.Serializable, Cloneable, Comparable<callback_result> { + private static final org.apache.thrift.protocol.TStruct STRUCT_DESC = new org.apache.thrift.protocol.TStruct("callback_result"); + + + private static final Map<Class<? extends IScheme>, SchemeFactory> schemes = new HashMap<Class<? extends IScheme>, SchemeFactory>(); + static { + schemes.put(StandardScheme.class, new callback_resultStandardSchemeFactory()); + schemes.put(TupleScheme.class, new callback_resultTupleSchemeFactory()); + } + + + /** The set of fields this struct contains, along with convenience methods for finding and manipulating them. */ + public enum _Fields implements org.apache.thrift.TFieldIdEnum { +; + + private static final Map<String, _Fields> byName = new HashMap<String, _Fields>(); + + static { + for (_Fields field : EnumSet.allOf(_Fields.class)) { + byName.put(field.getFieldName(), field); + } + } + + /** + * Find the _Fields constant that matches fieldId, or null if its not found. + */ + public static _Fields findByThriftId(int fieldId) { + switch(fieldId) { + default: + return null; + } + } + + /** + * Find the _Fields constant that matches fieldId, throwing an exception + * if it is not found. + */ + public static _Fields findByThriftIdOrThrow(int fieldId) { + _Fields fields = findByThriftId(fieldId); + if (fields == null) throw new IllegalArgumentException("Field " + fieldId + " doesn't exist!"); + return fields; + } + + /** + * Find the _Fields constant that matches name, or null if its not found. + */ + public static _Fields findByName(String name) { + return byName.get(name); + } + + private final short _thriftId; + private final String _fieldName; + + _Fields(short thriftId, String fieldName) { + _thriftId = thriftId; + _fieldName = fieldName; + } + + public short getThriftFieldId() { + return _thriftId; + } + + public String getFieldName() { + return _fieldName; + } + } + public static final Map<_Fields, org.apache.thrift.meta_data.FieldMetaData> metaDataMap; + static { + Map<_Fields, org.apache.thrift.meta_data.FieldMetaData> tmpMap = new EnumMap<_Fields, org.apache.thrift.meta_data.FieldMetaData>(_Fields.class); + metaDataMap = Collections.unmodifiableMap(tmpMap); + org.apache.thrift.meta_data.FieldMetaData.addStructMetaDataMap(callback_result.class, metaDataMap); + } + + public callback_result() { + } + + /** + * Performs a deep copy on <i>other</i>. + */ + public callback_result(callback_result other) { + } + + public callback_result deepCopy() { + return new callback_result(this); + } + + @Override + public void clear() { + } + + public void setFieldValue(_Fields field, Object value) { + switch (field) { + } + } + + public Object getFieldValue(_Fields field) { + switch (field) { + } + throw new IllegalStateException(); + } + + /** Returns true if field corresponding to fieldID is set (has been assigned a value) and false otherwise */ + public boolean isSet(_Fields field) { + if (field == null) { + throw new IllegalArgumentException(); + } + + switch (field) { + } + throw new IllegalStateException(); + } + + @Override + public boolean equals(Object that) { + if (that == null) + return false; + if (that instanceof callback_result) + return this.equals((callback_result)that); + return false; + } + + public boolean equals(callback_result that) { + if (that == null) + return false; + + return true; + } + + @Override + public int hashCode() { + List<Object> list = new ArrayList<Object>(); + + return list.hashCode(); + } + + @Override + public int compareTo(callback_result other) { + if (!getClass().equals(other.getClass())) { + return getClass().getName().compareTo(other.getClass().getName()); + } + + int lastComparison = 0; + + return 0; + } + + public _Fields fieldForId(int fieldId) { + return _Fields.findByThriftId(fieldId); + } + + public void read(org.apache.thrift.protocol.TProtocol iprot) throws org.apache.thrift.TException { + schemes.get(iprot.getScheme()).getScheme().read(iprot, this); + } + + public void write(org.apache.thrift.protocol.TProtocol oprot) throws org.apache.thrift.TException { + schemes.get(oprot.getScheme()).getScheme().write(oprot, this); + } + + @Override + public String toString() { + StringBuilder sb = new StringBuilder("callback_result("); + boolean first = true; + + sb.append(")"); + return sb.toString(); + } + + public void validate() throws org.apache.thrift.TException { + // check for required fields + // check for sub-struct validity + } + + private void writeObject(java.io.ObjectOutputStream out) throws java.io.IOException { + try { + write(new org.apache.thrift.protocol.TCompactProtocol(new org.apache.thrift.transport.TIOStreamTransport(out))); + } catch (org.apache.thrift.TException te) { + throw new java.io.IOException(te); + } + } + + private void readObject(java.io.ObjectInputStream in) throws java.io.IOException, ClassNotFoundException { + try { + read(new org.apache.thrift.protocol.TCompactProtocol(new org.apache.thrift.transport.TIOStreamTransport(in))); + } catch (org.apache.thrift.TException te) { + throw new java.io.IOException(te); + } + } + + private static class callback_resultStandardSchemeFactory implements SchemeFactory { + public callback_resultStandardScheme getScheme() { + return new callback_resultStandardScheme(); + } + } + + private static class callback_resultStandardScheme extends StandardScheme<callback_result> { + + public void read(org.apache.thrift.protocol.TProtocol iprot, callback_result struct) throws org.apache.thrift.TException { + org.apache.thrift.protocol.TField schemeField; + iprot.readStructBegin(); + while (true) + { + schemeField = iprot.readFieldBegin(); + if (schemeField.type == org.apache.thrift.protocol.TType.STOP) { + break; + } + switch (schemeField.id) { + default: + org.apache.thrift.protocol.TProtocolUtil.skip(iprot, schemeField.type); + } + iprot.readFieldEnd(); + } + iprot.readStructEnd(); + + // check for required fields of primitive type, which can't be checked in the validate method + struct.validate(); + } + + public void write(org.apache.thrift.protocol.TProtocol oprot, callback_result struct) throws org.apache.thrift.TException { + struct.validate(); + + oprot.writeStructBegin(STRUCT_DESC); + oprot.writeFieldStop(); + oprot.writeStructEnd(); + } + + } + + private static class callback_resultTupleSchemeFactory implements SchemeFactory { + public callback_resultTupleScheme getScheme() { + return new callback_resultTupleScheme(); + } + } + + private static class callback_resultTupleScheme extends TupleScheme<callback_result> { + + @Override + public void write(org.apache.thrift.protocol.TProtocol prot, callback_result struct) throws org.apache.thrift.TException { + TTupleProtocol oprot = (TTupleProtocol) prot; + } + + @Override + public void read(org.apache.thrift.protocol.TProtocol prot, callback_result struct) throws org.apache.thrift.TException { + TTupleProtocol iprot = (TTupleProtocol) prot; + } + } + + } + +} http://git-wip-us.apache.org/repos/asf/zeppelin/blob/1812928b/zeppelin-interpreter/src/main/thrift/RemoteInterpreterService.thrift ---------------------------------------------------------------------- diff --git a/zeppelin-interpreter/src/main/thrift/RemoteInterpreterService.thrift b/zeppelin-interpreter/src/main/thrift/RemoteInterpreterService.thrift index f2eb13f..f20fb90 100644 --- a/zeppelin-interpreter/src/main/thrift/RemoteInterpreterService.thrift +++ b/zeppelin-interpreter/src/main/thrift/RemoteInterpreterService.thrift @@ -88,6 +88,11 @@ struct InterpreterCompletion { 3: string meta } +struct CallbackInfo { + 1: string host, + 2: i32 port +} + service RemoteInterpreterService { void createInterpreter(1: string intpGroupId, 2: string sessionKey, 3: string className, 4: map<string, string> properties, 5: string userName); @@ -131,3 +136,7 @@ service RemoteInterpreterService { void onReceivedZeppelinResource(1: string object); } + +service RemoteInterpreterCallbackService { + void callback(1: CallbackInfo callbackInfo); +} http://git-wip-us.apache.org/repos/asf/zeppelin/blob/1812928b/zeppelin-interpreter/src/test/java/org/apache/zeppelin/interpreter/remote/RemoteInterpreterServerTest.java ---------------------------------------------------------------------- diff --git a/zeppelin-interpreter/src/test/java/org/apache/zeppelin/interpreter/remote/RemoteInterpreterServerTest.java b/zeppelin-interpreter/src/test/java/org/apache/zeppelin/interpreter/remote/RemoteInterpreterServerTest.java index a4b3a25..1a7c2a5 100644 --- a/zeppelin-interpreter/src/test/java/org/apache/zeppelin/interpreter/remote/RemoteInterpreterServerTest.java +++ b/zeppelin-interpreter/src/test/java/org/apache/zeppelin/interpreter/remote/RemoteInterpreterServerTest.java @@ -42,7 +42,7 @@ public class RemoteInterpreterServerTest { @Test public void testStartStop() throws InterruptedException, IOException, TException { - RemoteInterpreterServer server = new RemoteInterpreterServer( + RemoteInterpreterServer server = new RemoteInterpreterServer("localhost", RemoteInterpreterUtils.findRandomAvailablePortOnAllLocalInterfaces()); assertEquals(false, server.isRunning()); @@ -90,8 +90,8 @@ public class RemoteInterpreterServerTest { @Test public void testStartStopWithQueuedEvents() throws InterruptedException, IOException, TException { - RemoteInterpreterServer server = new RemoteInterpreterServer( - RemoteInterpreterUtils.findRandomAvailablePortOnAllLocalInterfaces()); + RemoteInterpreterServer server = new RemoteInterpreterServer("localhost", + RemoteInterpreterUtils.findRandomAvailablePortOnAllLocalInterfaces(), true); assertEquals(false, server.isRunning()); server.start(); http://git-wip-us.apache.org/repos/asf/zeppelin/blob/1812928b/zeppelin-interpreter/src/test/java/org/apache/zeppelin/interpreter/remote/RemoteInterpreterUtilsTest.java ---------------------------------------------------------------------- diff --git a/zeppelin-interpreter/src/test/java/org/apache/zeppelin/interpreter/remote/RemoteInterpreterUtilsTest.java b/zeppelin-interpreter/src/test/java/org/apache/zeppelin/interpreter/remote/RemoteInterpreterUtilsTest.java index 5f7426a..afbbcbd 100644 --- a/zeppelin-interpreter/src/test/java/org/apache/zeppelin/interpreter/remote/RemoteInterpreterUtilsTest.java +++ b/zeppelin-interpreter/src/test/java/org/apache/zeppelin/interpreter/remote/RemoteInterpreterUtilsTest.java @@ -28,6 +28,17 @@ public class RemoteInterpreterUtilsTest { @Test public void testFindRandomAvailablePortOnAllLocalInterfaces() throws IOException { assertTrue(RemoteInterpreterUtils.findRandomAvailablePortOnAllLocalInterfaces() > 0); + + String portRange = ":30000"; + assertTrue(RemoteInterpreterUtils.findRandomAvailablePortOnAllLocalInterfaces(portRange) <= 30000); + + portRange = "30000:"; + assertTrue(RemoteInterpreterUtils.findRandomAvailablePortOnAllLocalInterfaces(portRange) >= 30000); + + portRange = "30000:40000"; + int port = RemoteInterpreterUtils.findRandomAvailablePortOnAllLocalInterfaces(portRange); + assertTrue(port >= 30000 && port <= 40000); } + } http://git-wip-us.apache.org/repos/asf/zeppelin/blob/1812928b/zeppelin-zengine/src/main/java/org/apache/zeppelin/conf/ZeppelinConfiguration.java ---------------------------------------------------------------------- diff --git a/zeppelin-zengine/src/main/java/org/apache/zeppelin/conf/ZeppelinConfiguration.java b/zeppelin-zengine/src/main/java/org/apache/zeppelin/conf/ZeppelinConfiguration.java index 03cc069..ba90ed8 100644 --- a/zeppelin-zengine/src/main/java/org/apache/zeppelin/conf/ZeppelinConfiguration.java +++ b/zeppelin-zengine/src/main/java/org/apache/zeppelin/conf/ZeppelinConfiguration.java @@ -476,6 +476,10 @@ public class ZeppelinConfiguration extends XMLConfiguration { } } + public String getCallbackPortRange() { + return getString(ConfVars.ZEPPELIN_INTERPRETER_CALLBACK_PORTRANGE); + } + public boolean isWindowsPath(String path){ return path.matches("^[A-Za-z]:\\\\.*"); } @@ -684,7 +688,9 @@ public class ZeppelinConfiguration extends XMLConfiguration { ZEPPELIN_SERVER_X_XSS_PROTECTION("zeppelin.server.xxss.protection", "1"), ZEPPELIN_HDFS_KEYTAB("zeppelin.hdfs.keytab", ""), - ZEPPELIN_HDFS_PRINCIPAL("zeppelin.hdfs.principal", ""); + ZEPPELIN_HDFS_PRINCIPAL("zeppelin.hdfs.principal", ""), + + ZEPPELIN_INTERPRETER_CALLBACK_PORTRANGE("zeppelin.interpreter.callback.portRange", ":"); private String varName; @SuppressWarnings("rawtypes") http://git-wip-us.apache.org/repos/asf/zeppelin/blob/1812928b/zeppelin-zengine/src/main/java/org/apache/zeppelin/interpreter/InterpreterSetting.java ---------------------------------------------------------------------- diff --git a/zeppelin-zengine/src/main/java/org/apache/zeppelin/interpreter/InterpreterSetting.java b/zeppelin-zengine/src/main/java/org/apache/zeppelin/interpreter/InterpreterSetting.java index 9f4cfd4..79618a3 100644 --- a/zeppelin-zengine/src/main/java/org/apache/zeppelin/interpreter/InterpreterSetting.java +++ b/zeppelin-zengine/src/main/java/org/apache/zeppelin/interpreter/InterpreterSetting.java @@ -705,7 +705,8 @@ public class InterpreterSetting { // create new remote process remoteInterpreterProcess = new RemoteInterpreterManagedProcess( interpreterRunner != null ? interpreterRunner.getPath() : - conf.getInterpreterRemoteRunnerPath(), interpreterDir, localRepoPath, + conf.getInterpreterRemoteRunnerPath(), conf.getCallbackPortRange(), + interpreterDir, localRepoPath, getEnvFromInterpreterProperty(getJavaProperties()), connectTimeout, remoteInterpreterProcessListener, appEventListener, group); } http://git-wip-us.apache.org/repos/asf/zeppelin/blob/1812928b/zeppelin-zengine/src/main/java/org/apache/zeppelin/interpreter/remote/RemoteInterpreterManagedProcess.java ---------------------------------------------------------------------- diff --git a/zeppelin-zengine/src/main/java/org/apache/zeppelin/interpreter/remote/RemoteInterpreterManagedProcess.java b/zeppelin-zengine/src/main/java/org/apache/zeppelin/interpreter/remote/RemoteInterpreterManagedProcess.java index 19356fb..2d64831 100644 --- a/zeppelin-zengine/src/main/java/org/apache/zeppelin/interpreter/remote/RemoteInterpreterManagedProcess.java +++ b/zeppelin-zengine/src/main/java/org/apache/zeppelin/interpreter/remote/RemoteInterpreterManagedProcess.java @@ -17,10 +17,23 @@ package org.apache.zeppelin.interpreter.remote; -import org.apache.commons.exec.*; +import org.apache.commons.exec.CommandLine; +import org.apache.commons.exec.DefaultExecutor; +import org.apache.commons.exec.ExecuteException; +import org.apache.commons.exec.ExecuteResultHandler; +import org.apache.commons.exec.ExecuteWatchdog; +import org.apache.commons.exec.LogOutputStream; +import org.apache.commons.exec.PumpStreamHandler; import org.apache.commons.exec.environment.EnvironmentUtils; +import org.apache.thrift.TException; +import org.apache.thrift.server.TServer; +import org.apache.thrift.server.TThreadPoolServer; +import org.apache.thrift.transport.TServerSocket; +import org.apache.thrift.transport.TTransportException; import org.apache.zeppelin.helium.ApplicationEventListener; import org.apache.zeppelin.interpreter.InterpreterException; +import org.apache.zeppelin.interpreter.thrift.CallbackInfo; +import org.apache.zeppelin.interpreter.thrift.RemoteInterpreterCallbackService; import org.apache.zeppelin.interpreter.thrift.RemoteInterpreterService; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -29,6 +42,7 @@ import java.io.ByteArrayOutputStream; import java.io.IOException; import java.io.OutputStream; import java.util.Map; +import java.util.concurrent.atomic.AtomicBoolean; /** * This class manages start / stop of remote interpreter process @@ -37,11 +51,14 @@ public class RemoteInterpreterManagedProcess extends RemoteInterpreterProcess implements ExecuteResultHandler { private static final Logger logger = LoggerFactory.getLogger( RemoteInterpreterManagedProcess.class); - private final String interpreterRunner; + private final String interpreterRunner; + private final String portRange; private DefaultExecutor executor; private ExecuteWatchdog watchdog; - boolean running = false; + private AtomicBoolean running = new AtomicBoolean(false); + TServer callbackServer; + private String host = null; private int port = -1; private final String interpreterDir; private final String localRepoDir; @@ -51,6 +68,7 @@ public class RemoteInterpreterManagedProcess extends RemoteInterpreterProcess public RemoteInterpreterManagedProcess( String intpRunner, + String portRange, String intpDir, String localRepoDir, Map<String, String> env, @@ -61,6 +79,7 @@ public class RemoteInterpreterManagedProcess extends RemoteInterpreterProcess super(new RemoteInterpreterEventPoller(listener, appListener), connectTimeout); this.interpreterRunner = intpRunner; + this.portRange = portRange; this.env = env; this.interpreterDir = intpDir; this.localRepoDir = localRepoDir; @@ -77,6 +96,7 @@ public class RemoteInterpreterManagedProcess extends RemoteInterpreterProcess super(remoteInterpreterEventPoller, connectTimeout); this.interpreterRunner = intpRunner; + this.portRange = ":"; this.env = env; this.interpreterDir = intpDir; this.localRepoDir = localRepoDir; @@ -96,18 +116,69 @@ public class RemoteInterpreterManagedProcess extends RemoteInterpreterProcess @Override public void start(String userName, Boolean isUserImpersonate) { // start server process + final String callbackHost; + final int callbackPort; try { - port = RemoteInterpreterUtils.findRandomAvailablePortOnAllLocalInterfaces(); + port = RemoteInterpreterUtils.findRandomAvailablePortOnAllLocalInterfaces(portRange); logger.info("Choose port {} for RemoteInterpreterProcess", port); + callbackHost = RemoteInterpreterUtils.findAvailableHostAddress(); + callbackPort = RemoteInterpreterUtils.findRandomAvailablePortOnAllLocalInterfaces(); } catch (IOException e1) { throw new InterpreterException(e1); } + logger.info("Thrift server for callback will start. Port: {}", callbackPort); + try { + callbackServer = new TThreadPoolServer( + new TThreadPoolServer.Args(new TServerSocket(callbackPort)).processor( + new RemoteInterpreterCallbackService.Processor<>( + new RemoteInterpreterCallbackService.Iface() { + @Override + public void callback(CallbackInfo callbackInfo) throws TException { + logger.info("Registered: {}", callbackInfo); + host = callbackInfo.getHost(); + port = callbackInfo.getPort(); + running.set(true); + synchronized (running) { + running.notify(); + } + } + }))); + // Start thrift server to receive callbackInfo from RemoteInterpreterServer; + new Thread(new Runnable() { + @Override + public void run() { + callbackServer.serve(); + } + }).start(); + + Runtime.getRuntime().addShutdownHook(new Thread(new Runnable() { + @Override + public void run() { + if (callbackServer.isServing()) { + callbackServer.stop(); + } + } + })); + + while (!callbackServer.isServing()) { + logger.debug("callbackServer is not serving"); + Thread.sleep(500); + } + logger.debug("callbackServer is serving now"); + } catch (TTransportException e) { + logger.error("callback server error.", e); + } catch (InterruptedException e) { + logger.warn("", e); + } + CommandLine cmdLine = CommandLine.parse(interpreterRunner); cmdLine.addArgument("-d", false); cmdLine.addArgument(interpreterDir, false); + cmdLine.addArgument("-c", false); + cmdLine.addArgument(callbackHost, false); cmdLine.addArgument("-p", false); - cmdLine.addArgument(Integer.toString(port), false); + cmdLine.addArgument(Integer.toString(callbackPort), false); if (isUserImpersonate && !userName.equals("anonymous")) { cmdLine.addArgument("-u", false); cmdLine.addArgument(userName, false); @@ -133,45 +204,31 @@ public class RemoteInterpreterManagedProcess extends RemoteInterpreterProcess logger.info("Run interpreter process {}", cmdLine); executor.execute(cmdLine, procEnv, this); - running = true; } catch (IOException e) { - running = false; + running.set(false); throw new InterpreterException(e); } - - long startTime = System.currentTimeMillis(); - while (System.currentTimeMillis() - startTime < getConnectTimeout()) { - if (!running) { - try { - cmdOut.flush(); - } catch (IOException e) { - // nothing to do + try { + synchronized (running) { + if (!running.get()) { + running.wait(getConnectTimeout() * 2); } - throw new InterpreterException(new String(cmdOut.toByteArray())); } - - try { - if (RemoteInterpreterUtils.checkIfRemoteEndpointAccessible("localhost", port)) { - break; - } else { - try { - Thread.sleep(500); - } catch (InterruptedException e) { - logger.error("Exception in RemoteInterpreterProcess while synchronized reference " + - "Thread.sleep", e); - } - } - } catch (Exception e) { - if (logger.isDebugEnabled()) { - logger.debug("Remote interpreter not yet accessible at localhost:" + port); - } + if (!running.get()) { + callbackServer.stop(); + throw new InterpreterException("Cannot run interpreter"); } + } catch (InterruptedException e) { + logger.error("Remote interpreter is not accessible"); } processOutput.setOutputStream(null); } public void stop() { + if (callbackServer.isServing()) { + callbackServer.stop(); + } if (isRunning()) { logger.info("kill interpreter process"); try { @@ -190,25 +247,25 @@ public class RemoteInterpreterManagedProcess extends RemoteInterpreterProcess executor = null; watchdog = null; - running = false; + running.set(false); logger.info("Remote process terminated"); } @Override public void onProcessComplete(int exitValue) { logger.info("Interpreter process exited {}", exitValue); - running = false; + running.set(false); } @Override public void onProcessFailed(ExecuteException e) { logger.info("Interpreter process failed {}", e); - running = false; + running.set(false); } public boolean isRunning() { - return running; + return running.get(); } private static class ProcessLogOutputStream extends LogOutputStream { http://git-wip-us.apache.org/repos/asf/zeppelin/blob/1812928b/zeppelin-zengine/src/test/java/org/apache/zeppelin/notebook/NotebookTest.java ---------------------------------------------------------------------- diff --git a/zeppelin-zengine/src/test/java/org/apache/zeppelin/notebook/NotebookTest.java b/zeppelin-zengine/src/test/java/org/apache/zeppelin/notebook/NotebookTest.java index f4b8c32..f044fbd 100644 --- a/zeppelin-zengine/src/test/java/org/apache/zeppelin/notebook/NotebookTest.java +++ b/zeppelin-zengine/src/test/java/org/apache/zeppelin/notebook/NotebookTest.java @@ -357,16 +357,16 @@ public class NotebookTest extends AbstractInterpreterTest implements JobListener config.put("cron", "* * * * * ?"); note.setConfig(config); notebook.refreshCron(note.getId()); - Thread.sleep(1 * 1000); + Thread.sleep(2 * 1000); // remove cron scheduler. config.put("cron", null); note.setConfig(config); notebook.refreshCron(note.getId()); - Thread.sleep(1000); + Thread.sleep(2 * 1000); dateFinished = p.getDateFinished(); assertNotNull(dateFinished); - Thread.sleep(1 * 1000); + Thread.sleep(2 * 1000); assertEquals(dateFinished, p.getDateFinished()); notebook.removeNote(note.getId(), anonymous); }