http://git-wip-us.apache.org/repos/asf/incubator-livy/blob/412ccc8f/rsc/src/main/java/org/apache/livy/rsc/BaseProtocol.java
----------------------------------------------------------------------
diff --git a/rsc/src/main/java/org/apache/livy/rsc/BaseProtocol.java 
b/rsc/src/main/java/org/apache/livy/rsc/BaseProtocol.java
new file mode 100644
index 0000000..c25e98f
--- /dev/null
+++ b/rsc/src/main/java/org/apache/livy/rsc/BaseProtocol.java
@@ -0,0 +1,240 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.livy.rsc;
+
+import org.apache.livy.Job;
+import org.apache.livy.rsc.rpc.RpcDispatcher;
+
+public abstract class BaseProtocol extends RpcDispatcher {
+
+  protected static class CancelJob {
+
+    public final String id;
+
+    CancelJob(String id) {
+      this.id = id;
+    }
+
+    CancelJob() {
+      this(null);
+    }
+
+  }
+
+  protected static class EndSession {
+
+  }
+
+  protected static class Error {
+
+    public final String cause;
+
+    public Error(Throwable cause) {
+      if (cause == null) {
+        this.cause = "";
+      } else {
+        this.cause = Utils.stackTraceAsString(cause);
+      }
+    }
+
+    public Error() {
+      this(null);
+    }
+
+  }
+
+  public static class BypassJobRequest {
+
+    public final String id;
+    public final byte[] serializedJob;
+    public final boolean synchronous;
+
+    public BypassJobRequest(String id, byte[] serializedJob, boolean 
synchronous) {
+      this.id = id;
+      this.serializedJob = serializedJob;
+      this.synchronous = synchronous;
+    }
+
+    public BypassJobRequest() {
+      this(null, null, false);
+    }
+
+  }
+
+  protected static class GetBypassJobStatus {
+
+    public final String id;
+
+    public GetBypassJobStatus(String id) {
+      this.id = id;
+    }
+
+    public GetBypassJobStatus() {
+      this(null);
+    }
+
+  }
+
+  protected static class JobRequest<T> {
+
+    public final String id;
+    public final Job<T> job;
+
+    public JobRequest(String id, Job<T> job) {
+      this.id = id;
+      this.job = job;
+    }
+
+    public JobRequest() {
+      this(null, null);
+    }
+
+  }
+
+  protected static class JobResult<T> {
+
+    public final String id;
+    public final T result;
+    public final String error;
+
+    public JobResult(String id, T result, Throwable error) {
+      this.id = id;
+      this.result = result;
+      this.error = error != null ? Utils.stackTraceAsString(error) : null;
+    }
+
+    public JobResult() {
+      this(null, null, null);
+    }
+
+  }
+
+  protected static class JobStarted {
+
+    public final String id;
+
+    public JobStarted(String id) {
+      this.id = id;
+    }
+
+    public JobStarted() {
+      this(null);
+    }
+
+  }
+
+  protected static class SyncJobRequest<T> {
+
+    public final Job<T> job;
+
+    public SyncJobRequest(Job<T> job) {
+      this.job = job;
+    }
+
+    public SyncJobRequest() {
+      this(null);
+    }
+
+  }
+
+  public static class RemoteDriverAddress {
+
+    public final String host;
+    public final int port;
+
+    public RemoteDriverAddress(String host, int port) {
+      this.host = host;
+      this.port = port;
+    }
+
+    public RemoteDriverAddress() {
+      this(null, -1);
+    }
+
+  }
+
+  public static class ReplJobRequest {
+
+    public final String code;
+
+    public ReplJobRequest(String code) {
+      this.code = code;
+    }
+
+    public ReplJobRequest() {
+      this(null);
+    }
+  }
+
+  public static class GetReplJobResults {
+    public boolean allResults;
+    public Integer from, size;
+
+    public GetReplJobResults(Integer from, Integer size) {
+      this.allResults = false;
+      this.from = from;
+      this.size = size;
+    }
+
+    public GetReplJobResults() {
+      this.allResults = true;
+      from = null;
+      size = null;
+    }
+  }
+
+  protected static class ReplState {
+
+    public final String state;
+
+    public ReplState(String state) {
+      this.state = state;
+    }
+
+    public ReplState() {
+      this(null);
+    }
+  }
+
+  public static class CancelReplJobRequest {
+    public final int id;
+
+    public CancelReplJobRequest(int id) {
+      this.id = id;
+    }
+
+    public CancelReplJobRequest() {
+      this(-1);
+    }
+  }
+
+  public static class InitializationError {
+
+    public final String stackTrace;
+
+    public InitializationError(String stackTrace) {
+      this.stackTrace = stackTrace;
+    }
+
+    public InitializationError() {
+      this(null);
+    }
+
+  }
+
+}

http://git-wip-us.apache.org/repos/asf/incubator-livy/blob/412ccc8f/rsc/src/main/java/org/apache/livy/rsc/BypassJobStatus.java
----------------------------------------------------------------------
diff --git a/rsc/src/main/java/org/apache/livy/rsc/BypassJobStatus.java 
b/rsc/src/main/java/org/apache/livy/rsc/BypassJobStatus.java
new file mode 100644
index 0000000..f62c14d
--- /dev/null
+++ b/rsc/src/main/java/org/apache/livy/rsc/BypassJobStatus.java
@@ -0,0 +1,38 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.livy.rsc;
+
+import org.apache.livy.JobHandle;
+
+public class BypassJobStatus {
+
+  public final JobHandle.State state;
+  public final byte[] result;
+  public final String error;
+
+  public BypassJobStatus(JobHandle.State state, byte[] result, String error) {
+    this.state = state;
+    this.result = result;
+    this.error = error;
+  }
+
+  BypassJobStatus() {
+    this(null, null, null);
+  }
+
+}

http://git-wip-us.apache.org/repos/asf/incubator-livy/blob/412ccc8f/rsc/src/main/java/org/apache/livy/rsc/ContextInfo.java
----------------------------------------------------------------------
diff --git a/rsc/src/main/java/org/apache/livy/rsc/ContextInfo.java 
b/rsc/src/main/java/org/apache/livy/rsc/ContextInfo.java
new file mode 100644
index 0000000..96f69a4
--- /dev/null
+++ b/rsc/src/main/java/org/apache/livy/rsc/ContextInfo.java
@@ -0,0 +1,37 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.livy.rsc;
+
+/**
+ * Information about a running RSC instance.
+ */
+class ContextInfo {
+
+  final String remoteAddress;
+  final int remotePort;
+  final String clientId;
+  final String secret;
+
+  ContextInfo(String remoteAddress, int remotePort, String clientId, String 
secret) {
+    this.remoteAddress = remoteAddress;
+    this.remotePort = remotePort;
+    this.clientId = clientId;
+    this.secret = secret;
+  }
+
+}

http://git-wip-us.apache.org/repos/asf/incubator-livy/blob/412ccc8f/rsc/src/main/java/org/apache/livy/rsc/ContextLauncher.java
----------------------------------------------------------------------
diff --git a/rsc/src/main/java/org/apache/livy/rsc/ContextLauncher.java 
b/rsc/src/main/java/org/apache/livy/rsc/ContextLauncher.java
new file mode 100644
index 0000000..8f46c1e
--- /dev/null
+++ b/rsc/src/main/java/org/apache/livy/rsc/ContextLauncher.java
@@ -0,0 +1,457 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.livy.rsc;
+
+import java.io.BufferedReader;
+import java.io.File;
+import java.io.FileInputStream;
+import java.io.FileOutputStream;
+import java.io.IOException;
+import java.io.InputStream;
+import java.io.InputStreamReader;
+import java.io.OutputStreamWriter;
+import java.io.Reader;
+import java.io.Writer;
+import java.nio.file.Files;
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.EnumSet;
+import java.util.List;
+import java.util.Map;
+import java.util.Properties;
+import java.util.UUID;
+import java.util.concurrent.ScheduledFuture;
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.TimeoutException;
+import java.util.concurrent.atomic.AtomicInteger;
+import static java.nio.charset.StandardCharsets.UTF_8;
+import static java.nio.file.attribute.PosixFilePermission.*;
+
+import io.netty.channel.ChannelHandlerContext;
+import io.netty.util.concurrent.Promise;
+import org.apache.spark.launcher.SparkLauncher;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import org.apache.livy.client.common.TestUtils;
+import org.apache.livy.rsc.driver.RSCDriverBootstrapper;
+import org.apache.livy.rsc.rpc.Rpc;
+import org.apache.livy.rsc.rpc.RpcDispatcher;
+import org.apache.livy.rsc.rpc.RpcServer;
+
+import static org.apache.livy.rsc.RSCConf.Entry.*;
+
+/**
+ * Encapsulates code needed to launch a new Spark context and collect 
information about how
+ * to establish a client connection to it.
+ */
+class ContextLauncher {
+
+  private static final Logger LOG = 
LoggerFactory.getLogger(ContextLauncher.class);
+  private static final AtomicInteger CHILD_IDS = new AtomicInteger();
+
+  private static final String SPARK_DEPLOY_MODE = "spark.submit.deployMode";
+  private static final String SPARK_JARS_KEY = "spark.jars";
+  private static final String SPARK_ARCHIVES_KEY = "spark.yarn.dist.archives";
+  private static final String SPARK_HOME_ENV = "SPARK_HOME";
+
+  static DriverProcessInfo create(RSCClientFactory factory, RSCConf conf)
+      throws IOException {
+    ContextLauncher launcher = new ContextLauncher(factory, conf);
+    return new DriverProcessInfo(launcher.promise, launcher.child.child);
+  }
+
+  private final Promise<ContextInfo> promise;
+  private final ScheduledFuture<?> timeout;
+  private final String clientId;
+  private final String secret;
+  private final ChildProcess child;
+  private final RSCConf conf;
+  private final RSCClientFactory factory;
+
+  private ContextLauncher(RSCClientFactory factory, RSCConf conf) throws 
IOException {
+    this.promise = factory.getServer().getEventLoopGroup().next().newPromise();
+    this.clientId = UUID.randomUUID().toString();
+    this.secret = factory.getServer().createSecret();
+    this.conf = conf;
+    this.factory = factory;
+
+    final RegistrationHandler handler = new RegistrationHandler();
+    try {
+      factory.getServer().registerClient(clientId, secret, handler);
+      String replMode = conf.get("repl");
+      boolean repl = replMode != null && replMode.equals("true");
+
+      conf.set(LAUNCHER_ADDRESS, factory.getServer().getAddress());
+      conf.set(LAUNCHER_PORT, factory.getServer().getPort());
+      conf.set(CLIENT_ID, clientId);
+      conf.set(CLIENT_SECRET, secret);
+
+      Utils.addListener(promise, new FutureListener<ContextInfo>() {
+        @Override
+        public void onFailure(Throwable error) throws Exception {
+          // If promise is cancelled or failed, make sure spark-submit is not 
leaked.
+          if (child != null) {
+            child.kill();
+          }
+        }
+      });
+
+      this.child = startDriver(conf, promise);
+
+      // Set up a timeout to fail the promise if we don't hear back from the 
context
+      // after a configurable timeout.
+      Runnable timeoutTask = new Runnable() {
+        @Override
+        public void run() {
+          connectTimeout(handler);
+        }
+      };
+      this.timeout = 
factory.getServer().getEventLoopGroup().schedule(timeoutTask,
+        conf.getTimeAsMs(RPC_CLIENT_HANDSHAKE_TIMEOUT), TimeUnit.MILLISECONDS);
+    } catch (Exception e) {
+      dispose(true);
+      throw Utils.propagate(e);
+    }
+  }
+
+  private void connectTimeout(RegistrationHandler handler) {
+    if (promise.tryFailure(new TimeoutException("Timed out waiting for context 
to start."))) {
+      handler.dispose();
+    }
+    dispose(true);
+  }
+
+  private void dispose(boolean forceKill) {
+    factory.getServer().unregisterClient(clientId);
+    try {
+      if (child != null) {
+        if (forceKill) {
+          child.kill();
+        } else {
+          child.detach();
+        }
+      }
+    } finally {
+      factory.unref();
+    }
+  }
+
+  private static ChildProcess startDriver(final RSCConf conf, Promise<?> 
promise)
+      throws IOException {
+    String livyJars = conf.get(LIVY_JARS);
+    if (livyJars == null) {
+      String livyHome = System.getenv("LIVY_HOME");
+      Utils.checkState(livyHome != null,
+        "Need one of LIVY_HOME or %s set.", LIVY_JARS.key());
+      File rscJars = new File(livyHome, "rsc-jars");
+      if (!rscJars.isDirectory()) {
+        rscJars = new File(livyHome, "rsc/target/jars");
+      }
+      Utils.checkState(rscJars.isDirectory(),
+        "Cannot find 'client-jars' directory under LIVY_HOME.");
+      List<String> jars = new ArrayList<>();
+      for (File f : rscJars.listFiles()) {
+         jars.add(f.getAbsolutePath());
+      }
+      livyJars = Utils.join(jars, ",");
+    }
+    merge(conf, SPARK_JARS_KEY, livyJars, ",");
+
+    String kind = conf.get(SESSION_KIND);
+    if ("sparkr".equals(kind)) {
+      merge(conf, SPARK_ARCHIVES_KEY, conf.get(RSCConf.Entry.SPARKR_PACKAGE), 
",");
+    } else if ("pyspark".equals(kind)) {
+      merge(conf, "spark.submit.pyFiles", 
conf.get(RSCConf.Entry.PYSPARK_ARCHIVES), ",");
+    }
+
+    // Disable multiple attempts since the RPC server doesn't yet support 
multiple
+    // connections for the same registered app.
+    conf.set("spark.yarn.maxAppAttempts", "1");
+
+    // Let the launcher go away when launcher in yarn cluster mode. This 
avoids keeping lots
+    // of "small" Java processes lingering on the Livy server node.
+    conf.set("spark.yarn.submit.waitAppCompletion", "false");
+
+    if (!conf.getBoolean(CLIENT_IN_PROCESS) &&
+        // For tests which doesn't shutdown RscDriver gracefully, JaCoCo exec 
isn't dumped properly.
+        // Disable JaCoCo for this case.
+        !conf.getBoolean(TEST_STUCK_END_SESSION)) {
+      // For testing; propagate jacoco settings so that we also do coverage 
analysis
+      // on the launched driver. We replace the name of the main file 
("main.exec")
+      // so that we don't end up fighting with the main test launcher.
+      String jacocoArgs = TestUtils.getJacocoArgs();
+      if (jacocoArgs != null) {
+        merge(conf, SparkLauncher.DRIVER_EXTRA_JAVA_OPTIONS, jacocoArgs, " ");
+      }
+    }
+
+    final File confFile = writeConfToFile(conf);
+
+    if (ContextLauncher.mockSparkSubmit != null) {
+      LOG.warn("!!!! Using mock spark-submit. !!!!");
+      return new ChildProcess(conf, promise, ContextLauncher.mockSparkSubmit, 
confFile);
+    } else if (conf.getBoolean(CLIENT_IN_PROCESS)) {
+      // Mostly for testing things quickly. Do not do this in production.
+      LOG.warn("!!!! Running remote driver in-process. !!!!");
+      Runnable child = new Runnable() {
+        @Override
+        public void run() {
+          try {
+            RSCDriverBootstrapper.main(new String[] { 
confFile.getAbsolutePath() });
+          } catch (Exception e) {
+            throw Utils.propagate(e);
+          }
+        }
+      };
+      return new ChildProcess(conf, promise, child, confFile);
+    } else {
+      final SparkLauncher launcher = new SparkLauncher();
+
+      // Spark 1.x does not support specifying deploy mode in conf and needs 
special handling.
+      String deployMode = conf.get(SPARK_DEPLOY_MODE);
+      if (deployMode != null) {
+        launcher.setDeployMode(deployMode);
+      }
+
+      launcher.setSparkHome(System.getenv(SPARK_HOME_ENV));
+      launcher.setAppResource("spark-internal");
+      launcher.setPropertiesFile(confFile.getAbsolutePath());
+      launcher.setMainClass(RSCDriverBootstrapper.class.getName());
+
+      if (conf.get(PROXY_USER) != null) {
+        launcher.addSparkArg("--proxy-user", conf.get(PROXY_USER));
+      }
+
+      return new ChildProcess(conf, promise, launcher.launch(), confFile);
+    }
+  }
+
+  private static void merge(RSCConf conf, String key, String livyConf, String 
sep) {
+    String confValue = Utils.join(Arrays.asList(livyConf, conf.get(key)), sep);
+    conf.set(key, confValue);
+  }
+
+  /**
+   * Write the configuration to a file readable only by the process's owner. 
Livy properties
+   * are written with an added prefix so that they can be loaded using 
SparkConf on the driver
+   * side.
+   *
+   * The default Spark configuration (from either SPARK_HOME or 
SPARK_CONF_DIR) is merged into
+   * the user configuration, so that defaults set by Livy's admin take effect 
when not overridden
+   * by the user.
+   */
+  private static File writeConfToFile(RSCConf conf) throws IOException {
+    Properties confView = new Properties();
+    for (Map.Entry<String, String> e : conf) {
+      String key = e.getKey();
+      if (!key.startsWith(RSCConf.SPARK_CONF_PREFIX)) {
+        key = RSCConf.LIVY_SPARK_PREFIX + key;
+      }
+      confView.setProperty(key, e.getValue());
+    }
+
+    // Load the default Spark configuration.
+    String confDir = System.getenv("SPARK_CONF_DIR");
+    if (confDir == null && System.getenv(SPARK_HOME_ENV) != null) {
+      confDir = System.getenv(SPARK_HOME_ENV) + File.separator + "conf";
+    }
+
+    if (confDir != null) {
+      File sparkDefaults = new File(confDir + File.separator + 
"spark-defaults.conf");
+      if (sparkDefaults.isFile()) {
+        Properties sparkConf = new Properties();
+        Reader r = new InputStreamReader(new FileInputStream(sparkDefaults), 
UTF_8);
+        try {
+          sparkConf.load(r);
+        } finally {
+          r.close();
+        }
+
+        for (String key : sparkConf.stringPropertyNames()) {
+          if (!confView.containsKey(key)) {
+            confView.put(key, sparkConf.getProperty(key));
+          }
+        }
+      }
+    }
+
+    File file = File.createTempFile("livyConf", ".properties");
+    Files.setPosixFilePermissions(file.toPath(), EnumSet.of(OWNER_READ, 
OWNER_WRITE));
+    //file.deleteOnExit();
+
+    Writer writer = new OutputStreamWriter(new FileOutputStream(file), UTF_8);
+    try {
+      confView.store(writer, "Livy App Context Configuration");
+    } finally {
+      writer.close();
+    }
+
+    return file;
+  }
+
+
+  private class RegistrationHandler extends BaseProtocol
+    implements RpcServer.ClientCallback {
+
+    volatile RemoteDriverAddress driverAddress;
+
+    private Rpc client;
+
+    @Override
+    public RpcDispatcher onNewClient(Rpc client) {
+      LOG.debug("New RPC client connected from {}.", client.getChannel());
+      this.client = client;
+      return this;
+    }
+
+    @Override
+    public void onSaslComplete(Rpc client) {
+    }
+
+    void dispose() {
+      if (client != null) {
+        client.close();
+      }
+    }
+
+    private void handle(ChannelHandlerContext ctx, RemoteDriverAddress msg) {
+      ContextInfo info = new ContextInfo(msg.host, msg.port, clientId, secret);
+      if (promise.trySuccess(info)) {
+        timeout.cancel(true);
+        LOG.debug("Received driver info for client {}: {}/{}.", 
client.getChannel(),
+          msg.host, msg.port);
+      } else {
+        LOG.warn("Connection established but promise is already finalized.");
+      }
+
+      ctx.executor().submit(new Runnable() {
+        @Override
+        public void run() {
+          dispose();
+          ContextLauncher.this.dispose(false);
+        }
+      });
+    }
+
+  }
+
+  private static class ChildProcess {
+
+    private final RSCConf conf;
+    private final Promise<?> promise;
+    private final Process child;
+    private final Thread monitor;
+    private final File confFile;
+
+    public ChildProcess(RSCConf conf, Promise<?> promise, Runnable child, File 
confFile) {
+      this.conf = conf;
+      this.promise = promise;
+      this.monitor = monitor(child, CHILD_IDS.incrementAndGet());
+      this.child = null;
+      this.confFile = confFile;
+    }
+
+    public ChildProcess(RSCConf conf, Promise<?> promise, final Process 
childProc, File confFile) {
+      int childId = CHILD_IDS.incrementAndGet();
+      this.conf = conf;
+      this.promise = promise;
+      this.child = childProc;
+      this.confFile = confFile;
+
+      Runnable monitorTask = new Runnable() {
+        @Override
+        public void run() {
+          try {
+            int exitCode = child.waitFor();
+            if (exitCode != 0) {
+              LOG.warn("Child process exited with code {}.", exitCode);
+              fail(new IOException(String.format("Child process exited with 
code %d.", exitCode)));
+            }
+          } catch (InterruptedException ie) {
+            LOG.warn("Waiting thread interrupted, killing child process.");
+            Thread.interrupted();
+            child.destroy();
+          } catch (Exception e) {
+            LOG.warn("Exception while waiting for child process.", e);
+          }
+        }
+      };
+      this.monitor = monitor(monitorTask, childId);
+    }
+
+    private void fail(Throwable error) {
+      promise.tryFailure(error);
+    }
+
+    public void kill() {
+      if (child != null) {
+        child.destroy();
+      }
+      monitor.interrupt();
+      detach();
+
+      if (!monitor.isAlive()) {
+        return;
+      }
+
+      // Last ditch effort.
+      if (monitor.isAlive()) {
+        LOG.warn("Timed out shutting down remote driver, interrupting...");
+        monitor.interrupt();
+      }
+    }
+
+    public void detach() {
+      try {
+        monitor.join(conf.getTimeAsMs(CLIENT_SHUTDOWN_TIMEOUT));
+      } catch (InterruptedException ie) {
+        LOG.debug("Interrupted before driver thread was finished.");
+      }
+    }
+
+    private Thread monitor(final Runnable task, int childId) {
+      Runnable wrappedTask = new Runnable() {
+        @Override
+        public void run() {
+          try {
+            task.run();
+          } finally {
+            confFile.delete();
+          }
+        }
+      };
+      Thread thread = new Thread(wrappedTask);
+      thread.setDaemon(true);
+      thread.setName("ContextLauncher-" + childId);
+      thread.setUncaughtExceptionHandler(new Thread.UncaughtExceptionHandler() 
{
+        @Override
+        public void uncaughtException(Thread t, Throwable e) {
+          LOG.warn("Child task threw exception.", e);
+          fail(e);
+        }
+      });
+      thread.start();
+      return thread;
+    }
+  }
+
+  // Just for testing.
+  static Process mockSparkSubmit;
+
+}

http://git-wip-us.apache.org/repos/asf/incubator-livy/blob/412ccc8f/rsc/src/main/java/org/apache/livy/rsc/DriverProcessInfo.java
----------------------------------------------------------------------
diff --git a/rsc/src/main/java/org/apache/livy/rsc/DriverProcessInfo.java 
b/rsc/src/main/java/org/apache/livy/rsc/DriverProcessInfo.java
new file mode 100644
index 0000000..ddc991c
--- /dev/null
+++ b/rsc/src/main/java/org/apache/livy/rsc/DriverProcessInfo.java
@@ -0,0 +1,42 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.livy.rsc;
+
+import io.netty.util.concurrent.Promise;
+
+/**
+ * Information about driver process and @{@link ContextInfo}
+ */
+public class DriverProcessInfo {
+
+  private Promise<ContextInfo> contextInfo;
+  private transient Process driverProcess;
+
+  public DriverProcessInfo(Promise<ContextInfo> contextInfo, Process 
driverProcess) {
+    this.contextInfo = contextInfo;
+    this.driverProcess = driverProcess;
+  }
+
+  public Promise<ContextInfo> getContextInfo() {
+    return contextInfo;
+  }
+
+  public Process getDriverProcess() {
+    return driverProcess;
+  }
+}

http://git-wip-us.apache.org/repos/asf/incubator-livy/blob/412ccc8f/rsc/src/main/java/org/apache/livy/rsc/FutureListener.java
----------------------------------------------------------------------
diff --git a/rsc/src/main/java/org/apache/livy/rsc/FutureListener.java 
b/rsc/src/main/java/org/apache/livy/rsc/FutureListener.java
new file mode 100644
index 0000000..9b99eae
--- /dev/null
+++ b/rsc/src/main/java/org/apache/livy/rsc/FutureListener.java
@@ -0,0 +1,27 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.livy.rsc;
+
+/** A simplified future listener for netty futures. See Utils.addListener(). */
+public abstract class FutureListener<T> {
+
+  public void onSuccess(T result) throws Exception { }
+
+  public void onFailure(Throwable error) throws Exception { }
+
+}

http://git-wip-us.apache.org/repos/asf/incubator-livy/blob/412ccc8f/rsc/src/main/java/org/apache/livy/rsc/JobHandleImpl.java
----------------------------------------------------------------------
diff --git a/rsc/src/main/java/org/apache/livy/rsc/JobHandleImpl.java 
b/rsc/src/main/java/org/apache/livy/rsc/JobHandleImpl.java
new file mode 100644
index 0000000..0fc4ba2
--- /dev/null
+++ b/rsc/src/main/java/org/apache/livy/rsc/JobHandleImpl.java
@@ -0,0 +1,106 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.livy.rsc;
+
+import java.util.List;
+import java.util.concurrent.CopyOnWriteArrayList;
+import java.util.concurrent.ExecutionException;
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.TimeoutException;
+
+import io.netty.util.concurrent.Promise;
+
+import org.apache.livy.JobHandle;
+import org.apache.livy.client.common.AbstractJobHandle;
+
+/**
+ * A handle to a submitted job. Allows for monitoring and controlling of the 
running remote job.
+ */
+class JobHandleImpl<T> extends AbstractJobHandle<T> {
+
+  private final RSCClient client;
+  private final String jobId;
+  private final Promise<T> promise;
+  private volatile State state;
+
+  JobHandleImpl(RSCClient client, Promise<T> promise, String jobId) {
+    this.client = client;
+    this.jobId = jobId;
+    this.promise = promise;
+  }
+
+  /** Requests a running job to be cancelled. */
+  @Override
+  public boolean cancel(boolean mayInterrupt) {
+    if (changeState(State.CANCELLED)) {
+      client.cancel(jobId);
+      promise.cancel(mayInterrupt);
+      return true;
+    }
+    return false;
+  }
+
+  @Override
+  public T get() throws ExecutionException, InterruptedException {
+    return promise.get();
+  }
+
+  @Override
+  public T get(long timeout, TimeUnit unit)
+      throws ExecutionException, InterruptedException, TimeoutException {
+    return promise.get(timeout, unit);
+  }
+
+  @Override
+  public boolean isCancelled() {
+    return promise.isCancelled();
+  }
+
+  @Override
+  public boolean isDone() {
+    return promise.isDone();
+  }
+
+  @Override
+  protected T result() {
+    return promise.getNow();
+  }
+
+  @Override
+  protected Throwable error() {
+    return promise.cause();
+  }
+
+  @SuppressWarnings("unchecked")
+  void setSuccess(Object result) {
+    // The synchronization here is not necessary, but tests depend on it.
+    synchronized (listeners) {
+      promise.setSuccess((T) result);
+      changeState(State.SUCCEEDED);
+    }
+  }
+
+  void setFailure(Throwable error) {
+    // The synchronization here is not necessary, but tests depend on it.
+    synchronized (listeners) {
+      promise.setFailure(error);
+      changeState(State.FAILED);
+    }
+  }
+
+}

http://git-wip-us.apache.org/repos/asf/incubator-livy/blob/412ccc8f/rsc/src/main/java/org/apache/livy/rsc/PingJob.java
----------------------------------------------------------------------
diff --git a/rsc/src/main/java/org/apache/livy/rsc/PingJob.java 
b/rsc/src/main/java/org/apache/livy/rsc/PingJob.java
new file mode 100644
index 0000000..221f57f
--- /dev/null
+++ b/rsc/src/main/java/org/apache/livy/rsc/PingJob.java
@@ -0,0 +1,31 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.livy.rsc;
+
+import org.apache.livy.Job;
+import org.apache.livy.JobContext;
+
+/** A job that can be used to check for liveness of the remote context. */
+public class PingJob implements Job<Void> {
+
+  @Override
+  public Void call(JobContext jc) {
+    return null;
+  }
+
+}

http://git-wip-us.apache.org/repos/asf/incubator-livy/blob/412ccc8f/rsc/src/main/java/org/apache/livy/rsc/RSCClient.java
----------------------------------------------------------------------
diff --git a/rsc/src/main/java/org/apache/livy/rsc/RSCClient.java 
b/rsc/src/main/java/org/apache/livy/rsc/RSCClient.java
new file mode 100644
index 0000000..1b38467
--- /dev/null
+++ b/rsc/src/main/java/org/apache/livy/rsc/RSCClient.java
@@ -0,0 +1,410 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.livy.rsc;
+
+import java.io.File;
+import java.io.IOException;
+import java.net.URI;
+import java.nio.ByteBuffer;
+import java.util.Map;
+import java.util.UUID;
+import java.util.concurrent.ConcurrentHashMap;
+import java.util.concurrent.Future;
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.atomic.AtomicInteger;
+
+import io.netty.channel.ChannelHandlerContext;
+import io.netty.channel.EventLoopGroup;
+import io.netty.channel.nio.NioEventLoopGroup;
+import io.netty.util.concurrent.GenericFutureListener;
+import io.netty.util.concurrent.ImmediateEventExecutor;
+import io.netty.util.concurrent.Promise;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import org.apache.livy.Job;
+import org.apache.livy.JobHandle;
+import org.apache.livy.LivyClient;
+import org.apache.livy.client.common.BufferUtils;
+import org.apache.livy.rsc.driver.AddFileJob;
+import org.apache.livy.rsc.driver.AddJarJob;
+import org.apache.livy.rsc.rpc.Rpc;
+
+import static org.apache.livy.rsc.RSCConf.Entry.*;
+
+public class RSCClient implements LivyClient {
+  private static final Logger LOG = LoggerFactory.getLogger(RSCClient.class);
+  private static final AtomicInteger EXECUTOR_GROUP_ID = new AtomicInteger();
+
+  private final RSCConf conf;
+  private final Promise<ContextInfo> contextInfoPromise;
+  private final Map<String, JobHandleImpl<?>> jobs;
+  private final ClientProtocol protocol;
+  private final Promise<Rpc> driverRpc;
+  private final int executorGroupId;
+  private final EventLoopGroup eventLoopGroup;
+  private final Promise<URI> serverUriPromise;
+
+  private ContextInfo contextInfo;
+  private Process driverProcess;
+  private volatile boolean isAlive;
+  private volatile String replState;
+
+  RSCClient(RSCConf conf, Promise<ContextInfo> ctx, Process driverProcess) 
throws IOException {
+    this.conf = conf;
+    this.contextInfoPromise = ctx;
+    this.driverProcess = driverProcess;
+    this.jobs = new ConcurrentHashMap<>();
+    this.protocol = new ClientProtocol();
+    this.driverRpc = ImmediateEventExecutor.INSTANCE.newPromise();
+    this.executorGroupId = EXECUTOR_GROUP_ID.incrementAndGet();
+    this.eventLoopGroup = new NioEventLoopGroup(
+        conf.getInt(RPC_MAX_THREADS),
+        Utils.newDaemonThreadFactory("RSCClient-" + executorGroupId + "-%d"));
+    this.serverUriPromise = ImmediateEventExecutor.INSTANCE.newPromise();
+
+    Utils.addListener(this.contextInfoPromise, new 
FutureListener<ContextInfo>() {
+      @Override
+      public void onSuccess(ContextInfo info) throws Exception {
+        connectToContext(info);
+        String url = String.format("rsc://%s:%s@%s:%d",
+          info.clientId, info.secret, info.remoteAddress, info.remotePort);
+        serverUriPromise.setSuccess(URI.create(url));
+      }
+
+      @Override
+      public void onFailure(Throwable error) {
+        connectionError(error);
+        serverUriPromise.setFailure(error);
+      }
+    });
+
+    isAlive = true;
+  }
+
+  public boolean isAlive() {
+    return isAlive;
+  }
+
+  public Process getDriverProcess() {
+    return driverProcess;
+  }
+
+  private synchronized void connectToContext(final ContextInfo info) throws 
Exception {
+    this.contextInfo = info;
+
+    try {
+      Promise<Rpc> promise = Rpc.createClient(conf,
+        eventLoopGroup,
+        info.remoteAddress,
+        info.remotePort,
+        info.clientId,
+        info.secret,
+        protocol);
+      Utils.addListener(promise, new FutureListener<Rpc>() {
+        @Override
+        public void onSuccess(Rpc rpc) throws Exception {
+          driverRpc.setSuccess(rpc);
+          Utils.addListener(rpc.getChannel().closeFuture(), new 
FutureListener<Void>() {
+            @Override
+            public void onSuccess(Void unused) {
+              if (isAlive) {
+                LOG.warn("Client RPC channel closed unexpectedly.");
+                try {
+                  stop(false);
+                } catch (Exception e) { /* stop() itself prints warning. */ }
+              }
+            }
+          });
+          LOG.debug("Connected to context {} ({}, {}).", info.clientId,
+            rpc.getChannel(), executorGroupId);
+        }
+
+        @Override
+        public void onFailure(Throwable error) throws Exception {
+          driverRpc.setFailure(error);
+          connectionError(error);
+        }
+      });
+    } catch (Exception e) {
+      connectionError(e);
+    }
+  }
+
+  private void connectionError(Throwable error) {
+    LOG.error("Failed to connect to context.", error);
+    try {
+      stop(false);
+    } catch (Exception e) { /* stop() itself prints warning. */ }
+  }
+
+  private <T> io.netty.util.concurrent.Future<T> deferredCall(final Object msg,
+      final Class<T> retType) {
+    if (driverRpc.isSuccess()) {
+      try {
+        return driverRpc.get().call(msg, retType);
+      } catch (Exception ie) {
+        throw Utils.propagate(ie);
+      }
+    }
+
+    // No driver RPC yet, so install a listener and return a promise that will 
be ready when
+    // the driver is up and the message is actually delivered.
+    final Promise<T> promise = eventLoopGroup.next().newPromise();
+    final FutureListener<T> callListener = new FutureListener<T>() {
+      @Override
+      public void onSuccess(T value) throws Exception {
+        promise.setSuccess(value);
+      }
+
+      @Override
+      public void onFailure(Throwable error) throws Exception {
+        promise.setFailure(error);
+      }
+    };
+
+    Utils.addListener(driverRpc, new FutureListener<Rpc>() {
+      @Override
+      public void onSuccess(Rpc rpc) throws Exception {
+        Utils.addListener(rpc.call(msg, retType), callListener);
+      }
+
+      @Override
+      public void onFailure(Throwable error) throws Exception {
+        promise.setFailure(error);
+      }
+    });
+    return promise;
+  }
+
+  public Future<URI> getServerUri() {
+    return serverUriPromise;
+  }
+
+  @Override
+  public <T> JobHandle<T> submit(Job<T> job) {
+    return protocol.submit(job);
+  }
+
+  @Override
+  public <T> Future<T> run(Job<T> job) {
+    return protocol.run(job);
+  }
+
+  @Override
+  public synchronized void stop(boolean shutdownContext) {
+    if (isAlive) {
+      isAlive = false;
+      try {
+        this.contextInfoPromise.cancel(true);
+
+        if (shutdownContext && driverRpc.isSuccess()) {
+          protocol.endSession();
+
+          // Because the remote context won't really reply to the end session 
message -
+          // since it closes the channel while handling it, we wait for the 
RPC's channel
+          // to close instead.
+          long stopTimeout = conf.getTimeAsMs(CLIENT_SHUTDOWN_TIMEOUT);
+          driverRpc.get().getChannel().closeFuture().get(stopTimeout,
+            TimeUnit.MILLISECONDS);
+        }
+      } catch (Exception e) {
+        LOG.warn("Exception while waiting for end session reply.", e);
+        Utils.propagate(e);
+      } finally {
+        if (driverRpc.isSuccess()) {
+          try {
+            driverRpc.get().close();
+          } catch (Exception e) {
+            LOG.warn("Error stopping RPC.", e);
+          }
+        }
+
+        // Report failure for all pending jobs, so that clients can react.
+        for (Map.Entry<String, JobHandleImpl<?>> e : jobs.entrySet()) {
+          LOG.info("Failing pending job {} due to shutdown.", e.getKey());
+          e.getValue().setFailure(new IOException("RSCClient instance 
stopped."));
+        }
+
+        eventLoopGroup.shutdownGracefully();
+      }
+      if (contextInfo != null) {
+        LOG.debug("Disconnected from context {}, shutdown = {}.", 
contextInfo.clientId,
+          shutdownContext);
+      }
+    }
+  }
+
+  @Override
+  public Future<?> uploadJar(File jar) {
+    throw new UnsupportedOperationException("Use addJar to add the jar to the 
remote context!");
+  }
+
+  @Override
+  public Future<?> addJar(URI uri) {
+    return submit(new AddJarJob(uri.toString()));
+  }
+
+  @Override
+  public Future<?> uploadFile(File file) {
+    throw new UnsupportedOperationException("Use addFile to add the file to 
the remote context!");
+  }
+
+  @Override
+  public Future<?> addFile(URI uri) {
+    return submit(new AddFileJob(uri.toString()));
+  }
+
+  public String bypass(ByteBuffer serializedJob, boolean sync) {
+    return protocol.bypass(serializedJob, sync);
+  }
+
+  public Future<BypassJobStatus> getBypassJobStatus(String id) {
+    return protocol.getBypassJobStatus(id);
+  }
+
+  public void cancel(String jobId) {
+    protocol.cancel(jobId);
+  }
+
+  ContextInfo getContextInfo() {
+    return contextInfo;
+  }
+
+  public Future<Integer> submitReplCode(String code) throws Exception {
+    return deferredCall(new BaseProtocol.ReplJobRequest(code), Integer.class);
+  }
+
+  public void cancelReplCode(int statementId) throws Exception {
+    deferredCall(new BaseProtocol.CancelReplJobRequest(statementId), 
Void.class);
+  }
+
+  public Future<ReplJobResults> getReplJobResults(Integer from, Integer size) 
throws Exception {
+    return deferredCall(new BaseProtocol.GetReplJobResults(from, size), 
ReplJobResults.class);
+  }
+
+  public Future<ReplJobResults> getReplJobResults() throws Exception {
+    return deferredCall(new BaseProtocol.GetReplJobResults(), 
ReplJobResults.class);
+  }
+
+  /**
+   * @return Return the repl state. If this's not connected to a repl session, 
it will return null.
+   */
+  public String getReplState() {
+    return replState;
+  }
+
+  private class ClientProtocol extends BaseProtocol {
+
+    <T> JobHandleImpl<T> submit(Job<T> job) {
+      final String jobId = UUID.randomUUID().toString();
+      Object msg = new JobRequest<T>(jobId, job);
+
+      final Promise<T> promise = eventLoopGroup.next().newPromise();
+      final JobHandleImpl<T> handle = new JobHandleImpl<T>(RSCClient.this,
+        promise, jobId);
+      jobs.put(jobId, handle);
+
+      final io.netty.util.concurrent.Future<Void> rpc = deferredCall(msg, 
Void.class);
+      LOG.debug("Sending JobRequest[{}].", jobId);
+
+      Utils.addListener(rpc, new FutureListener<Void>() {
+        @Override
+        public void onSuccess(Void unused) throws Exception {
+          handle.changeState(JobHandle.State.QUEUED);
+        }
+
+        @Override
+        public void onFailure(Throwable error) throws Exception {
+          error.printStackTrace();
+          promise.tryFailure(error);
+        }
+      });
+      promise.addListener(new GenericFutureListener<Promise<T>>() {
+        @Override
+        public void operationComplete(Promise<T> p) {
+          if (jobId != null) {
+            jobs.remove(jobId);
+          }
+          if (p.isCancelled() && !rpc.isDone()) {
+            rpc.cancel(true);
+          }
+        }
+      });
+      return handle;
+    }
+
+    @SuppressWarnings("unchecked")
+    <T> Future<T> run(Job<T> job) {
+      return (Future<T>) deferredCall(new SyncJobRequest(job), Object.class);
+    }
+
+    String bypass(ByteBuffer serializedJob, boolean sync) {
+      String jobId = UUID.randomUUID().toString();
+      Object msg = new BypassJobRequest(jobId, 
BufferUtils.toByteArray(serializedJob), sync);
+      deferredCall(msg, Void.class);
+      return jobId;
+    }
+
+    Future<BypassJobStatus> getBypassJobStatus(String id) {
+      return deferredCall(new GetBypassJobStatus(id), BypassJobStatus.class);
+    }
+
+    void cancel(String jobId) {
+      deferredCall(new CancelJob(jobId), Void.class);
+    }
+
+    Future<?> endSession() {
+      return deferredCall(new EndSession(), Void.class);
+    }
+
+    private void handle(ChannelHandlerContext ctx, InitializationError msg) {
+      LOG.warn("Error reported from remote driver: %s", msg.stackTrace);
+    }
+
+    private void handle(ChannelHandlerContext ctx, JobResult msg) {
+      JobHandleImpl<?> handle = jobs.remove(msg.id);
+      if (handle != null) {
+        LOG.info("Received result for {}", msg.id);
+        // TODO: need a better exception for this.
+        Throwable error = msg.error != null ? new RuntimeException(msg.error) 
: null;
+        if (error == null) {
+          handle.setSuccess(msg.result);
+        } else {
+          handle.setFailure(error);
+        }
+      } else {
+        LOG.warn("Received result for unknown job {}", msg.id);
+      }
+    }
+
+    private void handle(ChannelHandlerContext ctx, JobStarted msg) {
+      JobHandleImpl<?> handle = jobs.get(msg.id);
+      if (handle != null) {
+        handle.changeState(JobHandle.State.STARTED);
+      } else {
+        LOG.warn("Received event for unknown job {}", msg.id);
+      }
+    }
+
+    private void handle(ChannelHandlerContext ctx, ReplState msg) {
+      LOG.trace("Received repl state for {}", msg.state);
+      replState = msg.state;
+    }
+  }
+}

http://git-wip-us.apache.org/repos/asf/incubator-livy/blob/412ccc8f/rsc/src/main/java/org/apache/livy/rsc/RSCClientFactory.java
----------------------------------------------------------------------
diff --git a/rsc/src/main/java/org/apache/livy/rsc/RSCClientFactory.java 
b/rsc/src/main/java/org/apache/livy/rsc/RSCClientFactory.java
new file mode 100644
index 0000000..c6327e2
--- /dev/null
+++ b/rsc/src/main/java/org/apache/livy/rsc/RSCClientFactory.java
@@ -0,0 +1,116 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.livy.rsc;
+
+import java.io.IOException;
+import java.net.URI;
+import java.util.Properties;
+import java.util.concurrent.atomic.AtomicInteger;
+
+import io.netty.util.concurrent.ImmediateEventExecutor;
+import io.netty.util.concurrent.Promise;
+
+import org.apache.livy.LivyClient;
+import org.apache.livy.LivyClientFactory;
+import org.apache.livy.rsc.rpc.RpcServer;
+
+/**
+ * Factory for RSC clients.
+ */
+public final class RSCClientFactory implements LivyClientFactory {
+
+  private final AtomicInteger refCount = new AtomicInteger();
+  private RpcServer server = null;
+
+  /**
+   * Creates a local Livy client if the URI has the "rsc" scheme.
+   * <p>
+   * If the URI contains user information, host and port, the library will try 
to connect to an
+   * existing RSC instance with the provided information, and most of the 
provided configuration
+   * will be ignored.
+   * <p>
+   * Otherwise, a new Spark context will be started with the given 
configuration.
+   */
+  @Override
+  public LivyClient createClient(URI uri, Properties config) {
+    if (!"rsc".equals(uri.getScheme())) {
+      return null;
+    }
+
+    RSCConf lconf = new RSCConf(config);
+
+    boolean needsServer = false;
+    try {
+      Promise<ContextInfo> info;
+      Process driverProcess = null;
+      if (uri.getUserInfo() != null && uri.getHost() != null && uri.getPort() 
> 0) {
+        info = createContextInfo(uri);
+      } else {
+        needsServer = true;
+        ref(lconf);
+        DriverProcessInfo processInfo = ContextLauncher.create(this, lconf);
+        info = processInfo.getContextInfo();
+        driverProcess = processInfo.getDriverProcess();
+      }
+      return new RSCClient(lconf, info, driverProcess);
+    } catch (Exception e) {
+      if (needsServer) {
+        unref();
+      }
+      throw Utils.propagate(e);
+    }
+  }
+
+  RpcServer getServer() {
+    return server;
+  }
+
+  private synchronized void ref(RSCConf config) throws IOException {
+    if (refCount.get() != 0) {
+      refCount.incrementAndGet();
+      return;
+    }
+
+    Utils.checkState(server == null, "Server already running but ref count is 
0.");
+    if (server == null) {
+      try {
+        server = new RpcServer(config);
+      } catch (InterruptedException ie) {
+        throw Utils.propagate(ie);
+      }
+    }
+
+    refCount.incrementAndGet();
+  }
+
+  synchronized void unref() {
+    if (refCount.decrementAndGet() == 0) {
+      server.close();
+      server = null;
+    }
+  }
+
+  private static Promise<ContextInfo> createContextInfo(final URI uri) {
+    String[] userInfo = uri.getUserInfo().split(":", 2);
+    ImmediateEventExecutor executor = ImmediateEventExecutor.INSTANCE;
+    Promise<ContextInfo> promise = executor.newPromise();
+    promise.setSuccess(new ContextInfo(uri.getHost(), uri.getPort(), 
userInfo[0], userInfo[1]));
+    return promise;
+  }
+
+}

http://git-wip-us.apache.org/repos/asf/incubator-livy/blob/412ccc8f/rsc/src/main/java/org/apache/livy/rsc/RSCConf.java
----------------------------------------------------------------------
diff --git a/rsc/src/main/java/org/apache/livy/rsc/RSCConf.java 
b/rsc/src/main/java/org/apache/livy/rsc/RSCConf.java
new file mode 100644
index 0000000..c560aed
--- /dev/null
+++ b/rsc/src/main/java/org/apache/livy/rsc/RSCConf.java
@@ -0,0 +1,212 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.livy.rsc;
+
+import java.io.IOException;
+import java.net.Inet4Address;
+import java.net.InetAddress;
+import java.net.NetworkInterface;
+import java.util.Collections;
+import java.util.Enumeration;
+import java.util.HashMap;
+import java.util.Map;
+import java.util.Properties;
+import javax.security.sasl.Sasl;
+
+import org.apache.livy.client.common.ClientConf;
+
+public class RSCConf extends ClientConf<RSCConf> {
+
+  public static final String SPARK_CONF_PREFIX = "spark.";
+  public static final String LIVY_SPARK_PREFIX = SPARK_CONF_PREFIX + 
"__livy__.";
+
+  private static final String RSC_CONF_PREFIX = "livy.rsc.";
+
+  public static enum Entry implements ConfEntry {
+    CLIENT_ID("client.auth.id", null),
+    CLIENT_SECRET("client.auth.secret", null),
+    CLIENT_IN_PROCESS("client.do-not-use.run-driver-in-process", false),
+    CLIENT_SHUTDOWN_TIMEOUT("client.shutdown-timeout", "10s"),
+    DRIVER_CLASS("driver-class", null),
+    SESSION_KIND("session.kind", null),
+
+    LIVY_JARS("jars", null),
+    SPARKR_PACKAGE("sparkr.package", null),
+    PYSPARK_ARCHIVES("pyspark.archives", null),
+
+    // Address for the RSC driver to connect back with it's connection info.
+    LAUNCHER_ADDRESS("launcher.address", null),
+    LAUNCHER_PORT_RANGE("launcher.port.range", "10000~10010"),
+    // Setting up of this propety by user has no benefit. It is currently 
being used
+    // to pass  port information from ContextLauncher to RSCDriver
+    LAUNCHER_PORT("launcher.port", -1),
+    // How long will the RSC wait for a connection for a Livy server before 
shutting itself down.
+    SERVER_IDLE_TIMEOUT("server.idle-timeout", "10m"),
+
+    PROXY_USER("proxy-user", null),
+
+    RPC_SERVER_ADDRESS("rpc.server.address", null),
+    RPC_CLIENT_HANDSHAKE_TIMEOUT("server.connect.timeout", "90s"),
+    RPC_CLIENT_CONNECT_TIMEOUT("client.connect.timeout", "10s"),
+    RPC_CHANNEL_LOG_LEVEL("channel.log.level", null),
+    RPC_MAX_MESSAGE_SIZE("rpc.max.size", 50 * 1024 * 1024),
+    RPC_MAX_THREADS("rpc.threads", 8),
+    RPC_SECRET_RANDOM_BITS("secret.bits", 256),
+
+    SASL_MECHANISMS("rpc.sasl.mechanisms", "DIGEST-MD5"),
+    SASL_QOP("rpc.sasl.qop", null),
+
+    TEST_STUCK_END_SESSION("test.do-not-use.stuck-end-session", false),
+    TEST_STUCK_START_DRIVER("test.do-not-use.stuck-start-driver", false),
+
+    JOB_CANCEL_TRIGGER_INTERVAL("job-cancel.trigger-interval", "100ms"),
+    JOB_CANCEL_TIMEOUT("job-cancel.timeout", "30s"),
+
+    RETAINED_STATEMENT_NUMBER("retained-statements", 100);
+
+    private final String key;
+    private final Object dflt;
+
+    private Entry(String key, Object dflt) {
+      this.key = RSC_CONF_PREFIX + key;
+      this.dflt = dflt;
+    }
+
+    @Override
+    public String key() { return key; }
+
+    @Override
+    public Object dflt() { return dflt; }
+  }
+
+  public RSCConf() {
+    this(new Properties());
+  }
+
+  public RSCConf(Properties config) {
+    super(config);
+  }
+
+  public Map<String, String> getSaslOptions() {
+    Map<String, String> opts = new HashMap<>();
+
+    // TODO: add more options?
+    String qop = get(Entry.SASL_QOP);
+    if (qop != null) {
+      opts.put(Sasl.QOP, qop);
+    }
+
+    return opts;
+  }
+
+  public String findLocalAddress() throws IOException {
+    InetAddress address = InetAddress.getLocalHost();
+    if (address.isLoopbackAddress()) {
+      // Address resolves to something like 127.0.1.1, which happens on Debian;
+      // try to find a better address using the local network interfaces
+      Enumeration<NetworkInterface> ifaces = 
NetworkInterface.getNetworkInterfaces();
+      while (ifaces.hasMoreElements()) {
+        NetworkInterface ni = ifaces.nextElement();
+        Enumeration<InetAddress> addrs = ni.getInetAddresses();
+        while (addrs.hasMoreElements()) {
+          InetAddress addr = addrs.nextElement();
+          if (!addr.isLinkLocalAddress() && !addr.isLoopbackAddress()
+              && addr instanceof Inet4Address) {
+            // We've found an address that looks reasonable!
+            LOG.warn("Your hostname, {}, resolves to a loopback address; using 
{} "
+                + " instead (on interface {})", address.getHostName(), 
addr.getHostAddress(),
+                ni.getName());
+            LOG.warn("Set '{}' if you need to bind to another address.",
+              Entry.RPC_SERVER_ADDRESS.key);
+            return addr.getHostAddress();
+          }
+        }
+      }
+    }
+
+    LOG.warn("Your hostname, {}, resolves to a loopback address, but we 
couldn't find "
+        + "any external IP address!", address.getCanonicalHostName());
+    LOG.warn("Set {} if you need to bind to another address.",
+      Entry.RPC_SERVER_ADDRESS.key);
+    return address.getCanonicalHostName();
+  }
+
+  private static final Map<String, DeprecatedConf> configsWithAlternatives
+    = Collections.unmodifiableMap(new HashMap<String, DeprecatedConf>() {{
+      put(RSCConf.Entry.CLIENT_IN_PROCESS.key, DepConf.CLIENT_IN_PROCESS);
+      put(RSCConf.Entry.CLIENT_SHUTDOWN_TIMEOUT.key, 
DepConf.CLIENT_SHUTDOWN_TIMEOUT);
+      put(RSCConf.Entry.DRIVER_CLASS.key, DepConf.DRIVER_CLASS);
+      put(RSCConf.Entry.SERVER_IDLE_TIMEOUT.key, DepConf.SERVER_IDLE_TIMEOUT);
+      put(RSCConf.Entry.PROXY_USER.key, DepConf.PROXY_USER);
+      put(RSCConf.Entry.TEST_STUCK_END_SESSION.key, 
DepConf.TEST_STUCK_END_SESSION);
+      put(RSCConf.Entry.TEST_STUCK_START_DRIVER.key, 
DepConf.TEST_STUCK_START_DRIVER);
+      put(RSCConf.Entry.JOB_CANCEL_TRIGGER_INTERVAL.key, 
DepConf.JOB_CANCEL_TRIGGER_INTERVAL);
+      put(RSCConf.Entry.JOB_CANCEL_TIMEOUT.key, DepConf.JOB_CANCEL_TIMEOUT);
+      put(RSCConf.Entry.RETAINED_STATEMENT_NUMBER.key, 
DepConf.RETAINED_STATEMENT_NUMBER);
+  }});
+
+  // Maps deprecated key to DeprecatedConf with the same key.
+  // There are no deprecated configs without alternatives currently.
+  private static final Map<String, DeprecatedConf> deprecatedConfigs
+    = Collections.unmodifiableMap(new HashMap<String, DeprecatedConf>());
+
+  protected Map<String, DeprecatedConf> getConfigsWithAlternatives() {
+    return configsWithAlternatives;
+  }
+
+  protected Map<String, DeprecatedConf> getDeprecatedConfigs() {
+    return deprecatedConfigs;
+  }
+
+  static enum DepConf implements DeprecatedConf {
+    CLIENT_IN_PROCESS("client.do_not_use.run_driver_in_process", "0.4"),
+    CLIENT_SHUTDOWN_TIMEOUT("client.shutdown_timeout", "0.4"),
+    DRIVER_CLASS("driver_class", "0.4"),
+    SERVER_IDLE_TIMEOUT("server.idle_timeout", "0.4"),
+    PROXY_USER("proxy_user", "0.4"),
+    TEST_STUCK_END_SESSION("test.do_not_use.stuck_end_session", "0.4"),
+    TEST_STUCK_START_DRIVER("test.do_not_use.stuck_start_driver", "0.4"),
+    JOB_CANCEL_TRIGGER_INTERVAL("job_cancel.trigger_interval", "0.4"),
+    JOB_CANCEL_TIMEOUT("job_cancel.timeout", "0.4"),
+    RETAINED_STATEMENT_NUMBER("retained_statements", "0.4");
+
+    private final String key;
+    private final String version;
+    private final String deprecationMessage;
+
+    private DepConf(String key, String version) {
+      this(key, version, "");
+    }
+
+    private DepConf(String key, String version, String deprecationMessage) {
+      this.key = RSC_CONF_PREFIX + key;
+      this.version = version;
+      this.deprecationMessage = deprecationMessage;
+    }
+
+    @Override
+    public String key() { return key; }
+
+    @Override
+    public String version() { return version; }
+
+    @Override
+    public String deprecationMessage() { return deprecationMessage; }
+  }
+
+}

http://git-wip-us.apache.org/repos/asf/incubator-livy/blob/412ccc8f/rsc/src/main/java/org/apache/livy/rsc/ReplJobResults.java
----------------------------------------------------------------------
diff --git a/rsc/src/main/java/org/apache/livy/rsc/ReplJobResults.java 
b/rsc/src/main/java/org/apache/livy/rsc/ReplJobResults.java
new file mode 100644
index 0000000..6717c59
--- /dev/null
+++ b/rsc/src/main/java/org/apache/livy/rsc/ReplJobResults.java
@@ -0,0 +1,31 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.livy.rsc;
+
+import org.apache.livy.rsc.driver.Statement;
+
+public class ReplJobResults {
+  public final Statement[] statements;
+
+  public ReplJobResults(Statement[] statements) {
+    this.statements = statements;
+  }
+
+  public ReplJobResults() {
+    this(null);
+  }
+}

http://git-wip-us.apache.org/repos/asf/incubator-livy/blob/412ccc8f/rsc/src/main/java/org/apache/livy/rsc/Utils.java
----------------------------------------------------------------------
diff --git a/rsc/src/main/java/org/apache/livy/rsc/Utils.java 
b/rsc/src/main/java/org/apache/livy/rsc/Utils.java
new file mode 100644
index 0000000..d2c0059
--- /dev/null
+++ b/rsc/src/main/java/org/apache/livy/rsc/Utils.java
@@ -0,0 +1,118 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.livy.rsc;
+
+import java.util.concurrent.ThreadFactory;
+import java.util.concurrent.atomic.AtomicInteger;
+
+import io.netty.util.concurrent.Future;
+import io.netty.util.concurrent.GenericFutureListener;
+
+/**
+ * A few simple utility functions used by the code, mostly to avoid a direct 
dependency
+ * on Guava.
+ */
+public class Utils {
+
+  public static void checkArgument(boolean condition) {
+    if (!condition) {
+      throw new IllegalArgumentException();
+    }
+  }
+
+  public static void checkArgument(boolean condition, String msg, Object... 
args) {
+    if (!condition) {
+      throw new IllegalArgumentException(String.format(msg, args));
+    }
+  }
+
+  public static void checkState(boolean condition, String msg, Object... args) 
{
+    if (!condition) {
+      throw new IllegalStateException(String.format(msg, args));
+    }
+  }
+
+  public static void checkNotNull(Object o) {
+    if (o == null) {
+      throw new NullPointerException();
+    }
+  }
+
+  public static RuntimeException propagate(Throwable t) {
+    if (t instanceof RuntimeException) {
+      throw (RuntimeException) t;
+    } else {
+      throw new RuntimeException(t);
+    }
+  }
+
+  public static ThreadFactory newDaemonThreadFactory(final String nameFormat) {
+    return new ThreadFactory() {
+
+      private final AtomicInteger threadId = new AtomicInteger();
+
+      @Override
+      public Thread newThread(Runnable r) {
+        Thread t = new Thread(r);
+        t.setName(String.format(nameFormat, threadId.incrementAndGet()));
+        t.setDaemon(true);
+        return t;
+      }
+
+    };
+  }
+
+  public static String join(Iterable<String> strs, String sep) {
+    StringBuilder sb = new StringBuilder();
+    for (String s : strs) {
+      if (s != null && !s.isEmpty()) {
+        sb.append(s).append(sep);
+      }
+    }
+    if (sb.length() > 0) {
+      sb.setLength(sb.length() - sep.length());
+    }
+    return sb.toString();
+  }
+
+  public static String stackTraceAsString(Throwable t) {
+    StringBuilder sb = new StringBuilder();
+    sb.append(t.getClass().getName()).append(": ").append(t.getMessage());
+    for (StackTraceElement e : t.getStackTrace()) {
+      sb.append("\n");
+      sb.append(e.toString());
+    }
+    return sb.toString();
+  }
+
+  public static <T> void addListener(Future<T> future, final FutureListener<T> 
lsnr) {
+    future.addListener(new GenericFutureListener<Future<T>>() {
+      @Override
+      public void operationComplete(Future<T> f) throws Exception {
+        if (f.isSuccess()) {
+          lsnr.onSuccess(f.get());
+        } else {
+          lsnr.onFailure(f.cause());
+        }
+      }
+    });
+  }
+
+  private Utils() { }
+
+}
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/incubator-livy/blob/412ccc8f/rsc/src/main/java/org/apache/livy/rsc/driver/AddFileJob.java
----------------------------------------------------------------------
diff --git a/rsc/src/main/java/org/apache/livy/rsc/driver/AddFileJob.java 
b/rsc/src/main/java/org/apache/livy/rsc/driver/AddFileJob.java
new file mode 100644
index 0000000..cc75b6c
--- /dev/null
+++ b/rsc/src/main/java/org/apache/livy/rsc/driver/AddFileJob.java
@@ -0,0 +1,40 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.livy.rsc.driver;
+
+import org.apache.livy.Job;
+import org.apache.livy.JobContext;
+
+public class AddFileJob implements Job<Object> {
+
+  private final String path;
+
+  AddFileJob() {
+    this(null);
+}
+
+  public AddFileJob(String path) {
+    this.path = path;
+}
+
+  @Override
+  public Object call(JobContext jc) throws Exception {
+    JobContextImpl jobContextImpl = (JobContextImpl)jc;
+    jobContextImpl.addFile(path);
+    return null;
+  }
+}

http://git-wip-us.apache.org/repos/asf/incubator-livy/blob/412ccc8f/rsc/src/main/java/org/apache/livy/rsc/driver/AddJarJob.java
----------------------------------------------------------------------
diff --git a/rsc/src/main/java/org/apache/livy/rsc/driver/AddJarJob.java 
b/rsc/src/main/java/org/apache/livy/rsc/driver/AddJarJob.java
new file mode 100644
index 0000000..c455e6e
--- /dev/null
+++ b/rsc/src/main/java/org/apache/livy/rsc/driver/AddJarJob.java
@@ -0,0 +1,42 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.livy.rsc.driver;
+
+import org.apache.livy.Job;
+import org.apache.livy.JobContext;
+
+public class AddJarJob implements Job<Object> {
+
+  private final String path;
+
+  // For serialization.
+  private AddJarJob() {
+    this(null);
+  }
+
+  public AddJarJob(String path) {
+    this.path = path;
+  }
+
+  @Override
+  public Object call(JobContext jc) throws Exception {
+    JobContextImpl jobContextImpl = (JobContextImpl)jc;
+    jobContextImpl.addJarOrPyFile(path);
+    return null;
+  }
+}
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/incubator-livy/blob/412ccc8f/rsc/src/main/java/org/apache/livy/rsc/driver/BypassJob.java
----------------------------------------------------------------------
diff --git a/rsc/src/main/java/org/apache/livy/rsc/driver/BypassJob.java 
b/rsc/src/main/java/org/apache/livy/rsc/driver/BypassJob.java
new file mode 100644
index 0000000..f0d14c6
--- /dev/null
+++ b/rsc/src/main/java/org/apache/livy/rsc/driver/BypassJob.java
@@ -0,0 +1,51 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.livy.rsc.driver;
+
+import java.nio.ByteBuffer;
+
+import org.apache.livy.Job;
+import org.apache.livy.JobContext;
+import org.apache.livy.client.common.BufferUtils;
+import org.apache.livy.client.common.Serializer;
+
+class BypassJob implements Job<byte[]> {
+
+  private final Serializer serializer;
+  private final byte[] serializedJob;
+
+  BypassJob(Serializer serializer, byte[] serializedJob) {
+    this.serializer = serializer;
+    this.serializedJob = serializedJob;
+  }
+
+  @Override
+  public byte[] call(JobContext jc) throws Exception {
+    Job<?> job = (Job<?>) 
serializer.deserialize(ByteBuffer.wrap(serializedJob));
+    Object result = job.call(jc);
+    byte[] serializedResult;
+    if (result != null) {
+      ByteBuffer data = serializer.serialize(result);
+      serializedResult = BufferUtils.toByteArray(data);
+    } else {
+      serializedResult = null;
+    }
+    return serializedResult;
+  }
+
+}

http://git-wip-us.apache.org/repos/asf/incubator-livy/blob/412ccc8f/rsc/src/main/java/org/apache/livy/rsc/driver/BypassJobWrapper.java
----------------------------------------------------------------------
diff --git a/rsc/src/main/java/org/apache/livy/rsc/driver/BypassJobWrapper.java 
b/rsc/src/main/java/org/apache/livy/rsc/driver/BypassJobWrapper.java
new file mode 100644
index 0000000..1fa5bf1
--- /dev/null
+++ b/rsc/src/main/java/org/apache/livy/rsc/driver/BypassJobWrapper.java
@@ -0,0 +1,75 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.livy.rsc.driver;
+
+import java.util.List;
+
+import org.apache.livy.Job;
+import org.apache.livy.JobHandle;
+import org.apache.livy.rsc.BypassJobStatus;
+import org.apache.livy.rsc.Utils;
+
+public class BypassJobWrapper extends JobWrapper<byte[]> {
+
+  private volatile byte[] result;
+  private volatile Throwable error;
+  private volatile JobHandle.State state;
+  private volatile List<Integer> newSparkJobs;
+
+  public BypassJobWrapper(RSCDriver driver, String jobId, Job<byte[]> 
serializedJob) {
+    super(driver, jobId, serializedJob);
+    state = JobHandle.State.QUEUED;
+  }
+
+  @Override
+  public Void call() throws Exception {
+    state = JobHandle.State.STARTED;
+    return super.call();
+  }
+
+  @Override
+  protected synchronized void finished(byte[] result, Throwable error) {
+    if (error == null) {
+      this.result = result;
+      this.state = JobHandle.State.SUCCEEDED;
+    } else {
+      this.error = error;
+      this.state = JobHandle.State.FAILED;
+    }
+  }
+
+  @Override
+  boolean cancel() {
+    if (super.cancel()) {
+      this.state = JobHandle.State.CANCELLED;
+      return true;
+    }
+    return false;
+  }
+
+  @Override
+  protected void jobStarted() {
+    // Do nothing; just avoid sending data back to the driver.
+  }
+
+  synchronized BypassJobStatus getStatus() {
+    String stackTrace = error != null ? Utils.stackTraceAsString(error) : null;
+    return new BypassJobStatus(state, result, stackTrace);
+  }
+
+}

http://git-wip-us.apache.org/repos/asf/incubator-livy/blob/412ccc8f/rsc/src/main/java/org/apache/livy/rsc/driver/JobContextImpl.java
----------------------------------------------------------------------
diff --git a/rsc/src/main/java/org/apache/livy/rsc/driver/JobContextImpl.java 
b/rsc/src/main/java/org/apache/livy/rsc/driver/JobContextImpl.java
new file mode 100644
index 0000000..ddb5713
--- /dev/null
+++ b/rsc/src/main/java/org/apache/livy/rsc/driver/JobContextImpl.java
@@ -0,0 +1,147 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.livy.rsc.driver;
+
+import java.io.File;
+import java.lang.reflect.Method;
+
+import org.apache.spark.SparkContext;
+import org.apache.spark.api.java.JavaFutureAction;
+import org.apache.spark.api.java.JavaSparkContext;
+import org.apache.spark.sql.SQLContext;
+import org.apache.spark.sql.hive.HiveContext;
+import org.apache.spark.streaming.Duration;
+import org.apache.spark.streaming.api.java.JavaStreamingContext;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import org.apache.livy.JobContext;
+import org.apache.livy.rsc.Utils;
+
+class JobContextImpl implements JobContext {
+
+  private static final Logger LOG = 
LoggerFactory.getLogger(JobContextImpl.class);
+
+  private final JavaSparkContext sc;
+  private final File localTmpDir;
+  private volatile SQLContext sqlctx;
+  private volatile HiveContext hivectx;
+  private volatile JavaStreamingContext streamingctx;
+  private final RSCDriver driver;
+  private volatile Object sparksession;
+
+  public JobContextImpl(JavaSparkContext sc, File localTmpDir, RSCDriver 
driver) {
+    this.sc = sc;
+    this.localTmpDir = localTmpDir;
+    this.driver = driver;
+  }
+
+  @Override
+  public JavaSparkContext sc() {
+    return sc;
+  }
+
+  @Override
+  public Object sparkSession() throws Exception {
+    if (sparksession == null) {
+      synchronized (this) {
+        if (sparksession == null) {
+          try {
+            Class<?> clz = Class.forName("org.apache.spark.sql.SparkSession$");
+            Object spark = clz.getField("MODULE$").get(null);
+            Method m = clz.getMethod("builder");
+            Object builder = m.invoke(spark);
+            builder.getClass().getMethod("sparkContext", SparkContext.class)
+              .invoke(builder, sc.sc());
+            sparksession = 
builder.getClass().getMethod("getOrCreate").invoke(builder);
+          } catch (Exception e) {
+            LOG.warn("SparkSession is not supported", e);
+            throw e;
+          }
+        }
+      }
+    }
+
+    return sparksession;
+  }
+
+  @Override
+  public SQLContext sqlctx() {
+    if (sqlctx == null) {
+      synchronized (this) {
+        if (sqlctx == null) {
+          sqlctx = new SQLContext(sc);
+        }
+      }
+    }
+    return sqlctx;
+  }
+
+  @Override
+  public HiveContext hivectx() {
+    if (hivectx == null) {
+      synchronized (this) {
+        if (hivectx == null) {
+          hivectx = new HiveContext(sc.sc());
+        }
+      }
+    }
+    return hivectx;
+  }
+
+  @Override
+  public synchronized JavaStreamingContext streamingctx(){
+    Utils.checkState(streamingctx != null, "method createStreamingContext must 
be called first.");
+    return streamingctx;
+  }
+
+  @Override
+  public synchronized void createStreamingContext(long batchDuration) {
+    Utils.checkState(streamingctx == null, "Streaming context is not null.");
+    streamingctx = new JavaStreamingContext(sc, new Duration(batchDuration));
+  }
+
+  @Override
+  public synchronized void stopStreamingCtx() {
+    Utils.checkState(streamingctx != null, "Streaming Context is null");
+    streamingctx.stop();
+    streamingctx = null;
+  }
+
+  @Override
+  public File getLocalTmpDir() {
+    return localTmpDir;
+  }
+
+  public synchronized void stop() {
+    if (streamingctx != null) {
+      stopStreamingCtx();
+    }
+    if (sc != null) {
+      sc.stop();
+    }
+  }
+
+  public void addFile(String path) {
+    driver.addFile(path);
+  }
+
+  public void addJarOrPyFile(String path) throws Exception {
+    driver.addJarOrPyFile(path);
+  }
+}

http://git-wip-us.apache.org/repos/asf/incubator-livy/blob/412ccc8f/rsc/src/main/java/org/apache/livy/rsc/driver/JobWrapper.java
----------------------------------------------------------------------
diff --git a/rsc/src/main/java/org/apache/livy/rsc/driver/JobWrapper.java 
b/rsc/src/main/java/org/apache/livy/rsc/driver/JobWrapper.java
new file mode 100644
index 0000000..f6df164
--- /dev/null
+++ b/rsc/src/main/java/org/apache/livy/rsc/driver/JobWrapper.java
@@ -0,0 +1,99 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.livy.rsc.driver;
+
+import java.util.ArrayList;
+import java.util.List;
+import java.util.concurrent.Callable;
+import java.util.concurrent.ExecutionException;
+import java.util.concurrent.ExecutorService;
+import java.util.concurrent.Future;
+import java.util.concurrent.atomic.AtomicInteger;
+
+import org.apache.spark.api.java.JavaFutureAction;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import org.apache.livy.Job;
+
+public class JobWrapper<T> implements Callable<Void> {
+
+  private static final Logger LOG = LoggerFactory.getLogger(JobWrapper.class);
+
+  public final String jobId;
+
+  private final RSCDriver driver;
+  private final Job<T> job;
+  private final AtomicInteger completed;
+
+  private Future<?> future;
+
+  public JobWrapper(RSCDriver driver, String jobId, Job<T> job) {
+    this.driver = driver;
+    this.jobId = jobId;
+    this.job = job;
+    this.completed = new AtomicInteger();
+  }
+
+  @Override
+  public Void call() throws Exception {
+    try {
+      jobStarted();
+      T result = job.call(driver.jobContext());
+      finished(result, null);
+    } catch (Throwable t) {
+      // Catch throwables in a best-effort to report job status back to the 
client. It's
+      // re-thrown so that the executor can destroy the affected thread (or 
the JVM can
+      // die or whatever would happen if the throwable bubbled up).
+      LOG.info("Failed to run job " + jobId, t);
+      finished(null, t);
+      throw new ExecutionException(t);
+    } finally {
+      driver.activeJobs.remove(jobId);
+    }
+    return null;
+  }
+
+  void submit(ExecutorService executor) {
+    this.future = executor.submit(this);
+  }
+
+  void jobDone() {
+    synchronized (completed) {
+      completed.incrementAndGet();
+      completed.notifyAll();
+    }
+  }
+
+  boolean cancel() {
+    return future != null ? future.cancel(true) : true;
+  }
+
+  protected void finished(T result, Throwable error) {
+    if (error == null) {
+      driver.jobFinished(jobId, result, null);
+    } else {
+      driver.jobFinished(jobId, null, error);
+    }
+  }
+
+  protected void jobStarted() {
+    driver.jobStarted(jobId);
+  }
+
+}

http://git-wip-us.apache.org/repos/asf/incubator-livy/blob/412ccc8f/rsc/src/main/java/org/apache/livy/rsc/driver/MutableClassLoader.java
----------------------------------------------------------------------
diff --git 
a/rsc/src/main/java/org/apache/livy/rsc/driver/MutableClassLoader.java 
b/rsc/src/main/java/org/apache/livy/rsc/driver/MutableClassLoader.java
new file mode 100644
index 0000000..30da79e
--- /dev/null
+++ b/rsc/src/main/java/org/apache/livy/rsc/driver/MutableClassLoader.java
@@ -0,0 +1,34 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.livy.rsc.driver;
+
+import java.net.URL;
+import java.net.URLClassLoader;
+
+class MutableClassLoader extends URLClassLoader {
+
+  MutableClassLoader(ClassLoader parent) {
+    super(new URL[] { }, parent);
+  }
+
+  @Override
+  public void addURL(URL url) {
+    super.addURL(url);
+  }
+
+}

Reply via email to