http://git-wip-us.apache.org/repos/asf/incubator-livy/blob/412ccc8f/client-http/src/test/scala/com/cloudera/livy/client/http/LivyConnectionSpec.scala ---------------------------------------------------------------------- diff --git a/client-http/src/test/scala/com/cloudera/livy/client/http/LivyConnectionSpec.scala b/client-http/src/test/scala/com/cloudera/livy/client/http/LivyConnectionSpec.scala deleted file mode 100644 index e1d29ca..0000000 --- a/client-http/src/test/scala/com/cloudera/livy/client/http/LivyConnectionSpec.scala +++ /dev/null @@ -1,118 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package com.cloudera.livy.client.http - -import java.io.IOException -import java.net.URLEncoder -import java.nio.charset.StandardCharsets.UTF_8 - -import org.apache.http.client.utils.URIBuilder -import org.eclipse.jetty.security._ -import org.eclipse.jetty.security.authentication.BasicAuthenticator -import org.eclipse.jetty.util.security._ -import org.scalatest.{BeforeAndAfterAll, FunSpecLike} -import org.scalatest.Matchers._ -import org.scalatra.servlet.ScalatraListener - -import com.cloudera.livy.{LivyBaseUnitTestSuite, LivyConf} -import com.cloudera.livy.server.WebServer - -class LivyConnectionSpec extends FunSpecLike with BeforeAndAfterAll with LivyBaseUnitTestSuite { - describe("LivyConnection") { - def basicAuth(username: String, password: String, realm: String): SecurityHandler = { - val roles = Array("user") - - val l = new HashLoginService() - l.putUser(username, Credential.getCredential(password), roles) - l.setName(realm) - - val constraint = new Constraint() - constraint.setName(Constraint.__BASIC_AUTH) - constraint.setRoles(roles) - constraint.setAuthenticate(true) - - val cm = new ConstraintMapping() - cm.setConstraint(constraint) - cm.setPathSpec("/*") - - val csh = new ConstraintSecurityHandler() - csh.setAuthenticator(new BasicAuthenticator()) - csh.setRealmName(realm) - csh.addConstraintMapping(cm) - csh.setLoginService(l) - - csh - } - - def test(password: String, livyConf: LivyConf = new LivyConf()): Unit = { - val username = "user name" - - val server = new WebServer(livyConf, "0.0.0.0", 0) - server.context.setSecurityHandler(basicAuth(username, password, "realm")) - server.context.setResourceBase("src/main/com/cloudera/livy/server") - server.context.setInitParameter(ScalatraListener.LifeCycleKey, - classOf[HttpClientTestBootstrap].getCanonicalName) - server.context.addEventListener(new ScalatraListener) - server.start() - - val utf8Name = UTF_8.name() - val uri = new URIBuilder() - .setScheme(server.protocol) - .setHost(server.host) - .setPort(server.port) - .setUserInfo(URLEncoder.encode(username, utf8Name), URLEncoder.encode(password, utf8Name)) - .build() - info(uri.toString) - val conn = new LivyConnection(uri, new HttpConf(null)) - try { - conn.get(classOf[Object], "/") should not be (null) - - } finally { - conn.close() - } - - server.stop() - server.join() - } - - it("should support HTTP auth with password") { - test("pass:word") - } - - it("should support HTTP auth with empty password") { - test("") - } - - it("should be failed with large header size") { - val livyConf = new LivyConf() - .set(LivyConf.REQUEST_HEADER_SIZE, 1024) - .set(LivyConf.RESPONSE_HEADER_SIZE, 1024) - val pwd = "test-password" * 100 - val exception = intercept[IOException](test(pwd, livyConf)) - exception.getMessage.contains("Request Entity Too Large") should be(true) - } - - it("should be succeeded with configured header size") { - val livyConf = new LivyConf() - .set(LivyConf.REQUEST_HEADER_SIZE, 2048) - .set(LivyConf.RESPONSE_HEADER_SIZE, 2048) - val pwd = "test-password" * 100 - test(pwd, livyConf) - } - } -}
http://git-wip-us.apache.org/repos/asf/incubator-livy/blob/412ccc8f/client-http/src/test/scala/org/apache/livy/client/http/HttpClientSpec.scala ---------------------------------------------------------------------- diff --git a/client-http/src/test/scala/org/apache/livy/client/http/HttpClientSpec.scala b/client-http/src/test/scala/org/apache/livy/client/http/HttpClientSpec.scala new file mode 100644 index 0000000..801c09b --- /dev/null +++ b/client-http/src/test/scala/org/apache/livy/client/http/HttpClientSpec.scala @@ -0,0 +1,290 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.livy.client.http + +import java.io.{File, InputStream} +import java.net.{InetAddress, URI} +import java.nio.file.{Files, Paths} +import java.util.concurrent.{Future => JFuture, _} +import java.util.concurrent.atomic.AtomicLong +import javax.servlet.ServletContext +import javax.servlet.http.HttpServletRequest + +import scala.concurrent.{ExecutionContext, Future} + +import org.mockito.ArgumentCaptor +import org.mockito.Matchers.{eq => meq, _} +import org.mockito.Mockito._ +import org.scalatest.{BeforeAndAfterAll, FunSpecLike} +import org.scalatra.LifeCycle +import org.scalatra.servlet.ScalatraListener + +import org.apache.livy._ +import org.apache.livy.client.common.{BufferUtils, Serializer} +import org.apache.livy.client.common.HttpMessages._ +import org.apache.livy.server.WebServer +import org.apache.livy.server.interactive.{InteractiveSession, InteractiveSessionServlet} +import org.apache.livy.server.recovery.SessionStore +import org.apache.livy.sessions.{InteractiveSessionManager, SessionState, Spark} +import org.apache.livy.test.jobs.Echo +import org.apache.livy.utils.AppInfo + +/** + * The test for the HTTP client is written in Scala so we can reuse the code in the livy-server + * module, which implements the client session backend. The client servlet has some functionality + * overridden to avoid creating sub-processes for each seession. + */ +class HttpClientSpec extends FunSpecLike with BeforeAndAfterAll with LivyBaseUnitTestSuite { + + import HttpClientSpec._ + + private val TIMEOUT_S = 10 + private val ID_GENERATOR = new AtomicLong() + private val serializer = new Serializer() + + private var server: WebServer = _ + private var client: LivyClient = _ + + override def beforeAll(): Unit = { + super.beforeAll() + server = new WebServer(new LivyConf(), "0.0.0.0", 0) + + server.context.setResourceBase("src/main/com/cloudera/livy/server") + server.context.setInitParameter(ScalatraListener.LifeCycleKey, + classOf[HttpClientTestBootstrap].getCanonicalName) + server.context.addEventListener(new ScalatraListener) + + server.start() + } + + override def afterAll(): Unit = { + super.afterAll() + if (server != null) { + server.stop() + server = null + } + if (client != null) { + client.stop(true) + client = null + } + session = null + } + + describe("HTTP client library") { + + it("should create clients") { + // WebServer does this internally instead of respecting "0.0.0.0", so try to use the same + // address. + val uri = s"http://${InetAddress.getLocalHost.getHostAddress}:${server.port}/" + client = new LivyClientBuilder(false).setURI(new URI(uri)).build() + } + + withClient("should run and monitor asynchronous jobs") { + testJob(false) + } + + withClient("should propagate errors from jobs") { + val errorMessage = "This job throws an error." + val (jobId, handle) = runJob(false, { id => Seq( + new JobStatus(id, JobHandle.State.FAILED, null, errorMessage)) + }) + + val error = intercept[ExecutionException] { + handle.get(TIMEOUT_S, TimeUnit.SECONDS) + } + assert(error.getCause() != null) + assert(error.getCause().getMessage().indexOf(errorMessage) >= 0) + verify(session, times(1)).jobStatus(meq(jobId)) + } + + withClient("should run and monitor synchronous jobs") { + testJob(false) + } + + withClient("should add files and jars") { + val furi = new URI("hdfs:file") + val juri = new URI("hdfs:jar") + + client.addFile(furi).get(TIMEOUT_S, TimeUnit.SECONDS) + client.addJar(juri).get(TIMEOUT_S, TimeUnit.SECONDS) + + verify(session, times(1)).addFile(meq(furi)) + verify(session, times(1)).addJar(meq(juri)) + } + + withClient("should upload files and jars") { + uploadAndVerify("file") + uploadAndVerify("jar") + } + + withClient("should cancel jobs") { + val (jobId, handle) = runJob(false, { id => Seq( + new JobStatus(id, JobHandle.State.STARTED, null, null), + new JobStatus(id, JobHandle.State.CANCELLED, null, null)) + }) + handle.cancel(true) + + intercept[CancellationException] { + handle.get(TIMEOUT_S, TimeUnit.SECONDS) + } + + verify(session, times(1)).cancelJob(meq(jobId)) + } + + withClient("should notify listeners of job completion") { + val (jobId, handle) = runJob(false, { id => Seq( + new JobStatus(id, JobHandle.State.STARTED, null, null), + new JobStatus(id, JobHandle.State.SUCCEEDED, serialize(id), null)) + }) + + val listener = mock(classOf[JobHandle.Listener[Long]]) + handle.asInstanceOf[JobHandle[Long]].addListener(listener) + + assert(handle.get(TIMEOUT_S, TimeUnit.SECONDS) === jobId) + verify(listener, times(1)).onJobSucceeded(any(), any()) + } + + withClient("should time out handle get() call") { + // JobHandleImpl does exponential backoff checking the result of a job. Given an initial + // wait of 100ms, 4 iterations should result in a wait of 800ms, so the handle should at that + // point timeout a wait of 100ms. + val (jobId, handle) = runJob(false, { id => Seq( + new JobStatus(id, JobHandle.State.STARTED, null, null), + new JobStatus(id, JobHandle.State.STARTED, null, null), + new JobStatus(id, JobHandle.State.STARTED, null, null), + new JobStatus(id, JobHandle.State.SUCCEEDED, serialize(id), null)) + }) + + intercept[TimeoutException] { + handle.get(100, TimeUnit.MILLISECONDS) + } + + assert(handle.get(TIMEOUT_S, TimeUnit.SECONDS) === jobId) + } + + withClient("should handle null responses") { + testJob(false, response = Some(null)) + } + + withClient("should connect to existing sessions") { + var sid = client.asInstanceOf[HttpClient].getSessionId() + val uri = s"http://${InetAddress.getLocalHost.getHostAddress}:${server.port}" + + s"${LivyConnection.SESSIONS_URI}/$sid" + val newClient = new LivyClientBuilder(false).setURI(new URI(uri)).build() + newClient.stop(false) + verify(session, never()).stop() + } + + withClient("should tear down clients") { + client.stop(true) + verify(session, times(1)).stop() + } + + } + + private def uploadAndVerify(cmd: String): Unit = { + val f = File.createTempFile("uploadTestFile", cmd) + val expectedStr = "Test data" + val expectedData = expectedStr.getBytes() + Files.write(Paths.get(f.getAbsolutePath), expectedData) + val b = new Array[Byte](expectedData.length) + val captor = ArgumentCaptor.forClass(classOf[InputStream]) + if (cmd == "file") { + client.uploadFile(f).get(TIMEOUT_S, TimeUnit.SECONDS) + verify(session, times(1)).addFile(captor.capture(), meq(f.getName)) + } else { + client.uploadJar(f).get(TIMEOUT_S, TimeUnit.SECONDS) + verify(session, times(1)).addJar(captor.capture(), meq(f.getName)) + } + captor.getValue.read(b) + assert(expectedStr === new String(b)) + } + + private def runJob(sync: Boolean, genStatusFn: Long => Seq[JobStatus]): (Long, JFuture[Int]) = { + val jobId = java.lang.Long.valueOf(ID_GENERATOR.incrementAndGet()) + when(session.submitJob(any(classOf[Array[Byte]]))).thenReturn(jobId) + + val statuses = genStatusFn(jobId) + val first = statuses.head + val remaining = statuses.drop(1) + when(session.jobStatus(meq(jobId))).thenReturn(first, remaining: _*) + + val job = new Echo(42) + val handle = if (sync) client.run(job) else client.submit(job) + (jobId, handle) + } + + private def testJob(sync: Boolean, response: Option[Any] = None): Unit = { + val (jobId, handle) = runJob(sync, { id => Seq( + new JobStatus(id, JobHandle.State.STARTED, null, null), + new JobStatus(id, JobHandle.State.SUCCEEDED, serialize(response.getOrElse(id)), null)) + }) + assert(handle.get(TIMEOUT_S, TimeUnit.SECONDS) === response.getOrElse(jobId)) + verify(session, times(2)).jobStatus(meq(jobId)) + } + + private def withClient(desc: String)(fn: => Unit): Unit = { + it(desc) { + assume(client != null, "No active client.") + fn + } + } + + def serialize(value: Any): Array[Byte] = { + BufferUtils.toByteArray(serializer.serialize(value)) + } + +} + +private object HttpClientSpec { + + // Hack warning: keep the session object available so that individual tests can mock + // the desired behavior before making requests to the server. + var session: InteractiveSession = _ + +} + +private class HttpClientTestBootstrap extends LifeCycle { + + private implicit def executor: ExecutionContext = ExecutionContext.global + + override def init(context: ServletContext): Unit = { + val conf = new LivyConf() + val stateStore = mock(classOf[SessionStore]) + val sessionManager = new InteractiveSessionManager(conf, stateStore, Some(Seq.empty)) + val servlet = new InteractiveSessionServlet(sessionManager, stateStore, conf) { + override protected def createSession(req: HttpServletRequest): InteractiveSession = { + val session = mock(classOf[InteractiveSession]) + val id = sessionManager.nextId() + when(session.id).thenReturn(id) + when(session.appId).thenReturn(None) + when(session.appInfo).thenReturn(AppInfo()) + when(session.state).thenReturn(SessionState.Idle()) + when(session.proxyUser).thenReturn(None) + when(session.kind).thenReturn(Spark()) + when(session.stop()).thenReturn(Future.successful(())) + require(HttpClientSpec.session == null, "Session already created?") + HttpClientSpec.session = session + session + } + } + + context.mount(servlet, s"${LivyConnection.SESSIONS_URI}/*") + } + +} http://git-wip-us.apache.org/repos/asf/incubator-livy/blob/412ccc8f/client-http/src/test/scala/org/apache/livy/client/http/LivyConnectionSpec.scala ---------------------------------------------------------------------- diff --git a/client-http/src/test/scala/org/apache/livy/client/http/LivyConnectionSpec.scala b/client-http/src/test/scala/org/apache/livy/client/http/LivyConnectionSpec.scala new file mode 100644 index 0000000..41edd66 --- /dev/null +++ b/client-http/src/test/scala/org/apache/livy/client/http/LivyConnectionSpec.scala @@ -0,0 +1,118 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.livy.client.http + +import java.io.IOException +import java.net.URLEncoder +import java.nio.charset.StandardCharsets.UTF_8 + +import org.apache.http.client.utils.URIBuilder +import org.eclipse.jetty.security._ +import org.eclipse.jetty.security.authentication.BasicAuthenticator +import org.eclipse.jetty.util.security._ +import org.scalatest.{BeforeAndAfterAll, FunSpecLike} +import org.scalatest.Matchers._ +import org.scalatra.servlet.ScalatraListener + +import org.apache.livy.{LivyBaseUnitTestSuite, LivyConf} +import org.apache.livy.server.WebServer + +class LivyConnectionSpec extends FunSpecLike with BeforeAndAfterAll with LivyBaseUnitTestSuite { + describe("LivyConnection") { + def basicAuth(username: String, password: String, realm: String): SecurityHandler = { + val roles = Array("user") + + val l = new HashLoginService() + l.putUser(username, Credential.getCredential(password), roles) + l.setName(realm) + + val constraint = new Constraint() + constraint.setName(Constraint.__BASIC_AUTH) + constraint.setRoles(roles) + constraint.setAuthenticate(true) + + val cm = new ConstraintMapping() + cm.setConstraint(constraint) + cm.setPathSpec("/*") + + val csh = new ConstraintSecurityHandler() + csh.setAuthenticator(new BasicAuthenticator()) + csh.setRealmName(realm) + csh.addConstraintMapping(cm) + csh.setLoginService(l) + + csh + } + + def test(password: String, livyConf: LivyConf = new LivyConf()): Unit = { + val username = "user name" + + val server = new WebServer(livyConf, "0.0.0.0", 0) + server.context.setSecurityHandler(basicAuth(username, password, "realm")) + server.context.setResourceBase("src/main/com/cloudera/livy/server") + server.context.setInitParameter(ScalatraListener.LifeCycleKey, + classOf[HttpClientTestBootstrap].getCanonicalName) + server.context.addEventListener(new ScalatraListener) + server.start() + + val utf8Name = UTF_8.name() + val uri = new URIBuilder() + .setScheme(server.protocol) + .setHost(server.host) + .setPort(server.port) + .setUserInfo(URLEncoder.encode(username, utf8Name), URLEncoder.encode(password, utf8Name)) + .build() + info(uri.toString) + val conn = new LivyConnection(uri, new HttpConf(null)) + try { + conn.get(classOf[Object], "/") should not be (null) + + } finally { + conn.close() + } + + server.stop() + server.join() + } + + it("should support HTTP auth with password") { + test("pass:word") + } + + it("should support HTTP auth with empty password") { + test("") + } + + it("should be failed with large header size") { + val livyConf = new LivyConf() + .set(LivyConf.REQUEST_HEADER_SIZE, 1024) + .set(LivyConf.RESPONSE_HEADER_SIZE, 1024) + val pwd = "test-password" * 100 + val exception = intercept[IOException](test(pwd, livyConf)) + exception.getMessage.contains("Request Entity Too Large") should be(true) + } + + it("should be succeeded with configured header size") { + val livyConf = new LivyConf() + .set(LivyConf.REQUEST_HEADER_SIZE, 2048) + .set(LivyConf.RESPONSE_HEADER_SIZE, 2048) + val pwd = "test-password" * 100 + test(pwd, livyConf) + } + } +} http://git-wip-us.apache.org/repos/asf/incubator-livy/blob/412ccc8f/core/pom.xml ---------------------------------------------------------------------- diff --git a/core/pom.xml b/core/pom.xml index e08e55b..0a0344b 100644 --- a/core/pom.xml +++ b/core/pom.xml @@ -20,19 +20,19 @@ xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd"> <modelVersion>4.0.0</modelVersion> <parent> - <groupId>com.cloudera.livy</groupId> + <groupId>org.apache.livy</groupId> <artifactId>multi-scala-project-root</artifactId> - <version>0.4.0-SNAPSHOT</version> + <version>0.4.0-incubating-SNAPSHOT</version> <relativePath>../scala/pom.xml</relativePath> </parent> <artifactId>livy-core-parent</artifactId> - <version>0.4.0-SNAPSHOT</version> + <version>0.4.0-incubating-SNAPSHOT</version> <packaging>pom</packaging> <dependencies> <dependency> - <groupId>com.cloudera.livy</groupId> + <groupId>org.apache.livy</groupId> <artifactId>livy-client-common</artifactId> <version>${project.version}</version> </dependency> http://git-wip-us.apache.org/repos/asf/incubator-livy/blob/412ccc8f/core/scala-2.10/pom.xml ---------------------------------------------------------------------- diff --git a/core/scala-2.10/pom.xml b/core/scala-2.10/pom.xml index b8e7bc5..8d1aad8 100644 --- a/core/scala-2.10/pom.xml +++ b/core/scala-2.10/pom.xml @@ -17,15 +17,15 @@ --> <project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd"> <modelVersion>4.0.0</modelVersion> - <groupId>com.cloudera.livy</groupId> + <groupId>org.apache.livy</groupId> <artifactId>livy-core_2.10</artifactId> - <version>0.4.0-SNAPSHOT</version> + <version>0.4.0-incubating-SNAPSHOT</version> <packaging>jar</packaging> <parent> - <groupId>com.cloudera.livy</groupId> + <groupId>org.apache.livy</groupId> <artifactId>livy-core-parent</artifactId> - <version>0.4.0-SNAPSHOT</version> + <version>0.4.0-incubating-SNAPSHOT</version> <relativePath>../pom.xml</relativePath> </parent> http://git-wip-us.apache.org/repos/asf/incubator-livy/blob/412ccc8f/core/scala-2.11/pom.xml ---------------------------------------------------------------------- diff --git a/core/scala-2.11/pom.xml b/core/scala-2.11/pom.xml index 1f817ab..bc16caf 100644 --- a/core/scala-2.11/pom.xml +++ b/core/scala-2.11/pom.xml @@ -17,15 +17,15 @@ --> <project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd"> <modelVersion>4.0.0</modelVersion> - <groupId>com.cloudera.livy</groupId> + <groupId>org.apache.livy</groupId> <artifactId>livy-core_2.11</artifactId> - <version>0.4.0-SNAPSHOT</version> + <version>0.4.0-incubating-SNAPSHOT</version> <packaging>jar</packaging> <parent> - <groupId>com.cloudera.livy</groupId> + <groupId>org.apache.livy</groupId> <artifactId>livy-core-parent</artifactId> - <version>0.4.0-SNAPSHOT</version> + <version>0.4.0-incubating-SNAPSHOT</version> <relativePath>../pom.xml</relativePath> </parent> http://git-wip-us.apache.org/repos/asf/incubator-livy/blob/412ccc8f/core/src/main/scala/com/cloudera/livy/Logging.scala ---------------------------------------------------------------------- diff --git a/core/src/main/scala/com/cloudera/livy/Logging.scala b/core/src/main/scala/com/cloudera/livy/Logging.scala deleted file mode 100644 index 8532ea2..0000000 --- a/core/src/main/scala/com/cloudera/livy/Logging.scala +++ /dev/null @@ -1,54 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package com.cloudera.livy - -import org.slf4j.LoggerFactory - -trait Logging { - lazy val logger = LoggerFactory.getLogger(this.getClass) - - def trace(message: => Any): Unit = { - if (logger.isTraceEnabled) { - logger.trace(message.toString) - } - } - - def debug(message: => Any): Unit = { - if (logger.isDebugEnabled) { - logger.debug(message.toString) - } - } - - def info(message: => Any): Unit = { - if (logger.isInfoEnabled) { - logger.info(message.toString) - } - } - - def warn(message: => Any): Unit = { - logger.warn(message.toString) - } - - def error(message: => Any, t: Throwable): Unit = { - logger.error(message.toString, t) - } - - def error(message: => Any): Unit = { - logger.error(message.toString) - } -} http://git-wip-us.apache.org/repos/asf/incubator-livy/blob/412ccc8f/core/src/main/scala/com/cloudera/livy/Utils.scala ---------------------------------------------------------------------- diff --git a/core/src/main/scala/com/cloudera/livy/Utils.scala b/core/src/main/scala/com/cloudera/livy/Utils.scala deleted file mode 100644 index 0d779dd..0000000 --- a/core/src/main/scala/com/cloudera/livy/Utils.scala +++ /dev/null @@ -1,109 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package com.cloudera.livy - -import java.io.{Closeable, File, FileInputStream, InputStreamReader} -import java.net.URL -import java.nio.charset.StandardCharsets.UTF_8 -import java.util.Properties - -import scala.annotation.tailrec -import scala.collection.JavaConverters._ -import scala.concurrent.TimeoutException -import scala.concurrent.duration.Duration - -object Utils { - def getPropertiesFromFile(file: File): Map[String, String] = { - loadProperties(file.toURI().toURL()) - } - - def loadProperties(url: URL): Map[String, String] = { - val inReader = new InputStreamReader(url.openStream(), UTF_8) - try { - val properties = new Properties() - properties.load(inReader) - properties.stringPropertyNames().asScala.map { k => - (k, properties.getProperty(k).trim()) - }.toMap - } finally { - inReader.close() - } - } - - /** - * Checks if event has occurred during some time period. This performs an exponential backoff - * to limit the poll calls. - * - * @param checkForEvent - * @param atMost - * @throws java.util.concurrent.TimeoutException - * @throws java.lang.InterruptedException - * @return - */ - @throws(classOf[TimeoutException]) - @throws(classOf[InterruptedException]) - final def waitUntil(checkForEvent: () => Boolean, atMost: Duration): Unit = { - val endTime = System.currentTimeMillis() + atMost.toMillis - - @tailrec - def aux(count: Int): Unit = { - if (!checkForEvent()) { - val now = System.currentTimeMillis() - - if (now < endTime) { - val sleepTime = Math.max(10 * (2 << (count - 1)), 1000) - Thread.sleep(sleepTime) - aux(count + 1) - } else { - throw new TimeoutException - } - } - } - - aux(1) - } - - /** Returns if the process is still running */ - def isProcessAlive(process: Process): Boolean = { - try { - process.exitValue() - false - } catch { - case _: IllegalThreadStateException => - true - } - } - - def startDaemonThread(name: String)(f: => Unit): Thread = { - val thread = new Thread(name) { - override def run(): Unit = f - } - thread.setDaemon(true) - thread.start() - thread - } - - def usingResource[A <: Closeable, B](resource: A)(f: A => B): B = { - try { - f(resource) - } finally { - resource.close() - } - } - -} http://git-wip-us.apache.org/repos/asf/incubator-livy/blob/412ccc8f/core/src/main/scala/com/cloudera/livy/msgs.scala ---------------------------------------------------------------------- diff --git a/core/src/main/scala/com/cloudera/livy/msgs.scala b/core/src/main/scala/com/cloudera/livy/msgs.scala deleted file mode 100644 index 39ee152..0000000 --- a/core/src/main/scala/com/cloudera/livy/msgs.scala +++ /dev/null @@ -1,63 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package com.cloudera.livy - -sealed trait MsgType - -object MsgType { - case object execute_request extends MsgType - case object execute_reply extends MsgType -} - -case class Msg[T <: Content](msg_type: MsgType, content: T) - -sealed trait Content - -case class ExecuteRequest(code: String) extends Content { - val msg_type = MsgType.execute_request -} - -sealed trait ExecutionStatus -object ExecutionStatus { - case object ok extends ExecutionStatus - case object error extends ExecutionStatus - case object abort extends ExecutionStatus -} - -sealed trait ExecuteReply extends Content { - val msg_type = MsgType.execute_reply - - val status: ExecutionStatus - val execution_count: Int -} - -case class ExecuteReplyOk(execution_count: Int, - payload: Map[String, String]) extends ExecuteReply { - val status = ExecutionStatus.ok -} - -case class ExecuteReplyError(execution_count: Int, - ename: String, - evalue: String, - traceback: List[String]) extends ExecuteReply { - val status = ExecutionStatus.error -} - -case class ExecuteResponse(id: Int, input: Seq[String], output: Seq[String]) - -case class ShutdownRequest() extends Content http://git-wip-us.apache.org/repos/asf/incubator-livy/blob/412ccc8f/core/src/main/scala/com/cloudera/livy/sessions/Kind.scala ---------------------------------------------------------------------- diff --git a/core/src/main/scala/com/cloudera/livy/sessions/Kind.scala b/core/src/main/scala/com/cloudera/livy/sessions/Kind.scala deleted file mode 100644 index 7a6480b..0000000 --- a/core/src/main/scala/com/cloudera/livy/sessions/Kind.scala +++ /dev/null @@ -1,68 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package com.cloudera.livy.sessions - -import com.fasterxml.jackson.core.{JsonGenerator, JsonParser, JsonToken} -import com.fasterxml.jackson.databind._ -import com.fasterxml.jackson.databind.module.SimpleModule - -sealed trait Kind -case class Spark() extends Kind { - override def toString: String = "spark" -} - -case class PySpark() extends Kind { - override def toString: String = "pyspark" -} - -case class PySpark3() extends Kind { - override def toString: String = "pyspark3" -} - -case class SparkR() extends Kind { - override def toString: String = "sparkr" -} - -object Kind { - - def apply(kind: String): Kind = kind match { - case "spark" | "scala" => Spark() - case "pyspark" | "python" => PySpark() - case "pyspark3" | "python3" => PySpark3() - case "sparkr" | "r" => SparkR() - case other => throw new IllegalArgumentException(s"Invalid kind: $other") - } - -} - -class SessionKindModule extends SimpleModule("SessionKind") { - - addSerializer(classOf[Kind], new JsonSerializer[Kind]() { - override def serialize(value: Kind, jgen: JsonGenerator, provider: SerializerProvider): Unit = { - jgen.writeString(value.toString) - } - }) - - addDeserializer(classOf[Kind], new JsonDeserializer[Kind]() { - override def deserialize(jp: JsonParser, ctxt: DeserializationContext): Kind = { - require(jp.getCurrentToken() == JsonToken.VALUE_STRING, "Kind should be a string.") - Kind(jp.getText()) - } - }) - -} http://git-wip-us.apache.org/repos/asf/incubator-livy/blob/412ccc8f/core/src/main/scala/com/cloudera/livy/sessions/SessionState.scala ---------------------------------------------------------------------- diff --git a/core/src/main/scala/com/cloudera/livy/sessions/SessionState.scala b/core/src/main/scala/com/cloudera/livy/sessions/SessionState.scala deleted file mode 100644 index 831b01a..0000000 --- a/core/src/main/scala/com/cloudera/livy/sessions/SessionState.scala +++ /dev/null @@ -1,107 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package com.cloudera.livy.sessions - -sealed trait SessionState { - /** Returns true if the State represents a process that can eventually execute commands */ - def isActive: Boolean -} - -sealed trait FinishedSessionState extends SessionState { - /** When session is finished. */ - def time: Long -} - -object SessionState { - - def apply(s: String): SessionState = { - s match { - case "not_started" => NotStarted() - case "starting" => Starting() - case "recovering" => Recovering() - case "idle" => Idle() - case "running" => Running() - case "busy" => Busy() - case "shutting_down" => ShuttingDown() - case "error" => Error() - case "dead" => Dead() - case "success" => Success() - case _ => throw new IllegalArgumentException(s"Illegal session state: $s") - } - } - - case class NotStarted() extends SessionState { - override def isActive: Boolean = true - - override def toString: String = "not_started" - } - - case class Starting() extends SessionState { - override def isActive: Boolean = true - - override def toString: String = "starting" - } - - case class Recovering() extends SessionState { - override def isActive: Boolean = true - - override def toString: String = "recovering" - } - - case class Idle() extends SessionState { - override def isActive: Boolean = true - - override def toString: String = "idle" - } - - case class Running() extends SessionState { - override def isActive: Boolean = true - - override def toString: String = "running" - } - - case class Busy() extends SessionState { - override def isActive: Boolean = true - - override def toString: String = "busy" - } - - case class ShuttingDown() extends SessionState { - override def isActive: Boolean = false - - override def toString: String = "shutting_down" - } - - case class Error(time: Long = System.nanoTime()) extends FinishedSessionState { - override def isActive: Boolean = true - - override def toString: String = "error" - } - - case class Dead(time: Long = System.nanoTime()) extends FinishedSessionState { - override def isActive: Boolean = false - - override def toString: String = "dead" - } - - case class Success(time: Long = System.nanoTime()) extends FinishedSessionState { - override def isActive: Boolean = false - - override def toString: String = "success" - } -} http://git-wip-us.apache.org/repos/asf/incubator-livy/blob/412ccc8f/core/src/main/scala/org/apache/livy/Logging.scala ---------------------------------------------------------------------- diff --git a/core/src/main/scala/org/apache/livy/Logging.scala b/core/src/main/scala/org/apache/livy/Logging.scala new file mode 100644 index 0000000..73ff7df --- /dev/null +++ b/core/src/main/scala/org/apache/livy/Logging.scala @@ -0,0 +1,54 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.livy + +import org.slf4j.LoggerFactory + +trait Logging { + lazy val logger = LoggerFactory.getLogger(this.getClass) + + def trace(message: => Any): Unit = { + if (logger.isTraceEnabled) { + logger.trace(message.toString) + } + } + + def debug(message: => Any): Unit = { + if (logger.isDebugEnabled) { + logger.debug(message.toString) + } + } + + def info(message: => Any): Unit = { + if (logger.isInfoEnabled) { + logger.info(message.toString) + } + } + + def warn(message: => Any): Unit = { + logger.warn(message.toString) + } + + def error(message: => Any, t: Throwable): Unit = { + logger.error(message.toString, t) + } + + def error(message: => Any): Unit = { + logger.error(message.toString) + } +} http://git-wip-us.apache.org/repos/asf/incubator-livy/blob/412ccc8f/core/src/main/scala/org/apache/livy/Utils.scala ---------------------------------------------------------------------- diff --git a/core/src/main/scala/org/apache/livy/Utils.scala b/core/src/main/scala/org/apache/livy/Utils.scala new file mode 100644 index 0000000..c1cffe4 --- /dev/null +++ b/core/src/main/scala/org/apache/livy/Utils.scala @@ -0,0 +1,109 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.livy + +import java.io.{Closeable, File, FileInputStream, InputStreamReader} +import java.net.URL +import java.nio.charset.StandardCharsets.UTF_8 +import java.util.Properties + +import scala.annotation.tailrec +import scala.collection.JavaConverters._ +import scala.concurrent.TimeoutException +import scala.concurrent.duration.Duration + +object Utils { + def getPropertiesFromFile(file: File): Map[String, String] = { + loadProperties(file.toURI().toURL()) + } + + def loadProperties(url: URL): Map[String, String] = { + val inReader = new InputStreamReader(url.openStream(), UTF_8) + try { + val properties = new Properties() + properties.load(inReader) + properties.stringPropertyNames().asScala.map { k => + (k, properties.getProperty(k).trim()) + }.toMap + } finally { + inReader.close() + } + } + + /** + * Checks if event has occurred during some time period. This performs an exponential backoff + * to limit the poll calls. + * + * @param checkForEvent + * @param atMost + * @throws java.util.concurrent.TimeoutException + * @throws java.lang.InterruptedException + * @return + */ + @throws(classOf[TimeoutException]) + @throws(classOf[InterruptedException]) + final def waitUntil(checkForEvent: () => Boolean, atMost: Duration): Unit = { + val endTime = System.currentTimeMillis() + atMost.toMillis + + @tailrec + def aux(count: Int): Unit = { + if (!checkForEvent()) { + val now = System.currentTimeMillis() + + if (now < endTime) { + val sleepTime = Math.max(10 * (2 << (count - 1)), 1000) + Thread.sleep(sleepTime) + aux(count + 1) + } else { + throw new TimeoutException + } + } + } + + aux(1) + } + + /** Returns if the process is still running */ + def isProcessAlive(process: Process): Boolean = { + try { + process.exitValue() + false + } catch { + case _: IllegalThreadStateException => + true + } + } + + def startDaemonThread(name: String)(f: => Unit): Thread = { + val thread = new Thread(name) { + override def run(): Unit = f + } + thread.setDaemon(true) + thread.start() + thread + } + + def usingResource[A <: Closeable, B](resource: A)(f: A => B): B = { + try { + f(resource) + } finally { + resource.close() + } + } + +} http://git-wip-us.apache.org/repos/asf/incubator-livy/blob/412ccc8f/core/src/main/scala/org/apache/livy/msgs.scala ---------------------------------------------------------------------- diff --git a/core/src/main/scala/org/apache/livy/msgs.scala b/core/src/main/scala/org/apache/livy/msgs.scala new file mode 100644 index 0000000..0dd0a26 --- /dev/null +++ b/core/src/main/scala/org/apache/livy/msgs.scala @@ -0,0 +1,63 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.livy + +sealed trait MsgType + +object MsgType { + case object execute_request extends MsgType + case object execute_reply extends MsgType +} + +case class Msg[T <: Content](msg_type: MsgType, content: T) + +sealed trait Content + +case class ExecuteRequest(code: String) extends Content { + val msg_type = MsgType.execute_request +} + +sealed trait ExecutionStatus +object ExecutionStatus { + case object ok extends ExecutionStatus + case object error extends ExecutionStatus + case object abort extends ExecutionStatus +} + +sealed trait ExecuteReply extends Content { + val msg_type = MsgType.execute_reply + + val status: ExecutionStatus + val execution_count: Int +} + +case class ExecuteReplyOk(execution_count: Int, + payload: Map[String, String]) extends ExecuteReply { + val status = ExecutionStatus.ok +} + +case class ExecuteReplyError(execution_count: Int, + ename: String, + evalue: String, + traceback: List[String]) extends ExecuteReply { + val status = ExecutionStatus.error +} + +case class ExecuteResponse(id: Int, input: Seq[String], output: Seq[String]) + +case class ShutdownRequest() extends Content http://git-wip-us.apache.org/repos/asf/incubator-livy/blob/412ccc8f/core/src/main/scala/org/apache/livy/sessions/Kind.scala ---------------------------------------------------------------------- diff --git a/core/src/main/scala/org/apache/livy/sessions/Kind.scala b/core/src/main/scala/org/apache/livy/sessions/Kind.scala new file mode 100644 index 0000000..bfb166f --- /dev/null +++ b/core/src/main/scala/org/apache/livy/sessions/Kind.scala @@ -0,0 +1,68 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.livy.sessions + +import com.fasterxml.jackson.core.{JsonGenerator, JsonParser, JsonToken} +import com.fasterxml.jackson.databind._ +import com.fasterxml.jackson.databind.module.SimpleModule + +sealed trait Kind +case class Spark() extends Kind { + override def toString: String = "spark" +} + +case class PySpark() extends Kind { + override def toString: String = "pyspark" +} + +case class PySpark3() extends Kind { + override def toString: String = "pyspark3" +} + +case class SparkR() extends Kind { + override def toString: String = "sparkr" +} + +object Kind { + + def apply(kind: String): Kind = kind match { + case "spark" | "scala" => Spark() + case "pyspark" | "python" => PySpark() + case "pyspark3" | "python3" => PySpark3() + case "sparkr" | "r" => SparkR() + case other => throw new IllegalArgumentException(s"Invalid kind: $other") + } + +} + +class SessionKindModule extends SimpleModule("SessionKind") { + + addSerializer(classOf[Kind], new JsonSerializer[Kind]() { + override def serialize(value: Kind, jgen: JsonGenerator, provider: SerializerProvider): Unit = { + jgen.writeString(value.toString) + } + }) + + addDeserializer(classOf[Kind], new JsonDeserializer[Kind]() { + override def deserialize(jp: JsonParser, ctxt: DeserializationContext): Kind = { + require(jp.getCurrentToken() == JsonToken.VALUE_STRING, "Kind should be a string.") + Kind(jp.getText()) + } + }) + +} http://git-wip-us.apache.org/repos/asf/incubator-livy/blob/412ccc8f/core/src/main/scala/org/apache/livy/sessions/SessionState.scala ---------------------------------------------------------------------- diff --git a/core/src/main/scala/org/apache/livy/sessions/SessionState.scala b/core/src/main/scala/org/apache/livy/sessions/SessionState.scala new file mode 100644 index 0000000..fd19321 --- /dev/null +++ b/core/src/main/scala/org/apache/livy/sessions/SessionState.scala @@ -0,0 +1,107 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.livy.sessions + +sealed trait SessionState { + /** Returns true if the State represents a process that can eventually execute commands */ + def isActive: Boolean +} + +sealed trait FinishedSessionState extends SessionState { + /** When session is finished. */ + def time: Long +} + +object SessionState { + + def apply(s: String): SessionState = { + s match { + case "not_started" => NotStarted() + case "starting" => Starting() + case "recovering" => Recovering() + case "idle" => Idle() + case "running" => Running() + case "busy" => Busy() + case "shutting_down" => ShuttingDown() + case "error" => Error() + case "dead" => Dead() + case "success" => Success() + case _ => throw new IllegalArgumentException(s"Illegal session state: $s") + } + } + + case class NotStarted() extends SessionState { + override def isActive: Boolean = true + + override def toString: String = "not_started" + } + + case class Starting() extends SessionState { + override def isActive: Boolean = true + + override def toString: String = "starting" + } + + case class Recovering() extends SessionState { + override def isActive: Boolean = true + + override def toString: String = "recovering" + } + + case class Idle() extends SessionState { + override def isActive: Boolean = true + + override def toString: String = "idle" + } + + case class Running() extends SessionState { + override def isActive: Boolean = true + + override def toString: String = "running" + } + + case class Busy() extends SessionState { + override def isActive: Boolean = true + + override def toString: String = "busy" + } + + case class ShuttingDown() extends SessionState { + override def isActive: Boolean = false + + override def toString: String = "shutting_down" + } + + case class Error(time: Long = System.nanoTime()) extends FinishedSessionState { + override def isActive: Boolean = true + + override def toString: String = "error" + } + + case class Dead(time: Long = System.nanoTime()) extends FinishedSessionState { + override def isActive: Boolean = false + + override def toString: String = "dead" + } + + case class Success(time: Long = System.nanoTime()) extends FinishedSessionState { + override def isActive: Boolean = false + + override def toString: String = "success" + } +} http://git-wip-us.apache.org/repos/asf/incubator-livy/blob/412ccc8f/core/src/test/scala/com/cloudera/livy/LivyBaseUnitTestSuite.scala ---------------------------------------------------------------------- diff --git a/core/src/test/scala/com/cloudera/livy/LivyBaseUnitTestSuite.scala b/core/src/test/scala/com/cloudera/livy/LivyBaseUnitTestSuite.scala deleted file mode 100644 index d833e39..0000000 --- a/core/src/test/scala/com/cloudera/livy/LivyBaseUnitTestSuite.scala +++ /dev/null @@ -1,34 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package com.cloudera.livy - -import org.scalatest.{Outcome, Suite} - -trait LivyBaseUnitTestSuite extends Suite with Logging { - - protected override def withFixture(test: NoArgTest): Outcome = { - val testName = test.name - val suiteName = this.getClass.getName - try { - info(s"\n\n==== TEST OUTPUT FOR $suiteName: '$testName' ====\n") - test() - } finally { - info(s"\n\n==== FINISHED $suiteName: '$testName' ====\n") - } - } -} http://git-wip-us.apache.org/repos/asf/incubator-livy/blob/412ccc8f/core/src/test/scala/org/apache/livy/LivyBaseUnitTestSuite.scala ---------------------------------------------------------------------- diff --git a/core/src/test/scala/org/apache/livy/LivyBaseUnitTestSuite.scala b/core/src/test/scala/org/apache/livy/LivyBaseUnitTestSuite.scala new file mode 100644 index 0000000..908172b --- /dev/null +++ b/core/src/test/scala/org/apache/livy/LivyBaseUnitTestSuite.scala @@ -0,0 +1,34 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.livy + +import org.scalatest.{Outcome, Suite} + +trait LivyBaseUnitTestSuite extends Suite with Logging { + + protected override def withFixture(test: NoArgTest): Outcome = { + val testName = test.name + val suiteName = this.getClass.getName + try { + info(s"\n\n==== TEST OUTPUT FOR $suiteName: '$testName' ====\n") + test() + } finally { + info(s"\n\n==== FINISHED $suiteName: '$testName' ====\n") + } + } +} http://git-wip-us.apache.org/repos/asf/incubator-livy/blob/412ccc8f/coverage/pom.xml ---------------------------------------------------------------------- diff --git a/coverage/pom.xml b/coverage/pom.xml index 2cf945a..de5ab2e 100644 --- a/coverage/pom.xml +++ b/coverage/pom.xml @@ -20,14 +20,14 @@ xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd"> <modelVersion>4.0.0</modelVersion> <parent> - <groupId>com.cloudera.livy</groupId> + <groupId>org.apache.livy</groupId> <artifactId>livy-main</artifactId> <relativePath>../pom.xml</relativePath> - <version>0.4.0-SNAPSHOT</version> + <version>0.4.0-incubating-SNAPSHOT</version> </parent> <artifactId>livy-coverage-report</artifactId> - <version>0.4.0-SNAPSHOT</version> + <version>0.4.0-incubating-SNAPSHOT</version> <packaging>pom</packaging> <dependencies> http://git-wip-us.apache.org/repos/asf/incubator-livy/blob/412ccc8f/examples/pom.xml ---------------------------------------------------------------------- diff --git a/examples/pom.xml b/examples/pom.xml index d1deec1..45bd11c 100644 --- a/examples/pom.xml +++ b/examples/pom.xml @@ -21,15 +21,15 @@ <modelVersion>4.0.0</modelVersion> <parent> - <groupId>com.cloudera.livy</groupId> + <groupId>org.apache.livy</groupId> <artifactId>livy-main</artifactId> - <version>0.4.0-SNAPSHOT</version> + <version>0.4.0-incubating-SNAPSHOT</version> <relativePath>../pom.xml</relativePath> </parent> - <groupId>com.cloudera.livy</groupId> + <groupId>org.apache.livy</groupId> <artifactId>livy-examples</artifactId> - <version>0.4.0-SNAPSHOT</version> + <version>0.4.0-incubating-SNAPSHOT</version> <packaging>jar</packaging> <properties> @@ -40,17 +40,17 @@ <dependencies> <dependency> - <groupId>com.cloudera.livy</groupId> + <groupId>org.apache.livy</groupId> <artifactId>livy-api</artifactId> <version>${project.version}</version> </dependency> <dependency> - <groupId>com.cloudera.livy</groupId> + <groupId>org.apache.livy</groupId> <artifactId>livy-scala-api_${scala.binary.version}</artifactId> <version>${project.version}</version> </dependency> <dependency> - <groupId>com.cloudera.livy</groupId> + <groupId>org.apache.livy</groupId> <artifactId>livy-client-http</artifactId> <version>${project.version}</version> </dependency> http://git-wip-us.apache.org/repos/asf/incubator-livy/blob/412ccc8f/examples/src/main/java/com/cloudera/livy/examples/PiApp.java ---------------------------------------------------------------------- diff --git a/examples/src/main/java/com/cloudera/livy/examples/PiApp.java b/examples/src/main/java/com/cloudera/livy/examples/PiApp.java deleted file mode 100644 index 7971474..0000000 --- a/examples/src/main/java/com/cloudera/livy/examples/PiApp.java +++ /dev/null @@ -1,101 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package com.cloudera.livy.examples; - -import java.io.File; -import java.net.URI; -import java.util.ArrayList; -import java.util.List; - -import org.apache.spark.api.java.function.Function; -import org.apache.spark.api.java.function.Function2; - -import com.cloudera.livy.*; - -class PiJob implements - Job<Double>, - Function<Integer, Integer>, - Function2<Integer, Integer, Integer> { - - private final int slices; - private final int samples; - - public PiJob(int slices) { - this.slices = slices; - this.samples = (int) Math.min(100000L * slices, Integer.MAX_VALUE); - } - - @Override - public Double call(JobContext ctx) throws Exception { - List<Integer> sampleList = new ArrayList<>(); - for (int i = 0; i < samples; i++) { - sampleList.add(i); - } - - return 4.0d * ctx.sc().parallelize(sampleList, slices).map(this).reduce(this) / samples; - } - - @Override - public Integer call(Integer v1) { - double x = Math.random() * 2 - 1; - double y = Math.random() * 2 - 1; - return (x * x + y * y < 1) ? 1 : 0; - } - - @Override - public Integer call(Integer v1, Integer v2) { - return v1 + v2; - } -} - -/** - * Example execution: - * java -cp /pathTo/spark-core_2.10-*version*.jar:/pathTo/livy-api-*version*.jar: - * /pathTo/livy-client-http-*version*.jar:/pathTo/livy-examples-*version*.jar - * com.cloudera.livy.examples.PiApp http://livy-host:8998 2 - */ -public class PiApp { - public static void main(String[] args) throws Exception { - if (args.length != 2) { - System.err.println("Usage: PiJob <livy url> <slices>"); - System.exit(-1); - } - - LivyClient client = new LivyClientBuilder() - .setURI(new URI(args[0])) - .build(); - - try { - System.out.println("Uploading livy-example jar to the SparkContext..."); - for (String s : System.getProperty("java.class.path").split(File.pathSeparator)) { - if (new File(s).getName().startsWith("livy-examples")) { - client.uploadJar(new File(s)).get(); - break; - } - } - - final int slices = Integer.parseInt(args[1]); - double pi = client.submit(new PiJob(slices)).get(); - - System.out.println("Pi is roughly " + pi); - } finally { - client.stop(true); - } - } -} - http://git-wip-us.apache.org/repos/asf/incubator-livy/blob/412ccc8f/examples/src/main/java/org/apache/livy/examples/PiApp.java ---------------------------------------------------------------------- diff --git a/examples/src/main/java/org/apache/livy/examples/PiApp.java b/examples/src/main/java/org/apache/livy/examples/PiApp.java new file mode 100644 index 0000000..638f3b2 --- /dev/null +++ b/examples/src/main/java/org/apache/livy/examples/PiApp.java @@ -0,0 +1,101 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.livy.examples; + +import java.io.File; +import java.net.URI; +import java.util.ArrayList; +import java.util.List; + +import org.apache.spark.api.java.function.Function; +import org.apache.spark.api.java.function.Function2; + +import org.apache.livy.*; + +class PiJob implements + Job<Double>, + Function<Integer, Integer>, + Function2<Integer, Integer, Integer> { + + private final int slices; + private final int samples; + + public PiJob(int slices) { + this.slices = slices; + this.samples = (int) Math.min(100000L * slices, Integer.MAX_VALUE); + } + + @Override + public Double call(JobContext ctx) throws Exception { + List<Integer> sampleList = new ArrayList<>(); + for (int i = 0; i < samples; i++) { + sampleList.add(i); + } + + return 4.0d * ctx.sc().parallelize(sampleList, slices).map(this).reduce(this) / samples; + } + + @Override + public Integer call(Integer v1) { + double x = Math.random() * 2 - 1; + double y = Math.random() * 2 - 1; + return (x * x + y * y < 1) ? 1 : 0; + } + + @Override + public Integer call(Integer v1, Integer v2) { + return v1 + v2; + } +} + +/** + * Example execution: + * java -cp /pathTo/spark-core_2.10-*version*.jar:/pathTo/livy-api-*version*.jar: + * /pathTo/livy-client-http-*version*.jar:/pathTo/livy-examples-*version*.jar + * org.apache.livy.examples.PiApp http://livy-host:8998 2 + */ +public class PiApp { + public static void main(String[] args) throws Exception { + if (args.length != 2) { + System.err.println("Usage: PiJob <livy url> <slices>"); + System.exit(-1); + } + + LivyClient client = new LivyClientBuilder() + .setURI(new URI(args[0])) + .build(); + + try { + System.out.println("Uploading livy-example jar to the SparkContext..."); + for (String s : System.getProperty("java.class.path").split(File.pathSeparator)) { + if (new File(s).getName().startsWith("livy-examples")) { + client.uploadJar(new File(s)).get(); + break; + } + } + + final int slices = Integer.parseInt(args[1]); + double pi = client.submit(new PiJob(slices)).get(); + + System.out.println("Pi is roughly " + pi); + } finally { + client.stop(true); + } + } +} + http://git-wip-us.apache.org/repos/asf/incubator-livy/blob/412ccc8f/examples/src/main/scala/com/cloudera/livy/examples/WordCountApp.scala ---------------------------------------------------------------------- diff --git a/examples/src/main/scala/com/cloudera/livy/examples/WordCountApp.scala b/examples/src/main/scala/com/cloudera/livy/examples/WordCountApp.scala deleted file mode 100644 index 55b9da2..0000000 --- a/examples/src/main/scala/com/cloudera/livy/examples/WordCountApp.scala +++ /dev/null @@ -1,218 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -// scalastyle:off println -package com.cloudera.livy.examples - -import java.io.{File, FileNotFoundException} -import java.net.URI - -import org.apache.spark.storage.StorageLevel -import scala.concurrent.Await -import scala.concurrent.duration._ -import scala.language.postfixOps - -import com.cloudera.livy.LivyClientBuilder -import com.cloudera.livy.scalaapi._ - -/** - * A WordCount example using Scala-API which reads text from a stream and saves - * it as data frames. The word with maximum count is the result. - */ -object WordCountApp { - - var scalaClient: LivyScalaClient = null - - /** - * Initializes the Scala client with the given url. - * @param url The Livy server url. - */ - def init(url: String): Unit = { - scalaClient = new LivyClientBuilder(false).setURI(new URI(url)).build().asScalaClient - } - - /** - * Uploads the Scala-API Jar and the examples Jar from the target directory. - * @throws FileNotFoundException If either of Scala-API Jar or examples Jar is not found. - */ - @throws(classOf[FileNotFoundException]) - def uploadRelevantJarsForJobExecution(): Unit = { - val exampleAppJarPath = getSourcePath(this) - val scalaApiJarPath = getSourcePath(scalaClient) - uploadJar(exampleAppJarPath) - uploadJar(scalaApiJarPath) - } - - @throws(classOf[FileNotFoundException]) - private def getSourcePath(obj: Object): String = { - val source = obj.getClass.getProtectionDomain.getCodeSource - if (source != null && source.getLocation.getPath != "") { - source.getLocation.getPath - } else { - throw new FileNotFoundException(s"Jar containing ${obj.getClass.getName} not found.") - } - } - - private def uploadJar(path: String) = { - val file = new File(path) - val uploadJarFuture = scalaClient.uploadJar(file) - Await.result(uploadJarFuture, 40 second) match { - case null => println("Successfully uploaded " + file.getName) - } - } - - /** - * Submits a spark streaming job to the livy server. - * - * The streaming job reads data from the given host and port. The data read - * is saved in json format as data frames in the given output path file. If the file is present - * it appends to it, else creates a new file. For simplicity, the number of streaming batches - * are 2 with each batch for 20 seconds. The Timeout of the streaming job is set to 40 seconds. - * @param host Hostname that Spark Streaming context has to connect for receiving data. - * @param port Port that Spark Streaming context has to connect for receiving data. - * @param outputPath Output path to save the processed data read by the Spark Streaming context. - */ - def processStreamingWordCount( - host: String, - port: Int, - outputPath: String): ScalaJobHandle[Unit] = { - scalaClient.submit { context => - context.createStreamingContext(15000) - val ssc = context.streamingctx - val sqlctx = context.sqlctx - val lines = ssc.socketTextStream(host, port, StorageLevel.MEMORY_AND_DISK_SER) - val words = lines.filter(filterEmptyContent(_)).flatMap(tokenize(_)) - words.print() - words.foreachRDD { rdd => - import sqlctx.implicits._ - val df = rdd.toDF("word") - df.write.mode("append").json(outputPath) - } - ssc.start() - ssc.awaitTerminationOrTimeout(12000) - ssc.stop(false, true) - } - } - - /** - * Submits a spark sql job to the livy server. - * - * The sql context job reads data frames from the given json path and executes - * a sql query to get the word with max count on the temp table created with data frames. - * @param inputPath Input path to the json data containing the words. - */ - def getWordWithMostCount(inputPath: String): ScalaJobHandle[String] = { - scalaClient.submit { context => - val sqlctx = context.sqlctx - val rdd = sqlctx.read.json(inputPath) - rdd.registerTempTable("words") - val result = sqlctx.sql("select word, count(word) as word_count from words " + - "group by word order by word_count desc limit 1") - result.first().toString() - } - } - - private def filterEmptyContent(text: String): Boolean = { - text != null && !text.isEmpty - } - - private def tokenize(text : String) : Array[String] = { - text.toLowerCase.replaceAll("[^a-zA-Z0-9\\s]", "").split("\\s+") - } - - private def stopClient(): Unit = { - if (scalaClient != null) { - scalaClient.stop(true) - scalaClient = null; - } - } - - /** - * Main method of the WordCount App. This method does the following - * - Validate the arguments. - * - Initializes the scala client of livy. - * - Uploads the required livy and app code jar files to the spark cluster needed during runtime. - * - Executes the streaming job that reads text-data from socket stream, tokenizes and saves - * them as dataframes in JSON format in the given output path. - * - Executes the sql-context job which reads the data frames from the given output path and - * and returns the word with max count. - * - * @param args - * - * REQUIRED ARGUMENTS - * arg(0) - Livy server url. - * arg(1) - Output path to save the text read from the stream. - * - * Remaining arguments are treated as key=value pairs. The following keys are recognized: - * host="examplehost" where "host" is the key and "examplehost" is the value - * port=8080 where "port" is the key and 8080 is the value - * - * STEPS FOR EXECUTION - To get accurate results for one execution: - * 1) Delete if the file already exists in the given outputFilePath. - * - * 2) Spark streaming will listen to the given or defaulted host and port. So textdata needs to be - * passed as socket stream during the run-time of the App. The streaming context reads 2 - * batches of data with an interval of 20 seconds for each batch. All the data has to be - * fed before the streaming context completes the second batch. NOTE - Inorder to get accurate - * results for one execution, pass the textdata before the execution of the app so that all the - * data is read by the socket stream. - * To pass data to localhost and port 8086 provide the following command - * nc -kl 8086 - * - * 3) The text can be provided as paragraphs as the app will tokenize the data and filter spaces. - * - * 4) Execute the application jar file with the required and optional arguments either using - * mvn or scala. - * - * Example execution: - * scala -cp /pathTo/livy-api-*version*.jar:/pathTo/livy-client-http-*version*.jar: - * /pathTo/livy-examples-*version*.jar:/pathTo/livy-scala-api-*version*.jar - * com.cloudera.livy.examples.WordCountApp http://livy-host:8998 /outputFilePath - * host=myhost port=8080 - */ - def main(args: Array[String]): Unit = { - var socketStreamHost: String = "localhost" - var socketStreamPort: Int = 8086 - var url = "" - var outputFilePath = "" - def parseOptionalArg(arg: String): Unit = { - val Array(argKey, argValue) = arg.split("=") - argKey match { - case "host" => socketStreamHost = argValue - case "port" => socketStreamPort = argValue.toInt - case _ => throw new IllegalArgumentException("Invalid key for optional arguments") - } - } - require(args.length >= 2 && args.length <= 4) - url = args(0) - outputFilePath = args(1) - args.slice(2, args.length).foreach(parseOptionalArg) - try { - init(url) - uploadRelevantJarsForJobExecution() - println("Calling processStreamingWordCount") - val handle1 = processStreamingWordCount(socketStreamHost, socketStreamPort, outputFilePath) - Await.result(handle1, 100 second) - println("Calling getWordWithMostCount") - val handle = getWordWithMostCount(outputFilePath) - println("Word with max count::" + Await.result(handle, 100 second)) - } finally { - stopClient() - } - } -} -// scalastyle:off println http://git-wip-us.apache.org/repos/asf/incubator-livy/blob/412ccc8f/examples/src/main/scala/org/apache/livy/examples/WordCountApp.scala ---------------------------------------------------------------------- diff --git a/examples/src/main/scala/org/apache/livy/examples/WordCountApp.scala b/examples/src/main/scala/org/apache/livy/examples/WordCountApp.scala new file mode 100644 index 0000000..34c0da9 --- /dev/null +++ b/examples/src/main/scala/org/apache/livy/examples/WordCountApp.scala @@ -0,0 +1,218 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +// scalastyle:off println +package org.apache.livy.examples + +import java.io.{File, FileNotFoundException} +import java.net.URI + +import org.apache.spark.storage.StorageLevel +import scala.concurrent.Await +import scala.concurrent.duration._ +import scala.language.postfixOps + +import org.apache.livy.LivyClientBuilder +import org.apache.livy.scalaapi._ + +/** + * A WordCount example using Scala-API which reads text from a stream and saves + * it as data frames. The word with maximum count is the result. + */ +object WordCountApp { + + var scalaClient: LivyScalaClient = null + + /** + * Initializes the Scala client with the given url. + * @param url The Livy server url. + */ + def init(url: String): Unit = { + scalaClient = new LivyClientBuilder(false).setURI(new URI(url)).build().asScalaClient + } + + /** + * Uploads the Scala-API Jar and the examples Jar from the target directory. + * @throws FileNotFoundException If either of Scala-API Jar or examples Jar is not found. + */ + @throws(classOf[FileNotFoundException]) + def uploadRelevantJarsForJobExecution(): Unit = { + val exampleAppJarPath = getSourcePath(this) + val scalaApiJarPath = getSourcePath(scalaClient) + uploadJar(exampleAppJarPath) + uploadJar(scalaApiJarPath) + } + + @throws(classOf[FileNotFoundException]) + private def getSourcePath(obj: Object): String = { + val source = obj.getClass.getProtectionDomain.getCodeSource + if (source != null && source.getLocation.getPath != "") { + source.getLocation.getPath + } else { + throw new FileNotFoundException(s"Jar containing ${obj.getClass.getName} not found.") + } + } + + private def uploadJar(path: String) = { + val file = new File(path) + val uploadJarFuture = scalaClient.uploadJar(file) + Await.result(uploadJarFuture, 40 second) match { + case null => println("Successfully uploaded " + file.getName) + } + } + + /** + * Submits a spark streaming job to the livy server. + * + * The streaming job reads data from the given host and port. The data read + * is saved in json format as data frames in the given output path file. If the file is present + * it appends to it, else creates a new file. For simplicity, the number of streaming batches + * are 2 with each batch for 20 seconds. The Timeout of the streaming job is set to 40 seconds. + * @param host Hostname that Spark Streaming context has to connect for receiving data. + * @param port Port that Spark Streaming context has to connect for receiving data. + * @param outputPath Output path to save the processed data read by the Spark Streaming context. + */ + def processStreamingWordCount( + host: String, + port: Int, + outputPath: String): ScalaJobHandle[Unit] = { + scalaClient.submit { context => + context.createStreamingContext(15000) + val ssc = context.streamingctx + val sqlctx = context.sqlctx + val lines = ssc.socketTextStream(host, port, StorageLevel.MEMORY_AND_DISK_SER) + val words = lines.filter(filterEmptyContent(_)).flatMap(tokenize(_)) + words.print() + words.foreachRDD { rdd => + import sqlctx.implicits._ + val df = rdd.toDF("word") + df.write.mode("append").json(outputPath) + } + ssc.start() + ssc.awaitTerminationOrTimeout(12000) + ssc.stop(false, true) + } + } + + /** + * Submits a spark sql job to the livy server. + * + * The sql context job reads data frames from the given json path and executes + * a sql query to get the word with max count on the temp table created with data frames. + * @param inputPath Input path to the json data containing the words. + */ + def getWordWithMostCount(inputPath: String): ScalaJobHandle[String] = { + scalaClient.submit { context => + val sqlctx = context.sqlctx + val rdd = sqlctx.read.json(inputPath) + rdd.registerTempTable("words") + val result = sqlctx.sql("select word, count(word) as word_count from words " + + "group by word order by word_count desc limit 1") + result.first().toString() + } + } + + private def filterEmptyContent(text: String): Boolean = { + text != null && !text.isEmpty + } + + private def tokenize(text : String) : Array[String] = { + text.toLowerCase.replaceAll("[^a-zA-Z0-9\\s]", "").split("\\s+") + } + + private def stopClient(): Unit = { + if (scalaClient != null) { + scalaClient.stop(true) + scalaClient = null; + } + } + + /** + * Main method of the WordCount App. This method does the following + * - Validate the arguments. + * - Initializes the scala client of livy. + * - Uploads the required livy and app code jar files to the spark cluster needed during runtime. + * - Executes the streaming job that reads text-data from socket stream, tokenizes and saves + * them as dataframes in JSON format in the given output path. + * - Executes the sql-context job which reads the data frames from the given output path and + * and returns the word with max count. + * + * @param args + * + * REQUIRED ARGUMENTS + * arg(0) - Livy server url. + * arg(1) - Output path to save the text read from the stream. + * + * Remaining arguments are treated as key=value pairs. The following keys are recognized: + * host="examplehost" where "host" is the key and "examplehost" is the value + * port=8080 where "port" is the key and 8080 is the value + * + * STEPS FOR EXECUTION - To get accurate results for one execution: + * 1) Delete if the file already exists in the given outputFilePath. + * + * 2) Spark streaming will listen to the given or defaulted host and port. So textdata needs to be + * passed as socket stream during the run-time of the App. The streaming context reads 2 + * batches of data with an interval of 20 seconds for each batch. All the data has to be + * fed before the streaming context completes the second batch. NOTE - Inorder to get accurate + * results for one execution, pass the textdata before the execution of the app so that all the + * data is read by the socket stream. + * To pass data to localhost and port 8086 provide the following command + * nc -kl 8086 + * + * 3) The text can be provided as paragraphs as the app will tokenize the data and filter spaces. + * + * 4) Execute the application jar file with the required and optional arguments either using + * mvn or scala. + * + * Example execution: + * scala -cp /pathTo/livy-api-*version*.jar:/pathTo/livy-client-http-*version*.jar: + * /pathTo/livy-examples-*version*.jar:/pathTo/livy-scala-api-*version*.jar + * com.cloudera.livy.examples.WordCountApp http://livy-host:8998 /outputFilePath + * host=myhost port=8080 + */ + def main(args: Array[String]): Unit = { + var socketStreamHost: String = "localhost" + var socketStreamPort: Int = 8086 + var url = "" + var outputFilePath = "" + def parseOptionalArg(arg: String): Unit = { + val Array(argKey, argValue) = arg.split("=") + argKey match { + case "host" => socketStreamHost = argValue + case "port" => socketStreamPort = argValue.toInt + case _ => throw new IllegalArgumentException("Invalid key for optional arguments") + } + } + require(args.length >= 2 && args.length <= 4) + url = args(0) + outputFilePath = args(1) + args.slice(2, args.length).foreach(parseOptionalArg) + try { + init(url) + uploadRelevantJarsForJobExecution() + println("Calling processStreamingWordCount") + val handle1 = processStreamingWordCount(socketStreamHost, socketStreamPort, outputFilePath) + Await.result(handle1, 100 second) + println("Calling getWordWithMostCount") + val handle = getWordWithMostCount(outputFilePath) + println("Word with max count::" + Await.result(handle, 100 second)) + } finally { + stopClient() + } + } +} +// scalastyle:off println http://git-wip-us.apache.org/repos/asf/incubator-livy/blob/412ccc8f/integration-test/minicluster-dependencies/pom.xml ---------------------------------------------------------------------- diff --git a/integration-test/minicluster-dependencies/pom.xml b/integration-test/minicluster-dependencies/pom.xml index d6a400c..809a3fc 100644 --- a/integration-test/minicluster-dependencies/pom.xml +++ b/integration-test/minicluster-dependencies/pom.xml @@ -23,13 +23,13 @@ <modelVersion>4.0.0</modelVersion> <parent> - <groupId>com.cloudera.livy</groupId> + <groupId>org.apache.livy</groupId> <artifactId>multi-scala-project-root</artifactId> <relativePath>../../scala/pom.xml</relativePath> - <version>0.4.0-SNAPSHOT</version> + <version>0.4.0-incubating-SNAPSHOT</version> </parent> <artifactId>minicluster-dependencies-parent</artifactId> - <version>0.4.0-SNAPSHOT</version> + <version>0.4.0-incubating-SNAPSHOT</version> <packaging>pom</packaging> <properties> <skipDeploy>true</skipDeploy> http://git-wip-us.apache.org/repos/asf/incubator-livy/blob/412ccc8f/integration-test/minicluster-dependencies/scala-2.10/pom.xml ---------------------------------------------------------------------- diff --git a/integration-test/minicluster-dependencies/scala-2.10/pom.xml b/integration-test/minicluster-dependencies/scala-2.10/pom.xml index b3493e9..12f6666 100644 --- a/integration-test/minicluster-dependencies/scala-2.10/pom.xml +++ b/integration-test/minicluster-dependencies/scala-2.10/pom.xml @@ -17,15 +17,15 @@ --> <project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd"> <modelVersion>4.0.0</modelVersion> - <groupId>com.cloudera.livy</groupId> + <groupId>org.apache.livy</groupId> <artifactId>minicluster-dependencies_2.10</artifactId> - <version>0.4.0-SNAPSHOT</version> + <version>0.4.0-incubating-SNAPSHOT</version> <packaging>jar</packaging> <parent> - <groupId>com.cloudera.livy</groupId> + <groupId>org.apache.livy</groupId> <artifactId>minicluster-dependencies-parent</artifactId> - <version>0.4.0-SNAPSHOT</version> + <version>0.4.0-incubating-SNAPSHOT</version> <relativePath>../pom.xml</relativePath> </parent> http://git-wip-us.apache.org/repos/asf/incubator-livy/blob/412ccc8f/integration-test/minicluster-dependencies/scala-2.11/pom.xml ---------------------------------------------------------------------- diff --git a/integration-test/minicluster-dependencies/scala-2.11/pom.xml b/integration-test/minicluster-dependencies/scala-2.11/pom.xml index d0e3fc7..d754593 100644 --- a/integration-test/minicluster-dependencies/scala-2.11/pom.xml +++ b/integration-test/minicluster-dependencies/scala-2.11/pom.xml @@ -17,15 +17,15 @@ --> <project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd"> <modelVersion>4.0.0</modelVersion> - <groupId>com.cloudera.livy</groupId> + <groupId>org.apache.livy</groupId> <artifactId>minicluster-dependencies_2.11</artifactId> - <version>0.4.0-SNAPSHOT</version> + <version>0.4.0-incubating-SNAPSHOT</version> <packaging>jar</packaging> <parent> - <groupId>com.cloudera.livy</groupId> + <groupId>org.apache.livy</groupId> <artifactId>minicluster-dependencies-parent</artifactId> - <version>0.4.0-SNAPSHOT</version> + <version>0.4.0-incubating-SNAPSHOT</version> <relativePath>../pom.xml</relativePath> </parent>