Repository: zeppelin
Updated Branches:
  refs/heads/master b14f5ab95 -> 1812928bf


ZEPPELIN-2645. Adding way to register RemoteInterpreterServer's port into 
InterpreterProcess

### What is this PR for?

Rebase PR #2418 ,  still use thrift as the communication protocol between 
zeppelin server and interpreter process.  We can change it io netty in future 
when we implement 2 way communication channel between zeppelin server and 
interpreter process.

### What type of PR is it?
[Bug Fix | Improvement | Feature | Documentation | Hot Fix | Refactoring]

### Todos
* [ ] - Task

### What is the Jira issue?
* https://issues.apache.org/jira/browse/ZEPPELIN-2645

### Screenshots (if appropriate)

### Questions:
* Does the licenses files need update? No
* Is there breaking changes for older versions? NO
* Does this needs documentation? No

Author: Jongyoul Lee <jongy...@gmail.com>

Closes #2562 from zjffdu/ZEPPELIN-2645 and squashes the following commits:

82bd8d0 [Jongyoul Lee] ZEPPELIN-2645. Adding way to register 
RemoteInterpreterServer's port into InterpreterProcess


Project: http://git-wip-us.apache.org/repos/asf/zeppelin/repo
Commit: http://git-wip-us.apache.org/repos/asf/zeppelin/commit/1812928b
Tree: http://git-wip-us.apache.org/repos/asf/zeppelin/tree/1812928b
Diff: http://git-wip-us.apache.org/repos/asf/zeppelin/diff/1812928b

Branch: refs/heads/master
Commit: 1812928bfbecb0cb685fe3233e65ef9d8c84f73f
Parents: b14f5ab
Author: Jongyoul Lee <jongy...@gmail.com>
Authored: Wed Jun 14 19:43:49 2017 +0900
Committer: Jeff Zhang <zjf...@apache.org>
Committed: Thu Sep 7 06:29:55 2017 +0800

----------------------------------------------------------------------
 bin/interpreter.sh                              |  13 +-
 conf/zeppelin-site.xml.template                 |   6 +
 .../zeppelin/helium/ZeppelinDevServer.java      |   4 +-
 .../remote/RemoteInterpreterServer.java         | 148 +++-
 .../remote/RemoteInterpreterUtils.java          |  98 ++-
 .../interpreter/thrift/CallbackInfo.java        | 518 +++++++++++
 .../RemoteInterpreterCallbackService.java       | 879 +++++++++++++++++++
 .../main/thrift/RemoteInterpreterService.thrift |   9 +
 .../remote/RemoteInterpreterServerTest.java     |   6 +-
 .../remote/RemoteInterpreterUtilsTest.java      |  11 +
 .../zeppelin/conf/ZeppelinConfiguration.java    |   8 +-
 .../interpreter/InterpreterSetting.java         |   3 +-
 .../remote/RemoteInterpreterManagedProcess.java | 129 ++-
 .../apache/zeppelin/notebook/NotebookTest.java  |   6 +-
 14 files changed, 1754 insertions(+), 84 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/zeppelin/blob/1812928b/bin/interpreter.sh
----------------------------------------------------------------------
diff --git a/bin/interpreter.sh b/bin/interpreter.sh
index 1344e31..fd93a06 100755
--- a/bin/interpreter.sh
+++ b/bin/interpreter.sh
@@ -23,7 +23,7 @@ function usage() {
     echo "usage) $0 -p <port> -d <interpreter dir to load> -l <local 
interpreter repo dir to load> -g <interpreter group name>"
 }
 
-while getopts "hp:d:l:v:u:g:" o; do
+while getopts "hc:p:d:l:v:u:g:" o; do
     case ${o} in
         h)
             usage
@@ -32,8 +32,11 @@ while getopts "hp:d:l:v:u:g:" o; do
         d)
             INTERPRETER_DIR=${OPTARG}
             ;;
+        c)
+            CALLBACK_HOST=${OPTARG} # This will be used callback host
+            ;;
         p)
-            PORT=${OPTARG}
+            PORT=${OPTARG} # This will be used callback port
             ;;
         l)
             LOCAL_INTERPRETER_REPO=${OPTARG}
@@ -202,12 +205,12 @@ fi
 
 if [[ -n "${SPARK_SUBMIT}" ]]; then
     if [[ -n "$ZEPPELIN_IMPERSONATE_USER" ]] && [[ 
"$ZEPPELIN_IMPERSONATE_SPARK_PROXY_USER" != "false" ]];  then
-       INTERPRETER_RUN_COMMAND+=' '` echo ${SPARK_SUBMIT} --class 
${ZEPPELIN_SERVER} --driver-class-path 
\"${ZEPPELIN_INTP_CLASSPATH_OVERRIDES}:${ZEPPELIN_INTP_CLASSPATH}\" 
--driver-java-options \"${JAVA_INTP_OPTS}\" ${SPARK_SUBMIT_OPTIONS} 
${ZEPPELIN_SPARK_CONF} --proxy-user ${ZEPPELIN_IMPERSONATE_USER} 
${SPARK_APP_JAR} ${PORT}`
+       INTERPRETER_RUN_COMMAND+=' '` echo ${SPARK_SUBMIT} --class 
${ZEPPELIN_SERVER} --driver-class-path 
\"${ZEPPELIN_INTP_CLASSPATH_OVERRIDES}:${ZEPPELIN_INTP_CLASSPATH}\" 
--driver-java-options \"${JAVA_INTP_OPTS}\" ${SPARK_SUBMIT_OPTIONS} 
${ZEPPELIN_SPARK_CONF} --proxy-user ${ZEPPELIN_IMPERSONATE_USER} 
${SPARK_APP_JAR} ${CALLBACK_HOST} ${PORT}`
     else
-       INTERPRETER_RUN_COMMAND+=' '` echo ${SPARK_SUBMIT} --class 
${ZEPPELIN_SERVER} --driver-class-path 
\"${ZEPPELIN_INTP_CLASSPATH_OVERRIDES}:${ZEPPELIN_INTP_CLASSPATH}\" 
--driver-java-options \"${JAVA_INTP_OPTS}\" ${SPARK_SUBMIT_OPTIONS} 
${ZEPPELIN_SPARK_CONF} ${SPARK_APP_JAR} ${PORT}`
+       INTERPRETER_RUN_COMMAND+=' '` echo ${SPARK_SUBMIT} --class 
${ZEPPELIN_SERVER} --driver-class-path 
\"${ZEPPELIN_INTP_CLASSPATH_OVERRIDES}:${ZEPPELIN_INTP_CLASSPATH}\" 
--driver-java-options \"${JAVA_INTP_OPTS}\" ${SPARK_SUBMIT_OPTIONS} 
${ZEPPELIN_SPARK_CONF} ${SPARK_APP_JAR} ${CALLBACK_HOST} ${PORT}`
     fi
 else
-    INTERPRETER_RUN_COMMAND+=' '` echo ${ZEPPELIN_RUNNER} ${JAVA_INTP_OPTS} 
${ZEPPELIN_INTP_MEM} -cp 
${ZEPPELIN_INTP_CLASSPATH_OVERRIDES}:${ZEPPELIN_INTP_CLASSPATH} 
${ZEPPELIN_SERVER} ${PORT} `
+    INTERPRETER_RUN_COMMAND+=' '` echo ${ZEPPELIN_RUNNER} ${JAVA_INTP_OPTS} 
${ZEPPELIN_INTP_MEM} -cp 
${ZEPPELIN_INTP_CLASSPATH_OVERRIDES}:${ZEPPELIN_INTP_CLASSPATH} 
${ZEPPELIN_SERVER} ${CALLBACK_HOST} ${PORT} `
 fi
 
 if [[ ! -z "$ZEPPELIN_IMPERSONATE_USER" ]] && [[ -n "${suid}" || -z 
"${SPARK_SUBMIT}" ]]; then

http://git-wip-us.apache.org/repos/asf/zeppelin/blob/1812928b/conf/zeppelin-site.xml.template
----------------------------------------------------------------------
diff --git a/conf/zeppelin-site.xml.template b/conf/zeppelin-site.xml.template
index 3ec6e27..ce3ffaa 100755
--- a/conf/zeppelin-site.xml.template
+++ b/conf/zeppelin-site.xml.template
@@ -430,4 +430,10 @@
   <description>The HTTP X-XSS-Protection response header is a feature of 
Internet Explorer, Chrome and Safari that stops pages from loading when they 
detect reflected cross-site scripting (XSS) attacks. When value is set to 1 and 
a cross-site scripting attack is detected, the browser will sanitize the page 
(remove the unsafe parts).</description>
 </property>
 -->
+<!--
+<property>
+  <name>zeppelin.interpreter.callback.portRange</name>
+  <value>10000:10010</value>
+</property>
+-->
 </configuration>

http://git-wip-us.apache.org/repos/asf/zeppelin/blob/1812928b/helium-dev/src/main/java/org/apache/zeppelin/helium/ZeppelinDevServer.java
----------------------------------------------------------------------
diff --git 
a/helium-dev/src/main/java/org/apache/zeppelin/helium/ZeppelinDevServer.java 
b/helium-dev/src/main/java/org/apache/zeppelin/helium/ZeppelinDevServer.java
index 2484469..3a5199d 100644
--- a/helium-dev/src/main/java/org/apache/zeppelin/helium/ZeppelinDevServer.java
+++ b/helium-dev/src/main/java/org/apache/zeppelin/helium/ZeppelinDevServer.java
@@ -38,8 +38,8 @@ public class ZeppelinDevServer extends
 
   private DevInterpreter interpreter = null;
   private InterpreterOutput out;
-  public ZeppelinDevServer(int port) throws TException {
-    super(port);
+  public ZeppelinDevServer(int port) throws TException, IOException {
+    super(null, port);
   }
 
   @Override

http://git-wip-us.apache.org/repos/asf/zeppelin/blob/1812928b/zeppelin-interpreter/src/main/java/org/apache/zeppelin/interpreter/remote/RemoteInterpreterServer.java
----------------------------------------------------------------------
diff --git 
a/zeppelin-interpreter/src/main/java/org/apache/zeppelin/interpreter/remote/RemoteInterpreterServer.java
 
b/zeppelin-interpreter/src/main/java/org/apache/zeppelin/interpreter/remote/RemoteInterpreterServer.java
index 6925360..7f476e8 100644
--- 
a/zeppelin-interpreter/src/main/java/org/apache/zeppelin/interpreter/remote/RemoteInterpreterServer.java
+++ 
b/zeppelin-interpreter/src/main/java/org/apache/zeppelin/interpreter/remote/RemoteInterpreterServer.java
@@ -17,29 +17,55 @@
 
 package org.apache.zeppelin.interpreter.remote;
 
-import java.io.IOException;
-import java.lang.reflect.Constructor;
-import java.lang.reflect.InvocationTargetException;
-import java.lang.reflect.Method;
-import java.net.URL;
-import java.nio.ByteBuffer;
-import java.util.*;
-import java.util.concurrent.ConcurrentHashMap;
-import java.util.concurrent.ConcurrentMap;
-
+import com.google.gson.Gson;
+import com.google.gson.reflect.TypeToken;
 import org.apache.thrift.TException;
 import org.apache.thrift.server.TThreadPoolServer;
 import org.apache.thrift.transport.TServerSocket;
 import org.apache.thrift.transport.TTransportException;
 import org.apache.zeppelin.dep.DependencyResolver;
-import org.apache.zeppelin.display.*;
-import org.apache.zeppelin.helium.*;
-import org.apache.zeppelin.interpreter.*;
-import org.apache.zeppelin.interpreter.InterpreterHookRegistry.HookType;
+import org.apache.zeppelin.display.AngularObject;
+import org.apache.zeppelin.display.AngularObjectRegistry;
+import org.apache.zeppelin.display.AngularObjectRegistryListener;
+import org.apache.zeppelin.display.GUI;
+import org.apache.zeppelin.helium.Application;
+import org.apache.zeppelin.helium.ApplicationContext;
+import org.apache.zeppelin.helium.ApplicationException;
+import org.apache.zeppelin.helium.ApplicationLoader;
+import org.apache.zeppelin.helium.HeliumAppAngularObjectRegistry;
+import org.apache.zeppelin.helium.HeliumPackage;
+import org.apache.zeppelin.interpreter.Constants;
+import org.apache.zeppelin.interpreter.Interpreter;
+import org.apache.zeppelin.interpreter.InterpreterContext;
+import org.apache.zeppelin.interpreter.InterpreterContextRunner;
+import org.apache.zeppelin.interpreter.InterpreterException;
+import org.apache.zeppelin.interpreter.InterpreterGroup;
 import org.apache.zeppelin.interpreter.InterpreterHookListener;
+import org.apache.zeppelin.interpreter.InterpreterHookRegistry;
+import org.apache.zeppelin.interpreter.InterpreterHookRegistry.HookType;
+import org.apache.zeppelin.interpreter.InterpreterOutput;
+import org.apache.zeppelin.interpreter.InterpreterOutputListener;
+import org.apache.zeppelin.interpreter.InterpreterResult;
 import org.apache.zeppelin.interpreter.InterpreterResult.Code;
-import org.apache.zeppelin.interpreter.thrift.*;
-import org.apache.zeppelin.resource.*;
+import org.apache.zeppelin.interpreter.InterpreterResultMessage;
+import org.apache.zeppelin.interpreter.InterpreterResultMessageOutput;
+import org.apache.zeppelin.interpreter.LazyOpenInterpreter;
+import org.apache.zeppelin.interpreter.RemoteWorksController;
+import org.apache.zeppelin.interpreter.RemoteZeppelinServerResource;
+import org.apache.zeppelin.interpreter.thrift.CallbackInfo;
+import org.apache.zeppelin.interpreter.thrift.InterpreterCompletion;
+import org.apache.zeppelin.interpreter.thrift.RemoteApplicationResult;
+import org.apache.zeppelin.interpreter.thrift.RemoteInterpreterContext;
+import org.apache.zeppelin.interpreter.thrift.RemoteInterpreterEvent;
+import org.apache.zeppelin.interpreter.thrift.RemoteInterpreterResult;
+import org.apache.zeppelin.interpreter.thrift.RemoteInterpreterResultMessage;
+import org.apache.zeppelin.interpreter.thrift.RemoteInterpreterService;
+import 
org.apache.zeppelin.interpreter.thrift.ZeppelinServerResourceParagraphRunner;
+import org.apache.zeppelin.resource.DistributedResourcePool;
+import org.apache.zeppelin.resource.Resource;
+import org.apache.zeppelin.resource.ResourcePool;
+import org.apache.zeppelin.resource.ResourceSet;
+import org.apache.zeppelin.resource.WellKnownResourceName;
 import org.apache.zeppelin.scheduler.Job;
 import org.apache.zeppelin.scheduler.Job.Status;
 import org.apache.zeppelin.scheduler.JobListener;
@@ -49,8 +75,22 @@ import org.apache.zeppelin.user.AuthenticationInfo;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
-import com.google.gson.Gson;
-import com.google.gson.reflect.TypeToken;
+import java.io.IOException;
+import java.lang.reflect.Constructor;
+import java.lang.reflect.InvocationTargetException;
+import java.lang.reflect.Method;
+import java.net.URL;
+import java.nio.ByteBuffer;
+import java.util.Arrays;
+import java.util.Collections;
+import java.util.HashMap;
+import java.util.Iterator;
+import java.util.LinkedList;
+import java.util.List;
+import java.util.Map;
+import java.util.Properties;
+import java.util.concurrent.ConcurrentHashMap;
+import java.util.concurrent.ConcurrentMap;
 
 /**
  * Entry point for Interpreter process.
@@ -70,6 +110,9 @@ public class RemoteInterpreterServer
   Gson gson = new Gson();
 
   RemoteInterpreterService.Processor<RemoteInterpreterServer> processor;
+  private String callbackHost;
+  private int callbackPort;
+  private String host;
   private int port;
   private TThreadPoolServer server;
 
@@ -87,11 +130,34 @@ public class RemoteInterpreterServer
   // Hold information for manual progress update
   private ConcurrentMap<String, Integer> progressMap = new 
ConcurrentHashMap<>();
 
-  public RemoteInterpreterServer(int port) throws TTransportException {
-    this.port = port;
+  private boolean isTest;
+
+  public RemoteInterpreterServer(String callbackHost, int port) throws 
IOException,
+      TTransportException {
+    this(callbackHost, port, false);
+  }
+
+  public RemoteInterpreterServer(String callbackHost, int port, boolean isTest)
+      throws TTransportException, IOException {
+    if (null != callbackHost) {
+      this.callbackHost = callbackHost;
+      this.callbackPort = port;
+    } else {
+      // DevInterpreter
+      this.port = port;
+    }
+    this.isTest = isTest;
 
     processor = new RemoteInterpreterService.Processor<>(this);
-    TServerSocket serverTransport = new TServerSocket(port);
+    TServerSocket serverTransport;
+    if (null == callbackHost) {
+      // Dev Interpreter
+      serverTransport = new TServerSocket(port);
+    } else {
+      this.port = 
RemoteInterpreterUtils.findRandomAvailablePortOnAllLocalInterfaces();
+      this.host = RemoteInterpreterUtils.findAvailableHostAddress();
+      serverTransport = new TServerSocket(this.port);
+    }
     server = new TThreadPoolServer(
         new TThreadPoolServer.Args(serverTransport).processor(processor));
     remoteWorksResponsePool = Collections.synchronizedMap(new HashMap<String, 
Object>());
@@ -100,6 +166,36 @@ public class RemoteInterpreterServer
 
   @Override
   public void run() {
+    if (null != callbackHost && !isTest) {
+      new Thread(new Runnable() {
+        boolean interrupted = false;
+        @Override
+        public void run() {
+          while (!interrupted && !server.isServing()) {
+            try {
+              Thread.sleep(1000);
+            } catch (InterruptedException e) {
+              interrupted = true;
+            }
+          }
+
+          if (!interrupted) {
+            CallbackInfo callbackInfo = new CallbackInfo(host, port);
+            try {
+              RemoteInterpreterUtils
+                  .registerInterpreter(callbackHost, callbackPort, 
callbackInfo);
+            } catch (TException e) {
+              logger.error("Error while registering interpreter: {}", 
callbackInfo, e);
+              try {
+                shutdown();
+              } catch (TException e1) {
+                logger.warn("Exception occurs while shutting down", e1);
+              }
+            }
+          }
+        }
+      }).start();
+    }
     logger.info("Starting remote interpreter server on port {}", port);
     server.serve();
   }
@@ -151,13 +247,15 @@ public class RemoteInterpreterServer
 
 
   public static void main(String[] args)
-      throws TTransportException, InterruptedException {
-
+      throws TTransportException, InterruptedException, IOException {
+    String callbackHost = null;
     int port = Constants.ZEPPELIN_INTERPRETER_DEFAUlT_PORT;
     if (args.length > 0) {
-      port = Integer.parseInt(args[0]);
+      callbackHost = args[0];
+      port = Integer.parseInt(args[1]);
     }
-    RemoteInterpreterServer remoteInterpreterServer = new 
RemoteInterpreterServer(port);
+    RemoteInterpreterServer remoteInterpreterServer =
+        new RemoteInterpreterServer(callbackHost, port);
     remoteInterpreterServer.start();
     remoteInterpreterServer.join();
     System.exit(0);

http://git-wip-us.apache.org/repos/asf/zeppelin/blob/1812928b/zeppelin-interpreter/src/main/java/org/apache/zeppelin/interpreter/remote/RemoteInterpreterUtils.java
----------------------------------------------------------------------
diff --git 
a/zeppelin-interpreter/src/main/java/org/apache/zeppelin/interpreter/remote/RemoteInterpreterUtils.java
 
b/zeppelin-interpreter/src/main/java/org/apache/zeppelin/interpreter/remote/RemoteInterpreterUtils.java
index 4ee6690..835199a 100644
--- 
a/zeppelin-interpreter/src/main/java/org/apache/zeppelin/interpreter/remote/RemoteInterpreterUtils.java
+++ 
b/zeppelin-interpreter/src/main/java/org/apache/zeppelin/interpreter/remote/RemoteInterpreterUtils.java
@@ -17,27 +17,96 @@
 
 package org.apache.zeppelin.interpreter.remote;
 
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-
 import java.io.IOException;
 import java.net.ConnectException;
+import java.net.Inet4Address;
+import java.net.InetAddress;
 import java.net.InetSocketAddress;
+import java.net.InterfaceAddress;
+import java.net.NetworkInterface;
 import java.net.ServerSocket;
 import java.net.Socket;
+import java.net.SocketException;
+import java.net.UnknownHostException;
+import java.util.Collections;
+import org.apache.thrift.TException;
+import org.apache.thrift.protocol.TBinaryProtocol;
+import org.apache.thrift.protocol.TProtocol;
+import org.apache.thrift.transport.TSocket;
+import org.apache.thrift.transport.TTransport;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import org.apache.zeppelin.interpreter.thrift.CallbackInfo;
+import org.apache.zeppelin.interpreter.thrift.RemoteInterpreterCallbackService;
 
 /**
  *
  */
 public class RemoteInterpreterUtils {
   static Logger LOGGER = LoggerFactory.getLogger(RemoteInterpreterUtils.class);
+
+
   public static int findRandomAvailablePortOnAllLocalInterfaces() throws 
IOException {
-    int port;
-    try (ServerSocket socket = new ServerSocket(0);) {
-      port = socket.getLocalPort();
-      socket.close();
+    return findRandomAvailablePortOnAllLocalInterfaces(":");
+  }
+
+  /**
+   * start:end
+   *
+   * @param portRange
+   * @return
+   * @throws IOException
+   */
+  public static int findRandomAvailablePortOnAllLocalInterfaces(String 
portRange)
+      throws IOException {
+
+    // ':' is the default value which means no constraints on the portRange
+    if (portRange == null || portRange.equals(":")) {
+      int port;
+      try (ServerSocket socket = new ServerSocket(0);) {
+        port = socket.getLocalPort();
+        socket.close();
+      }
+      return port;
     }
-    return port;
+    // valid user registered port https://en.wikipedia.org/wiki/Registered_port
+    int start = 1024;
+    int end = 49151;
+    String[] ports = portRange.split(":", -1);
+    if (!ports[0].isEmpty()) {
+      start = Integer.parseInt(ports[0]);
+    }
+    if (!ports[1].isEmpty()) {
+      end = Integer.parseInt(ports[1]);
+    }
+    for (int i = start; i <= end; ++i) {
+      try {
+        ServerSocket socket = new ServerSocket(i);
+        return socket.getLocalPort();
+      } catch (Exception e) {
+        // ignore this
+      }
+    }
+    throw new IOException("No available port in the portRange: " + portRange);
+  }
+
+  public static String findAvailableHostAddress() throws UnknownHostException, 
SocketException {
+    InetAddress address = InetAddress.getLocalHost();
+    if (address.isLoopbackAddress()) {
+      for (NetworkInterface networkInterface : Collections
+          .list(NetworkInterface.getNetworkInterfaces())) {
+        if (!networkInterface.isLoopback()) {
+          for (InterfaceAddress interfaceAddress : 
networkInterface.getInterfaceAddresses()) {
+            InetAddress a = interfaceAddress.getAddress();
+            if (a instanceof Inet4Address) {
+              return a.getHostAddress();
+            }
+          }
+        }
+      }
+    }
+    return address.getHostAddress();
   }
 
   public static boolean checkIfRemoteEndpointAccessible(String host, int port) 
{
@@ -80,4 +149,17 @@ public class RemoteInterpreterUtils {
 
     return key.matches("^[A-Z_0-9]*");
   }
+
+  public static void registerInterpreter(String callbackHost, int callbackPort,
+      final CallbackInfo callbackInfo) throws TException {
+    LOGGER.info("callbackHost: {}, callbackPort: {}, callbackInfo: {}", 
callbackHost, callbackPort,
+        callbackInfo);
+    try (TTransport transport = new TSocket(callbackHost, callbackPort)) {
+      transport.open();
+      TProtocol protocol = new TBinaryProtocol(transport);
+      RemoteInterpreterCallbackService.Client client = new 
RemoteInterpreterCallbackService.Client(
+          protocol);
+      client.callback(callbackInfo);
+    }
+  }
 }

http://git-wip-us.apache.org/repos/asf/zeppelin/blob/1812928b/zeppelin-interpreter/src/main/java/org/apache/zeppelin/interpreter/thrift/CallbackInfo.java
----------------------------------------------------------------------
diff --git 
a/zeppelin-interpreter/src/main/java/org/apache/zeppelin/interpreter/thrift/CallbackInfo.java
 
b/zeppelin-interpreter/src/main/java/org/apache/zeppelin/interpreter/thrift/CallbackInfo.java
new file mode 100644
index 0000000..b0c7e9a
--- /dev/null
+++ 
b/zeppelin-interpreter/src/main/java/org/apache/zeppelin/interpreter/thrift/CallbackInfo.java
@@ -0,0 +1,518 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+/**
+ * Autogenerated by Thrift Compiler (0.9.2)
+ *
+ * DO NOT EDIT UNLESS YOU ARE SURE THAT YOU KNOW WHAT YOU ARE DOING
+ *  @generated
+ */
+package org.apache.zeppelin.interpreter.thrift;
+
+import org.apache.thrift.scheme.IScheme;
+import org.apache.thrift.scheme.SchemeFactory;
+import org.apache.thrift.scheme.StandardScheme;
+
+import org.apache.thrift.scheme.TupleScheme;
+import org.apache.thrift.protocol.TTupleProtocol;
+import org.apache.thrift.protocol.TProtocolException;
+import org.apache.thrift.EncodingUtils;
+import org.apache.thrift.TException;
+import org.apache.thrift.async.AsyncMethodCallback;
+import org.apache.thrift.server.AbstractNonblockingServer.*;
+import java.util.List;
+import java.util.ArrayList;
+import java.util.Map;
+import java.util.HashMap;
+import java.util.EnumMap;
+import java.util.Set;
+import java.util.HashSet;
+import java.util.EnumSet;
+import java.util.Collections;
+import java.util.BitSet;
+import java.nio.ByteBuffer;
+import java.util.Arrays;
+import javax.annotation.Generated;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+@SuppressWarnings({"cast", "rawtypes", "serial", "unchecked"})
+@Generated(value = "Autogenerated by Thrift Compiler (0.9.2)", date = 
"2017-6-17")
+public class CallbackInfo implements org.apache.thrift.TBase<CallbackInfo, 
CallbackInfo._Fields>, java.io.Serializable, Cloneable, 
Comparable<CallbackInfo> {
+  private static final org.apache.thrift.protocol.TStruct STRUCT_DESC = new 
org.apache.thrift.protocol.TStruct("CallbackInfo");
+
+  private static final org.apache.thrift.protocol.TField HOST_FIELD_DESC = new 
org.apache.thrift.protocol.TField("host", 
org.apache.thrift.protocol.TType.STRING, (short)1);
+  private static final org.apache.thrift.protocol.TField PORT_FIELD_DESC = new 
org.apache.thrift.protocol.TField("port", org.apache.thrift.protocol.TType.I32, 
(short)2);
+
+  private static final Map<Class<? extends IScheme>, SchemeFactory> schemes = 
new HashMap<Class<? extends IScheme>, SchemeFactory>();
+  static {
+    schemes.put(StandardScheme.class, new CallbackInfoStandardSchemeFactory());
+    schemes.put(TupleScheme.class, new CallbackInfoTupleSchemeFactory());
+  }
+
+  public String host; // required
+  public int port; // required
+
+  /** The set of fields this struct contains, along with convenience methods 
for finding and manipulating them. */
+  public enum _Fields implements org.apache.thrift.TFieldIdEnum {
+    HOST((short)1, "host"),
+    PORT((short)2, "port");
+
+    private static final Map<String, _Fields> byName = new HashMap<String, 
_Fields>();
+
+    static {
+      for (_Fields field : EnumSet.allOf(_Fields.class)) {
+        byName.put(field.getFieldName(), field);
+      }
+    }
+
+    /**
+     * Find the _Fields constant that matches fieldId, or null if its not 
found.
+     */
+    public static _Fields findByThriftId(int fieldId) {
+      switch(fieldId) {
+        case 1: // HOST
+          return HOST;
+        case 2: // PORT
+          return PORT;
+        default:
+          return null;
+      }
+    }
+
+    /**
+     * Find the _Fields constant that matches fieldId, throwing an exception
+     * if it is not found.
+     */
+    public static _Fields findByThriftIdOrThrow(int fieldId) {
+      _Fields fields = findByThriftId(fieldId);
+      if (fields == null) throw new IllegalArgumentException("Field " + 
fieldId + " doesn't exist!");
+      return fields;
+    }
+
+    /**
+     * Find the _Fields constant that matches name, or null if its not found.
+     */
+    public static _Fields findByName(String name) {
+      return byName.get(name);
+    }
+
+    private final short _thriftId;
+    private final String _fieldName;
+
+    _Fields(short thriftId, String fieldName) {
+      _thriftId = thriftId;
+      _fieldName = fieldName;
+    }
+
+    public short getThriftFieldId() {
+      return _thriftId;
+    }
+
+    public String getFieldName() {
+      return _fieldName;
+    }
+  }
+
+  // isset id assignments
+  private static final int __PORT_ISSET_ID = 0;
+  private byte __isset_bitfield = 0;
+  public static final Map<_Fields, org.apache.thrift.meta_data.FieldMetaData> 
metaDataMap;
+  static {
+    Map<_Fields, org.apache.thrift.meta_data.FieldMetaData> tmpMap = new 
EnumMap<_Fields, org.apache.thrift.meta_data.FieldMetaData>(_Fields.class);
+    tmpMap.put(_Fields.HOST, new 
org.apache.thrift.meta_data.FieldMetaData("host", 
org.apache.thrift.TFieldRequirementType.DEFAULT, 
+        new 
org.apache.thrift.meta_data.FieldValueMetaData(org.apache.thrift.protocol.TType.STRING)));
+    tmpMap.put(_Fields.PORT, new 
org.apache.thrift.meta_data.FieldMetaData("port", 
org.apache.thrift.TFieldRequirementType.DEFAULT, 
+        new 
org.apache.thrift.meta_data.FieldValueMetaData(org.apache.thrift.protocol.TType.I32)));
+    metaDataMap = Collections.unmodifiableMap(tmpMap);
+    
org.apache.thrift.meta_data.FieldMetaData.addStructMetaDataMap(CallbackInfo.class,
 metaDataMap);
+  }
+
+  public CallbackInfo() {
+  }
+
+  public CallbackInfo(
+    String host,
+    int port)
+  {
+    this();
+    this.host = host;
+    this.port = port;
+    setPortIsSet(true);
+  }
+
+  /**
+   * Performs a deep copy on <i>other</i>.
+   */
+  public CallbackInfo(CallbackInfo other) {
+    __isset_bitfield = other.__isset_bitfield;
+    if (other.isSetHost()) {
+      this.host = other.host;
+    }
+    this.port = other.port;
+  }
+
+  public CallbackInfo deepCopy() {
+    return new CallbackInfo(this);
+  }
+
+  @Override
+  public void clear() {
+    this.host = null;
+    setPortIsSet(false);
+    this.port = 0;
+  }
+
+  public String getHost() {
+    return this.host;
+  }
+
+  public CallbackInfo setHost(String host) {
+    this.host = host;
+    return this;
+  }
+
+  public void unsetHost() {
+    this.host = null;
+  }
+
+  /** Returns true if field host is set (has been assigned a value) and false 
otherwise */
+  public boolean isSetHost() {
+    return this.host != null;
+  }
+
+  public void setHostIsSet(boolean value) {
+    if (!value) {
+      this.host = null;
+    }
+  }
+
+  public int getPort() {
+    return this.port;
+  }
+
+  public CallbackInfo setPort(int port) {
+    this.port = port;
+    setPortIsSet(true);
+    return this;
+  }
+
+  public void unsetPort() {
+    __isset_bitfield = EncodingUtils.clearBit(__isset_bitfield, 
__PORT_ISSET_ID);
+  }
+
+  /** Returns true if field port is set (has been assigned a value) and false 
otherwise */
+  public boolean isSetPort() {
+    return EncodingUtils.testBit(__isset_bitfield, __PORT_ISSET_ID);
+  }
+
+  public void setPortIsSet(boolean value) {
+    __isset_bitfield = EncodingUtils.setBit(__isset_bitfield, __PORT_ISSET_ID, 
value);
+  }
+
+  public void setFieldValue(_Fields field, Object value) {
+    switch (field) {
+    case HOST:
+      if (value == null) {
+        unsetHost();
+      } else {
+        setHost((String)value);
+      }
+      break;
+
+    case PORT:
+      if (value == null) {
+        unsetPort();
+      } else {
+        setPort((Integer)value);
+      }
+      break;
+
+    }
+  }
+
+  public Object getFieldValue(_Fields field) {
+    switch (field) {
+    case HOST:
+      return getHost();
+
+    case PORT:
+      return Integer.valueOf(getPort());
+
+    }
+    throw new IllegalStateException();
+  }
+
+  /** Returns true if field corresponding to fieldID is set (has been assigned 
a value) and false otherwise */
+  public boolean isSet(_Fields field) {
+    if (field == null) {
+      throw new IllegalArgumentException();
+    }
+
+    switch (field) {
+    case HOST:
+      return isSetHost();
+    case PORT:
+      return isSetPort();
+    }
+    throw new IllegalStateException();
+  }
+
+  @Override
+  public boolean equals(Object that) {
+    if (that == null)
+      return false;
+    if (that instanceof CallbackInfo)
+      return this.equals((CallbackInfo)that);
+    return false;
+  }
+
+  public boolean equals(CallbackInfo that) {
+    if (that == null)
+      return false;
+
+    boolean this_present_host = true && this.isSetHost();
+    boolean that_present_host = true && that.isSetHost();
+    if (this_present_host || that_present_host) {
+      if (!(this_present_host && that_present_host))
+        return false;
+      if (!this.host.equals(that.host))
+        return false;
+    }
+
+    boolean this_present_port = true;
+    boolean that_present_port = true;
+    if (this_present_port || that_present_port) {
+      if (!(this_present_port && that_present_port))
+        return false;
+      if (this.port != that.port)
+        return false;
+    }
+
+    return true;
+  }
+
+  @Override
+  public int hashCode() {
+    List<Object> list = new ArrayList<Object>();
+
+    boolean present_host = true && (isSetHost());
+    list.add(present_host);
+    if (present_host)
+      list.add(host);
+
+    boolean present_port = true;
+    list.add(present_port);
+    if (present_port)
+      list.add(port);
+
+    return list.hashCode();
+  }
+
+  @Override
+  public int compareTo(CallbackInfo other) {
+    if (!getClass().equals(other.getClass())) {
+      return getClass().getName().compareTo(other.getClass().getName());
+    }
+
+    int lastComparison = 0;
+
+    lastComparison = Boolean.valueOf(isSetHost()).compareTo(other.isSetHost());
+    if (lastComparison != 0) {
+      return lastComparison;
+    }
+    if (isSetHost()) {
+      lastComparison = org.apache.thrift.TBaseHelper.compareTo(this.host, 
other.host);
+      if (lastComparison != 0) {
+        return lastComparison;
+      }
+    }
+    lastComparison = Boolean.valueOf(isSetPort()).compareTo(other.isSetPort());
+    if (lastComparison != 0) {
+      return lastComparison;
+    }
+    if (isSetPort()) {
+      lastComparison = org.apache.thrift.TBaseHelper.compareTo(this.port, 
other.port);
+      if (lastComparison != 0) {
+        return lastComparison;
+      }
+    }
+    return 0;
+  }
+
+  public _Fields fieldForId(int fieldId) {
+    return _Fields.findByThriftId(fieldId);
+  }
+
+  public void read(org.apache.thrift.protocol.TProtocol iprot) throws 
org.apache.thrift.TException {
+    schemes.get(iprot.getScheme()).getScheme().read(iprot, this);
+  }
+
+  public void write(org.apache.thrift.protocol.TProtocol oprot) throws 
org.apache.thrift.TException {
+    schemes.get(oprot.getScheme()).getScheme().write(oprot, this);
+  }
+
+  @Override
+  public String toString() {
+    StringBuilder sb = new StringBuilder("CallbackInfo(");
+    boolean first = true;
+
+    sb.append("host:");
+    if (this.host == null) {
+      sb.append("null");
+    } else {
+      sb.append(this.host);
+    }
+    first = false;
+    if (!first) sb.append(", ");
+    sb.append("port:");
+    sb.append(this.port);
+    first = false;
+    sb.append(")");
+    return sb.toString();
+  }
+
+  public void validate() throws org.apache.thrift.TException {
+    // check for required fields
+    // check for sub-struct validity
+  }
+
+  private void writeObject(java.io.ObjectOutputStream out) throws 
java.io.IOException {
+    try {
+      write(new org.apache.thrift.protocol.TCompactProtocol(new 
org.apache.thrift.transport.TIOStreamTransport(out)));
+    } catch (org.apache.thrift.TException te) {
+      throw new java.io.IOException(te);
+    }
+  }
+
+  private void readObject(java.io.ObjectInputStream in) throws 
java.io.IOException, ClassNotFoundException {
+    try {
+      // it doesn't seem like you should have to do this, but java 
serialization is wacky, and doesn't call the default constructor.
+      __isset_bitfield = 0;
+      read(new org.apache.thrift.protocol.TCompactProtocol(new 
org.apache.thrift.transport.TIOStreamTransport(in)));
+    } catch (org.apache.thrift.TException te) {
+      throw new java.io.IOException(te);
+    }
+  }
+
+  private static class CallbackInfoStandardSchemeFactory implements 
SchemeFactory {
+    public CallbackInfoStandardScheme getScheme() {
+      return new CallbackInfoStandardScheme();
+    }
+  }
+
+  private static class CallbackInfoStandardScheme extends 
StandardScheme<CallbackInfo> {
+
+    public void read(org.apache.thrift.protocol.TProtocol iprot, CallbackInfo 
struct) throws org.apache.thrift.TException {
+      org.apache.thrift.protocol.TField schemeField;
+      iprot.readStructBegin();
+      while (true)
+      {
+        schemeField = iprot.readFieldBegin();
+        if (schemeField.type == org.apache.thrift.protocol.TType.STOP) { 
+          break;
+        }
+        switch (schemeField.id) {
+          case 1: // HOST
+            if (schemeField.type == org.apache.thrift.protocol.TType.STRING) {
+              struct.host = iprot.readString();
+              struct.setHostIsSet(true);
+            } else { 
+              org.apache.thrift.protocol.TProtocolUtil.skip(iprot, 
schemeField.type);
+            }
+            break;
+          case 2: // PORT
+            if (schemeField.type == org.apache.thrift.protocol.TType.I32) {
+              struct.port = iprot.readI32();
+              struct.setPortIsSet(true);
+            } else { 
+              org.apache.thrift.protocol.TProtocolUtil.skip(iprot, 
schemeField.type);
+            }
+            break;
+          default:
+            org.apache.thrift.protocol.TProtocolUtil.skip(iprot, 
schemeField.type);
+        }
+        iprot.readFieldEnd();
+      }
+      iprot.readStructEnd();
+
+      // check for required fields of primitive type, which can't be checked 
in the validate method
+      struct.validate();
+    }
+
+    public void write(org.apache.thrift.protocol.TProtocol oprot, CallbackInfo 
struct) throws org.apache.thrift.TException {
+      struct.validate();
+
+      oprot.writeStructBegin(STRUCT_DESC);
+      if (struct.host != null) {
+        oprot.writeFieldBegin(HOST_FIELD_DESC);
+        oprot.writeString(struct.host);
+        oprot.writeFieldEnd();
+      }
+      oprot.writeFieldBegin(PORT_FIELD_DESC);
+      oprot.writeI32(struct.port);
+      oprot.writeFieldEnd();
+      oprot.writeFieldStop();
+      oprot.writeStructEnd();
+    }
+
+  }
+
+  private static class CallbackInfoTupleSchemeFactory implements SchemeFactory 
{
+    public CallbackInfoTupleScheme getScheme() {
+      return new CallbackInfoTupleScheme();
+    }
+  }
+
+  private static class CallbackInfoTupleScheme extends 
TupleScheme<CallbackInfo> {
+
+    @Override
+    public void write(org.apache.thrift.protocol.TProtocol prot, CallbackInfo 
struct) throws org.apache.thrift.TException {
+      TTupleProtocol oprot = (TTupleProtocol) prot;
+      BitSet optionals = new BitSet();
+      if (struct.isSetHost()) {
+        optionals.set(0);
+      }
+      if (struct.isSetPort()) {
+        optionals.set(1);
+      }
+      oprot.writeBitSet(optionals, 2);
+      if (struct.isSetHost()) {
+        oprot.writeString(struct.host);
+      }
+      if (struct.isSetPort()) {
+        oprot.writeI32(struct.port);
+      }
+    }
+
+    @Override
+    public void read(org.apache.thrift.protocol.TProtocol prot, CallbackInfo 
struct) throws org.apache.thrift.TException {
+      TTupleProtocol iprot = (TTupleProtocol) prot;
+      BitSet incoming = iprot.readBitSet(2);
+      if (incoming.get(0)) {
+        struct.host = iprot.readString();
+        struct.setHostIsSet(true);
+      }
+      if (incoming.get(1)) {
+        struct.port = iprot.readI32();
+        struct.setPortIsSet(true);
+      }
+    }
+  }
+
+}
+

http://git-wip-us.apache.org/repos/asf/zeppelin/blob/1812928b/zeppelin-interpreter/src/main/java/org/apache/zeppelin/interpreter/thrift/RemoteInterpreterCallbackService.java
----------------------------------------------------------------------
diff --git 
a/zeppelin-interpreter/src/main/java/org/apache/zeppelin/interpreter/thrift/RemoteInterpreterCallbackService.java
 
b/zeppelin-interpreter/src/main/java/org/apache/zeppelin/interpreter/thrift/RemoteInterpreterCallbackService.java
new file mode 100644
index 0000000..6ef08f6
--- /dev/null
+++ 
b/zeppelin-interpreter/src/main/java/org/apache/zeppelin/interpreter/thrift/RemoteInterpreterCallbackService.java
@@ -0,0 +1,879 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+/**
+ * Autogenerated by Thrift Compiler (0.9.2)
+ *
+ * DO NOT EDIT UNLESS YOU ARE SURE THAT YOU KNOW WHAT YOU ARE DOING
+ *  @generated
+ */
+package org.apache.zeppelin.interpreter.thrift;
+
+import org.apache.thrift.scheme.IScheme;
+import org.apache.thrift.scheme.SchemeFactory;
+import org.apache.thrift.scheme.StandardScheme;
+
+import org.apache.thrift.scheme.TupleScheme;
+import org.apache.thrift.protocol.TTupleProtocol;
+import org.apache.thrift.protocol.TProtocolException;
+import org.apache.thrift.EncodingUtils;
+import org.apache.thrift.TException;
+import org.apache.thrift.async.AsyncMethodCallback;
+import org.apache.thrift.server.AbstractNonblockingServer.*;
+import java.util.List;
+import java.util.ArrayList;
+import java.util.Map;
+import java.util.HashMap;
+import java.util.EnumMap;
+import java.util.Set;
+import java.util.HashSet;
+import java.util.EnumSet;
+import java.util.Collections;
+import java.util.BitSet;
+import java.nio.ByteBuffer;
+import java.util.Arrays;
+import javax.annotation.Generated;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+@SuppressWarnings({"cast", "rawtypes", "serial", "unchecked"})
+@Generated(value = "Autogenerated by Thrift Compiler (0.9.2)", date = 
"2017-6-17")
+public class RemoteInterpreterCallbackService {
+
+  public interface Iface {
+
+    public void callback(CallbackInfo callbackInfo) throws 
org.apache.thrift.TException;
+
+  }
+
+  public interface AsyncIface {
+
+    public void callback(CallbackInfo callbackInfo, 
org.apache.thrift.async.AsyncMethodCallback resultHandler) throws 
org.apache.thrift.TException;
+
+  }
+
+  public static class Client extends org.apache.thrift.TServiceClient 
implements Iface {
+    public static class Factory implements 
org.apache.thrift.TServiceClientFactory<Client> {
+      public Factory() {}
+      public Client getClient(org.apache.thrift.protocol.TProtocol prot) {
+        return new Client(prot);
+      }
+      public Client getClient(org.apache.thrift.protocol.TProtocol iprot, 
org.apache.thrift.protocol.TProtocol oprot) {
+        return new Client(iprot, oprot);
+      }
+    }
+
+    public Client(org.apache.thrift.protocol.TProtocol prot)
+    {
+      super(prot, prot);
+    }
+
+    public Client(org.apache.thrift.protocol.TProtocol iprot, 
org.apache.thrift.protocol.TProtocol oprot) {
+      super(iprot, oprot);
+    }
+
+    public void callback(CallbackInfo callbackInfo) throws 
org.apache.thrift.TException
+    {
+      send_callback(callbackInfo);
+      recv_callback();
+    }
+
+    public void send_callback(CallbackInfo callbackInfo) throws 
org.apache.thrift.TException
+    {
+      callback_args args = new callback_args();
+      args.setCallbackInfo(callbackInfo);
+      sendBase("callback", args);
+    }
+
+    public void recv_callback() throws org.apache.thrift.TException
+    {
+      callback_result result = new callback_result();
+      receiveBase(result, "callback");
+      return;
+    }
+
+  }
+  public static class AsyncClient extends org.apache.thrift.async.TAsyncClient 
implements AsyncIface {
+    public static class Factory implements 
org.apache.thrift.async.TAsyncClientFactory<AsyncClient> {
+      private org.apache.thrift.async.TAsyncClientManager clientManager;
+      private org.apache.thrift.protocol.TProtocolFactory protocolFactory;
+      public Factory(org.apache.thrift.async.TAsyncClientManager 
clientManager, org.apache.thrift.protocol.TProtocolFactory protocolFactory) {
+        this.clientManager = clientManager;
+        this.protocolFactory = protocolFactory;
+      }
+      public AsyncClient 
getAsyncClient(org.apache.thrift.transport.TNonblockingTransport transport) {
+        return new AsyncClient(protocolFactory, clientManager, transport);
+      }
+    }
+
+    public AsyncClient(org.apache.thrift.protocol.TProtocolFactory 
protocolFactory, org.apache.thrift.async.TAsyncClientManager clientManager, 
org.apache.thrift.transport.TNonblockingTransport transport) {
+      super(protocolFactory, clientManager, transport);
+    }
+
+    public void callback(CallbackInfo callbackInfo, 
org.apache.thrift.async.AsyncMethodCallback resultHandler) throws 
org.apache.thrift.TException {
+      checkReady();
+      callback_call method_call = new callback_call(callbackInfo, 
resultHandler, this, ___protocolFactory, ___transport);
+      this.___currentMethod = method_call;
+      ___manager.call(method_call);
+    }
+
+    public static class callback_call extends 
org.apache.thrift.async.TAsyncMethodCall {
+      private CallbackInfo callbackInfo;
+      public callback_call(CallbackInfo callbackInfo, 
org.apache.thrift.async.AsyncMethodCallback resultHandler, 
org.apache.thrift.async.TAsyncClient client, 
org.apache.thrift.protocol.TProtocolFactory protocolFactory, 
org.apache.thrift.transport.TNonblockingTransport transport) throws 
org.apache.thrift.TException {
+        super(client, protocolFactory, transport, resultHandler, false);
+        this.callbackInfo = callbackInfo;
+      }
+
+      public void write_args(org.apache.thrift.protocol.TProtocol prot) throws 
org.apache.thrift.TException {
+        prot.writeMessageBegin(new 
org.apache.thrift.protocol.TMessage("callback", 
org.apache.thrift.protocol.TMessageType.CALL, 0));
+        callback_args args = new callback_args();
+        args.setCallbackInfo(callbackInfo);
+        args.write(prot);
+        prot.writeMessageEnd();
+      }
+
+      public void getResult() throws org.apache.thrift.TException {
+        if (getState() != 
org.apache.thrift.async.TAsyncMethodCall.State.RESPONSE_READ) {
+          throw new IllegalStateException("Method call not finished!");
+        }
+        org.apache.thrift.transport.TMemoryInputTransport memoryTransport = 
new org.apache.thrift.transport.TMemoryInputTransport(getFrameBuffer().array());
+        org.apache.thrift.protocol.TProtocol prot = 
client.getProtocolFactory().getProtocol(memoryTransport);
+        (new Client(prot)).recv_callback();
+      }
+    }
+
+  }
+
+  public static class Processor<I extends Iface> extends 
org.apache.thrift.TBaseProcessor<I> implements org.apache.thrift.TProcessor {
+    private static final Logger LOGGER = 
LoggerFactory.getLogger(Processor.class.getName());
+    public Processor(I iface) {
+      super(iface, getProcessMap(new HashMap<String, 
org.apache.thrift.ProcessFunction<I, ? extends org.apache.thrift.TBase>>()));
+    }
+
+    protected Processor(I iface, Map<String,  
org.apache.thrift.ProcessFunction<I, ? extends  org.apache.thrift.TBase>> 
processMap) {
+      super(iface, getProcessMap(processMap));
+    }
+
+    private static <I extends Iface> Map<String,  
org.apache.thrift.ProcessFunction<I, ? extends  org.apache.thrift.TBase>> 
getProcessMap(Map<String,  org.apache.thrift.ProcessFunction<I, ? extends  
org.apache.thrift.TBase>> processMap) {
+      processMap.put("callback", new callback());
+      return processMap;
+    }
+
+    public static class callback<I extends Iface> extends 
org.apache.thrift.ProcessFunction<I, callback_args> {
+      public callback() {
+        super("callback");
+      }
+
+      public callback_args getEmptyArgsInstance() {
+        return new callback_args();
+      }
+
+      protected boolean isOneway() {
+        return false;
+      }
+
+      public callback_result getResult(I iface, callback_args args) throws 
org.apache.thrift.TException {
+        callback_result result = new callback_result();
+        iface.callback(args.callbackInfo);
+        return result;
+      }
+    }
+
+  }
+
+  public static class AsyncProcessor<I extends AsyncIface> extends 
org.apache.thrift.TBaseAsyncProcessor<I> {
+    private static final Logger LOGGER = 
LoggerFactory.getLogger(AsyncProcessor.class.getName());
+    public AsyncProcessor(I iface) {
+      super(iface, getProcessMap(new HashMap<String, 
org.apache.thrift.AsyncProcessFunction<I, ? extends org.apache.thrift.TBase, 
?>>()));
+    }
+
+    protected AsyncProcessor(I iface, Map<String,  
org.apache.thrift.AsyncProcessFunction<I, ? extends  org.apache.thrift.TBase, 
?>> processMap) {
+      super(iface, getProcessMap(processMap));
+    }
+
+    private static <I extends AsyncIface> Map<String,  
org.apache.thrift.AsyncProcessFunction<I, ? extends  
org.apache.thrift.TBase,?>> getProcessMap(Map<String,  
org.apache.thrift.AsyncProcessFunction<I, ? extends  org.apache.thrift.TBase, 
?>> processMap) {
+      processMap.put("callback", new callback());
+      return processMap;
+    }
+
+    public static class callback<I extends AsyncIface> extends 
org.apache.thrift.AsyncProcessFunction<I, callback_args, Void> {
+      public callback() {
+        super("callback");
+      }
+
+      public callback_args getEmptyArgsInstance() {
+        return new callback_args();
+      }
+
+      public AsyncMethodCallback<Void> getResultHandler(final AsyncFrameBuffer 
fb, final int seqid) {
+        final org.apache.thrift.AsyncProcessFunction fcall = this;
+        return new AsyncMethodCallback<Void>() { 
+          public void onComplete(Void o) {
+            callback_result result = new callback_result();
+            try {
+              fcall.sendResponse(fb,result, 
org.apache.thrift.protocol.TMessageType.REPLY,seqid);
+              return;
+            } catch (Exception e) {
+              LOGGER.error("Exception writing to internal frame buffer", e);
+            }
+            fb.close();
+          }
+          public void onError(Exception e) {
+            byte msgType = org.apache.thrift.protocol.TMessageType.REPLY;
+            org.apache.thrift.TBase msg;
+            callback_result result = new callback_result();
+            {
+              msgType = org.apache.thrift.protocol.TMessageType.EXCEPTION;
+              msg = (org.apache.thrift.TBase)new 
org.apache.thrift.TApplicationException(org.apache.thrift.TApplicationException.INTERNAL_ERROR,
 e.getMessage());
+            }
+            try {
+              fcall.sendResponse(fb,msg,msgType,seqid);
+              return;
+            } catch (Exception ex) {
+              LOGGER.error("Exception writing to internal frame buffer", ex);
+            }
+            fb.close();
+          }
+        };
+      }
+
+      protected boolean isOneway() {
+        return false;
+      }
+
+      public void start(I iface, callback_args args, 
org.apache.thrift.async.AsyncMethodCallback<Void> resultHandler) throws 
TException {
+        iface.callback(args.callbackInfo,resultHandler);
+      }
+    }
+
+  }
+
+  public static class callback_args implements 
org.apache.thrift.TBase<callback_args, callback_args._Fields>, 
java.io.Serializable, Cloneable, Comparable<callback_args>   {
+    private static final org.apache.thrift.protocol.TStruct STRUCT_DESC = new 
org.apache.thrift.protocol.TStruct("callback_args");
+
+    private static final org.apache.thrift.protocol.TField 
CALLBACK_INFO_FIELD_DESC = new 
org.apache.thrift.protocol.TField("callbackInfo", 
org.apache.thrift.protocol.TType.STRUCT, (short)1);
+
+    private static final Map<Class<? extends IScheme>, SchemeFactory> schemes 
= new HashMap<Class<? extends IScheme>, SchemeFactory>();
+    static {
+      schemes.put(StandardScheme.class, new 
callback_argsStandardSchemeFactory());
+      schemes.put(TupleScheme.class, new callback_argsTupleSchemeFactory());
+    }
+
+    public CallbackInfo callbackInfo; // required
+
+    /** The set of fields this struct contains, along with convenience methods 
for finding and manipulating them. */
+    public enum _Fields implements org.apache.thrift.TFieldIdEnum {
+      CALLBACK_INFO((short)1, "callbackInfo");
+
+      private static final Map<String, _Fields> byName = new HashMap<String, 
_Fields>();
+
+      static {
+        for (_Fields field : EnumSet.allOf(_Fields.class)) {
+          byName.put(field.getFieldName(), field);
+        }
+      }
+
+      /**
+       * Find the _Fields constant that matches fieldId, or null if its not 
found.
+       */
+      public static _Fields findByThriftId(int fieldId) {
+        switch(fieldId) {
+          case 1: // CALLBACK_INFO
+            return CALLBACK_INFO;
+          default:
+            return null;
+        }
+      }
+
+      /**
+       * Find the _Fields constant that matches fieldId, throwing an exception
+       * if it is not found.
+       */
+      public static _Fields findByThriftIdOrThrow(int fieldId) {
+        _Fields fields = findByThriftId(fieldId);
+        if (fields == null) throw new IllegalArgumentException("Field " + 
fieldId + " doesn't exist!");
+        return fields;
+      }
+
+      /**
+       * Find the _Fields constant that matches name, or null if its not found.
+       */
+      public static _Fields findByName(String name) {
+        return byName.get(name);
+      }
+
+      private final short _thriftId;
+      private final String _fieldName;
+
+      _Fields(short thriftId, String fieldName) {
+        _thriftId = thriftId;
+        _fieldName = fieldName;
+      }
+
+      public short getThriftFieldId() {
+        return _thriftId;
+      }
+
+      public String getFieldName() {
+        return _fieldName;
+      }
+    }
+
+    // isset id assignments
+    public static final Map<_Fields, 
org.apache.thrift.meta_data.FieldMetaData> metaDataMap;
+    static {
+      Map<_Fields, org.apache.thrift.meta_data.FieldMetaData> tmpMap = new 
EnumMap<_Fields, org.apache.thrift.meta_data.FieldMetaData>(_Fields.class);
+      tmpMap.put(_Fields.CALLBACK_INFO, new 
org.apache.thrift.meta_data.FieldMetaData("callbackInfo", 
org.apache.thrift.TFieldRequirementType.DEFAULT, 
+          new 
org.apache.thrift.meta_data.StructMetaData(org.apache.thrift.protocol.TType.STRUCT,
 CallbackInfo.class)));
+      metaDataMap = Collections.unmodifiableMap(tmpMap);
+      
org.apache.thrift.meta_data.FieldMetaData.addStructMetaDataMap(callback_args.class,
 metaDataMap);
+    }
+
+    public callback_args() {
+    }
+
+    public callback_args(
+      CallbackInfo callbackInfo)
+    {
+      this();
+      this.callbackInfo = callbackInfo;
+    }
+
+    /**
+     * Performs a deep copy on <i>other</i>.
+     */
+    public callback_args(callback_args other) {
+      if (other.isSetCallbackInfo()) {
+        this.callbackInfo = new CallbackInfo(other.callbackInfo);
+      }
+    }
+
+    public callback_args deepCopy() {
+      return new callback_args(this);
+    }
+
+    @Override
+    public void clear() {
+      this.callbackInfo = null;
+    }
+
+    public CallbackInfo getCallbackInfo() {
+      return this.callbackInfo;
+    }
+
+    public callback_args setCallbackInfo(CallbackInfo callbackInfo) {
+      this.callbackInfo = callbackInfo;
+      return this;
+    }
+
+    public void unsetCallbackInfo() {
+      this.callbackInfo = null;
+    }
+
+    /** Returns true if field callbackInfo is set (has been assigned a value) 
and false otherwise */
+    public boolean isSetCallbackInfo() {
+      return this.callbackInfo != null;
+    }
+
+    public void setCallbackInfoIsSet(boolean value) {
+      if (!value) {
+        this.callbackInfo = null;
+      }
+    }
+
+    public void setFieldValue(_Fields field, Object value) {
+      switch (field) {
+      case CALLBACK_INFO:
+        if (value == null) {
+          unsetCallbackInfo();
+        } else {
+          setCallbackInfo((CallbackInfo)value);
+        }
+        break;
+
+      }
+    }
+
+    public Object getFieldValue(_Fields field) {
+      switch (field) {
+      case CALLBACK_INFO:
+        return getCallbackInfo();
+
+      }
+      throw new IllegalStateException();
+    }
+
+    /** Returns true if field corresponding to fieldID is set (has been 
assigned a value) and false otherwise */
+    public boolean isSet(_Fields field) {
+      if (field == null) {
+        throw new IllegalArgumentException();
+      }
+
+      switch (field) {
+      case CALLBACK_INFO:
+        return isSetCallbackInfo();
+      }
+      throw new IllegalStateException();
+    }
+
+    @Override
+    public boolean equals(Object that) {
+      if (that == null)
+        return false;
+      if (that instanceof callback_args)
+        return this.equals((callback_args)that);
+      return false;
+    }
+
+    public boolean equals(callback_args that) {
+      if (that == null)
+        return false;
+
+      boolean this_present_callbackInfo = true && this.isSetCallbackInfo();
+      boolean that_present_callbackInfo = true && that.isSetCallbackInfo();
+      if (this_present_callbackInfo || that_present_callbackInfo) {
+        if (!(this_present_callbackInfo && that_present_callbackInfo))
+          return false;
+        if (!this.callbackInfo.equals(that.callbackInfo))
+          return false;
+      }
+
+      return true;
+    }
+
+    @Override
+    public int hashCode() {
+      List<Object> list = new ArrayList<Object>();
+
+      boolean present_callbackInfo = true && (isSetCallbackInfo());
+      list.add(present_callbackInfo);
+      if (present_callbackInfo)
+        list.add(callbackInfo);
+
+      return list.hashCode();
+    }
+
+    @Override
+    public int compareTo(callback_args other) {
+      if (!getClass().equals(other.getClass())) {
+        return getClass().getName().compareTo(other.getClass().getName());
+      }
+
+      int lastComparison = 0;
+
+      lastComparison = 
Boolean.valueOf(isSetCallbackInfo()).compareTo(other.isSetCallbackInfo());
+      if (lastComparison != 0) {
+        return lastComparison;
+      }
+      if (isSetCallbackInfo()) {
+        lastComparison = 
org.apache.thrift.TBaseHelper.compareTo(this.callbackInfo, other.callbackInfo);
+        if (lastComparison != 0) {
+          return lastComparison;
+        }
+      }
+      return 0;
+    }
+
+    public _Fields fieldForId(int fieldId) {
+      return _Fields.findByThriftId(fieldId);
+    }
+
+    public void read(org.apache.thrift.protocol.TProtocol iprot) throws 
org.apache.thrift.TException {
+      schemes.get(iprot.getScheme()).getScheme().read(iprot, this);
+    }
+
+    public void write(org.apache.thrift.protocol.TProtocol oprot) throws 
org.apache.thrift.TException {
+      schemes.get(oprot.getScheme()).getScheme().write(oprot, this);
+    }
+
+    @Override
+    public String toString() {
+      StringBuilder sb = new StringBuilder("callback_args(");
+      boolean first = true;
+
+      sb.append("callbackInfo:");
+      if (this.callbackInfo == null) {
+        sb.append("null");
+      } else {
+        sb.append(this.callbackInfo);
+      }
+      first = false;
+      sb.append(")");
+      return sb.toString();
+    }
+
+    public void validate() throws org.apache.thrift.TException {
+      // check for required fields
+      // check for sub-struct validity
+      if (callbackInfo != null) {
+        callbackInfo.validate();
+      }
+    }
+
+    private void writeObject(java.io.ObjectOutputStream out) throws 
java.io.IOException {
+      try {
+        write(new org.apache.thrift.protocol.TCompactProtocol(new 
org.apache.thrift.transport.TIOStreamTransport(out)));
+      } catch (org.apache.thrift.TException te) {
+        throw new java.io.IOException(te);
+      }
+    }
+
+    private void readObject(java.io.ObjectInputStream in) throws 
java.io.IOException, ClassNotFoundException {
+      try {
+        read(new org.apache.thrift.protocol.TCompactProtocol(new 
org.apache.thrift.transport.TIOStreamTransport(in)));
+      } catch (org.apache.thrift.TException te) {
+        throw new java.io.IOException(te);
+      }
+    }
+
+    private static class callback_argsStandardSchemeFactory implements 
SchemeFactory {
+      public callback_argsStandardScheme getScheme() {
+        return new callback_argsStandardScheme();
+      }
+    }
+
+    private static class callback_argsStandardScheme extends 
StandardScheme<callback_args> {
+
+      public void read(org.apache.thrift.protocol.TProtocol iprot, 
callback_args struct) throws org.apache.thrift.TException {
+        org.apache.thrift.protocol.TField schemeField;
+        iprot.readStructBegin();
+        while (true)
+        {
+          schemeField = iprot.readFieldBegin();
+          if (schemeField.type == org.apache.thrift.protocol.TType.STOP) { 
+            break;
+          }
+          switch (schemeField.id) {
+            case 1: // CALLBACK_INFO
+              if (schemeField.type == org.apache.thrift.protocol.TType.STRUCT) 
{
+                struct.callbackInfo = new CallbackInfo();
+                struct.callbackInfo.read(iprot);
+                struct.setCallbackInfoIsSet(true);
+              } else { 
+                org.apache.thrift.protocol.TProtocolUtil.skip(iprot, 
schemeField.type);
+              }
+              break;
+            default:
+              org.apache.thrift.protocol.TProtocolUtil.skip(iprot, 
schemeField.type);
+          }
+          iprot.readFieldEnd();
+        }
+        iprot.readStructEnd();
+
+        // check for required fields of primitive type, which can't be checked 
in the validate method
+        struct.validate();
+      }
+
+      public void write(org.apache.thrift.protocol.TProtocol oprot, 
callback_args struct) throws org.apache.thrift.TException {
+        struct.validate();
+
+        oprot.writeStructBegin(STRUCT_DESC);
+        if (struct.callbackInfo != null) {
+          oprot.writeFieldBegin(CALLBACK_INFO_FIELD_DESC);
+          struct.callbackInfo.write(oprot);
+          oprot.writeFieldEnd();
+        }
+        oprot.writeFieldStop();
+        oprot.writeStructEnd();
+      }
+
+    }
+
+    private static class callback_argsTupleSchemeFactory implements 
SchemeFactory {
+      public callback_argsTupleScheme getScheme() {
+        return new callback_argsTupleScheme();
+      }
+    }
+
+    private static class callback_argsTupleScheme extends 
TupleScheme<callback_args> {
+
+      @Override
+      public void write(org.apache.thrift.protocol.TProtocol prot, 
callback_args struct) throws org.apache.thrift.TException {
+        TTupleProtocol oprot = (TTupleProtocol) prot;
+        BitSet optionals = new BitSet();
+        if (struct.isSetCallbackInfo()) {
+          optionals.set(0);
+        }
+        oprot.writeBitSet(optionals, 1);
+        if (struct.isSetCallbackInfo()) {
+          struct.callbackInfo.write(oprot);
+        }
+      }
+
+      @Override
+      public void read(org.apache.thrift.protocol.TProtocol prot, 
callback_args struct) throws org.apache.thrift.TException {
+        TTupleProtocol iprot = (TTupleProtocol) prot;
+        BitSet incoming = iprot.readBitSet(1);
+        if (incoming.get(0)) {
+          struct.callbackInfo = new CallbackInfo();
+          struct.callbackInfo.read(iprot);
+          struct.setCallbackInfoIsSet(true);
+        }
+      }
+    }
+
+  }
+
+  public static class callback_result implements 
org.apache.thrift.TBase<callback_result, callback_result._Fields>, 
java.io.Serializable, Cloneable, Comparable<callback_result>   {
+    private static final org.apache.thrift.protocol.TStruct STRUCT_DESC = new 
org.apache.thrift.protocol.TStruct("callback_result");
+
+
+    private static final Map<Class<? extends IScheme>, SchemeFactory> schemes 
= new HashMap<Class<? extends IScheme>, SchemeFactory>();
+    static {
+      schemes.put(StandardScheme.class, new 
callback_resultStandardSchemeFactory());
+      schemes.put(TupleScheme.class, new callback_resultTupleSchemeFactory());
+    }
+
+
+    /** The set of fields this struct contains, along with convenience methods 
for finding and manipulating them. */
+    public enum _Fields implements org.apache.thrift.TFieldIdEnum {
+;
+
+      private static final Map<String, _Fields> byName = new HashMap<String, 
_Fields>();
+
+      static {
+        for (_Fields field : EnumSet.allOf(_Fields.class)) {
+          byName.put(field.getFieldName(), field);
+        }
+      }
+
+      /**
+       * Find the _Fields constant that matches fieldId, or null if its not 
found.
+       */
+      public static _Fields findByThriftId(int fieldId) {
+        switch(fieldId) {
+          default:
+            return null;
+        }
+      }
+
+      /**
+       * Find the _Fields constant that matches fieldId, throwing an exception
+       * if it is not found.
+       */
+      public static _Fields findByThriftIdOrThrow(int fieldId) {
+        _Fields fields = findByThriftId(fieldId);
+        if (fields == null) throw new IllegalArgumentException("Field " + 
fieldId + " doesn't exist!");
+        return fields;
+      }
+
+      /**
+       * Find the _Fields constant that matches name, or null if its not found.
+       */
+      public static _Fields findByName(String name) {
+        return byName.get(name);
+      }
+
+      private final short _thriftId;
+      private final String _fieldName;
+
+      _Fields(short thriftId, String fieldName) {
+        _thriftId = thriftId;
+        _fieldName = fieldName;
+      }
+
+      public short getThriftFieldId() {
+        return _thriftId;
+      }
+
+      public String getFieldName() {
+        return _fieldName;
+      }
+    }
+    public static final Map<_Fields, 
org.apache.thrift.meta_data.FieldMetaData> metaDataMap;
+    static {
+      Map<_Fields, org.apache.thrift.meta_data.FieldMetaData> tmpMap = new 
EnumMap<_Fields, org.apache.thrift.meta_data.FieldMetaData>(_Fields.class);
+      metaDataMap = Collections.unmodifiableMap(tmpMap);
+      
org.apache.thrift.meta_data.FieldMetaData.addStructMetaDataMap(callback_result.class,
 metaDataMap);
+    }
+
+    public callback_result() {
+    }
+
+    /**
+     * Performs a deep copy on <i>other</i>.
+     */
+    public callback_result(callback_result other) {
+    }
+
+    public callback_result deepCopy() {
+      return new callback_result(this);
+    }
+
+    @Override
+    public void clear() {
+    }
+
+    public void setFieldValue(_Fields field, Object value) {
+      switch (field) {
+      }
+    }
+
+    public Object getFieldValue(_Fields field) {
+      switch (field) {
+      }
+      throw new IllegalStateException();
+    }
+
+    /** Returns true if field corresponding to fieldID is set (has been 
assigned a value) and false otherwise */
+    public boolean isSet(_Fields field) {
+      if (field == null) {
+        throw new IllegalArgumentException();
+      }
+
+      switch (field) {
+      }
+      throw new IllegalStateException();
+    }
+
+    @Override
+    public boolean equals(Object that) {
+      if (that == null)
+        return false;
+      if (that instanceof callback_result)
+        return this.equals((callback_result)that);
+      return false;
+    }
+
+    public boolean equals(callback_result that) {
+      if (that == null)
+        return false;
+
+      return true;
+    }
+
+    @Override
+    public int hashCode() {
+      List<Object> list = new ArrayList<Object>();
+
+      return list.hashCode();
+    }
+
+    @Override
+    public int compareTo(callback_result other) {
+      if (!getClass().equals(other.getClass())) {
+        return getClass().getName().compareTo(other.getClass().getName());
+      }
+
+      int lastComparison = 0;
+
+      return 0;
+    }
+
+    public _Fields fieldForId(int fieldId) {
+      return _Fields.findByThriftId(fieldId);
+    }
+
+    public void read(org.apache.thrift.protocol.TProtocol iprot) throws 
org.apache.thrift.TException {
+      schemes.get(iprot.getScheme()).getScheme().read(iprot, this);
+    }
+
+    public void write(org.apache.thrift.protocol.TProtocol oprot) throws 
org.apache.thrift.TException {
+      schemes.get(oprot.getScheme()).getScheme().write(oprot, this);
+      }
+
+    @Override
+    public String toString() {
+      StringBuilder sb = new StringBuilder("callback_result(");
+      boolean first = true;
+
+      sb.append(")");
+      return sb.toString();
+    }
+
+    public void validate() throws org.apache.thrift.TException {
+      // check for required fields
+      // check for sub-struct validity
+    }
+
+    private void writeObject(java.io.ObjectOutputStream out) throws 
java.io.IOException {
+      try {
+        write(new org.apache.thrift.protocol.TCompactProtocol(new 
org.apache.thrift.transport.TIOStreamTransport(out)));
+      } catch (org.apache.thrift.TException te) {
+        throw new java.io.IOException(te);
+      }
+    }
+
+    private void readObject(java.io.ObjectInputStream in) throws 
java.io.IOException, ClassNotFoundException {
+      try {
+        read(new org.apache.thrift.protocol.TCompactProtocol(new 
org.apache.thrift.transport.TIOStreamTransport(in)));
+      } catch (org.apache.thrift.TException te) {
+        throw new java.io.IOException(te);
+      }
+    }
+
+    private static class callback_resultStandardSchemeFactory implements 
SchemeFactory {
+      public callback_resultStandardScheme getScheme() {
+        return new callback_resultStandardScheme();
+      }
+    }
+
+    private static class callback_resultStandardScheme extends 
StandardScheme<callback_result> {
+
+      public void read(org.apache.thrift.protocol.TProtocol iprot, 
callback_result struct) throws org.apache.thrift.TException {
+        org.apache.thrift.protocol.TField schemeField;
+        iprot.readStructBegin();
+        while (true)
+        {
+          schemeField = iprot.readFieldBegin();
+          if (schemeField.type == org.apache.thrift.protocol.TType.STOP) { 
+            break;
+          }
+          switch (schemeField.id) {
+            default:
+              org.apache.thrift.protocol.TProtocolUtil.skip(iprot, 
schemeField.type);
+          }
+          iprot.readFieldEnd();
+        }
+        iprot.readStructEnd();
+
+        // check for required fields of primitive type, which can't be checked 
in the validate method
+        struct.validate();
+      }
+
+      public void write(org.apache.thrift.protocol.TProtocol oprot, 
callback_result struct) throws org.apache.thrift.TException {
+        struct.validate();
+
+        oprot.writeStructBegin(STRUCT_DESC);
+        oprot.writeFieldStop();
+        oprot.writeStructEnd();
+      }
+
+    }
+
+    private static class callback_resultTupleSchemeFactory implements 
SchemeFactory {
+      public callback_resultTupleScheme getScheme() {
+        return new callback_resultTupleScheme();
+      }
+    }
+
+    private static class callback_resultTupleScheme extends 
TupleScheme<callback_result> {
+
+      @Override
+      public void write(org.apache.thrift.protocol.TProtocol prot, 
callback_result struct) throws org.apache.thrift.TException {
+        TTupleProtocol oprot = (TTupleProtocol) prot;
+      }
+
+      @Override
+      public void read(org.apache.thrift.protocol.TProtocol prot, 
callback_result struct) throws org.apache.thrift.TException {
+        TTupleProtocol iprot = (TTupleProtocol) prot;
+      }
+    }
+
+  }
+
+}

http://git-wip-us.apache.org/repos/asf/zeppelin/blob/1812928b/zeppelin-interpreter/src/main/thrift/RemoteInterpreterService.thrift
----------------------------------------------------------------------
diff --git 
a/zeppelin-interpreter/src/main/thrift/RemoteInterpreterService.thrift 
b/zeppelin-interpreter/src/main/thrift/RemoteInterpreterService.thrift
index f2eb13f..f20fb90 100644
--- a/zeppelin-interpreter/src/main/thrift/RemoteInterpreterService.thrift
+++ b/zeppelin-interpreter/src/main/thrift/RemoteInterpreterService.thrift
@@ -88,6 +88,11 @@ struct InterpreterCompletion {
   3: string meta
 }
 
+struct CallbackInfo {
+  1: string host,
+  2: i32 port
+}
+
 service RemoteInterpreterService {
 
   void createInterpreter(1: string intpGroupId, 2: string sessionKey, 3: 
string className, 4: map<string, string> properties, 5: string userName);
@@ -131,3 +136,7 @@ service RemoteInterpreterService {
 
   void onReceivedZeppelinResource(1: string object);
 }
+
+service RemoteInterpreterCallbackService {
+  void callback(1: CallbackInfo callbackInfo);
+}

http://git-wip-us.apache.org/repos/asf/zeppelin/blob/1812928b/zeppelin-interpreter/src/test/java/org/apache/zeppelin/interpreter/remote/RemoteInterpreterServerTest.java
----------------------------------------------------------------------
diff --git 
a/zeppelin-interpreter/src/test/java/org/apache/zeppelin/interpreter/remote/RemoteInterpreterServerTest.java
 
b/zeppelin-interpreter/src/test/java/org/apache/zeppelin/interpreter/remote/RemoteInterpreterServerTest.java
index a4b3a25..1a7c2a5 100644
--- 
a/zeppelin-interpreter/src/test/java/org/apache/zeppelin/interpreter/remote/RemoteInterpreterServerTest.java
+++ 
b/zeppelin-interpreter/src/test/java/org/apache/zeppelin/interpreter/remote/RemoteInterpreterServerTest.java
@@ -42,7 +42,7 @@ public class RemoteInterpreterServerTest {
 
   @Test
   public void testStartStop() throws InterruptedException, IOException, 
TException {
-    RemoteInterpreterServer server = new RemoteInterpreterServer(
+    RemoteInterpreterServer server = new RemoteInterpreterServer("localhost",
         RemoteInterpreterUtils.findRandomAvailablePortOnAllLocalInterfaces());
     assertEquals(false, server.isRunning());
 
@@ -90,8 +90,8 @@ public class RemoteInterpreterServerTest {
 
   @Test
   public void testStartStopWithQueuedEvents() throws InterruptedException, 
IOException, TException {
-    RemoteInterpreterServer server = new RemoteInterpreterServer(
-        RemoteInterpreterUtils.findRandomAvailablePortOnAllLocalInterfaces());
+    RemoteInterpreterServer server = new RemoteInterpreterServer("localhost",
+        RemoteInterpreterUtils.findRandomAvailablePortOnAllLocalInterfaces(), 
true);
     assertEquals(false, server.isRunning());
 
     server.start();

http://git-wip-us.apache.org/repos/asf/zeppelin/blob/1812928b/zeppelin-interpreter/src/test/java/org/apache/zeppelin/interpreter/remote/RemoteInterpreterUtilsTest.java
----------------------------------------------------------------------
diff --git 
a/zeppelin-interpreter/src/test/java/org/apache/zeppelin/interpreter/remote/RemoteInterpreterUtilsTest.java
 
b/zeppelin-interpreter/src/test/java/org/apache/zeppelin/interpreter/remote/RemoteInterpreterUtilsTest.java
index 5f7426a..afbbcbd 100644
--- 
a/zeppelin-interpreter/src/test/java/org/apache/zeppelin/interpreter/remote/RemoteInterpreterUtilsTest.java
+++ 
b/zeppelin-interpreter/src/test/java/org/apache/zeppelin/interpreter/remote/RemoteInterpreterUtilsTest.java
@@ -28,6 +28,17 @@ public class RemoteInterpreterUtilsTest {
   @Test
   public void testFindRandomAvailablePortOnAllLocalInterfaces() throws 
IOException {
     
assertTrue(RemoteInterpreterUtils.findRandomAvailablePortOnAllLocalInterfaces() 
> 0);
+
+    String portRange = ":30000";
+    
assertTrue(RemoteInterpreterUtils.findRandomAvailablePortOnAllLocalInterfaces(portRange)
 <= 30000);
+
+    portRange = "30000:";
+    
assertTrue(RemoteInterpreterUtils.findRandomAvailablePortOnAllLocalInterfaces(portRange)
 >= 30000);
+
+    portRange = "30000:40000";
+    int port = 
RemoteInterpreterUtils.findRandomAvailablePortOnAllLocalInterfaces(portRange);
+    assertTrue(port >= 30000 && port <= 40000);
   }
 
+
 }

http://git-wip-us.apache.org/repos/asf/zeppelin/blob/1812928b/zeppelin-zengine/src/main/java/org/apache/zeppelin/conf/ZeppelinConfiguration.java
----------------------------------------------------------------------
diff --git 
a/zeppelin-zengine/src/main/java/org/apache/zeppelin/conf/ZeppelinConfiguration.java
 
b/zeppelin-zengine/src/main/java/org/apache/zeppelin/conf/ZeppelinConfiguration.java
index 03cc069..ba90ed8 100644
--- 
a/zeppelin-zengine/src/main/java/org/apache/zeppelin/conf/ZeppelinConfiguration.java
+++ 
b/zeppelin-zengine/src/main/java/org/apache/zeppelin/conf/ZeppelinConfiguration.java
@@ -476,6 +476,10 @@ public class ZeppelinConfiguration extends 
XMLConfiguration {
     }
   }
 
+  public String getCallbackPortRange() {
+    return getString(ConfVars.ZEPPELIN_INTERPRETER_CALLBACK_PORTRANGE);
+  }
+
   public boolean isWindowsPath(String path){
     return path.matches("^[A-Za-z]:\\\\.*");
   }
@@ -684,7 +688,9 @@ public class ZeppelinConfiguration extends XMLConfiguration 
{
     ZEPPELIN_SERVER_X_XSS_PROTECTION("zeppelin.server.xxss.protection", "1"),
 
     ZEPPELIN_HDFS_KEYTAB("zeppelin.hdfs.keytab", ""),
-    ZEPPELIN_HDFS_PRINCIPAL("zeppelin.hdfs.principal", "");
+    ZEPPELIN_HDFS_PRINCIPAL("zeppelin.hdfs.principal", ""),
+
+    
ZEPPELIN_INTERPRETER_CALLBACK_PORTRANGE("zeppelin.interpreter.callback.portRange",
 ":");
 
     private String varName;
     @SuppressWarnings("rawtypes")

http://git-wip-us.apache.org/repos/asf/zeppelin/blob/1812928b/zeppelin-zengine/src/main/java/org/apache/zeppelin/interpreter/InterpreterSetting.java
----------------------------------------------------------------------
diff --git 
a/zeppelin-zengine/src/main/java/org/apache/zeppelin/interpreter/InterpreterSetting.java
 
b/zeppelin-zengine/src/main/java/org/apache/zeppelin/interpreter/InterpreterSetting.java
index 9f4cfd4..79618a3 100644
--- 
a/zeppelin-zengine/src/main/java/org/apache/zeppelin/interpreter/InterpreterSetting.java
+++ 
b/zeppelin-zengine/src/main/java/org/apache/zeppelin/interpreter/InterpreterSetting.java
@@ -705,7 +705,8 @@ public class InterpreterSetting {
       // create new remote process
       remoteInterpreterProcess = new RemoteInterpreterManagedProcess(
           interpreterRunner != null ? interpreterRunner.getPath() :
-              conf.getInterpreterRemoteRunnerPath(), interpreterDir, 
localRepoPath,
+              conf.getInterpreterRemoteRunnerPath(), 
conf.getCallbackPortRange(),
+          interpreterDir, localRepoPath,
           getEnvFromInterpreterProperty(getJavaProperties()), connectTimeout,
           remoteInterpreterProcessListener, appEventListener, group);
     }

http://git-wip-us.apache.org/repos/asf/zeppelin/blob/1812928b/zeppelin-zengine/src/main/java/org/apache/zeppelin/interpreter/remote/RemoteInterpreterManagedProcess.java
----------------------------------------------------------------------
diff --git 
a/zeppelin-zengine/src/main/java/org/apache/zeppelin/interpreter/remote/RemoteInterpreterManagedProcess.java
 
b/zeppelin-zengine/src/main/java/org/apache/zeppelin/interpreter/remote/RemoteInterpreterManagedProcess.java
index 19356fb..2d64831 100644
--- 
a/zeppelin-zengine/src/main/java/org/apache/zeppelin/interpreter/remote/RemoteInterpreterManagedProcess.java
+++ 
b/zeppelin-zengine/src/main/java/org/apache/zeppelin/interpreter/remote/RemoteInterpreterManagedProcess.java
@@ -17,10 +17,23 @@
 
 package org.apache.zeppelin.interpreter.remote;
 
-import org.apache.commons.exec.*;
+import org.apache.commons.exec.CommandLine;
+import org.apache.commons.exec.DefaultExecutor;
+import org.apache.commons.exec.ExecuteException;
+import org.apache.commons.exec.ExecuteResultHandler;
+import org.apache.commons.exec.ExecuteWatchdog;
+import org.apache.commons.exec.LogOutputStream;
+import org.apache.commons.exec.PumpStreamHandler;
 import org.apache.commons.exec.environment.EnvironmentUtils;
+import org.apache.thrift.TException;
+import org.apache.thrift.server.TServer;
+import org.apache.thrift.server.TThreadPoolServer;
+import org.apache.thrift.transport.TServerSocket;
+import org.apache.thrift.transport.TTransportException;
 import org.apache.zeppelin.helium.ApplicationEventListener;
 import org.apache.zeppelin.interpreter.InterpreterException;
+import org.apache.zeppelin.interpreter.thrift.CallbackInfo;
+import org.apache.zeppelin.interpreter.thrift.RemoteInterpreterCallbackService;
 import org.apache.zeppelin.interpreter.thrift.RemoteInterpreterService;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
@@ -29,6 +42,7 @@ import java.io.ByteArrayOutputStream;
 import java.io.IOException;
 import java.io.OutputStream;
 import java.util.Map;
+import java.util.concurrent.atomic.AtomicBoolean;
 
 /**
  * This class manages start / stop of remote interpreter process
@@ -37,11 +51,14 @@ public class RemoteInterpreterManagedProcess extends 
RemoteInterpreterProcess
     implements ExecuteResultHandler {
   private static final Logger logger = LoggerFactory.getLogger(
       RemoteInterpreterManagedProcess.class);
-  private final String interpreterRunner;
 
+  private final String interpreterRunner;
+  private final String portRange;
   private DefaultExecutor executor;
   private ExecuteWatchdog watchdog;
-  boolean running = false;
+  private AtomicBoolean running = new AtomicBoolean(false);
+  TServer callbackServer;
+  private String host = null;
   private int port = -1;
   private final String interpreterDir;
   private final String localRepoDir;
@@ -51,6 +68,7 @@ public class RemoteInterpreterManagedProcess extends 
RemoteInterpreterProcess
 
   public RemoteInterpreterManagedProcess(
       String intpRunner,
+      String portRange,
       String intpDir,
       String localRepoDir,
       Map<String, String> env,
@@ -61,6 +79,7 @@ public class RemoteInterpreterManagedProcess extends 
RemoteInterpreterProcess
     super(new RemoteInterpreterEventPoller(listener, appListener),
         connectTimeout);
     this.interpreterRunner = intpRunner;
+    this.portRange = portRange;
     this.env = env;
     this.interpreterDir = intpDir;
     this.localRepoDir = localRepoDir;
@@ -77,6 +96,7 @@ public class RemoteInterpreterManagedProcess extends 
RemoteInterpreterProcess
     super(remoteInterpreterEventPoller,
         connectTimeout);
     this.interpreterRunner = intpRunner;
+    this.portRange = ":";
     this.env = env;
     this.interpreterDir = intpDir;
     this.localRepoDir = localRepoDir;
@@ -96,18 +116,69 @@ public class RemoteInterpreterManagedProcess extends 
RemoteInterpreterProcess
   @Override
   public void start(String userName, Boolean isUserImpersonate) {
     // start server process
+    final String callbackHost;
+    final int callbackPort;
     try {
-      port = 
RemoteInterpreterUtils.findRandomAvailablePortOnAllLocalInterfaces();
+      port = 
RemoteInterpreterUtils.findRandomAvailablePortOnAllLocalInterfaces(portRange);
       logger.info("Choose port {} for RemoteInterpreterProcess", port);
+      callbackHost = RemoteInterpreterUtils.findAvailableHostAddress();
+      callbackPort = 
RemoteInterpreterUtils.findRandomAvailablePortOnAllLocalInterfaces();
     } catch (IOException e1) {
       throw new InterpreterException(e1);
     }
 
+    logger.info("Thrift server for callback will start. Port: {}", 
callbackPort);
+    try {
+      callbackServer = new TThreadPoolServer(
+        new TThreadPoolServer.Args(new TServerSocket(callbackPort)).processor(
+          new RemoteInterpreterCallbackService.Processor<>(
+            new RemoteInterpreterCallbackService.Iface() {
+              @Override
+              public void callback(CallbackInfo callbackInfo) throws 
TException {
+                logger.info("Registered: {}", callbackInfo);
+                host = callbackInfo.getHost();
+                port = callbackInfo.getPort();
+                running.set(true);
+                synchronized (running) {
+                  running.notify();
+                }
+              }
+            })));
+      // Start thrift server to receive callbackInfo from 
RemoteInterpreterServer;
+      new Thread(new Runnable() {
+        @Override
+        public void run() {
+          callbackServer.serve();
+        }
+      }).start();
+
+      Runtime.getRuntime().addShutdownHook(new Thread(new Runnable() {
+        @Override
+        public void run() {
+          if (callbackServer.isServing()) {
+            callbackServer.stop();
+          }
+        }
+      }));
+
+      while (!callbackServer.isServing()) {
+        logger.debug("callbackServer is not serving");
+        Thread.sleep(500);
+      }
+      logger.debug("callbackServer is serving now");
+    } catch (TTransportException e) {
+      logger.error("callback server error.", e);
+    } catch (InterruptedException e) {
+      logger.warn("", e);
+    }
+
     CommandLine cmdLine = CommandLine.parse(interpreterRunner);
     cmdLine.addArgument("-d", false);
     cmdLine.addArgument(interpreterDir, false);
+    cmdLine.addArgument("-c", false);
+    cmdLine.addArgument(callbackHost, false);
     cmdLine.addArgument("-p", false);
-    cmdLine.addArgument(Integer.toString(port), false);
+    cmdLine.addArgument(Integer.toString(callbackPort), false);
     if (isUserImpersonate && !userName.equals("anonymous")) {
       cmdLine.addArgument("-u", false);
       cmdLine.addArgument(userName, false);
@@ -133,45 +204,31 @@ public class RemoteInterpreterManagedProcess extends 
RemoteInterpreterProcess
 
       logger.info("Run interpreter process {}", cmdLine);
       executor.execute(cmdLine, procEnv, this);
-      running = true;
     } catch (IOException e) {
-      running = false;
+      running.set(false);
       throw new InterpreterException(e);
     }
 
-
-    long startTime = System.currentTimeMillis();
-    while (System.currentTimeMillis() - startTime < getConnectTimeout()) {
-      if (!running) {
-        try {
-          cmdOut.flush();
-        } catch (IOException e) {
-          // nothing to do
+    try {
+      synchronized (running) {
+        if (!running.get()) {
+          running.wait(getConnectTimeout() * 2);
         }
-        throw new InterpreterException(new String(cmdOut.toByteArray()));
       }
-
-      try {
-        if 
(RemoteInterpreterUtils.checkIfRemoteEndpointAccessible("localhost", port)) {
-          break;
-        } else {
-          try {
-            Thread.sleep(500);
-          } catch (InterruptedException e) {
-            logger.error("Exception in RemoteInterpreterProcess while 
synchronized reference " +
-                    "Thread.sleep", e);
-          }
-        }
-      } catch (Exception e) {
-        if (logger.isDebugEnabled()) {
-          logger.debug("Remote interpreter not yet accessible at localhost:" + 
port);
-        }
+      if (!running.get()) {
+        callbackServer.stop();
+        throw new InterpreterException("Cannot run interpreter");
       }
+    } catch (InterruptedException e) {
+      logger.error("Remote interpreter is not accessible");
     }
     processOutput.setOutputStream(null);
   }
 
   public void stop() {
+    if (callbackServer.isServing()) {
+      callbackServer.stop();
+    }
     if (isRunning()) {
       logger.info("kill interpreter process");
       try {
@@ -190,25 +247,25 @@ public class RemoteInterpreterManagedProcess extends 
RemoteInterpreterProcess
 
     executor = null;
     watchdog = null;
-    running = false;
+    running.set(false);
     logger.info("Remote process terminated");
   }
 
   @Override
   public void onProcessComplete(int exitValue) {
     logger.info("Interpreter process exited {}", exitValue);
-    running = false;
+    running.set(false);
 
   }
 
   @Override
   public void onProcessFailed(ExecuteException e) {
     logger.info("Interpreter process failed {}", e);
-    running = false;
+    running.set(false);
   }
 
   public boolean isRunning() {
-    return running;
+    return running.get();
   }
 
   private static class ProcessLogOutputStream extends LogOutputStream {

http://git-wip-us.apache.org/repos/asf/zeppelin/blob/1812928b/zeppelin-zengine/src/test/java/org/apache/zeppelin/notebook/NotebookTest.java
----------------------------------------------------------------------
diff --git 
a/zeppelin-zengine/src/test/java/org/apache/zeppelin/notebook/NotebookTest.java 
b/zeppelin-zengine/src/test/java/org/apache/zeppelin/notebook/NotebookTest.java
index f4b8c32..f044fbd 100644
--- 
a/zeppelin-zengine/src/test/java/org/apache/zeppelin/notebook/NotebookTest.java
+++ 
b/zeppelin-zengine/src/test/java/org/apache/zeppelin/notebook/NotebookTest.java
@@ -357,16 +357,16 @@ public class NotebookTest extends AbstractInterpreterTest 
implements JobListener
     config.put("cron", "* * * * * ?");
     note.setConfig(config);
     notebook.refreshCron(note.getId());
-    Thread.sleep(1 * 1000);
+    Thread.sleep(2 * 1000);
 
     // remove cron scheduler.
     config.put("cron", null);
     note.setConfig(config);
     notebook.refreshCron(note.getId());
-    Thread.sleep(1000);
+    Thread.sleep(2 * 1000);
     dateFinished = p.getDateFinished();
     assertNotNull(dateFinished);
-    Thread.sleep(1 * 1000);
+    Thread.sleep(2 * 1000);
     assertEquals(dateFinished, p.getDateFinished());
     notebook.removeNote(note.getId(), anonymous);
   }

Reply via email to