http://git-wip-us.apache.org/repos/asf/incubator-livy/blob/412ccc8f/client-http/src/test/scala/com/cloudera/livy/client/http/LivyConnectionSpec.scala
----------------------------------------------------------------------
diff --git 
a/client-http/src/test/scala/com/cloudera/livy/client/http/LivyConnectionSpec.scala
 
b/client-http/src/test/scala/com/cloudera/livy/client/http/LivyConnectionSpec.scala
deleted file mode 100644
index e1d29ca..0000000
--- 
a/client-http/src/test/scala/com/cloudera/livy/client/http/LivyConnectionSpec.scala
+++ /dev/null
@@ -1,118 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *    http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package com.cloudera.livy.client.http
-
-import java.io.IOException
-import java.net.URLEncoder
-import java.nio.charset.StandardCharsets.UTF_8
-
-import org.apache.http.client.utils.URIBuilder
-import org.eclipse.jetty.security._
-import org.eclipse.jetty.security.authentication.BasicAuthenticator
-import org.eclipse.jetty.util.security._
-import org.scalatest.{BeforeAndAfterAll, FunSpecLike}
-import org.scalatest.Matchers._
-import org.scalatra.servlet.ScalatraListener
-
-import com.cloudera.livy.{LivyBaseUnitTestSuite, LivyConf}
-import com.cloudera.livy.server.WebServer
-
-class LivyConnectionSpec extends FunSpecLike with BeforeAndAfterAll with 
LivyBaseUnitTestSuite {
-  describe("LivyConnection") {
-    def basicAuth(username: String, password: String, realm: String): 
SecurityHandler = {
-      val roles = Array("user")
-
-      val l = new HashLoginService()
-      l.putUser(username, Credential.getCredential(password), roles)
-      l.setName(realm)
-
-      val constraint = new Constraint()
-      constraint.setName(Constraint.__BASIC_AUTH)
-      constraint.setRoles(roles)
-      constraint.setAuthenticate(true)
-
-      val cm = new ConstraintMapping()
-      cm.setConstraint(constraint)
-      cm.setPathSpec("/*")
-
-      val csh = new ConstraintSecurityHandler()
-      csh.setAuthenticator(new BasicAuthenticator())
-      csh.setRealmName(realm)
-      csh.addConstraintMapping(cm)
-      csh.setLoginService(l)
-
-      csh
-    }
-
-    def test(password: String, livyConf: LivyConf = new LivyConf()): Unit = {
-      val username = "user name"
-
-      val server = new WebServer(livyConf, "0.0.0.0", 0)
-      server.context.setSecurityHandler(basicAuth(username, password, "realm"))
-      server.context.setResourceBase("src/main/com/cloudera/livy/server")
-      server.context.setInitParameter(ScalatraListener.LifeCycleKey,
-        classOf[HttpClientTestBootstrap].getCanonicalName)
-      server.context.addEventListener(new ScalatraListener)
-      server.start()
-
-      val utf8Name = UTF_8.name()
-      val uri = new URIBuilder()
-        .setScheme(server.protocol)
-        .setHost(server.host)
-        .setPort(server.port)
-        .setUserInfo(URLEncoder.encode(username, utf8Name), 
URLEncoder.encode(password, utf8Name))
-        .build()
-      info(uri.toString)
-      val conn = new LivyConnection(uri, new HttpConf(null))
-      try {
-        conn.get(classOf[Object], "/") should not be (null)
-
-      } finally {
-        conn.close()
-      }
-
-      server.stop()
-      server.join()
-    }
-
-    it("should support HTTP auth with password") {
-      test("pass:word")
-    }
-
-    it("should support HTTP auth with empty password") {
-      test("")
-    }
-
-    it("should be failed with large header size") {
-      val livyConf = new LivyConf()
-        .set(LivyConf.REQUEST_HEADER_SIZE, 1024)
-        .set(LivyConf.RESPONSE_HEADER_SIZE, 1024)
-      val pwd = "test-password" * 100
-      val exception = intercept[IOException](test(pwd, livyConf))
-      exception.getMessage.contains("Request Entity Too Large") should be(true)
-    }
-
-    it("should be succeeded with configured header size") {
-      val livyConf = new LivyConf()
-        .set(LivyConf.REQUEST_HEADER_SIZE, 2048)
-        .set(LivyConf.RESPONSE_HEADER_SIZE, 2048)
-      val pwd = "test-password" * 100
-      test(pwd, livyConf)
-    }
-  }
-}

http://git-wip-us.apache.org/repos/asf/incubator-livy/blob/412ccc8f/client-http/src/test/scala/org/apache/livy/client/http/HttpClientSpec.scala
----------------------------------------------------------------------
diff --git 
a/client-http/src/test/scala/org/apache/livy/client/http/HttpClientSpec.scala 
b/client-http/src/test/scala/org/apache/livy/client/http/HttpClientSpec.scala
new file mode 100644
index 0000000..801c09b
--- /dev/null
+++ 
b/client-http/src/test/scala/org/apache/livy/client/http/HttpClientSpec.scala
@@ -0,0 +1,290 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.livy.client.http
+
+import java.io.{File, InputStream}
+import java.net.{InetAddress, URI}
+import java.nio.file.{Files, Paths}
+import java.util.concurrent.{Future => JFuture, _}
+import java.util.concurrent.atomic.AtomicLong
+import javax.servlet.ServletContext
+import javax.servlet.http.HttpServletRequest
+
+import scala.concurrent.{ExecutionContext, Future}
+
+import org.mockito.ArgumentCaptor
+import org.mockito.Matchers.{eq => meq, _}
+import org.mockito.Mockito._
+import org.scalatest.{BeforeAndAfterAll, FunSpecLike}
+import org.scalatra.LifeCycle
+import org.scalatra.servlet.ScalatraListener
+
+import org.apache.livy._
+import org.apache.livy.client.common.{BufferUtils, Serializer}
+import org.apache.livy.client.common.HttpMessages._
+import org.apache.livy.server.WebServer
+import org.apache.livy.server.interactive.{InteractiveSession, 
InteractiveSessionServlet}
+import org.apache.livy.server.recovery.SessionStore
+import org.apache.livy.sessions.{InteractiveSessionManager, SessionState, 
Spark}
+import org.apache.livy.test.jobs.Echo
+import org.apache.livy.utils.AppInfo
+
+/**
+ * The test for the HTTP client is written in Scala so we can reuse the code 
in the livy-server
+ * module, which implements the client session backend. The client servlet has 
some functionality
+ * overridden to avoid creating sub-processes for each seession.
+ */
+class HttpClientSpec extends FunSpecLike with BeforeAndAfterAll with 
LivyBaseUnitTestSuite {
+
+  import HttpClientSpec._
+
+  private val TIMEOUT_S = 10
+  private val ID_GENERATOR = new AtomicLong()
+  private val serializer = new Serializer()
+
+  private var server: WebServer = _
+  private var client: LivyClient = _
+
+  override def beforeAll(): Unit = {
+    super.beforeAll()
+    server = new WebServer(new LivyConf(), "0.0.0.0", 0)
+
+    server.context.setResourceBase("src/main/com/cloudera/livy/server")
+    server.context.setInitParameter(ScalatraListener.LifeCycleKey,
+      classOf[HttpClientTestBootstrap].getCanonicalName)
+    server.context.addEventListener(new ScalatraListener)
+
+    server.start()
+  }
+
+  override def afterAll(): Unit = {
+    super.afterAll()
+    if (server != null) {
+      server.stop()
+      server = null
+    }
+    if (client != null) {
+      client.stop(true)
+      client = null
+    }
+    session = null
+  }
+
+  describe("HTTP client library") {
+
+    it("should create clients") {
+      // WebServer does this internally instead of respecting "0.0.0.0", so 
try to use the same
+      // address.
+      val uri = 
s"http://${InetAddress.getLocalHost.getHostAddress}:${server.port}/";
+      client = new LivyClientBuilder(false).setURI(new URI(uri)).build()
+    }
+
+    withClient("should run and monitor asynchronous jobs") {
+      testJob(false)
+    }
+
+    withClient("should propagate errors from jobs") {
+      val errorMessage = "This job throws an error."
+      val (jobId, handle) = runJob(false, { id => Seq(
+          new JobStatus(id, JobHandle.State.FAILED, null, errorMessage))
+        })
+
+      val error = intercept[ExecutionException] {
+        handle.get(TIMEOUT_S, TimeUnit.SECONDS)
+      }
+      assert(error.getCause() != null)
+      assert(error.getCause().getMessage().indexOf(errorMessage) >= 0)
+      verify(session, times(1)).jobStatus(meq(jobId))
+    }
+
+    withClient("should run and monitor synchronous jobs") {
+      testJob(false)
+    }
+
+    withClient("should add files and jars") {
+      val furi = new URI("hdfs:file")
+      val juri = new URI("hdfs:jar")
+
+      client.addFile(furi).get(TIMEOUT_S, TimeUnit.SECONDS)
+      client.addJar(juri).get(TIMEOUT_S, TimeUnit.SECONDS)
+
+      verify(session, times(1)).addFile(meq(furi))
+      verify(session, times(1)).addJar(meq(juri))
+    }
+
+    withClient("should upload files and jars") {
+      uploadAndVerify("file")
+      uploadAndVerify("jar")
+    }
+
+    withClient("should cancel jobs") {
+      val (jobId, handle) = runJob(false, { id => Seq(
+          new JobStatus(id, JobHandle.State.STARTED, null, null),
+          new JobStatus(id, JobHandle.State.CANCELLED, null, null))
+        })
+      handle.cancel(true)
+
+      intercept[CancellationException] {
+        handle.get(TIMEOUT_S, TimeUnit.SECONDS)
+      }
+
+      verify(session, times(1)).cancelJob(meq(jobId))
+    }
+
+    withClient("should notify listeners of job completion") {
+      val (jobId, handle) = runJob(false, { id => Seq(
+          new JobStatus(id, JobHandle.State.STARTED, null, null),
+          new JobStatus(id, JobHandle.State.SUCCEEDED, serialize(id), null))
+        })
+
+      val listener = mock(classOf[JobHandle.Listener[Long]])
+      handle.asInstanceOf[JobHandle[Long]].addListener(listener)
+
+      assert(handle.get(TIMEOUT_S, TimeUnit.SECONDS) === jobId)
+      verify(listener, times(1)).onJobSucceeded(any(), any())
+    }
+
+    withClient("should time out handle get() call") {
+      // JobHandleImpl does exponential backoff checking the result of a job. 
Given an initial
+      // wait of 100ms, 4 iterations should result in a wait of 800ms, so the 
handle should at that
+      // point timeout a wait of 100ms.
+      val (jobId, handle) = runJob(false, { id => Seq(
+          new JobStatus(id, JobHandle.State.STARTED, null, null),
+          new JobStatus(id, JobHandle.State.STARTED, null, null),
+          new JobStatus(id, JobHandle.State.STARTED, null, null),
+          new JobStatus(id, JobHandle.State.SUCCEEDED, serialize(id), null))
+        })
+
+      intercept[TimeoutException] {
+        handle.get(100, TimeUnit.MILLISECONDS)
+      }
+
+      assert(handle.get(TIMEOUT_S, TimeUnit.SECONDS) === jobId)
+    }
+
+    withClient("should handle null responses") {
+      testJob(false, response = Some(null))
+    }
+
+    withClient("should connect to existing sessions") {
+      var sid = client.asInstanceOf[HttpClient].getSessionId()
+      val uri = 
s"http://${InetAddress.getLocalHost.getHostAddress}:${server.port}"; +
+        s"${LivyConnection.SESSIONS_URI}/$sid"
+      val newClient = new LivyClientBuilder(false).setURI(new URI(uri)).build()
+      newClient.stop(false)
+      verify(session, never()).stop()
+    }
+
+    withClient("should tear down clients") {
+      client.stop(true)
+      verify(session, times(1)).stop()
+    }
+
+  }
+
+  private def uploadAndVerify(cmd: String): Unit = {
+    val f = File.createTempFile("uploadTestFile", cmd)
+    val expectedStr = "Test data"
+    val expectedData = expectedStr.getBytes()
+    Files.write(Paths.get(f.getAbsolutePath), expectedData)
+    val b = new Array[Byte](expectedData.length)
+    val captor = ArgumentCaptor.forClass(classOf[InputStream])
+    if (cmd == "file") {
+      client.uploadFile(f).get(TIMEOUT_S, TimeUnit.SECONDS)
+      verify(session, times(1)).addFile(captor.capture(), meq(f.getName))
+    } else {
+      client.uploadJar(f).get(TIMEOUT_S, TimeUnit.SECONDS)
+      verify(session, times(1)).addJar(captor.capture(), meq(f.getName))
+    }
+    captor.getValue.read(b)
+    assert(expectedStr === new String(b))
+  }
+
+  private def runJob(sync: Boolean, genStatusFn: Long => Seq[JobStatus]): 
(Long, JFuture[Int]) = {
+    val jobId = java.lang.Long.valueOf(ID_GENERATOR.incrementAndGet())
+    when(session.submitJob(any(classOf[Array[Byte]]))).thenReturn(jobId)
+
+    val statuses = genStatusFn(jobId)
+    val first = statuses.head
+    val remaining = statuses.drop(1)
+    when(session.jobStatus(meq(jobId))).thenReturn(first, remaining: _*)
+
+    val job = new Echo(42)
+    val handle = if (sync) client.run(job) else client.submit(job)
+    (jobId, handle)
+  }
+
+  private def testJob(sync: Boolean, response: Option[Any] = None): Unit = {
+    val (jobId, handle) = runJob(sync, { id => Seq(
+        new JobStatus(id, JobHandle.State.STARTED, null, null),
+        new JobStatus(id, JobHandle.State.SUCCEEDED, 
serialize(response.getOrElse(id)), null))
+      })
+    assert(handle.get(TIMEOUT_S, TimeUnit.SECONDS) === 
response.getOrElse(jobId))
+    verify(session, times(2)).jobStatus(meq(jobId))
+  }
+
+  private def withClient(desc: String)(fn: => Unit): Unit = {
+    it(desc) {
+      assume(client != null, "No active client.")
+      fn
+    }
+  }
+
+  def serialize(value: Any): Array[Byte] = {
+    BufferUtils.toByteArray(serializer.serialize(value))
+  }
+
+}
+
+private object HttpClientSpec {
+
+  // Hack warning: keep the session object available so that individual tests 
can mock
+  // the desired behavior before making requests to the server.
+  var session: InteractiveSession = _
+
+}
+
+private class HttpClientTestBootstrap extends LifeCycle {
+
+  private implicit def executor: ExecutionContext = ExecutionContext.global
+
+  override def init(context: ServletContext): Unit = {
+    val conf = new LivyConf()
+    val stateStore = mock(classOf[SessionStore])
+    val sessionManager = new InteractiveSessionManager(conf, stateStore, 
Some(Seq.empty))
+    val servlet = new InteractiveSessionServlet(sessionManager, stateStore, 
conf) {
+      override protected def createSession(req: HttpServletRequest): 
InteractiveSession = {
+        val session = mock(classOf[InteractiveSession])
+        val id = sessionManager.nextId()
+        when(session.id).thenReturn(id)
+        when(session.appId).thenReturn(None)
+        when(session.appInfo).thenReturn(AppInfo())
+        when(session.state).thenReturn(SessionState.Idle())
+        when(session.proxyUser).thenReturn(None)
+        when(session.kind).thenReturn(Spark())
+        when(session.stop()).thenReturn(Future.successful(()))
+        require(HttpClientSpec.session == null, "Session already created?")
+        HttpClientSpec.session = session
+        session
+      }
+    }
+
+    context.mount(servlet, s"${LivyConnection.SESSIONS_URI}/*")
+  }
+
+}

http://git-wip-us.apache.org/repos/asf/incubator-livy/blob/412ccc8f/client-http/src/test/scala/org/apache/livy/client/http/LivyConnectionSpec.scala
----------------------------------------------------------------------
diff --git 
a/client-http/src/test/scala/org/apache/livy/client/http/LivyConnectionSpec.scala
 
b/client-http/src/test/scala/org/apache/livy/client/http/LivyConnectionSpec.scala
new file mode 100644
index 0000000..41edd66
--- /dev/null
+++ 
b/client-http/src/test/scala/org/apache/livy/client/http/LivyConnectionSpec.scala
@@ -0,0 +1,118 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.livy.client.http
+
+import java.io.IOException
+import java.net.URLEncoder
+import java.nio.charset.StandardCharsets.UTF_8
+
+import org.apache.http.client.utils.URIBuilder
+import org.eclipse.jetty.security._
+import org.eclipse.jetty.security.authentication.BasicAuthenticator
+import org.eclipse.jetty.util.security._
+import org.scalatest.{BeforeAndAfterAll, FunSpecLike}
+import org.scalatest.Matchers._
+import org.scalatra.servlet.ScalatraListener
+
+import org.apache.livy.{LivyBaseUnitTestSuite, LivyConf}
+import org.apache.livy.server.WebServer
+
+class LivyConnectionSpec extends FunSpecLike with BeforeAndAfterAll with 
LivyBaseUnitTestSuite {
+  describe("LivyConnection") {
+    def basicAuth(username: String, password: String, realm: String): 
SecurityHandler = {
+      val roles = Array("user")
+
+      val l = new HashLoginService()
+      l.putUser(username, Credential.getCredential(password), roles)
+      l.setName(realm)
+
+      val constraint = new Constraint()
+      constraint.setName(Constraint.__BASIC_AUTH)
+      constraint.setRoles(roles)
+      constraint.setAuthenticate(true)
+
+      val cm = new ConstraintMapping()
+      cm.setConstraint(constraint)
+      cm.setPathSpec("/*")
+
+      val csh = new ConstraintSecurityHandler()
+      csh.setAuthenticator(new BasicAuthenticator())
+      csh.setRealmName(realm)
+      csh.addConstraintMapping(cm)
+      csh.setLoginService(l)
+
+      csh
+    }
+
+    def test(password: String, livyConf: LivyConf = new LivyConf()): Unit = {
+      val username = "user name"
+
+      val server = new WebServer(livyConf, "0.0.0.0", 0)
+      server.context.setSecurityHandler(basicAuth(username, password, "realm"))
+      server.context.setResourceBase("src/main/com/cloudera/livy/server")
+      server.context.setInitParameter(ScalatraListener.LifeCycleKey,
+        classOf[HttpClientTestBootstrap].getCanonicalName)
+      server.context.addEventListener(new ScalatraListener)
+      server.start()
+
+      val utf8Name = UTF_8.name()
+      val uri = new URIBuilder()
+        .setScheme(server.protocol)
+        .setHost(server.host)
+        .setPort(server.port)
+        .setUserInfo(URLEncoder.encode(username, utf8Name), 
URLEncoder.encode(password, utf8Name))
+        .build()
+      info(uri.toString)
+      val conn = new LivyConnection(uri, new HttpConf(null))
+      try {
+        conn.get(classOf[Object], "/") should not be (null)
+
+      } finally {
+        conn.close()
+      }
+
+      server.stop()
+      server.join()
+    }
+
+    it("should support HTTP auth with password") {
+      test("pass:word")
+    }
+
+    it("should support HTTP auth with empty password") {
+      test("")
+    }
+
+    it("should be failed with large header size") {
+      val livyConf = new LivyConf()
+        .set(LivyConf.REQUEST_HEADER_SIZE, 1024)
+        .set(LivyConf.RESPONSE_HEADER_SIZE, 1024)
+      val pwd = "test-password" * 100
+      val exception = intercept[IOException](test(pwd, livyConf))
+      exception.getMessage.contains("Request Entity Too Large") should be(true)
+    }
+
+    it("should be succeeded with configured header size") {
+      val livyConf = new LivyConf()
+        .set(LivyConf.REQUEST_HEADER_SIZE, 2048)
+        .set(LivyConf.RESPONSE_HEADER_SIZE, 2048)
+      val pwd = "test-password" * 100
+      test(pwd, livyConf)
+    }
+  }
+}

http://git-wip-us.apache.org/repos/asf/incubator-livy/blob/412ccc8f/core/pom.xml
----------------------------------------------------------------------
diff --git a/core/pom.xml b/core/pom.xml
index e08e55b..0a0344b 100644
--- a/core/pom.xml
+++ b/core/pom.xml
@@ -20,19 +20,19 @@
          xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 
http://maven.apache.org/xsd/maven-4.0.0.xsd";>
   <modelVersion>4.0.0</modelVersion>
   <parent>
-    <groupId>com.cloudera.livy</groupId>
+    <groupId>org.apache.livy</groupId>
     <artifactId>multi-scala-project-root</artifactId>
-    <version>0.4.0-SNAPSHOT</version>
+    <version>0.4.0-incubating-SNAPSHOT</version>
     <relativePath>../scala/pom.xml</relativePath>
   </parent>
 
   <artifactId>livy-core-parent</artifactId>
-  <version>0.4.0-SNAPSHOT</version>
+  <version>0.4.0-incubating-SNAPSHOT</version>
   <packaging>pom</packaging>
 
   <dependencies>
     <dependency>
-      <groupId>com.cloudera.livy</groupId>
+      <groupId>org.apache.livy</groupId>
       <artifactId>livy-client-common</artifactId>
       <version>${project.version}</version>
     </dependency>

http://git-wip-us.apache.org/repos/asf/incubator-livy/blob/412ccc8f/core/scala-2.10/pom.xml
----------------------------------------------------------------------
diff --git a/core/scala-2.10/pom.xml b/core/scala-2.10/pom.xml
index b8e7bc5..8d1aad8 100644
--- a/core/scala-2.10/pom.xml
+++ b/core/scala-2.10/pom.xml
@@ -17,15 +17,15 @@
 -->
 <project xmlns="http://maven.apache.org/POM/4.0.0"; 
xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"; 
xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 
http://maven.apache.org/xsd/maven-4.0.0.xsd";>
   <modelVersion>4.0.0</modelVersion>
-  <groupId>com.cloudera.livy</groupId>
+  <groupId>org.apache.livy</groupId>
   <artifactId>livy-core_2.10</artifactId>
-  <version>0.4.0-SNAPSHOT</version>
+  <version>0.4.0-incubating-SNAPSHOT</version>
   <packaging>jar</packaging>
 
   <parent>
-    <groupId>com.cloudera.livy</groupId>
+    <groupId>org.apache.livy</groupId>
     <artifactId>livy-core-parent</artifactId>
-    <version>0.4.0-SNAPSHOT</version>
+    <version>0.4.0-incubating-SNAPSHOT</version>
     <relativePath>../pom.xml</relativePath>
   </parent>
 

http://git-wip-us.apache.org/repos/asf/incubator-livy/blob/412ccc8f/core/scala-2.11/pom.xml
----------------------------------------------------------------------
diff --git a/core/scala-2.11/pom.xml b/core/scala-2.11/pom.xml
index 1f817ab..bc16caf 100644
--- a/core/scala-2.11/pom.xml
+++ b/core/scala-2.11/pom.xml
@@ -17,15 +17,15 @@
 -->
 <project xmlns="http://maven.apache.org/POM/4.0.0"; 
xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"; 
xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 
http://maven.apache.org/xsd/maven-4.0.0.xsd";>
   <modelVersion>4.0.0</modelVersion>
-  <groupId>com.cloudera.livy</groupId>
+  <groupId>org.apache.livy</groupId>
   <artifactId>livy-core_2.11</artifactId>
-  <version>0.4.0-SNAPSHOT</version>
+  <version>0.4.0-incubating-SNAPSHOT</version>
   <packaging>jar</packaging>
 
   <parent>
-    <groupId>com.cloudera.livy</groupId>
+    <groupId>org.apache.livy</groupId>
     <artifactId>livy-core-parent</artifactId>
-    <version>0.4.0-SNAPSHOT</version>
+    <version>0.4.0-incubating-SNAPSHOT</version>
     <relativePath>../pom.xml</relativePath>
   </parent>
 

http://git-wip-us.apache.org/repos/asf/incubator-livy/blob/412ccc8f/core/src/main/scala/com/cloudera/livy/Logging.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/com/cloudera/livy/Logging.scala 
b/core/src/main/scala/com/cloudera/livy/Logging.scala
deleted file mode 100644
index 8532ea2..0000000
--- a/core/src/main/scala/com/cloudera/livy/Logging.scala
+++ /dev/null
@@ -1,54 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *    http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package com.cloudera.livy
-
-import org.slf4j.LoggerFactory
-
-trait Logging {
-  lazy val logger = LoggerFactory.getLogger(this.getClass)
-
-  def trace(message: => Any): Unit = {
-    if (logger.isTraceEnabled) {
-      logger.trace(message.toString)
-    }
-  }
-
-  def debug(message: => Any): Unit = {
-    if (logger.isDebugEnabled) {
-      logger.debug(message.toString)
-    }
-  }
-
-  def info(message: => Any): Unit = {
-    if (logger.isInfoEnabled) {
-      logger.info(message.toString)
-    }
-  }
-
-  def warn(message: => Any): Unit = {
-    logger.warn(message.toString)
-  }
-
-  def error(message: => Any, t: Throwable): Unit = {
-    logger.error(message.toString, t)
-  }
-
-  def error(message: => Any): Unit = {
-    logger.error(message.toString)
-  }
-}

http://git-wip-us.apache.org/repos/asf/incubator-livy/blob/412ccc8f/core/src/main/scala/com/cloudera/livy/Utils.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/com/cloudera/livy/Utils.scala 
b/core/src/main/scala/com/cloudera/livy/Utils.scala
deleted file mode 100644
index 0d779dd..0000000
--- a/core/src/main/scala/com/cloudera/livy/Utils.scala
+++ /dev/null
@@ -1,109 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *    http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package com.cloudera.livy
-
-import java.io.{Closeable, File, FileInputStream, InputStreamReader}
-import java.net.URL
-import java.nio.charset.StandardCharsets.UTF_8
-import java.util.Properties
-
-import scala.annotation.tailrec
-import scala.collection.JavaConverters._
-import scala.concurrent.TimeoutException
-import scala.concurrent.duration.Duration
-
-object Utils {
-  def getPropertiesFromFile(file: File): Map[String, String] = {
-    loadProperties(file.toURI().toURL())
-  }
-
-  def loadProperties(url: URL): Map[String, String] = {
-    val inReader = new InputStreamReader(url.openStream(), UTF_8)
-    try {
-      val properties = new Properties()
-      properties.load(inReader)
-      properties.stringPropertyNames().asScala.map { k =>
-        (k, properties.getProperty(k).trim())
-      }.toMap
-    } finally {
-      inReader.close()
-    }
-  }
-
-  /**
-   * Checks if event has occurred during some time period. This performs an 
exponential backoff
-   * to limit the poll calls.
-   *
-   * @param checkForEvent
-   * @param atMost
-   * @throws java.util.concurrent.TimeoutException
-   * @throws java.lang.InterruptedException
-   * @return
-   */
-  @throws(classOf[TimeoutException])
-  @throws(classOf[InterruptedException])
-  final def waitUntil(checkForEvent: () => Boolean, atMost: Duration): Unit = {
-    val endTime = System.currentTimeMillis() + atMost.toMillis
-
-    @tailrec
-    def aux(count: Int): Unit = {
-      if (!checkForEvent()) {
-        val now = System.currentTimeMillis()
-
-        if (now < endTime) {
-          val sleepTime = Math.max(10 * (2 << (count - 1)), 1000)
-          Thread.sleep(sleepTime)
-          aux(count + 1)
-        } else {
-          throw new TimeoutException
-        }
-      }
-    }
-
-    aux(1)
-  }
-
-  /** Returns if the process is still running */
-  def isProcessAlive(process: Process): Boolean = {
-    try {
-      process.exitValue()
-      false
-    } catch {
-      case _: IllegalThreadStateException =>
-        true
-    }
-  }
-
-  def startDaemonThread(name: String)(f: => Unit): Thread = {
-    val thread = new Thread(name) {
-      override def run(): Unit = f
-    }
-    thread.setDaemon(true)
-    thread.start()
-    thread
-  }
-
-  def usingResource[A <: Closeable, B](resource: A)(f: A => B): B = {
-    try {
-      f(resource)
-    } finally {
-      resource.close()
-    }
-  }
-
-}

http://git-wip-us.apache.org/repos/asf/incubator-livy/blob/412ccc8f/core/src/main/scala/com/cloudera/livy/msgs.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/com/cloudera/livy/msgs.scala 
b/core/src/main/scala/com/cloudera/livy/msgs.scala
deleted file mode 100644
index 39ee152..0000000
--- a/core/src/main/scala/com/cloudera/livy/msgs.scala
+++ /dev/null
@@ -1,63 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *    http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package com.cloudera.livy
-
-sealed trait MsgType
-
-object MsgType {
-  case object execute_request extends MsgType
-  case object execute_reply extends MsgType
-}
-
-case class Msg[T <: Content](msg_type: MsgType, content: T)
-
-sealed trait Content
-
-case class ExecuteRequest(code: String) extends Content {
-  val msg_type = MsgType.execute_request
-}
-
-sealed trait ExecutionStatus
-object ExecutionStatus {
-  case object ok extends ExecutionStatus
-  case object error extends ExecutionStatus
-  case object abort extends ExecutionStatus
-}
-
-sealed trait ExecuteReply extends Content {
-  val msg_type = MsgType.execute_reply
-
-  val status: ExecutionStatus
-  val execution_count: Int
-}
-
-case class ExecuteReplyOk(execution_count: Int,
-                          payload: Map[String, String]) extends ExecuteReply {
-  val status = ExecutionStatus.ok
-}
-
-case class ExecuteReplyError(execution_count: Int,
-                             ename: String,
-                             evalue: String,
-                             traceback: List[String]) extends ExecuteReply {
-  val status = ExecutionStatus.error
-}
-
-case class ExecuteResponse(id: Int, input: Seq[String], output: Seq[String])
-
-case class ShutdownRequest() extends Content

http://git-wip-us.apache.org/repos/asf/incubator-livy/blob/412ccc8f/core/src/main/scala/com/cloudera/livy/sessions/Kind.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/com/cloudera/livy/sessions/Kind.scala 
b/core/src/main/scala/com/cloudera/livy/sessions/Kind.scala
deleted file mode 100644
index 7a6480b..0000000
--- a/core/src/main/scala/com/cloudera/livy/sessions/Kind.scala
+++ /dev/null
@@ -1,68 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *    http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package com.cloudera.livy.sessions
-
-import com.fasterxml.jackson.core.{JsonGenerator, JsonParser, JsonToken}
-import com.fasterxml.jackson.databind._
-import com.fasterxml.jackson.databind.module.SimpleModule
-
-sealed trait Kind
-case class Spark() extends Kind {
-  override def toString: String = "spark"
-}
-
-case class PySpark() extends Kind {
-  override def toString: String = "pyspark"
-}
-
-case class PySpark3() extends Kind {
-  override def toString: String = "pyspark3"
-}
-
-case class SparkR() extends Kind {
-  override def toString: String = "sparkr"
-}
-
-object Kind {
-
-  def apply(kind: String): Kind = kind match {
-    case "spark" | "scala" => Spark()
-    case "pyspark" | "python" => PySpark()
-    case "pyspark3" | "python3" => PySpark3()
-    case "sparkr" | "r" => SparkR()
-    case other => throw new IllegalArgumentException(s"Invalid kind: $other")
-  }
-
-}
-
-class SessionKindModule extends SimpleModule("SessionKind") {
-
-  addSerializer(classOf[Kind], new JsonSerializer[Kind]() {
-    override def serialize(value: Kind, jgen: JsonGenerator, provider: 
SerializerProvider): Unit = {
-      jgen.writeString(value.toString)
-    }
-  })
-
-  addDeserializer(classOf[Kind], new JsonDeserializer[Kind]() {
-    override def deserialize(jp: JsonParser, ctxt: DeserializationContext): 
Kind = {
-      require(jp.getCurrentToken() == JsonToken.VALUE_STRING, "Kind should be 
a string.")
-      Kind(jp.getText())
-    }
-  })
-
-}

http://git-wip-us.apache.org/repos/asf/incubator-livy/blob/412ccc8f/core/src/main/scala/com/cloudera/livy/sessions/SessionState.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/com/cloudera/livy/sessions/SessionState.scala 
b/core/src/main/scala/com/cloudera/livy/sessions/SessionState.scala
deleted file mode 100644
index 831b01a..0000000
--- a/core/src/main/scala/com/cloudera/livy/sessions/SessionState.scala
+++ /dev/null
@@ -1,107 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *    http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package com.cloudera.livy.sessions
-
-sealed trait SessionState {
-  /** Returns true if the State represents a process that can eventually 
execute commands */
-  def isActive: Boolean
-}
-
-sealed trait FinishedSessionState extends SessionState {
-  /** When session is finished. */
-  def time: Long
-}
-
-object SessionState {
-
-  def apply(s: String): SessionState = {
-    s match {
-      case "not_started" => NotStarted()
-      case "starting" => Starting()
-      case "recovering" => Recovering()
-      case "idle" => Idle()
-      case "running" => Running()
-      case "busy" => Busy()
-      case "shutting_down" => ShuttingDown()
-      case "error" => Error()
-      case "dead" => Dead()
-      case "success" => Success()
-      case _ => throw new IllegalArgumentException(s"Illegal session state: 
$s")
-    }
-  }
-
-  case class NotStarted() extends SessionState {
-    override def isActive: Boolean = true
-
-    override def toString: String = "not_started"
-  }
-
-  case class Starting() extends SessionState {
-    override def isActive: Boolean = true
-
-    override def toString: String = "starting"
-  }
-
-  case class Recovering() extends SessionState {
-    override def isActive: Boolean = true
-
-    override def toString: String = "recovering"
-  }
-
-  case class Idle() extends SessionState {
-    override def isActive: Boolean = true
-
-    override def toString: String = "idle"
-  }
-
-  case class Running() extends SessionState {
-    override def isActive: Boolean = true
-
-    override def toString: String = "running"
-  }
-
-  case class Busy() extends SessionState {
-    override def isActive: Boolean = true
-
-    override def toString: String = "busy"
-  }
-
-  case class ShuttingDown() extends SessionState {
-    override def isActive: Boolean = false
-
-    override def toString: String = "shutting_down"
-  }
-
-  case class Error(time: Long = System.nanoTime()) extends 
FinishedSessionState {
-    override def isActive: Boolean = true
-
-    override def toString: String = "error"
-  }
-
-  case class Dead(time: Long = System.nanoTime()) extends FinishedSessionState 
{
-    override def isActive: Boolean = false
-
-    override def toString: String = "dead"
-  }
-
-  case class Success(time: Long = System.nanoTime()) extends 
FinishedSessionState {
-    override def isActive: Boolean = false
-
-    override def toString: String = "success"
-  }
-}

http://git-wip-us.apache.org/repos/asf/incubator-livy/blob/412ccc8f/core/src/main/scala/org/apache/livy/Logging.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/org/apache/livy/Logging.scala 
b/core/src/main/scala/org/apache/livy/Logging.scala
new file mode 100644
index 0000000..73ff7df
--- /dev/null
+++ b/core/src/main/scala/org/apache/livy/Logging.scala
@@ -0,0 +1,54 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.livy
+
+import org.slf4j.LoggerFactory
+
+trait Logging {
+  lazy val logger = LoggerFactory.getLogger(this.getClass)
+
+  def trace(message: => Any): Unit = {
+    if (logger.isTraceEnabled) {
+      logger.trace(message.toString)
+    }
+  }
+
+  def debug(message: => Any): Unit = {
+    if (logger.isDebugEnabled) {
+      logger.debug(message.toString)
+    }
+  }
+
+  def info(message: => Any): Unit = {
+    if (logger.isInfoEnabled) {
+      logger.info(message.toString)
+    }
+  }
+
+  def warn(message: => Any): Unit = {
+    logger.warn(message.toString)
+  }
+
+  def error(message: => Any, t: Throwable): Unit = {
+    logger.error(message.toString, t)
+  }
+
+  def error(message: => Any): Unit = {
+    logger.error(message.toString)
+  }
+}

http://git-wip-us.apache.org/repos/asf/incubator-livy/blob/412ccc8f/core/src/main/scala/org/apache/livy/Utils.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/org/apache/livy/Utils.scala 
b/core/src/main/scala/org/apache/livy/Utils.scala
new file mode 100644
index 0000000..c1cffe4
--- /dev/null
+++ b/core/src/main/scala/org/apache/livy/Utils.scala
@@ -0,0 +1,109 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.livy
+
+import java.io.{Closeable, File, FileInputStream, InputStreamReader}
+import java.net.URL
+import java.nio.charset.StandardCharsets.UTF_8
+import java.util.Properties
+
+import scala.annotation.tailrec
+import scala.collection.JavaConverters._
+import scala.concurrent.TimeoutException
+import scala.concurrent.duration.Duration
+
+object Utils {
+  def getPropertiesFromFile(file: File): Map[String, String] = {
+    loadProperties(file.toURI().toURL())
+  }
+
+  def loadProperties(url: URL): Map[String, String] = {
+    val inReader = new InputStreamReader(url.openStream(), UTF_8)
+    try {
+      val properties = new Properties()
+      properties.load(inReader)
+      properties.stringPropertyNames().asScala.map { k =>
+        (k, properties.getProperty(k).trim())
+      }.toMap
+    } finally {
+      inReader.close()
+    }
+  }
+
+  /**
+   * Checks if event has occurred during some time period. This performs an 
exponential backoff
+   * to limit the poll calls.
+   *
+   * @param checkForEvent
+   * @param atMost
+   * @throws java.util.concurrent.TimeoutException
+   * @throws java.lang.InterruptedException
+   * @return
+   */
+  @throws(classOf[TimeoutException])
+  @throws(classOf[InterruptedException])
+  final def waitUntil(checkForEvent: () => Boolean, atMost: Duration): Unit = {
+    val endTime = System.currentTimeMillis() + atMost.toMillis
+
+    @tailrec
+    def aux(count: Int): Unit = {
+      if (!checkForEvent()) {
+        val now = System.currentTimeMillis()
+
+        if (now < endTime) {
+          val sleepTime = Math.max(10 * (2 << (count - 1)), 1000)
+          Thread.sleep(sleepTime)
+          aux(count + 1)
+        } else {
+          throw new TimeoutException
+        }
+      }
+    }
+
+    aux(1)
+  }
+
+  /** Returns if the process is still running */
+  def isProcessAlive(process: Process): Boolean = {
+    try {
+      process.exitValue()
+      false
+    } catch {
+      case _: IllegalThreadStateException =>
+        true
+    }
+  }
+
+  def startDaemonThread(name: String)(f: => Unit): Thread = {
+    val thread = new Thread(name) {
+      override def run(): Unit = f
+    }
+    thread.setDaemon(true)
+    thread.start()
+    thread
+  }
+
+  def usingResource[A <: Closeable, B](resource: A)(f: A => B): B = {
+    try {
+      f(resource)
+    } finally {
+      resource.close()
+    }
+  }
+
+}

http://git-wip-us.apache.org/repos/asf/incubator-livy/blob/412ccc8f/core/src/main/scala/org/apache/livy/msgs.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/org/apache/livy/msgs.scala 
b/core/src/main/scala/org/apache/livy/msgs.scala
new file mode 100644
index 0000000..0dd0a26
--- /dev/null
+++ b/core/src/main/scala/org/apache/livy/msgs.scala
@@ -0,0 +1,63 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.livy
+
+sealed trait MsgType
+
+object MsgType {
+  case object execute_request extends MsgType
+  case object execute_reply extends MsgType
+}
+
+case class Msg[T <: Content](msg_type: MsgType, content: T)
+
+sealed trait Content
+
+case class ExecuteRequest(code: String) extends Content {
+  val msg_type = MsgType.execute_request
+}
+
+sealed trait ExecutionStatus
+object ExecutionStatus {
+  case object ok extends ExecutionStatus
+  case object error extends ExecutionStatus
+  case object abort extends ExecutionStatus
+}
+
+sealed trait ExecuteReply extends Content {
+  val msg_type = MsgType.execute_reply
+
+  val status: ExecutionStatus
+  val execution_count: Int
+}
+
+case class ExecuteReplyOk(execution_count: Int,
+                          payload: Map[String, String]) extends ExecuteReply {
+  val status = ExecutionStatus.ok
+}
+
+case class ExecuteReplyError(execution_count: Int,
+                             ename: String,
+                             evalue: String,
+                             traceback: List[String]) extends ExecuteReply {
+  val status = ExecutionStatus.error
+}
+
+case class ExecuteResponse(id: Int, input: Seq[String], output: Seq[String])
+
+case class ShutdownRequest() extends Content

http://git-wip-us.apache.org/repos/asf/incubator-livy/blob/412ccc8f/core/src/main/scala/org/apache/livy/sessions/Kind.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/org/apache/livy/sessions/Kind.scala 
b/core/src/main/scala/org/apache/livy/sessions/Kind.scala
new file mode 100644
index 0000000..bfb166f
--- /dev/null
+++ b/core/src/main/scala/org/apache/livy/sessions/Kind.scala
@@ -0,0 +1,68 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.livy.sessions
+
+import com.fasterxml.jackson.core.{JsonGenerator, JsonParser, JsonToken}
+import com.fasterxml.jackson.databind._
+import com.fasterxml.jackson.databind.module.SimpleModule
+
+sealed trait Kind
+case class Spark() extends Kind {
+  override def toString: String = "spark"
+}
+
+case class PySpark() extends Kind {
+  override def toString: String = "pyspark"
+}
+
+case class PySpark3() extends Kind {
+  override def toString: String = "pyspark3"
+}
+
+case class SparkR() extends Kind {
+  override def toString: String = "sparkr"
+}
+
+object Kind {
+
+  def apply(kind: String): Kind = kind match {
+    case "spark" | "scala" => Spark()
+    case "pyspark" | "python" => PySpark()
+    case "pyspark3" | "python3" => PySpark3()
+    case "sparkr" | "r" => SparkR()
+    case other => throw new IllegalArgumentException(s"Invalid kind: $other")
+  }
+
+}
+
+class SessionKindModule extends SimpleModule("SessionKind") {
+
+  addSerializer(classOf[Kind], new JsonSerializer[Kind]() {
+    override def serialize(value: Kind, jgen: JsonGenerator, provider: 
SerializerProvider): Unit = {
+      jgen.writeString(value.toString)
+    }
+  })
+
+  addDeserializer(classOf[Kind], new JsonDeserializer[Kind]() {
+    override def deserialize(jp: JsonParser, ctxt: DeserializationContext): 
Kind = {
+      require(jp.getCurrentToken() == JsonToken.VALUE_STRING, "Kind should be 
a string.")
+      Kind(jp.getText())
+    }
+  })
+
+}

http://git-wip-us.apache.org/repos/asf/incubator-livy/blob/412ccc8f/core/src/main/scala/org/apache/livy/sessions/SessionState.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/org/apache/livy/sessions/SessionState.scala 
b/core/src/main/scala/org/apache/livy/sessions/SessionState.scala
new file mode 100644
index 0000000..fd19321
--- /dev/null
+++ b/core/src/main/scala/org/apache/livy/sessions/SessionState.scala
@@ -0,0 +1,107 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.livy.sessions
+
+sealed trait SessionState {
+  /** Returns true if the State represents a process that can eventually 
execute commands */
+  def isActive: Boolean
+}
+
+sealed trait FinishedSessionState extends SessionState {
+  /** When session is finished. */
+  def time: Long
+}
+
+object SessionState {
+
+  def apply(s: String): SessionState = {
+    s match {
+      case "not_started" => NotStarted()
+      case "starting" => Starting()
+      case "recovering" => Recovering()
+      case "idle" => Idle()
+      case "running" => Running()
+      case "busy" => Busy()
+      case "shutting_down" => ShuttingDown()
+      case "error" => Error()
+      case "dead" => Dead()
+      case "success" => Success()
+      case _ => throw new IllegalArgumentException(s"Illegal session state: 
$s")
+    }
+  }
+
+  case class NotStarted() extends SessionState {
+    override def isActive: Boolean = true
+
+    override def toString: String = "not_started"
+  }
+
+  case class Starting() extends SessionState {
+    override def isActive: Boolean = true
+
+    override def toString: String = "starting"
+  }
+
+  case class Recovering() extends SessionState {
+    override def isActive: Boolean = true
+
+    override def toString: String = "recovering"
+  }
+
+  case class Idle() extends SessionState {
+    override def isActive: Boolean = true
+
+    override def toString: String = "idle"
+  }
+
+  case class Running() extends SessionState {
+    override def isActive: Boolean = true
+
+    override def toString: String = "running"
+  }
+
+  case class Busy() extends SessionState {
+    override def isActive: Boolean = true
+
+    override def toString: String = "busy"
+  }
+
+  case class ShuttingDown() extends SessionState {
+    override def isActive: Boolean = false
+
+    override def toString: String = "shutting_down"
+  }
+
+  case class Error(time: Long = System.nanoTime()) extends 
FinishedSessionState {
+    override def isActive: Boolean = true
+
+    override def toString: String = "error"
+  }
+
+  case class Dead(time: Long = System.nanoTime()) extends FinishedSessionState 
{
+    override def isActive: Boolean = false
+
+    override def toString: String = "dead"
+  }
+
+  case class Success(time: Long = System.nanoTime()) extends 
FinishedSessionState {
+    override def isActive: Boolean = false
+
+    override def toString: String = "success"
+  }
+}

http://git-wip-us.apache.org/repos/asf/incubator-livy/blob/412ccc8f/core/src/test/scala/com/cloudera/livy/LivyBaseUnitTestSuite.scala
----------------------------------------------------------------------
diff --git a/core/src/test/scala/com/cloudera/livy/LivyBaseUnitTestSuite.scala 
b/core/src/test/scala/com/cloudera/livy/LivyBaseUnitTestSuite.scala
deleted file mode 100644
index d833e39..0000000
--- a/core/src/test/scala/com/cloudera/livy/LivyBaseUnitTestSuite.scala
+++ /dev/null
@@ -1,34 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *    http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package com.cloudera.livy
-
-import org.scalatest.{Outcome, Suite}
-
-trait LivyBaseUnitTestSuite extends Suite with Logging {
-
-  protected override def withFixture(test: NoArgTest): Outcome = {
-    val testName = test.name
-    val suiteName = this.getClass.getName
-    try {
-      info(s"\n\n==== TEST OUTPUT FOR $suiteName: '$testName' ====\n")
-      test()
-    } finally {
-      info(s"\n\n==== FINISHED $suiteName: '$testName' ====\n")
-    }
-  }
-}

http://git-wip-us.apache.org/repos/asf/incubator-livy/blob/412ccc8f/core/src/test/scala/org/apache/livy/LivyBaseUnitTestSuite.scala
----------------------------------------------------------------------
diff --git a/core/src/test/scala/org/apache/livy/LivyBaseUnitTestSuite.scala 
b/core/src/test/scala/org/apache/livy/LivyBaseUnitTestSuite.scala
new file mode 100644
index 0000000..908172b
--- /dev/null
+++ b/core/src/test/scala/org/apache/livy/LivyBaseUnitTestSuite.scala
@@ -0,0 +1,34 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.livy
+
+import org.scalatest.{Outcome, Suite}
+
+trait LivyBaseUnitTestSuite extends Suite with Logging {
+
+  protected override def withFixture(test: NoArgTest): Outcome = {
+    val testName = test.name
+    val suiteName = this.getClass.getName
+    try {
+      info(s"\n\n==== TEST OUTPUT FOR $suiteName: '$testName' ====\n")
+      test()
+    } finally {
+      info(s"\n\n==== FINISHED $suiteName: '$testName' ====\n")
+    }
+  }
+}

http://git-wip-us.apache.org/repos/asf/incubator-livy/blob/412ccc8f/coverage/pom.xml
----------------------------------------------------------------------
diff --git a/coverage/pom.xml b/coverage/pom.xml
index 2cf945a..de5ab2e 100644
--- a/coverage/pom.xml
+++ b/coverage/pom.xml
@@ -20,14 +20,14 @@
          xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 
http://maven.apache.org/xsd/maven-4.0.0.xsd";>
   <modelVersion>4.0.0</modelVersion>
   <parent>
-    <groupId>com.cloudera.livy</groupId>
+    <groupId>org.apache.livy</groupId>
     <artifactId>livy-main</artifactId>
     <relativePath>../pom.xml</relativePath>
-    <version>0.4.0-SNAPSHOT</version>
+    <version>0.4.0-incubating-SNAPSHOT</version>
   </parent>
 
   <artifactId>livy-coverage-report</artifactId>
-  <version>0.4.0-SNAPSHOT</version>
+  <version>0.4.0-incubating-SNAPSHOT</version>
   <packaging>pom</packaging>
 
   <dependencies>

http://git-wip-us.apache.org/repos/asf/incubator-livy/blob/412ccc8f/examples/pom.xml
----------------------------------------------------------------------
diff --git a/examples/pom.xml b/examples/pom.xml
index d1deec1..45bd11c 100644
--- a/examples/pom.xml
+++ b/examples/pom.xml
@@ -21,15 +21,15 @@
   <modelVersion>4.0.0</modelVersion>
 
   <parent>
-    <groupId>com.cloudera.livy</groupId>
+    <groupId>org.apache.livy</groupId>
     <artifactId>livy-main</artifactId>
-    <version>0.4.0-SNAPSHOT</version>
+    <version>0.4.0-incubating-SNAPSHOT</version>
     <relativePath>../pom.xml</relativePath>
   </parent>
 
-  <groupId>com.cloudera.livy</groupId>
+  <groupId>org.apache.livy</groupId>
   <artifactId>livy-examples</artifactId>
-  <version>0.4.0-SNAPSHOT</version>
+  <version>0.4.0-incubating-SNAPSHOT</version>
   <packaging>jar</packaging>
 
   <properties>
@@ -40,17 +40,17 @@
 
   <dependencies>
     <dependency>
-      <groupId>com.cloudera.livy</groupId>
+      <groupId>org.apache.livy</groupId>
       <artifactId>livy-api</artifactId>
       <version>${project.version}</version>
     </dependency>
     <dependency>
-      <groupId>com.cloudera.livy</groupId>
+      <groupId>org.apache.livy</groupId>
       <artifactId>livy-scala-api_${scala.binary.version}</artifactId>
       <version>${project.version}</version>
     </dependency>
     <dependency>
-      <groupId>com.cloudera.livy</groupId>
+      <groupId>org.apache.livy</groupId>
       <artifactId>livy-client-http</artifactId>
       <version>${project.version}</version>
     </dependency>

http://git-wip-us.apache.org/repos/asf/incubator-livy/blob/412ccc8f/examples/src/main/java/com/cloudera/livy/examples/PiApp.java
----------------------------------------------------------------------
diff --git a/examples/src/main/java/com/cloudera/livy/examples/PiApp.java 
b/examples/src/main/java/com/cloudera/livy/examples/PiApp.java
deleted file mode 100644
index 7971474..0000000
--- a/examples/src/main/java/com/cloudera/livy/examples/PiApp.java
+++ /dev/null
@@ -1,101 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *    http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package com.cloudera.livy.examples;
-
-import java.io.File;
-import java.net.URI;
-import java.util.ArrayList;
-import java.util.List;
-
-import org.apache.spark.api.java.function.Function;
-import org.apache.spark.api.java.function.Function2;
-
-import com.cloudera.livy.*;
-
-class PiJob implements
-    Job<Double>,
-    Function<Integer, Integer>,
-    Function2<Integer, Integer, Integer> {
-
-  private final int slices;
-  private final int samples;
-
-  public PiJob(int slices) {
-    this.slices = slices;
-    this.samples = (int) Math.min(100000L * slices, Integer.MAX_VALUE);
-  }
-
-  @Override
-  public Double call(JobContext ctx) throws Exception {
-    List<Integer> sampleList = new ArrayList<>();
-    for (int i = 0; i < samples; i++) {
-      sampleList.add(i);
-    }
-
-    return 4.0d * ctx.sc().parallelize(sampleList, 
slices).map(this).reduce(this) / samples;
-  }
-
-  @Override
-  public Integer call(Integer v1) {
-    double x = Math.random() * 2 - 1;
-    double y = Math.random() * 2 - 1;
-    return (x * x + y * y < 1) ? 1 : 0;
-  }
-
-  @Override
-  public Integer call(Integer v1, Integer v2) {
-    return v1 + v2;
-  }
-}
-
-/**
- * Example execution:
- * java -cp 
/pathTo/spark-core_2.10-*version*.jar:/pathTo/livy-api-*version*.jar:
- * /pathTo/livy-client-http-*version*.jar:/pathTo/livy-examples-*version*.jar
- * com.cloudera.livy.examples.PiApp http://livy-host:8998 2
- */
-public class PiApp {
-  public static void main(String[] args) throws Exception {
-    if (args.length != 2) {
-      System.err.println("Usage: PiJob <livy url> <slices>");
-      System.exit(-1);
-    }
-
-    LivyClient client = new LivyClientBuilder()
-      .setURI(new URI(args[0]))
-      .build();
-
-    try {
-      System.out.println("Uploading livy-example jar to the SparkContext...");
-      for (String s : 
System.getProperty("java.class.path").split(File.pathSeparator)) {
-        if (new File(s).getName().startsWith("livy-examples")) {
-          client.uploadJar(new File(s)).get();
-          break;
-        }
-      }
-
-      final int slices = Integer.parseInt(args[1]);
-      double pi = client.submit(new PiJob(slices)).get();
-
-      System.out.println("Pi is roughly " + pi);
-    } finally {
-      client.stop(true);
-    }
-  }
-}
-

http://git-wip-us.apache.org/repos/asf/incubator-livy/blob/412ccc8f/examples/src/main/java/org/apache/livy/examples/PiApp.java
----------------------------------------------------------------------
diff --git a/examples/src/main/java/org/apache/livy/examples/PiApp.java 
b/examples/src/main/java/org/apache/livy/examples/PiApp.java
new file mode 100644
index 0000000..638f3b2
--- /dev/null
+++ b/examples/src/main/java/org/apache/livy/examples/PiApp.java
@@ -0,0 +1,101 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.livy.examples;
+
+import java.io.File;
+import java.net.URI;
+import java.util.ArrayList;
+import java.util.List;
+
+import org.apache.spark.api.java.function.Function;
+import org.apache.spark.api.java.function.Function2;
+
+import org.apache.livy.*;
+
+class PiJob implements
+    Job<Double>,
+    Function<Integer, Integer>,
+    Function2<Integer, Integer, Integer> {
+
+  private final int slices;
+  private final int samples;
+
+  public PiJob(int slices) {
+    this.slices = slices;
+    this.samples = (int) Math.min(100000L * slices, Integer.MAX_VALUE);
+  }
+
+  @Override
+  public Double call(JobContext ctx) throws Exception {
+    List<Integer> sampleList = new ArrayList<>();
+    for (int i = 0; i < samples; i++) {
+      sampleList.add(i);
+    }
+
+    return 4.0d * ctx.sc().parallelize(sampleList, 
slices).map(this).reduce(this) / samples;
+  }
+
+  @Override
+  public Integer call(Integer v1) {
+    double x = Math.random() * 2 - 1;
+    double y = Math.random() * 2 - 1;
+    return (x * x + y * y < 1) ? 1 : 0;
+  }
+
+  @Override
+  public Integer call(Integer v1, Integer v2) {
+    return v1 + v2;
+  }
+}
+
+/**
+ * Example execution:
+ * java -cp 
/pathTo/spark-core_2.10-*version*.jar:/pathTo/livy-api-*version*.jar:
+ * /pathTo/livy-client-http-*version*.jar:/pathTo/livy-examples-*version*.jar
+ * org.apache.livy.examples.PiApp http://livy-host:8998 2
+ */
+public class PiApp {
+  public static void main(String[] args) throws Exception {
+    if (args.length != 2) {
+      System.err.println("Usage: PiJob <livy url> <slices>");
+      System.exit(-1);
+    }
+
+    LivyClient client = new LivyClientBuilder()
+      .setURI(new URI(args[0]))
+      .build();
+
+    try {
+      System.out.println("Uploading livy-example jar to the SparkContext...");
+      for (String s : 
System.getProperty("java.class.path").split(File.pathSeparator)) {
+        if (new File(s).getName().startsWith("livy-examples")) {
+          client.uploadJar(new File(s)).get();
+          break;
+        }
+      }
+
+      final int slices = Integer.parseInt(args[1]);
+      double pi = client.submit(new PiJob(slices)).get();
+
+      System.out.println("Pi is roughly " + pi);
+    } finally {
+      client.stop(true);
+    }
+  }
+}
+

http://git-wip-us.apache.org/repos/asf/incubator-livy/blob/412ccc8f/examples/src/main/scala/com/cloudera/livy/examples/WordCountApp.scala
----------------------------------------------------------------------
diff --git 
a/examples/src/main/scala/com/cloudera/livy/examples/WordCountApp.scala 
b/examples/src/main/scala/com/cloudera/livy/examples/WordCountApp.scala
deleted file mode 100644
index 55b9da2..0000000
--- a/examples/src/main/scala/com/cloudera/livy/examples/WordCountApp.scala
+++ /dev/null
@@ -1,218 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *    http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-// scalastyle:off println
-package com.cloudera.livy.examples
-
-import java.io.{File, FileNotFoundException}
-import java.net.URI
-
-import org.apache.spark.storage.StorageLevel
-import scala.concurrent.Await
-import scala.concurrent.duration._
-import scala.language.postfixOps
-
-import com.cloudera.livy.LivyClientBuilder
-import com.cloudera.livy.scalaapi._
-
-/**
- *  A WordCount example using Scala-API which reads text from a stream and 
saves
- *  it as data frames. The word with maximum count is the result.
- */
-object WordCountApp {
-
-  var scalaClient: LivyScalaClient = null
-
-  /**
-   *  Initializes the Scala client with the given url.
-   *  @param url The Livy server url.
-   */
-  def init(url: String): Unit = {
-    scalaClient = new LivyClientBuilder(false).setURI(new 
URI(url)).build().asScalaClient
-  }
-
-  /**
-   *  Uploads the Scala-API Jar and the examples Jar from the target directory.
-   *  @throws FileNotFoundException If either of Scala-API Jar or examples Jar 
is not found.
-   */
-  @throws(classOf[FileNotFoundException])
-  def uploadRelevantJarsForJobExecution(): Unit = {
-    val exampleAppJarPath = getSourcePath(this)
-    val scalaApiJarPath = getSourcePath(scalaClient)
-    uploadJar(exampleAppJarPath)
-    uploadJar(scalaApiJarPath)
-  }
-
-  @throws(classOf[FileNotFoundException])
-  private def getSourcePath(obj: Object): String = {
-    val source = obj.getClass.getProtectionDomain.getCodeSource
-    if (source != null && source.getLocation.getPath != "") {
-      source.getLocation.getPath
-    } else {
-      throw new FileNotFoundException(s"Jar containing ${obj.getClass.getName} 
not found.")
-    }
-  }
-
-  private def uploadJar(path: String) = {
-    val file = new File(path)
-    val uploadJarFuture = scalaClient.uploadJar(file)
-    Await.result(uploadJarFuture, 40 second) match {
-      case null => println("Successfully uploaded " + file.getName)
-    }
-  }
-
-  /**
-   * Submits a spark streaming job to the livy server.
-   *
-   * The streaming job reads data from the given host and port. The data read
-   * is saved in json format as data frames in the given output path file. If 
the file is present
-   * it appends to it, else creates a new file. For simplicity, the number of 
streaming batches
-   * are 2 with each batch for 20 seconds. The Timeout of the streaming job is 
set to 40 seconds.
-   * @param host Hostname that Spark Streaming context has to connect for 
receiving data.
-   * @param port Port that Spark Streaming context has to connect for 
receiving data.
-   * @param outputPath Output path to save the processed data read by the 
Spark Streaming context.
-   */
-  def processStreamingWordCount(
-      host: String,
-      port: Int,
-      outputPath: String): ScalaJobHandle[Unit] = {
-    scalaClient.submit { context =>
-      context.createStreamingContext(15000)
-      val ssc = context.streamingctx
-      val sqlctx = context.sqlctx
-      val lines = ssc.socketTextStream(host, port, 
StorageLevel.MEMORY_AND_DISK_SER)
-      val words = lines.filter(filterEmptyContent(_)).flatMap(tokenize(_))
-      words.print()
-      words.foreachRDD { rdd =>
-        import sqlctx.implicits._
-        val df = rdd.toDF("word")
-        df.write.mode("append").json(outputPath)
-      }
-      ssc.start()
-      ssc.awaitTerminationOrTimeout(12000)
-      ssc.stop(false, true)
-    }
-  }
-
-  /**
-   * Submits a spark sql job to the livy server.
-   *
-   * The sql context job reads data frames from the given json path and 
executes
-   * a sql query to get the word with max count on the temp table created with 
data frames.
-   * @param inputPath Input path to the json data containing the words.
-   */
-  def getWordWithMostCount(inputPath: String): ScalaJobHandle[String] = {
-    scalaClient.submit { context =>
-      val sqlctx = context.sqlctx
-      val rdd = sqlctx.read.json(inputPath)
-      rdd.registerTempTable("words")
-      val result = sqlctx.sql("select word, count(word) as word_count from 
words " +
-        "group by word order by word_count desc limit 1")
-      result.first().toString()
-    }
-  }
-
-  private def filterEmptyContent(text: String): Boolean = {
-    text != null && !text.isEmpty
-  }
-
-  private def tokenize(text : String) : Array[String] = {
-    text.toLowerCase.replaceAll("[^a-zA-Z0-9\\s]", "").split("\\s+")
-  }
-
-  private def stopClient(): Unit = {
-    if (scalaClient != null) {
-      scalaClient.stop(true)
-      scalaClient = null;
-    }
-  }
-
-  /**
-   * Main method of the WordCount App. This method does the following
-   * - Validate the arguments.
-   * - Initializes the scala client of livy.
-   * - Uploads the required livy and app code jar files to the spark cluster 
needed during runtime.
-   * - Executes the streaming job that reads text-data from socket stream, 
tokenizes and saves
-   *   them as dataframes in JSON format in the given output path.
-   * - Executes the sql-context job which reads the data frames from the given 
output path and
-   * and returns the word with max count.
-   *
-   * @param args
-   *
-   * REQUIRED ARGUMENTS
-   * arg(0) - Livy server url.
-   * arg(1) - Output path to save the text read from the stream.
-   *
-   * Remaining arguments are treated as key=value pairs. The following keys 
are recognized:
-   * host="examplehost" where "host" is the key and "examplehost" is the value
-   * port=8080 where "port" is the key and 8080 is the value
-   *
-   * STEPS FOR EXECUTION - To get accurate results for one execution:
-   * 1) Delete if the file already exists in the given outputFilePath.
-   *
-   * 2) Spark streaming will listen to the given or defaulted host and port. 
So textdata needs to be
-   *    passed as socket stream during the run-time of the App. The streaming 
context reads 2
-   *    batches of data with an interval of 20 seconds for each batch. All the 
data has to be
-   *    fed before the streaming context completes the second batch. NOTE - 
Inorder to get accurate
-   *    results for one execution, pass the textdata before the execution of 
the app so that all the
-   *    data is read by the socket stream.
-   *    To pass data to localhost and port 8086 provide the following command
-   *    nc -kl 8086
-   *
-   * 3) The text can be provided as paragraphs as the app will tokenize the 
data and filter spaces.
-   *
-   * 4) Execute the application jar file with the required and optional 
arguments either using
-   * mvn or scala.
-   *
-   * Example execution:
-   * scala -cp 
/pathTo/livy-api-*version*.jar:/pathTo/livy-client-http-*version*.jar:
-   * /pathTo/livy-examples-*version*.jar:/pathTo/livy-scala-api-*version*.jar
-   * com.cloudera.livy.examples.WordCountApp http://livy-host:8998 
/outputFilePath
-   * host=myhost port=8080
-   */
-  def main(args: Array[String]): Unit = {
-    var socketStreamHost: String = "localhost"
-    var socketStreamPort: Int = 8086
-    var url = ""
-    var outputFilePath = ""
-    def parseOptionalArg(arg: String): Unit = {
-      val Array(argKey, argValue) = arg.split("=")
-      argKey match {
-        case "host" => socketStreamHost = argValue
-        case "port" => socketStreamPort = argValue.toInt
-        case _ => throw new IllegalArgumentException("Invalid key for optional 
arguments")
-      }
-    }
-    require(args.length >= 2 && args.length <= 4)
-    url = args(0)
-    outputFilePath = args(1)
-    args.slice(2, args.length).foreach(parseOptionalArg)
-    try {
-      init(url)
-      uploadRelevantJarsForJobExecution()
-      println("Calling processStreamingWordCount")
-      val handle1 = processStreamingWordCount(socketStreamHost, 
socketStreamPort, outputFilePath)
-      Await.result(handle1, 100 second)
-      println("Calling getWordWithMostCount")
-      val handle = getWordWithMostCount(outputFilePath)
-      println("Word with max count::" + Await.result(handle, 100 second))
-    } finally {
-      stopClient()
-    }
-  }
-}
-// scalastyle:off println

http://git-wip-us.apache.org/repos/asf/incubator-livy/blob/412ccc8f/examples/src/main/scala/org/apache/livy/examples/WordCountApp.scala
----------------------------------------------------------------------
diff --git 
a/examples/src/main/scala/org/apache/livy/examples/WordCountApp.scala 
b/examples/src/main/scala/org/apache/livy/examples/WordCountApp.scala
new file mode 100644
index 0000000..34c0da9
--- /dev/null
+++ b/examples/src/main/scala/org/apache/livy/examples/WordCountApp.scala
@@ -0,0 +1,218 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+// scalastyle:off println
+package org.apache.livy.examples
+
+import java.io.{File, FileNotFoundException}
+import java.net.URI
+
+import org.apache.spark.storage.StorageLevel
+import scala.concurrent.Await
+import scala.concurrent.duration._
+import scala.language.postfixOps
+
+import org.apache.livy.LivyClientBuilder
+import org.apache.livy.scalaapi._
+
+/**
+ *  A WordCount example using Scala-API which reads text from a stream and 
saves
+ *  it as data frames. The word with maximum count is the result.
+ */
+object WordCountApp {
+
+  var scalaClient: LivyScalaClient = null
+
+  /**
+   *  Initializes the Scala client with the given url.
+   *  @param url The Livy server url.
+   */
+  def init(url: String): Unit = {
+    scalaClient = new LivyClientBuilder(false).setURI(new 
URI(url)).build().asScalaClient
+  }
+
+  /**
+   *  Uploads the Scala-API Jar and the examples Jar from the target directory.
+   *  @throws FileNotFoundException If either of Scala-API Jar or examples Jar 
is not found.
+   */
+  @throws(classOf[FileNotFoundException])
+  def uploadRelevantJarsForJobExecution(): Unit = {
+    val exampleAppJarPath = getSourcePath(this)
+    val scalaApiJarPath = getSourcePath(scalaClient)
+    uploadJar(exampleAppJarPath)
+    uploadJar(scalaApiJarPath)
+  }
+
+  @throws(classOf[FileNotFoundException])
+  private def getSourcePath(obj: Object): String = {
+    val source = obj.getClass.getProtectionDomain.getCodeSource
+    if (source != null && source.getLocation.getPath != "") {
+      source.getLocation.getPath
+    } else {
+      throw new FileNotFoundException(s"Jar containing ${obj.getClass.getName} 
not found.")
+    }
+  }
+
+  private def uploadJar(path: String) = {
+    val file = new File(path)
+    val uploadJarFuture = scalaClient.uploadJar(file)
+    Await.result(uploadJarFuture, 40 second) match {
+      case null => println("Successfully uploaded " + file.getName)
+    }
+  }
+
+  /**
+   * Submits a spark streaming job to the livy server.
+   *
+   * The streaming job reads data from the given host and port. The data read
+   * is saved in json format as data frames in the given output path file. If 
the file is present
+   * it appends to it, else creates a new file. For simplicity, the number of 
streaming batches
+   * are 2 with each batch for 20 seconds. The Timeout of the streaming job is 
set to 40 seconds.
+   * @param host Hostname that Spark Streaming context has to connect for 
receiving data.
+   * @param port Port that Spark Streaming context has to connect for 
receiving data.
+   * @param outputPath Output path to save the processed data read by the 
Spark Streaming context.
+   */
+  def processStreamingWordCount(
+      host: String,
+      port: Int,
+      outputPath: String): ScalaJobHandle[Unit] = {
+    scalaClient.submit { context =>
+      context.createStreamingContext(15000)
+      val ssc = context.streamingctx
+      val sqlctx = context.sqlctx
+      val lines = ssc.socketTextStream(host, port, 
StorageLevel.MEMORY_AND_DISK_SER)
+      val words = lines.filter(filterEmptyContent(_)).flatMap(tokenize(_))
+      words.print()
+      words.foreachRDD { rdd =>
+        import sqlctx.implicits._
+        val df = rdd.toDF("word")
+        df.write.mode("append").json(outputPath)
+      }
+      ssc.start()
+      ssc.awaitTerminationOrTimeout(12000)
+      ssc.stop(false, true)
+    }
+  }
+
+  /**
+   * Submits a spark sql job to the livy server.
+   *
+   * The sql context job reads data frames from the given json path and 
executes
+   * a sql query to get the word with max count on the temp table created with 
data frames.
+   * @param inputPath Input path to the json data containing the words.
+   */
+  def getWordWithMostCount(inputPath: String): ScalaJobHandle[String] = {
+    scalaClient.submit { context =>
+      val sqlctx = context.sqlctx
+      val rdd = sqlctx.read.json(inputPath)
+      rdd.registerTempTable("words")
+      val result = sqlctx.sql("select word, count(word) as word_count from 
words " +
+        "group by word order by word_count desc limit 1")
+      result.first().toString()
+    }
+  }
+
+  private def filterEmptyContent(text: String): Boolean = {
+    text != null && !text.isEmpty
+  }
+
+  private def tokenize(text : String) : Array[String] = {
+    text.toLowerCase.replaceAll("[^a-zA-Z0-9\\s]", "").split("\\s+")
+  }
+
+  private def stopClient(): Unit = {
+    if (scalaClient != null) {
+      scalaClient.stop(true)
+      scalaClient = null;
+    }
+  }
+
+  /**
+   * Main method of the WordCount App. This method does the following
+   * - Validate the arguments.
+   * - Initializes the scala client of livy.
+   * - Uploads the required livy and app code jar files to the spark cluster 
needed during runtime.
+   * - Executes the streaming job that reads text-data from socket stream, 
tokenizes and saves
+   *   them as dataframes in JSON format in the given output path.
+   * - Executes the sql-context job which reads the data frames from the given 
output path and
+   * and returns the word with max count.
+   *
+   * @param args
+   *
+   * REQUIRED ARGUMENTS
+   * arg(0) - Livy server url.
+   * arg(1) - Output path to save the text read from the stream.
+   *
+   * Remaining arguments are treated as key=value pairs. The following keys 
are recognized:
+   * host="examplehost" where "host" is the key and "examplehost" is the value
+   * port=8080 where "port" is the key and 8080 is the value
+   *
+   * STEPS FOR EXECUTION - To get accurate results for one execution:
+   * 1) Delete if the file already exists in the given outputFilePath.
+   *
+   * 2) Spark streaming will listen to the given or defaulted host and port. 
So textdata needs to be
+   *    passed as socket stream during the run-time of the App. The streaming 
context reads 2
+   *    batches of data with an interval of 20 seconds for each batch. All the 
data has to be
+   *    fed before the streaming context completes the second batch. NOTE - 
Inorder to get accurate
+   *    results for one execution, pass the textdata before the execution of 
the app so that all the
+   *    data is read by the socket stream.
+   *    To pass data to localhost and port 8086 provide the following command
+   *    nc -kl 8086
+   *
+   * 3) The text can be provided as paragraphs as the app will tokenize the 
data and filter spaces.
+   *
+   * 4) Execute the application jar file with the required and optional 
arguments either using
+   * mvn or scala.
+   *
+   * Example execution:
+   * scala -cp 
/pathTo/livy-api-*version*.jar:/pathTo/livy-client-http-*version*.jar:
+   * /pathTo/livy-examples-*version*.jar:/pathTo/livy-scala-api-*version*.jar
+   * com.cloudera.livy.examples.WordCountApp http://livy-host:8998 
/outputFilePath
+   * host=myhost port=8080
+   */
+  def main(args: Array[String]): Unit = {
+    var socketStreamHost: String = "localhost"
+    var socketStreamPort: Int = 8086
+    var url = ""
+    var outputFilePath = ""
+    def parseOptionalArg(arg: String): Unit = {
+      val Array(argKey, argValue) = arg.split("=")
+      argKey match {
+        case "host" => socketStreamHost = argValue
+        case "port" => socketStreamPort = argValue.toInt
+        case _ => throw new IllegalArgumentException("Invalid key for optional 
arguments")
+      }
+    }
+    require(args.length >= 2 && args.length <= 4)
+    url = args(0)
+    outputFilePath = args(1)
+    args.slice(2, args.length).foreach(parseOptionalArg)
+    try {
+      init(url)
+      uploadRelevantJarsForJobExecution()
+      println("Calling processStreamingWordCount")
+      val handle1 = processStreamingWordCount(socketStreamHost, 
socketStreamPort, outputFilePath)
+      Await.result(handle1, 100 second)
+      println("Calling getWordWithMostCount")
+      val handle = getWordWithMostCount(outputFilePath)
+      println("Word with max count::" + Await.result(handle, 100 second))
+    } finally {
+      stopClient()
+    }
+  }
+}
+// scalastyle:off println

http://git-wip-us.apache.org/repos/asf/incubator-livy/blob/412ccc8f/integration-test/minicluster-dependencies/pom.xml
----------------------------------------------------------------------
diff --git a/integration-test/minicluster-dependencies/pom.xml 
b/integration-test/minicluster-dependencies/pom.xml
index d6a400c..809a3fc 100644
--- a/integration-test/minicluster-dependencies/pom.xml
+++ b/integration-test/minicluster-dependencies/pom.xml
@@ -23,13 +23,13 @@
 
   <modelVersion>4.0.0</modelVersion>
   <parent>
-    <groupId>com.cloudera.livy</groupId>
+    <groupId>org.apache.livy</groupId>
     <artifactId>multi-scala-project-root</artifactId>
     <relativePath>../../scala/pom.xml</relativePath>
-    <version>0.4.0-SNAPSHOT</version>
+    <version>0.4.0-incubating-SNAPSHOT</version>
   </parent>
   <artifactId>minicluster-dependencies-parent</artifactId>
-  <version>0.4.0-SNAPSHOT</version>
+  <version>0.4.0-incubating-SNAPSHOT</version>
   <packaging>pom</packaging>
   <properties>
     <skipDeploy>true</skipDeploy>

http://git-wip-us.apache.org/repos/asf/incubator-livy/blob/412ccc8f/integration-test/minicluster-dependencies/scala-2.10/pom.xml
----------------------------------------------------------------------
diff --git a/integration-test/minicluster-dependencies/scala-2.10/pom.xml 
b/integration-test/minicluster-dependencies/scala-2.10/pom.xml
index b3493e9..12f6666 100644
--- a/integration-test/minicluster-dependencies/scala-2.10/pom.xml
+++ b/integration-test/minicluster-dependencies/scala-2.10/pom.xml
@@ -17,15 +17,15 @@
 -->
 <project xmlns="http://maven.apache.org/POM/4.0.0"; 
xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"; 
xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 
http://maven.apache.org/xsd/maven-4.0.0.xsd";>
   <modelVersion>4.0.0</modelVersion>
-  <groupId>com.cloudera.livy</groupId>
+  <groupId>org.apache.livy</groupId>
   <artifactId>minicluster-dependencies_2.10</artifactId>
-  <version>0.4.0-SNAPSHOT</version>
+  <version>0.4.0-incubating-SNAPSHOT</version>
   <packaging>jar</packaging>
 
   <parent>
-    <groupId>com.cloudera.livy</groupId>
+    <groupId>org.apache.livy</groupId>
     <artifactId>minicluster-dependencies-parent</artifactId>
-    <version>0.4.0-SNAPSHOT</version>
+    <version>0.4.0-incubating-SNAPSHOT</version>
     <relativePath>../pom.xml</relativePath>
   </parent>
 

http://git-wip-us.apache.org/repos/asf/incubator-livy/blob/412ccc8f/integration-test/minicluster-dependencies/scala-2.11/pom.xml
----------------------------------------------------------------------
diff --git a/integration-test/minicluster-dependencies/scala-2.11/pom.xml 
b/integration-test/minicluster-dependencies/scala-2.11/pom.xml
index d0e3fc7..d754593 100644
--- a/integration-test/minicluster-dependencies/scala-2.11/pom.xml
+++ b/integration-test/minicluster-dependencies/scala-2.11/pom.xml
@@ -17,15 +17,15 @@
 -->
 <project xmlns="http://maven.apache.org/POM/4.0.0"; 
xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"; 
xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 
http://maven.apache.org/xsd/maven-4.0.0.xsd";>
   <modelVersion>4.0.0</modelVersion>
-  <groupId>com.cloudera.livy</groupId>
+  <groupId>org.apache.livy</groupId>
   <artifactId>minicluster-dependencies_2.11</artifactId>
-  <version>0.4.0-SNAPSHOT</version>
+  <version>0.4.0-incubating-SNAPSHOT</version>
   <packaging>jar</packaging>
 
   <parent>
-    <groupId>com.cloudera.livy</groupId>
+    <groupId>org.apache.livy</groupId>
     <artifactId>minicluster-dependencies-parent</artifactId>
-    <version>0.4.0-SNAPSHOT</version>
+    <version>0.4.0-incubating-SNAPSHOT</version>
     <relativePath>../pom.xml</relativePath>
   </parent>
 


Reply via email to