This is an automated email from the ASF dual-hosted git repository. jshao pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/incubator-livy.git
The following commit(s) were added to refs/heads/master by this push: new 97cf2f7 [LIVY-756] Add Spark 3.0 and Scala 2.12 support 97cf2f7 is described below commit 97cf2f75929ef6c152afc468adbead269bd0758f Author: jerryshao <jerrys...@tencent.com> AuthorDate: Thu Jul 2 15:44:12 2020 +0800 [LIVY-756] Add Spark 3.0 and Scala 2.12 support ## What changes were proposed in this pull request? This PR is based tprelle 's PR #289 , and address all the left issues in that PR: 1. multi-scala version support in one build (Scala 2.11 and 2.12 support). 2. make SparkR work. Also reverts most of the unnecessary changes. Besides this PR remove the build below 2.4 (2.2, 2.3), since Spark 2.2 and 2.3 only ships with Scala 2.11, hard to maintain multiple version. But user could still use 2.2 and 2.3 without changes. All credits to tprelle. ## How was this patch tested? Run UT and IT with Spark 2.4.5 and 3.0.0 locally. Author: jerryshao <jerrys...@tencent.com> Closes #300 from jerryshao/LIVY-756. --- .gitignore | 1 + .rat-excludes | 1 + .travis.yml | 24 +++--- README.md | 4 +- assembly/assembly.xml | 7 ++ assembly/pom.xml | 23 ++++++ client-common/pom.xml | 2 +- .../org/apache/livy/client/common/Serializer.java | 8 +- {client-common => core/scala-2.12}/pom.xml | 52 ++++++------- .../org/apache/livy/LivyBaseUnitTestSuite.scala | 4 +- coverage/pom.xml | 35 +++++++++ .../org/apache/livy/examples/WordCountApp.scala | 2 +- integration-test/pom.xml | 2 +- integration-test/src/test/resources/rtest.R | 9 +-- .../scala/org/apache/livy/test/InteractiveIT.scala | 6 +- .../src/test/spark2/scala/Spark2JobApiIT.scala | 26 +++++-- pom.xml | 88 +++++++++++++--------- repl/pom.xml | 3 + repl/scala-2.11/pom.xml | 1 + .../org/apache/livy/repl/SparkInterpreter.scala | 5 +- repl/{scala-2.11 => scala-2.12}/pom.xml | 11 +-- .../org/apache/livy/repl/SparkInterpreter.scala | 17 ++--- .../apache/livy/repl/SparkInterpreterSpec.scala | 68 +++++++++++++++++ .../main/scala/org/apache/livy/repl/Session.scala | 4 +- .../org/apache/livy/repl/SQLInterpreterSpec.scala | 4 +- rsc/pom.xml | 6 +- .../org/apache/livy/rsc/driver/SparkEntries.java | 7 +- .../org/apache/livy/rsc/rpc/KryoMessageCodec.java | 7 -- {repl/scala-2.11 => scala-api/scala-2.12}/pom.xml | 17 ++--- scala-api/src/main/resources/build.marker | 0 .../org/apache/livy/scalaapi/ScalaJobHandle.scala | 8 ++ server/pom.xml | 9 ++- .../org/apache/livy/server/SessionServlet.scala | 2 +- .../server/interactive/InteractiveSession.scala | 6 +- .../org/apache/livy/utils/LivySparkUtils.scala | 4 +- .../apache/livy/server/BaseJsonServletSpec.scala | 3 +- .../apache/livy/server/SessionServletSpec.scala | 2 +- .../livy/server/batch/BatchServletSpec.scala | 2 +- .../livy/server/batch/BatchSessionSpec.scala | 6 +- .../InteractiveSessionServletSpec.scala | 3 +- .../interactive/InteractiveSessionSpec.scala | 2 +- .../livy/server/interactive/JobApiSpec.scala | 2 +- .../server/interactive/SessionHeartbeatSpec.scala | 2 +- .../server/recovery/FileSystemStateStoreSpec.scala | 2 +- .../livy/server/recovery/SessionStoreSpec.scala | 2 +- .../livy/server/recovery/StateStoreSpec.scala | 2 - .../server/recovery/ZooKeeperStateStoreSpec.scala | 2 +- .../apache/livy/sessions/SessionManagerSpec.scala | 2 +- .../apache/livy/utils/LivySparkUtilsSuite.scala | 5 ++ .../org/apache/livy/utils/SparkYarnAppSpec.scala | 2 +- .../org/apache/livy/test/jobs/SQLGetTweets.java | 2 +- .../livy/thriftserver/types/DataTypeUtils.scala | 5 +- .../livy/thriftserver/ThriftServerSuites.scala | 3 +- thriftserver/session/pom.xml | 13 ---- .../thriftserver/session/ColumnBufferTest.java | 16 ++-- 55 files changed, 362 insertions(+), 189 deletions(-) diff --git a/.gitignore b/.gitignore index d46d49f..b1045ea 100644 --- a/.gitignore +++ b/.gitignore @@ -24,6 +24,7 @@ metastore_db/ derby.log dependency-reduced-pom.xml release-staging/ +venv/ # For python setup.py, which pollutes the source dirs. python-api/dist diff --git a/.rat-excludes b/.rat-excludes index ac29fe6..1df6e9e 100644 --- a/.rat-excludes +++ b/.rat-excludes @@ -28,3 +28,4 @@ logs/* **/jquery-2.1.1.min.js docs/**/*.html docs/**/JB/** +venv/* diff --git a/.travis.yml b/.travis.yml index c2c0ffd..e463461 100644 --- a/.travis.yml +++ b/.travis.yml @@ -16,27 +16,23 @@ # sudo: required -dist: trusty +dist: xenial language: scala - 2.11.12 matrix: include: - - name: "Spark 2.2 Unit Tests" - env: MVN_FLAG='-Pthriftserver -DskipITs' - - name: "Spark 2.2 ITs" - env: MVN_FLAG='-Pthriftserver -DskipTests' - - name: "Spark 2.3 Unit Tests" - env: MVN_FLAG='-Pspark-2.3 -Pthriftserver -DskipITs' - - name: "Spark 2.3 ITs" - env: MVN_FLAG='-Pspark-2.3 -Pthriftserver -DskipTests' - name: "Spark 2.4 Unit Tests" - env: MVN_FLAG='-Pspark-2.4 -Pthriftserver -DskipITs' + env: MVN_FLAG='-Pthriftserver -DskipITs' - name: "Spark 2.4 ITs" - env: MVN_FLAG='-Pspark-2.4 -Pthriftserver -DskipTests' + env: MVN_FLAG='-Pthriftserver -DskipTests' + - name: "Spark 3.0 Unit Tests" + env: MVN_FLAG='-Pthriftserver -Pspark-3.0 -DskipITs' + - name: "Spark 3.0 ITs" + env: MVN_FLAG='-Pthriftserver -Pspark-3.0 -DskipTests' jdk: - - oraclejdk8 + - openjdk8 cache: pip: true @@ -44,7 +40,9 @@ cache: - $HOME/.m2 install: - - sudo apt-get -qq install r-base + - sudo add-apt-repository 'deb https://cloud.r-project.org/bin/linux/ubuntu xenial-cran35/' + - sudo apt-get update + - sudo apt-get -qq --allow-unauthenticated install r-base - sudo apt-get -qq install python3-pip python-dev - sudo apt-get -qq install libkrb5-dev - sudo apt-get -qq remove python-setuptools diff --git a/README.md b/README.md index d5219e5..d454cbc 100644 --- a/README.md +++ b/README.md @@ -57,7 +57,7 @@ Required python packages for building Livy: To run Livy, you will also need a Spark installation. You can get Spark releases at https://spark.apache.org/downloads.html. -Livy requires Spark 2.2+. You can switch to a different version of Spark by setting the +Livy requires Spark 2.4+. You can switch to a different version of Spark by setting the ``SPARK_HOME`` environment variable in the Livy server process, without needing to rebuild Livy. @@ -71,7 +71,7 @@ cd incubator-livy mvn package ``` -By default Livy is built against Apache Spark 2.2.0, but the version of Spark used when running +By default Livy is built against Apache Spark 2.4.5, but the version of Spark used when running Livy does not need to match the version used to build Livy. Livy internally handles the differences between different Spark versions. diff --git a/assembly/assembly.xml b/assembly/assembly.xml index de61aee..eaefbb5 100644 --- a/assembly/assembly.xml +++ b/assembly/assembly.xml @@ -63,6 +63,13 @@ </includes> </fileSet> <fileSet> + <directory>${project.parent.basedir}/repl/scala-2.12/target/jars</directory> + <outputDirectory>${assembly.name}/repl_2.12-jars</outputDirectory> + <includes> + <include>*</include> + </includes> + </fileSet> + <fileSet> <directory>${project.parent.basedir}/server/target/jars</directory> <outputDirectory>${assembly.name}/jars</outputDirectory> <includes> diff --git a/assembly/pom.xml b/assembly/pom.xml index a9262c5..380a6cf 100644 --- a/assembly/pom.xml +++ b/assembly/pom.xml @@ -49,6 +49,12 @@ <dependency> <groupId>${project.groupId}</groupId> + <artifactId>livy-repl_2.12</artifactId> + <version>${project.version}</version> + </dependency> + + <dependency> + <groupId>${project.groupId}</groupId> <artifactId>livy-server</artifactId> <version>${project.version}</version> </dependency> @@ -75,6 +81,23 @@ </execution> </executions> </plugin> + + <plugin> + <groupId>org.apache.maven.plugins</groupId> + <artifactId>maven-surefire-plugin</artifactId> + <configuration> + <skipTests>true</skipTests> + </configuration> + </plugin> + + <plugin> + <groupId>org.scalatest</groupId> + <artifactId>scalatest-maven-plugin</artifactId> + <configuration> + <skipTests>true</skipTests> + </configuration> + </plugin> + </plugins> </build> diff --git a/client-common/pom.xml b/client-common/pom.xml index 6511b49..6533d69 100644 --- a/client-common/pom.xml +++ b/client-common/pom.xml @@ -36,7 +36,7 @@ </dependency> <dependency> - <groupId>com.esotericsoftware.kryo</groupId> + <groupId>com.esotericsoftware</groupId> <artifactId>kryo</artifactId> </dependency> <dependency> diff --git a/client-common/src/main/java/org/apache/livy/client/common/Serializer.java b/client-common/src/main/java/org/apache/livy/client/common/Serializer.java index 3ea3f56..2f879ac 100644 --- a/client-common/src/main/java/org/apache/livy/client/common/Serializer.java +++ b/client-common/src/main/java/org/apache/livy/client/common/Serializer.java @@ -23,7 +23,8 @@ import java.nio.ByteBuffer; import com.esotericsoftware.kryo.Kryo; import com.esotericsoftware.kryo.io.Input; import com.esotericsoftware.kryo.io.Output; -import com.esotericsoftware.shaded.org.objenesis.strategy.StdInstantiatorStrategy; +import com.esotericsoftware.kryo.serializers.ClosureSerializer; +import org.objenesis.strategy.StdInstantiatorStrategy; import org.apache.livy.annotations.Private; @@ -49,7 +50,10 @@ public class Serializer { kryo.register(klass, REG_ID_BASE + count); count++; } - kryo.setInstantiatorStrategy(new StdInstantiatorStrategy()); + kryo.setInstantiatorStrategy(new Kryo.DefaultInstantiatorStrategy( + new StdInstantiatorStrategy())); + kryo.register(java.lang.invoke.SerializedLambda.class); + kryo.register(ClosureSerializer.Closure.class, new ClosureSerializer()); kryo.setClassLoader(Thread.currentThread().getContextClassLoader()); return kryo; } diff --git a/client-common/pom.xml b/core/scala-2.12/pom.xml similarity index 65% copy from client-common/pom.xml copy to core/scala-2.12/pom.xml index 6511b49..f6e16bb 100644 --- a/client-common/pom.xml +++ b/core/scala-2.12/pom.xml @@ -17,37 +17,37 @@ --> <project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd"> <modelVersion>4.0.0</modelVersion> + <groupId>org.apache.livy</groupId> + <artifactId>livy-core_2.12</artifactId> + <version>0.8.0-incubating-SNAPSHOT</version> + <packaging>jar</packaging> + <parent> <groupId>org.apache.livy</groupId> - <artifactId>livy-main</artifactId> + <artifactId>livy-core-parent</artifactId> <version>0.8.0-incubating-SNAPSHOT</version> + <relativePath>../pom.xml</relativePath> </parent> - <groupId>org.apache.livy</groupId> - <artifactId>livy-client-common</artifactId> - <version>0.8.0-incubating-SNAPSHOT</version> - <packaging>jar</packaging> + <properties> + <scala.version>${scala-2.12.version}</scala.version> + <scala.binary.version>2.12</scala.binary.version> + </properties> - <dependencies> - <dependency> - <groupId>org.apache.livy</groupId> - <artifactId>livy-api</artifactId> - <version>${project.version}</version> - </dependency> + <build> + <plugins> + <plugin> + <groupId>org.apache.maven.plugins</groupId> + <artifactId>maven-jar-plugin</artifactId> + <executions> + <execution> + <goals> + <goal>test-jar</goal> + </goals> + </execution> + </executions> + </plugin> + </plugins> + </build> - <dependency> - <groupId>com.esotericsoftware.kryo</groupId> - <artifactId>kryo</artifactId> - </dependency> - <dependency> - <groupId>com.fasterxml.jackson.core</groupId> - <artifactId>jackson-databind</artifactId> - <scope>test</scope> - </dependency> - <dependency> - <groupId>org.slf4j</groupId> - <artifactId>slf4j-api</artifactId> - <scope>provided</scope> - </dependency> - </dependencies> </project> diff --git a/core/src/test/scala/org/apache/livy/LivyBaseUnitTestSuite.scala b/core/src/test/scala/org/apache/livy/LivyBaseUnitTestSuite.scala index 908172b..65c5cd8 100644 --- a/core/src/test/scala/org/apache/livy/LivyBaseUnitTestSuite.scala +++ b/core/src/test/scala/org/apache/livy/LivyBaseUnitTestSuite.scala @@ -17,9 +17,9 @@ package org.apache.livy -import org.scalatest.{Outcome, Suite} +import org.scalatest.{Outcome, TestSuite} -trait LivyBaseUnitTestSuite extends Suite with Logging { +trait LivyBaseUnitTestSuite extends TestSuite with Logging { protected override def withFixture(test: NoArgTest): Outcome = { val testName = test.name diff --git a/coverage/pom.xml b/coverage/pom.xml index cfd7ae1..db59cc2 100644 --- a/coverage/pom.xml +++ b/coverage/pom.xml @@ -58,12 +58,24 @@ <dependency> <groupId>${project.groupId}</groupId> + <artifactId>livy-core_2.12</artifactId> + <version>${project.version}</version> + </dependency> + + <dependency> + <groupId>${project.groupId}</groupId> <artifactId>livy-repl_2.11</artifactId> <version>${project.version}</version> </dependency> <dependency> <groupId>${project.groupId}</groupId> + <artifactId>livy-repl_2.12</artifactId> + <version>${project.version}</version> + </dependency> + + <dependency> + <groupId>${project.groupId}</groupId> <artifactId>livy-rsc</artifactId> <version>${project.version}</version> </dependency> @@ -82,6 +94,12 @@ <dependency> <groupId>${project.groupId}</groupId> + <artifactId>livy-scala-api_2.12</artifactId> + <version>${project.version}</version> + </dependency> + + <dependency> + <groupId>${project.groupId}</groupId> <artifactId>livy-integration-test</artifactId> <version>${project.version}</version> <scope>test</scope> @@ -116,6 +134,23 @@ </execution> </executions> </plugin> + + <plugin> + <groupId>org.apache.maven.plugins</groupId> + <artifactId>maven-surefire-plugin</artifactId> + <configuration> + <skipTests>true</skipTests> + </configuration> + </plugin> + + <plugin> + <groupId>org.scalatest</groupId> + <artifactId>scalatest-maven-plugin</artifactId> + <configuration> + <skipTests>true</skipTests> + </configuration> + </plugin> + </plugins> </build> diff --git a/examples/src/main/scala/org/apache/livy/examples/WordCountApp.scala b/examples/src/main/scala/org/apache/livy/examples/WordCountApp.scala index da30a76..2285f52 100644 --- a/examples/src/main/scala/org/apache/livy/examples/WordCountApp.scala +++ b/examples/src/main/scala/org/apache/livy/examples/WordCountApp.scala @@ -119,7 +119,7 @@ object WordCountApp { scalaClient.submit { context => val sqlctx = context.sqlctx val rdd = sqlctx.read.json(inputPath) - rdd.registerTempTable("words") + rdd.createOrReplaceTempView("words") val result = sqlctx.sql("select word, count(word) as word_count from words " + "group by word order by word_count desc limit 1") result.first().toString() diff --git a/integration-test/pom.xml b/integration-test/pom.xml index f58474f..8146385 100644 --- a/integration-test/pom.xml +++ b/integration-test/pom.xml @@ -87,7 +87,6 @@ <dependency> <groupId>org.asynchttpclient</groupId> <artifactId>async-http-client</artifactId> - <version>2.10.1</version> </dependency> <dependency> @@ -297,6 +296,7 @@ <LIVY_INTEGRATION_TEST>true</LIVY_INTEGRATION_TEST> <SPARK_HOME>${project.build.directory}/${spark.bin.name}</SPARK_HOME> <LIVY_TEST_THRIFT_ENABLED>${livy.test.thrift.enabled}</LIVY_TEST_THRIFT_ENABLED> + <PYSPARK_ROW_FIELD_SORTING_ENABLED>true</PYSPARK_ROW_FIELD_SORTING_ENABLED> </environmentVariables> <systemProperties> <cluster.spec>${cluster.spec}</cluster.spec> diff --git a/integration-test/src/test/resources/rtest.R b/integration-test/src/test/resources/rtest.R index a026a10..d955b7f 100644 --- a/integration-test/src/test/resources/rtest.R +++ b/integration-test/src/test/resources/rtest.R @@ -17,18 +17,17 @@ library(SparkR) -# Initialize SparkContext and SQLContext -sc <- sparkR.init(appName="SparkR-DataFrame-example") -sqlContext <- sparkRSQL.init(sc) +# Initialize SparkSession +sparkR.session(appName = "SparkR-DataFrame-example") # Create a simple local data.frame localDF <- data.frame(name=c("John", "Smith", "Sarah"), age=c(19, 23, 18)) # Convert local data frame to a SparkDataFrame -df <- createDataFrame(sqlContext, localDF) +df <- createDataFrame(localDF) # Print its schema printSchema(df) # Stop the SparkContext now -sparkR.stop() +sparkR.session.stop() diff --git a/integration-test/src/test/scala/org/apache/livy/test/InteractiveIT.scala b/integration-test/src/test/scala/org/apache/livy/test/InteractiveIT.scala index 8ef9d7a..237ee8f 100644 --- a/integration-test/src/test/scala/org/apache/livy/test/InteractiveIT.scala +++ b/integration-test/src/test/scala/org/apache/livy/test/InteractiveIT.scala @@ -41,7 +41,7 @@ class InteractiveIT extends BaseIntegrationTestSuite { // with the mini cluster s.run("""sc.getConf.get("spark.executor.instances")""").verifyResult("res1: String = 1\n") - s.run("val sql = new org.apache.spark.sql.SQLContext(sc)").verifyResult( + s.run("val sql = spark.sqlContext").verifyResult( ".*" + Pattern.quote( "sql: org.apache.spark.sql.SQLContext = org.apache.spark.sql.SQLContext") + ".*") s.run("abcde").verifyError(evalue = ".*?:[0-9]+: error: not found: value abcde.*") @@ -51,7 +51,7 @@ class InteractiveIT extends BaseIntegrationTestSuite { // Verify query submission s.run(s"""val df = spark.createDataFrame(Seq(("jerry", 20), ("michael", 21)))""") .verifyResult(".*" + Pattern.quote("df: org.apache.spark.sql.DataFrame") + ".*") - s.run("df.registerTempTable(\"people\")").result() + s.run("df.createOrReplaceTempView(\"people\")").result() s.run("SELECT * FROM people", Some(SQL)).verifyResult(".*\"jerry\",20.*\"michael\",21.*") // Verify Livy internal configurations are not exposed. @@ -108,7 +108,7 @@ class InteractiveIT extends BaseIntegrationTestSuite { s.run("1+1").verifyResult(startsWith(s"[$count] 2")) s.run("""localDF <- data.frame(name=c("John", "Smith", "Sarah"), age=c(19, 23, 18))""") .verifyResult(null) - s.run("df <- createDataFrame(sqlContext, localDF)").verifyResult(null) + s.run("df <- createDataFrame(localDF)").verifyResult(null) s.run("printSchema(df)").verifyResult(literal( """|root | |-- name: string (nullable = true) diff --git a/integration-test/src/test/spark2/scala/Spark2JobApiIT.scala b/integration-test/src/test/spark2/scala/Spark2JobApiIT.scala index 8a8c5e3..38230f8 100644 --- a/integration-test/src/test/spark2/scala/Spark2JobApiIT.scala +++ b/integration-test/src/test/spark2/scala/Spark2JobApiIT.scala @@ -22,17 +22,19 @@ import java.net.URI import java.util.concurrent.{TimeUnit, Future => JFuture} import javax.servlet.http.HttpServletResponse +import scala.util.Properties + import com.fasterxml.jackson.databind.ObjectMapper import com.fasterxml.jackson.module.scala.DefaultScalaModule -import org.scalatest.BeforeAndAfterAll - import org.apache.http.client.methods.HttpGet +import org.scalatest.BeforeAndAfterAll import org.apache.livy._ import org.apache.livy.client.common.HttpMessages._ import org.apache.livy.sessions.SessionKindModule import org.apache.livy.test.framework.BaseIntegrationTestSuite import org.apache.livy.test.jobs.spark2._ +import org.apache.livy.utils.LivySparkUtils class Spark2JobApiIT extends BaseIntegrationTestSuite with BeforeAndAfterAll with Logging { @@ -52,7 +54,7 @@ class Spark2JobApiIT extends BaseIntegrationTestSuite with BeforeAndAfterAll wit livyClient.connectSession(sessionId).stop() } - test("create a new session and upload test jar") { + scalaTest("create a new session and upload test jar") { val prevSessionCount = sessionList().total val tempClient = createClient(livyEndpoint) @@ -81,13 +83,13 @@ class Spark2JobApiIT extends BaseIntegrationTestSuite with BeforeAndAfterAll wit } } - test("run spark2 job") { + scalaTest("run spark2 job") { assume(client != null, "Client not active.") val result = waitFor(client.submit(new SparkSessionTest())) assert(result === 3) } - test("run spark2 dataset job") { + scalaTest("run spark2 dataset job") { assume(client != null, "Client not active.") val result = waitFor(client.submit(new DatasetTest())) assert(result === 2) @@ -112,4 +114,18 @@ class Spark2JobApiIT extends BaseIntegrationTestSuite with BeforeAndAfterAll wit private def createClient(uri: String): LivyClient = { new LivyClientBuilder().setURI(new URI(uri)).build() } + + protected def scalaTest(desc: String)(testFn: => Unit): Unit = { + test(desc) { + val livyConf = new LivyConf() + val (sparkVersion, scalaVersion) = LivySparkUtils.sparkSubmitVersion(livyConf) + val formattedSparkVersion = LivySparkUtils.formatSparkVersion(sparkVersion) + val versionString = + LivySparkUtils.sparkScalaVersion(formattedSparkVersion, scalaVersion, livyConf) + + assume(versionString == LivySparkUtils.formatScalaVersion(Properties.versionNumberString), + s"Scala test can only be run with ${Properties.versionString}") + testFn + } + } } diff --git a/pom.xml b/pom.xml index f47c024..d2e535a 100644 --- a/pom.xml +++ b/pom.xml @@ -78,40 +78,46 @@ </mailingLists> <properties> + <asynchttpclient.version>2.10.1</asynchttpclient.version> <hadoop.version>2.7.3</hadoop.version> <hadoop.scope>compile</hadoop.scope> - <spark.scala-2.11.version>2.2.3</spark.scala-2.11.version> + <spark.scala-2.11.version>2.4.5</spark.scala-2.11.version> + <spark.scala-2.12.version>2.4.5</spark.scala-2.12.version> <spark.version>${spark.scala-2.11.version}</spark.version> <hive.version>3.0.0</hive.version> <commons-codec.version>1.9</commons-codec.version> <httpclient.version>4.5.3</httpclient.version> <httpcore.version>4.4.4</httpcore.version> - <jackson.version>2.9.9</jackson.version> + <jackson.version>2.10.1</jackson.version> <javax.servlet-api.version>3.1.0</javax.servlet-api.version> <jetty.version>9.3.24.v20180605</jetty.version> - <json4s.version>3.2.11</json4s.version> + <json4s.spark-2.11.version>3.5.3</json4s.spark-2.11.version> + <json4s.spark-2.12.version>3.5.3</json4s.spark-2.12.version> + <json4s.version>${json4s.spark-2.11.version}</json4s.version> <junit.version>4.11</junit.version> <libthrift.version>0.9.3</libthrift.version> - <kryo.version>2.22</kryo.version> + <kryo.version>4.0.2</kryo.version> <metrics.version>3.1.0</metrics.version> - <mockito.version>1.9.5</mockito.version> - <netty.spark-2.11.version>4.0.37.Final</netty.spark-2.11.version> + <mockito.version>1.10.19</mockito.version> + <netty.spark-2.11.version>4.1.17.Final</netty.spark-2.11.version> + <netty.spark-2.12.version>4.1.17.Final</netty.spark-2.12.version> <netty.version>${netty.spark-2.11.version}</netty.version> <project.build.sourceEncoding>UTF-8</project.build.sourceEncoding> <py4j.version>0.10.7</py4j.version> <scala-2.11.version>2.11.12</scala-2.11.version> + <scala-2.12.version>2.12.10</scala-2.12.version> <scala.binary.version>2.11</scala.binary.version> <scala.version>${scala-2.11.version}</scala.version> - <scalatest.version>2.2.4</scalatest.version> - <scalatra.version>2.3.0</scalatra.version> + <scalatest.version>3.0.8</scalatest.version> + <scalatra.version>2.6.5</scalatra.version> <java.version>1.8</java.version> <test.redirectToFile>true</test.redirectToFile> <execution.root>${user.dir}</execution.root> <spark.home>${execution.root}/dev/spark</spark.home> <spark.bin.download.url> - https://archive.apache.org/dist/spark/spark-2.2.3/spark-2.2.3-bin-hadoop2.7.tgz + https://archive.apache.org/dist/spark/spark-2.4.5/spark-2.4.5-bin-hadoop2.7.tgz </spark.bin.download.url> - <spark.bin.name>spark-2.2.3-bin-hadoop2.7</spark.bin.name> + <spark.bin.name>spark-2.4.5-bin-hadoop2.7</spark.bin.name> <!-- used for testing, NCSARequestLog use it for access log --> <livy.log.dir>${basedir}/target</livy.log.dir> @@ -199,15 +205,18 @@ <module>client-http</module> <module>core</module> <module>core/scala-2.11</module> + <module>core/scala-2.12</module> <module>coverage</module> <module>examples</module> <module>python-api</module> <module>repl</module> <module>repl/scala-2.11</module> + <module>repl/scala-2.12</module> <module>rsc</module> <module>scala</module> <module>scala-api</module> <module>scala-api/scala-2.11</module> + <module>scala-api/scala-2.12</module> <module>server</module> <module>test-lib</module> <module>integration-test</module> @@ -240,6 +249,12 @@ <artifactId>scalatra-scalatest_${scala.binary.version}</artifactId> <version>${scalatra.version}</version> <scope>test</scope> + <exclusions> + <exclusion> + <groupId>org.mockito</groupId> + <artifactId>*</artifactId> + </exclusion> + </exclusions> </dependency> </dependencies> @@ -254,7 +269,7 @@ </dependency> <dependency> - <groupId>com.esotericsoftware.kryo</groupId> + <groupId>com.esotericsoftware</groupId> <artifactId>kryo</artifactId> <version>${kryo.version}</version> </dependency> @@ -547,6 +562,12 @@ <dependency> <groupId>org.scalatra</groupId> + <artifactId>scalatra-metrics_${scala.binary.version}</artifactId> + <version>${scalatra.version}</version> + </dependency> + + <dependency> + <groupId>org.scalatra</groupId> <artifactId>scalatra-test_${scala.binary.version}</artifactId> <version>${scalatra.version}</version> </dependency> @@ -557,6 +578,12 @@ <version>${py4j.version}</version> </dependency> + <dependency> + <groupId>org.asynchttpclient</groupId> + <artifactId>async-http-client</artifactId> + <version>${asynchttpclient.version}</version> + </dependency> + <!-- we need a version > 1.7.13 because of SLF4J-324 --> <dependency> <groupId>org.slf4j</groupId> @@ -611,7 +638,7 @@ <plugin> <groupId>net.alchim31.maven</groupId> <artifactId>scala-maven-plugin</artifactId> - <version>3.2.2</version> + <version>4.2.0</version> <executions> <execution> <goals> @@ -630,7 +657,6 @@ <configuration> <scalaVersion>${scala.version}</scalaVersion> <recompileMode>incremental</recompileMode> - <useZincServer>true</useZincServer> <checkMultipleScalaVersions>false</checkMultipleScalaVersions> <args> <arg>-unchecked</arg> @@ -661,7 +687,7 @@ <plugin> <groupId>org.apache.maven.plugins</groupId> <artifactId>maven-shade-plugin</artifactId> - <version>2.4.2</version> + <version>3.2.1</version> </plugin> <plugin> @@ -694,7 +720,7 @@ <plugin> <groupId>org.scalatest</groupId> <artifactId>scalatest-maven-plugin</artifactId> - <version>1.0</version> + <version>2.0.0</version> <configuration> <environmentVariables> <LIVY_TEST>true</LIVY_TEST> @@ -1026,37 +1052,29 @@ </modules> </profile> - <!-- Spark version profiles --> - <profile> - <id>spark-2.3</id> - <properties> - <spark.scala-2.11.version>2.3.3</spark.scala-2.11.version> - <spark.version>${spark.scala-2.11.version}</spark.version> - <netty.spark-2.11.version>4.1.17.Final</netty.spark-2.11.version> - <spark.bin.download.url> - https://archive.apache.org/dist/spark/spark-2.3.3/spark-2.3.3-bin-hadoop2.7.tgz - </spark.bin.download.url> - <spark.bin.name>spark-2.3.3-bin-hadoop2.7</spark.bin.name> - </properties> - </profile> - <profile> - <id>spark-2.4</id> + <id>spark-3.0</id> <activation> <property> - <name>spark-2.4</name> + <name>spark-3.0</name> </property> </activation> <properties> + <spark.scala-2.12.version>3.0.0</spark.scala-2.12.version> <spark.scala-2.11.version>2.4.5</spark.scala-2.11.version> <spark.version>${spark.scala-2.11.version}</spark.version> - <netty.spark-2.11.version>4.1.17.Final</netty.spark-2.11.version> + <netty.spark-2.12.version>4.1.47.Final</netty.spark-2.12.version> + <netty.spark-2.11.version>4.1.47.Final</netty.spark-2.11.version> + <netty.version>${netty.spark-2.11.version}</netty.version> <java.version>1.8</java.version> - <py4j.version>0.10.7</py4j.version> + <py4j.version>0.10.9</py4j.version> + <json4s.spark-2.11.version>3.5.3</json4s.spark-2.11.version> + <json4s.spark-2.12.version>3.6.6</json4s.spark-2.12.version> + <json4s.version>${json4s.spark-2.11.version}</json4s.version> <spark.bin.download.url> - https://archive.apache.org/dist/spark/spark-2.4.5/spark-2.4.5-bin-hadoop2.7.tgz + https://archive.apache.org/dist/spark/spark-3.0.0/spark-3.0.0-bin-hadoop2.7.tgz </spark.bin.download.url> - <spark.bin.name>spark-2.4.5-bin-hadoop2.7</spark.bin.name> + <spark.bin.name>spark-3.0.0-bin-hadoop2.7</spark.bin.name> </properties> </profile> diff --git a/repl/pom.xml b/repl/pom.xml index 4afd548..88b341f 100644 --- a/repl/pom.xml +++ b/repl/pom.xml @@ -175,6 +175,7 @@ <include>org.json4s:json4s-ast_${scala.binary.version}</include> <include>org.json4s:json4s-core_${scala.binary.version}</include> <include>org.json4s:json4s-jackson_${scala.binary.version}</include> + <include>org.json4s:json4s-scalap_${scala.binary.version}</include> </includes> </artifactSet> <filters> @@ -211,6 +212,8 @@ json4s-ast_${scala.binary.version}, json4s-core_${scala.binary.version}, json4s-jackson_${scala.binary.version}, + json4s-scalap_${scala.binary.version}, + scala-xml_${scala.binary.version}, paranamer, scalap </excludeArtifactIds> diff --git a/repl/scala-2.11/pom.xml b/repl/scala-2.11/pom.xml index 020b842..fb0d15f 100644 --- a/repl/scala-2.11/pom.xml +++ b/repl/scala-2.11/pom.xml @@ -36,6 +36,7 @@ <scala.binary.version>2.11</scala.binary.version> <spark.version>${spark.scala-2.11.version}</spark.version> <netty.version>${netty.spark-2.11.version}</netty.version> + <json4s.version>${json4s.spark-2.11.version}</json4s.version> </properties> </project> diff --git a/repl/scala-2.11/src/main/scala/org/apache/livy/repl/SparkInterpreter.scala b/repl/scala-2.11/src/main/scala/org/apache/livy/repl/SparkInterpreter.scala index 7de2859..98c478f 100644 --- a/repl/scala-2.11/src/main/scala/org/apache/livy/repl/SparkInterpreter.scala +++ b/repl/scala-2.11/src/main/scala/org/apache/livy/repl/SparkInterpreter.scala @@ -27,15 +27,12 @@ import scala.tools.nsc.interpreter.IMain import scala.tools.nsc.interpreter.JLineCompletion import scala.tools.nsc.interpreter.JPrintWriter import scala.tools.nsc.interpreter.Results.Result -import scala.util.control.NonFatal import org.apache.spark.SparkConf import org.apache.spark.repl.SparkILoop -import org.apache.livy.rsc.driver.SparkEntries - /** - * This represents a Spark interpreter. It is not thread safe. + * This represents a Scala 2.11 Spark interpreter. It is not thread safe. */ class SparkInterpreter(protected override val conf: SparkConf) extends AbstractSparkInterpreter { diff --git a/repl/scala-2.11/pom.xml b/repl/scala-2.12/pom.xml similarity index 81% copy from repl/scala-2.11/pom.xml copy to repl/scala-2.12/pom.xml index 020b842..b1904e8 100644 --- a/repl/scala-2.11/pom.xml +++ b/repl/scala-2.12/pom.xml @@ -20,7 +20,7 @@ xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd"> <modelVersion>4.0.0</modelVersion> <groupId>org.apache.livy</groupId> - <artifactId>livy-repl_2.11</artifactId> + <artifactId>livy-repl_2.12</artifactId> <version>0.8.0-incubating-SNAPSHOT</version> <packaging>jar</packaging> @@ -32,10 +32,11 @@ </parent> <properties> - <scala.version>${scala-2.11.version}</scala.version> - <scala.binary.version>2.11</scala.binary.version> - <spark.version>${spark.scala-2.11.version}</spark.version> - <netty.version>${netty.spark-2.11.version}</netty.version> + <scala.version>${scala-2.12.version}</scala.version> + <scala.binary.version>2.12</scala.binary.version> + <spark.version>${spark.scala-2.12.version}</spark.version> + <netty.version>${netty.spark-2.12.version}</netty.version> + <json4s.version>${json4s.spark-2.12.version}</json4s.version> </properties> </project> diff --git a/repl/scala-2.11/src/main/scala/org/apache/livy/repl/SparkInterpreter.scala b/repl/scala-2.12/src/main/scala/org/apache/livy/repl/SparkInterpreter.scala similarity index 91% copy from repl/scala-2.11/src/main/scala/org/apache/livy/repl/SparkInterpreter.scala copy to repl/scala-2.12/src/main/scala/org/apache/livy/repl/SparkInterpreter.scala index 7de2859..bb8f7e5 100644 --- a/repl/scala-2.11/src/main/scala/org/apache/livy/repl/SparkInterpreter.scala +++ b/repl/scala-2.12/src/main/scala/org/apache/livy/repl/SparkInterpreter.scala @@ -22,18 +22,15 @@ import java.net.URLClassLoader import java.nio.file.{Files, Paths} import scala.tools.nsc.Settings -import scala.tools.nsc.interpreter.Completion.ScalaCompleter +import scala.tools.nsc.interpreter.Completion import scala.tools.nsc.interpreter.IMain -import scala.tools.nsc.interpreter.JLineCompletion import scala.tools.nsc.interpreter.JPrintWriter +import scala.tools.nsc.interpreter.NoCompletion import scala.tools.nsc.interpreter.Results.Result -import scala.util.control.NonFatal import org.apache.spark.SparkConf import org.apache.spark.repl.SparkILoop -import org.apache.livy.rsc.driver.SparkEntries - /** * This represents a Spark interpreter. It is not thread safe. */ @@ -61,8 +58,8 @@ class SparkInterpreter(protected override val conf: SparkConf) extends AbstractS sparkILoop.initializeSynchronous() restoreContextClassLoader { - sparkILoop.setContextClassLoader() - + sparkILoop.compilerClasspath + sparkILoop.ensureClassLoader var classLoader = Thread.currentThread().getContextClassLoader while (classLoader != null) { if (classLoader.getClass.getCanonicalName == @@ -107,13 +104,13 @@ class SparkInterpreter(protected override val conf: SparkConf) extends AbstractS } override protected def completeCandidates(code: String, cursor: Int) : Array[String] = { - val completer : ScalaCompleter = { + val completer : Completion = { try { val cls = Class.forName("scala.tools.nsc.interpreter.PresentationCompilerCompleter") cls.getDeclaredConstructor(classOf[IMain]).newInstance(sparkILoop.intp) - .asInstanceOf[ScalaCompleter] + .asInstanceOf[Completion] } catch { - case e : ClassNotFoundException => new JLineCompletion(sparkILoop.intp).completer + case e : ClassNotFoundException => NoCompletion } } completer.complete(code, cursor).candidates.toArray diff --git a/repl/scala-2.12/src/test/scala/org/apache/livy/repl/SparkInterpreterSpec.scala b/repl/scala-2.12/src/test/scala/org/apache/livy/repl/SparkInterpreterSpec.scala new file mode 100644 index 0000000..d922034 --- /dev/null +++ b/repl/scala-2.12/src/test/scala/org/apache/livy/repl/SparkInterpreterSpec.scala @@ -0,0 +1,68 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.livy.repl + +import org.scalatest._ + +import org.apache.livy.LivyBaseUnitTestSuite + +class SparkInterpreterSpec extends FunSpec with Matchers with LivyBaseUnitTestSuite { + describe("SparkInterpreter") { + val interpreter = new SparkInterpreter(null) + + it("should parse Scala compile error.") { + // Regression test for LIVY-. + val error = + """<console>:27: error: type mismatch; + | found : Int + | required: String + | sc.setJobGroup(groupName, groupName, true) + | ^ + |<console>:27: error: type mismatch; + | found : Int + | required: String + | sc.setJobGroup(groupName, groupName, true) + | ^ + |""".stripMargin + + val parsedError = AbstractSparkInterpreter.KEEP_NEWLINE_REGEX.split(error) + + val expectedTraceback = parsedError.tail + + val (ename, traceback) = interpreter.parseError(error) + ename shouldBe "<console>:27: error: type mismatch;" + traceback shouldBe expectedTraceback + } + + it("should parse Scala runtime error.") { + val error = + """java.lang.RuntimeException: message + | ... 48 elided + | + |Tailing message""".stripMargin + + val parsedError = AbstractSparkInterpreter.KEEP_NEWLINE_REGEX.split(error) + + val expectedTraceback = parsedError.tail + + val (ename, traceback) = interpreter.parseError(error) + ename shouldBe "java.lang.RuntimeException: message" + traceback shouldBe expectedTraceback + } + } +} diff --git a/repl/src/main/scala/org/apache/livy/repl/Session.scala b/repl/src/main/scala/org/apache/livy/repl/Session.scala index ea8a761..262c811 100644 --- a/repl/src/main/scala/org/apache/livy/repl/Session.scala +++ b/repl/src/main/scala/org/apache/livy/repl/Session.scala @@ -348,8 +348,10 @@ class Session( case "1" => (s"""setJobGroup(sc, "$jobGroup", "Job group for statement $jobGroup", FALSE)""", SparkR) - case "2" => + case "2" | "3" => (s"""setJobGroup("$jobGroup", "Job group for statement $jobGroup", FALSE)""", SparkR) + case v => + throw new IllegalArgumentException(s"Unknown Spark major version [$v]") } } // Set the job group diff --git a/repl/src/test/scala/org/apache/livy/repl/SQLInterpreterSpec.scala b/repl/src/test/scala/org/apache/livy/repl/SQLInterpreterSpec.scala index 781ed72..3d9d4ac 100644 --- a/repl/src/test/scala/org/apache/livy/repl/SQLInterpreterSpec.scala +++ b/repl/src/test/scala/org/apache/livy/repl/SQLInterpreterSpec.scala @@ -113,7 +113,7 @@ class SQLInterpreterSpec extends BaseInterpreterSpec { it should "execute sql queries" in withInterpreter { interpreter => val rdd = sparkEntries.sc().parallelize(Seq(People("Jerry", 20), People("Michael", 21))) val df = sparkEntries.sqlctx().createDataFrame(rdd) - df.registerTempTable("people") + df.createOrReplaceTempView("people") // Test normal behavior val resp1 = interpreter.execute( @@ -159,7 +159,7 @@ class SQLInterpreterSpec extends BaseInterpreterSpec { ("1", new java.math.BigDecimal(1.0)), ("2", new java.math.BigDecimal(2.0)))) val df = sparkEntries.sqlctx().createDataFrame(rdd).selectExpr("_1 as col1", "_2 as col2") - df.registerTempTable("test") + df.createOrReplaceTempView("test") val resp1 = interpreter.execute( """ diff --git a/rsc/pom.xml b/rsc/pom.xml index 074197f..833ad83 100644 --- a/rsc/pom.xml +++ b/rsc/pom.xml @@ -58,7 +58,7 @@ </dependency> <dependency> - <groupId>com.esotericsoftware.kryo</groupId> + <groupId>com.esotericsoftware</groupId> <artifactId>kryo</artifactId> </dependency> <dependency> @@ -140,7 +140,9 @@ <artifactSet> <includes> <include>org.apache.livy:livy-client-common</include> - <include>com.esotericsoftware.kryo:kryo</include> + <include>com.esotericsoftware:kryo</include> + <include>com.esotericsoftware:minlog</include> + <include>com.esotericsoftware:reflectasm</include> </includes> </artifactSet> <filters> diff --git a/rsc/src/main/java/org/apache/livy/rsc/driver/SparkEntries.java b/rsc/src/main/java/org/apache/livy/rsc/driver/SparkEntries.java index c64fc72..6726bb1 100644 --- a/rsc/src/main/java/org/apache/livy/rsc/driver/SparkEntries.java +++ b/rsc/src/main/java/org/apache/livy/rsc/driver/SparkEntries.java @@ -17,7 +17,6 @@ package org.apache.livy.rsc.driver; -import java.lang.reflect.Method; import java.util.concurrent.TimeUnit; import org.apache.spark.SparkConf; @@ -60,7 +59,7 @@ public class SparkEntries { return sc; } - public SparkSession sparkSession() throws Exception { + public SparkSession sparkSession() { if (sparksession == null) { synchronized (this) { if (sparksession == null) { @@ -100,7 +99,7 @@ public class SparkEntries { if (sqlctx == null) { synchronized (this) { if (sqlctx == null) { - sqlctx = new SQLContext(sc()); + sqlctx = sparkSession().sqlContext(); LOG.info("Created SQLContext."); } } @@ -112,7 +111,7 @@ public class SparkEntries { if (hivectx == null) { synchronized (this) { if (hivectx == null) { - SparkConf conf = sc.getConf(); + SparkConf conf = sc().getConf(); if (conf.getBoolean("spark.repl.enableHiveContext", false) || conf.get("spark.sql.catalogImplementation", "in-memory").toLowerCase() .equals("hive")) { diff --git a/rsc/src/main/java/org/apache/livy/rsc/rpc/KryoMessageCodec.java b/rsc/src/main/java/org/apache/livy/rsc/rpc/KryoMessageCodec.java index b860e65..191ea50 100644 --- a/rsc/src/main/java/org/apache/livy/rsc/rpc/KryoMessageCodec.java +++ b/rsc/src/main/java/org/apache/livy/rsc/rpc/KryoMessageCodec.java @@ -17,17 +17,10 @@ package org.apache.livy.rsc.rpc; -import java.io.ByteArrayOutputStream; import java.io.IOException; import java.nio.ByteBuffer; -import java.util.Arrays; import java.util.List; -import com.esotericsoftware.kryo.Kryo; -import com.esotericsoftware.kryo.io.ByteBufferInputStream; -import com.esotericsoftware.kryo.io.Input; -import com.esotericsoftware.kryo.io.Output; -import com.esotericsoftware.shaded.org.objenesis.strategy.StdInstantiatorStrategy; import io.netty.buffer.ByteBuf; import io.netty.channel.ChannelHandlerContext; import io.netty.handler.codec.ByteToMessageCodec; diff --git a/repl/scala-2.11/pom.xml b/scala-api/scala-2.12/pom.xml similarity index 68% copy from repl/scala-2.11/pom.xml copy to scala-api/scala-2.12/pom.xml index 020b842..a4f362d 100644 --- a/repl/scala-2.11/pom.xml +++ b/scala-api/scala-2.12/pom.xml @@ -15,27 +15,24 @@ ~ See the License for the specific language governing permissions and ~ limitations under the License. --> -<project xmlns="http://maven.apache.org/POM/4.0.0" - xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" - xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd"> +<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd"> <modelVersion>4.0.0</modelVersion> <groupId>org.apache.livy</groupId> - <artifactId>livy-repl_2.11</artifactId> + <artifactId>livy-scala-api_2.12</artifactId> <version>0.8.0-incubating-SNAPSHOT</version> <packaging>jar</packaging> <parent> <groupId>org.apache.livy</groupId> - <artifactId>livy-repl-parent</artifactId> + <artifactId>livy-scala-api-parent</artifactId> <version>0.8.0-incubating-SNAPSHOT</version> <relativePath>../pom.xml</relativePath> </parent> <properties> - <scala.version>${scala-2.11.version}</scala.version> - <scala.binary.version>2.11</scala.binary.version> - <spark.version>${spark.scala-2.11.version}</spark.version> - <netty.version>${netty.spark-2.11.version}</netty.version> + <scala.version>${scala-2.12.version}</scala.version> + <scala.binary.version>2.12</scala.binary.version> + <spark.version>${spark.scala-2.12.version}</spark.version> + <netty.version>${netty.spark-2.12.version}</netty.version> </properties> - </project> diff --git a/scala-api/src/main/resources/build.marker b/scala-api/src/main/resources/build.marker new file mode 100644 index 0000000..e69de29 diff --git a/scala-api/src/main/scala/org/apache/livy/scalaapi/ScalaJobHandle.scala b/scala-api/src/main/scala/org/apache/livy/scalaapi/ScalaJobHandle.scala index d1cf29d..a04cbfa 100644 --- a/scala-api/src/main/scala/org/apache/livy/scalaapi/ScalaJobHandle.scala +++ b/scala-api/src/main/scala/org/apache/livy/scalaapi/ScalaJobHandle.scala @@ -190,6 +190,14 @@ class ScalaJobHandle[T] private[livy] (jobHandle: JobHandle[T]) extends Future[T getJavaFutureResult(jobHandle, atMost) this } + + // These two methods must be implemented in Scala 2.12. They are implemented as unsupported + // operations here. + def transform[S](f: (Try[T]) => Try[S])(implicit executor: ExecutionContext): Future[S] = + throw new UnsupportedOperationException() + + def transformWith[S](f: (Try[T]) => Future[S])(implicit executor: ExecutionContext): Future[S] = + throw new UnsupportedOperationException() } private abstract class AbstractScalaJobHandleListener[T] extends Listener[T] { diff --git a/server/pom.xml b/server/pom.xml index bca1853..188158b 100644 --- a/server/pom.xml +++ b/server/pom.xml @@ -200,7 +200,7 @@ <dependency> <groupId>org.scalatra</groupId> <artifactId>scalatra-metrics_${scala.binary.version}</artifactId> - <version>2.4.0.M3</version> + <version>${scalatra.version}</version> <exclusions> <exclusion> <groupId>com.typesafe.akka</groupId> @@ -255,6 +255,12 @@ <groupId>org.scalatra</groupId> <artifactId>scalatra-test_${scala.binary.version}</artifactId> <scope>test</scope> + <exclusions> + <exclusion> + <groupId>org.mockito</groupId> + <artifactId>*</artifactId> + </exclusion> + </exclusions> </dependency> <dependency> @@ -368,4 +374,3 @@ </build> </project> - diff --git a/server/src/main/scala/org/apache/livy/server/SessionServlet.scala b/server/src/main/scala/org/apache/livy/server/SessionServlet.scala index 1fc27a5..a726e7d 100644 --- a/server/src/main/scala/org/apache/livy/server/SessionServlet.scala +++ b/server/src/main/scala/org/apache/livy/server/SessionServlet.scala @@ -47,7 +47,7 @@ abstract class SessionServlet[S <: Session, R <: RecoveryMetadata]( with ApiVersioningSupport with MethodOverride with UrlGeneratorSupport - with GZipSupport + with ContentEncodingSupport { /** * Creates a new session based on the current request. The implementation is responsible for diff --git a/server/src/main/scala/org/apache/livy/server/interactive/InteractiveSession.scala b/server/src/main/scala/org/apache/livy/server/interactive/InteractiveSession.scala index 790bd5a..c4c273a 100644 --- a/server/src/main/scala/org/apache/livy/server/interactive/InteractiveSession.scala +++ b/server/src/main/scala/org/apache/livy/server/interactive/InteractiveSession.scala @@ -204,11 +204,13 @@ object InteractiveSession extends Logging { } else { val sparkHome = livyConf.sparkHome().get val libdir = sparkMajorVersion match { - case 2 => + case 2 | 3 => if (new File(sparkHome, "RELEASE").isFile) { new File(sparkHome, "jars") - } else { + } else if (new File(sparkHome, "assembly/target/scala-2.11/jars").isDirectory) { new File(sparkHome, "assembly/target/scala-2.11/jars") + } else { + new File(sparkHome, "assembly/target/scala-2.12/jars") } case v => throw new RuntimeException(s"Unsupported Spark major version: $sparkMajorVersion") diff --git a/server/src/main/scala/org/apache/livy/utils/LivySparkUtils.scala b/server/src/main/scala/org/apache/livy/utils/LivySparkUtils.scala index dc98b0d..c94b199 100644 --- a/server/src/main/scala/org/apache/livy/utils/LivySparkUtils.scala +++ b/server/src/main/scala/org/apache/livy/utils/LivySparkUtils.scala @@ -30,6 +30,8 @@ object LivySparkUtils extends Logging { // For each Spark version we supported, we need to add this mapping relation in case Scala // version cannot be detected from "spark-submit --version". private val _defaultSparkScalaVersion = SortedMap( + // Spark 3.0 + Scala 2.12 + (3, 0) -> "2.12", // Spark 2.4 + Scala 2.11 (2, 4) -> "2.11", // Spark 2.3 + Scala 2.11 @@ -40,7 +42,7 @@ object LivySparkUtils extends Logging { // Supported Spark version private val MIN_VERSION = (2, 2) - private val MAX_VERSION = (3, 0) + private val MAX_VERSION = (3, 1) private val sparkVersionRegex = """version (.*)""".r.unanchored private val scalaVersionRegex = """Scala version (.*), Java""".r.unanchored diff --git a/server/src/test/scala/org/apache/livy/server/BaseJsonServletSpec.scala b/server/src/test/scala/org/apache/livy/server/BaseJsonServletSpec.scala index 959707a..a96ae2b 100644 --- a/server/src/test/scala/org/apache/livy/server/BaseJsonServletSpec.scala +++ b/server/src/test/scala/org/apache/livy/server/BaseJsonServletSpec.scala @@ -108,7 +108,8 @@ abstract class BaseJsonServletSpec extends ScalatraSuite // Only try to parse the body if response is in the "OK" range (20x). if ((status / 100) * 100 == SC_OK) { val result = - if (header("Content-Type").startsWith("application/json")) { + if (klass.runtimeClass != classOf[Unit] && + header("Content-Type").startsWith("application/json")) { // Sometimes there's an empty body with no "Content-Length" header. So read the whole // body first, and only send it to Jackson if there's content. val in = response.inputStream diff --git a/server/src/test/scala/org/apache/livy/server/SessionServletSpec.scala b/server/src/test/scala/org/apache/livy/server/SessionServletSpec.scala index cdd1783..466c4a2 100644 --- a/server/src/test/scala/org/apache/livy/server/SessionServletSpec.scala +++ b/server/src/test/scala/org/apache/livy/server/SessionServletSpec.scala @@ -20,7 +20,7 @@ package org.apache.livy.server import javax.servlet.http.HttpServletRequest import javax.servlet.http.HttpServletResponse._ -import org.scalatest.mock.MockitoSugar.mock +import org.scalatestplus.mockito.MockitoSugar.mock import org.apache.livy.LivyConf import org.apache.livy.server.recovery.SessionStore diff --git a/server/src/test/scala/org/apache/livy/server/batch/BatchServletSpec.scala b/server/src/test/scala/org/apache/livy/server/batch/BatchServletSpec.scala index 1999caa..5a84035 100644 --- a/server/src/test/scala/org/apache/livy/server/batch/BatchServletSpec.scala +++ b/server/src/test/scala/org/apache/livy/server/batch/BatchServletSpec.scala @@ -26,7 +26,7 @@ import javax.servlet.http.HttpServletResponse._ import scala.concurrent.duration.Duration import org.mockito.Mockito._ -import org.scalatest.mock.MockitoSugar.mock +import org.scalatestplus.mockito.MockitoSugar.mock import org.apache.livy.{LivyConf, Utils} import org.apache.livy.server.{AccessManager, BaseSessionServletSpec} diff --git a/server/src/test/scala/org/apache/livy/server/batch/BatchSessionSpec.scala b/server/src/test/scala/org/apache/livy/server/batch/BatchSessionSpec.scala index bc9ddc4..20e6136 100644 --- a/server/src/test/scala/org/apache/livy/server/batch/BatchSessionSpec.scala +++ b/server/src/test/scala/org/apache/livy/server/batch/BatchSessionSpec.scala @@ -26,8 +26,8 @@ import scala.concurrent.duration.Duration import org.mockito.Matchers import org.mockito.Matchers.anyObject import org.mockito.Mockito._ -import org.scalatest.{BeforeAndAfter, FunSpec, ShouldMatchers} -import org.scalatest.mock.MockitoSugar.mock +import org.scalatest.{BeforeAndAfter, FunSpec} +import org.scalatestplus.mockito.MockitoSugar.mock import org.apache.livy.{LivyBaseUnitTestSuite, LivyConf, Utils} import org.apache.livy.server.AccessManager @@ -38,7 +38,7 @@ import org.apache.livy.utils.{AppInfo, Clock, SparkApp} class BatchSessionSpec extends FunSpec with BeforeAndAfter - with ShouldMatchers + with org.scalatest.Matchers with LivyBaseUnitTestSuite { val script: Path = { diff --git a/server/src/test/scala/org/apache/livy/server/interactive/InteractiveSessionServletSpec.scala b/server/src/test/scala/org/apache/livy/server/interactive/InteractiveSessionServletSpec.scala index c97aa19..78407d5 100644 --- a/server/src/test/scala/org/apache/livy/server/interactive/InteractiveSessionServletSpec.scala +++ b/server/src/test/scala/org/apache/livy/server/interactive/InteractiveSessionServletSpec.scala @@ -23,6 +23,7 @@ import javax.servlet.http.{HttpServletRequest, HttpServletResponse} import scala.collection.JavaConverters._ import scala.concurrent.duration._ import scala.concurrent.Future +import scala.language.postfixOps import org.json4s.jackson.Json4sScalaModule import org.mockito.Matchers._ @@ -31,7 +32,7 @@ import org.mockito.invocation.InvocationOnMock import org.mockito.stubbing.Answer import org.scalatest.Entry import org.scalatest.concurrent.Eventually._ -import org.scalatest.mock.MockitoSugar.mock +import org.scalatestplus.mockito.MockitoSugar.mock import org.apache.livy.{ExecuteRequest, LivyConf} import org.apache.livy.client.common.HttpMessages.SessionInfo diff --git a/server/src/test/scala/org/apache/livy/server/interactive/InteractiveSessionSpec.scala b/server/src/test/scala/org/apache/livy/server/interactive/InteractiveSessionSpec.scala index 55f0e21..d13e682 100644 --- a/server/src/test/scala/org/apache/livy/server/interactive/InteractiveSessionSpec.scala +++ b/server/src/test/scala/org/apache/livy/server/interactive/InteractiveSessionSpec.scala @@ -31,7 +31,7 @@ import org.mockito.Matchers._ import org.mockito.Mockito.{atLeastOnce, verify, when} import org.scalatest.{BeforeAndAfterAll, FunSpec, Matchers} import org.scalatest.concurrent.Eventually._ -import org.scalatest.mock.MockitoSugar.mock +import org.scalatestplus.mockito.MockitoSugar.mock import org.apache.livy.{ExecuteRequest, JobHandle, LivyBaseUnitTestSuite, LivyConf} import org.apache.livy.rsc.{PingJob, RSCClient, RSCConf} diff --git a/server/src/test/scala/org/apache/livy/server/interactive/JobApiSpec.scala b/server/src/test/scala/org/apache/livy/server/interactive/JobApiSpec.scala index 1646492..8ec0b19 100644 --- a/server/src/test/scala/org/apache/livy/server/interactive/JobApiSpec.scala +++ b/server/src/test/scala/org/apache/livy/server/interactive/JobApiSpec.scala @@ -29,7 +29,7 @@ import scala.language.postfixOps import org.apache.hadoop.security.UserGroupInformation import org.scalatest.concurrent.Eventually._ -import org.scalatest.mock.MockitoSugar.mock +import org.scalatestplus.mockito.MockitoSugar.mock import org.apache.livy.{Job, JobHandle, LivyConf} import org.apache.livy.client.common.{BufferUtils, Serializer} diff --git a/server/src/test/scala/org/apache/livy/server/interactive/SessionHeartbeatSpec.scala b/server/src/test/scala/org/apache/livy/server/interactive/SessionHeartbeatSpec.scala index 074b9c2..c9ca9d5 100644 --- a/server/src/test/scala/org/apache/livy/server/interactive/SessionHeartbeatSpec.scala +++ b/server/src/test/scala/org/apache/livy/server/interactive/SessionHeartbeatSpec.scala @@ -24,7 +24,7 @@ import scala.language.postfixOps import org.mockito.Mockito.{never, verify, when} import org.scalatest.{FunSpec, Matchers} import org.scalatest.concurrent.Eventually._ -import org.scalatest.mock.MockitoSugar.mock +import org.scalatestplus.mockito.MockitoSugar.mock import org.apache.livy.LivyConf import org.apache.livy.server.recovery.SessionStore diff --git a/server/src/test/scala/org/apache/livy/server/recovery/FileSystemStateStoreSpec.scala b/server/src/test/scala/org/apache/livy/server/recovery/FileSystemStateStoreSpec.scala index 4758c85..082a80a 100644 --- a/server/src/test/scala/org/apache/livy/server/recovery/FileSystemStateStoreSpec.scala +++ b/server/src/test/scala/org/apache/livy/server/recovery/FileSystemStateStoreSpec.scala @@ -32,7 +32,7 @@ import org.mockito.invocation.InvocationOnMock import org.mockito.stubbing.Answer import org.scalatest.FunSpec import org.scalatest.Matchers._ -import org.scalatest.mock.MockitoSugar.mock +import org.scalatestplus.mockito.MockitoSugar.mock import org.apache.livy.{LivyBaseUnitTestSuite, LivyConf} diff --git a/server/src/test/scala/org/apache/livy/server/recovery/SessionStoreSpec.scala b/server/src/test/scala/org/apache/livy/server/recovery/SessionStoreSpec.scala index 5eeb2cf..8861051 100644 --- a/server/src/test/scala/org/apache/livy/server/recovery/SessionStoreSpec.scala +++ b/server/src/test/scala/org/apache/livy/server/recovery/SessionStoreSpec.scala @@ -22,7 +22,7 @@ import scala.util.Success import org.mockito.Mockito._ import org.scalatest.FunSpec import org.scalatest.Matchers._ -import org.scalatest.mock.MockitoSugar.mock +import org.scalatestplus.mockito.MockitoSugar.mock import org.apache.livy.{LivyBaseUnitTestSuite, LivyConf} import org.apache.livy.sessions.Session.RecoveryMetadata diff --git a/server/src/test/scala/org/apache/livy/server/recovery/StateStoreSpec.scala b/server/src/test/scala/org/apache/livy/server/recovery/StateStoreSpec.scala index c7040a5..8c2d4b3 100644 --- a/server/src/test/scala/org/apache/livy/server/recovery/StateStoreSpec.scala +++ b/server/src/test/scala/org/apache/livy/server/recovery/StateStoreSpec.scala @@ -17,8 +17,6 @@ package org.apache.livy.server.recovery -import scala.reflect.classTag - import org.scalatest.{BeforeAndAfter, FunSpec} import org.scalatest.Matchers._ diff --git a/server/src/test/scala/org/apache/livy/server/recovery/ZooKeeperStateStoreSpec.scala b/server/src/test/scala/org/apache/livy/server/recovery/ZooKeeperStateStoreSpec.scala index 3ad7912..7b17b24 100644 --- a/server/src/test/scala/org/apache/livy/server/recovery/ZooKeeperStateStoreSpec.scala +++ b/server/src/test/scala/org/apache/livy/server/recovery/ZooKeeperStateStoreSpec.scala @@ -26,7 +26,7 @@ import org.apache.zookeeper.data.Stat import org.mockito.Mockito._ import org.scalatest.FunSpec import org.scalatest.Matchers._ -import org.scalatest.mock.MockitoSugar.mock +import org.scalatestplus.mockito.MockitoSugar.mock import org.apache.livy.{LivyBaseUnitTestSuite, LivyConf} diff --git a/server/src/test/scala/org/apache/livy/sessions/SessionManagerSpec.scala b/server/src/test/scala/org/apache/livy/sessions/SessionManagerSpec.scala index 7f5e31e..8014d4a 100644 --- a/server/src/test/scala/org/apache/livy/sessions/SessionManagerSpec.scala +++ b/server/src/test/scala/org/apache/livy/sessions/SessionManagerSpec.scala @@ -25,7 +25,7 @@ import scala.util.{Failure, Try} import org.mockito.Mockito.{doReturn, never, verify, when} import org.scalatest.{FunSpec, Matchers} import org.scalatest.concurrent.Eventually._ -import org.scalatest.mock.MockitoSugar.mock +import org.scalatestplus.mockito.MockitoSugar.mock import org.apache.livy.{LivyBaseUnitTestSuite, LivyConf} import org.apache.livy.server.batch.{BatchRecoveryMetadata, BatchSession} diff --git a/server/src/test/scala/org/apache/livy/utils/LivySparkUtilsSuite.scala b/server/src/test/scala/org/apache/livy/utils/LivySparkUtilsSuite.scala index ab3e715..ea62d0c 100644 --- a/server/src/test/scala/org/apache/livy/utils/LivySparkUtilsSuite.scala +++ b/server/src/test/scala/org/apache/livy/utils/LivySparkUtilsSuite.scala @@ -46,6 +46,8 @@ class LivySparkUtilsSuite extends FunSuite with Matchers with LivyBaseUnitTestSu test("should recognize supported Spark versions") { testSparkVersion("2.2.0") testSparkVersion("2.3.0") + testSparkVersion("2.4.0") + testSparkVersion("3.0.0") } test("should complain about unsupported Spark versions") { @@ -85,6 +87,8 @@ class LivySparkUtilsSuite extends FunSuite with Matchers with LivyBaseUnitTestSu test("defaultSparkScalaVersion() should return default Scala version") { defaultSparkScalaVersion(formatSparkVersion("2.2.1")) shouldBe "2.11" defaultSparkScalaVersion(formatSparkVersion("2.3.0")) shouldBe "2.11" + defaultSparkScalaVersion(formatSparkVersion("2.4.0")) shouldBe "2.11" + defaultSparkScalaVersion(formatSparkVersion("3.0.0")) shouldBe "2.12" } test("sparkScalaVersion() should use spark-submit detected Scala version.") { @@ -104,5 +108,6 @@ class LivySparkUtilsSuite extends FunSuite with Matchers with LivyBaseUnitTestSu test("sparkScalaVersion() should use default Spark Scala version.") { sparkScalaVersion(formatSparkVersion("2.2.0"), None, livyConf) shouldBe "2.11" sparkScalaVersion(formatSparkVersion("2.3.1"), None, livyConf) shouldBe "2.11" + sparkScalaVersion(formatSparkVersion("3.0.0"), None, livyConf) shouldBe "2.12" } } diff --git a/server/src/test/scala/org/apache/livy/utils/SparkYarnAppSpec.scala b/server/src/test/scala/org/apache/livy/utils/SparkYarnAppSpec.scala index d43125d..ddd9767 100644 --- a/server/src/test/scala/org/apache/livy/utils/SparkYarnAppSpec.scala +++ b/server/src/test/scala/org/apache/livy/utils/SparkYarnAppSpec.scala @@ -35,7 +35,7 @@ import org.mockito.invocation.InvocationOnMock import org.mockito.stubbing.Answer import org.scalatest.concurrent.Eventually import org.scalatest.FunSpec -import org.scalatest.mock.MockitoSugar.mock +import org.scalatestplus.mockito.MockitoSugar.mock import org.apache.livy.{LivyBaseUnitTestSuite, LivyConf, Utils} import org.apache.livy.utils.SparkApp._ diff --git a/test-lib/src/main/java/org/apache/livy/test/jobs/SQLGetTweets.java b/test-lib/src/main/java/org/apache/livy/test/jobs/SQLGetTweets.java index a9660c4..30c87f0 100644 --- a/test-lib/src/main/java/org/apache/livy/test/jobs/SQLGetTweets.java +++ b/test-lib/src/main/java/org/apache/livy/test/jobs/SQLGetTweets.java @@ -61,7 +61,7 @@ public class SQLGetTweets implements Job<List<String>> { } SQLContext sqlctx = useHiveContext ? jc.hivectx() : jc.sqlctx(); - sqlctx.read().json(input.toString()).registerTempTable("tweets"); + sqlctx.read().json(input.toString()).createOrReplaceTempView("tweets"); List<String> tweetList = new ArrayList<>(); Row[] result = diff --git a/thriftserver/server/src/main/scala/org/apache/livy/thriftserver/types/DataTypeUtils.scala b/thriftserver/server/src/main/scala/org/apache/livy/thriftserver/types/DataTypeUtils.scala index 8220c27..90799f7 100644 --- a/thriftserver/server/src/main/scala/org/apache/livy/thriftserver/types/DataTypeUtils.scala +++ b/thriftserver/server/src/main/scala/org/apache/livy/thriftserver/types/DataTypeUtils.scala @@ -17,7 +17,7 @@ package org.apache.livy.thriftserver.types -import org.json4s.{DefaultFormats, JValue} +import org.json4s.{DefaultFormats, JValue, StringInput} import org.json4s.JsonAST.{JObject, JString} import org.json4s.jackson.JsonMethods.parse @@ -52,6 +52,7 @@ object DataTypeUtils { } } + /** * Converts a JSON representing the Spark schema (the one returned by `df.schema.json`) into * a [[Schema]] instance. @@ -60,7 +61,7 @@ object DataTypeUtils { * @return a [[Schema]] representing the schema provided as input */ def schemaFromSparkJson(sparkJson: String): Schema = { - val schema = parse(sparkJson) \ "fields" + val schema = parse(StringInput(sparkJson), false) \ "fields" val fields = schema.children.map { field => val name = (field \ "name").extract[String] val hiveType = toFieldType(field \ "type") diff --git a/thriftserver/server/src/test/scala/org/apache/livy/thriftserver/ThriftServerSuites.scala b/thriftserver/server/src/test/scala/org/apache/livy/thriftserver/ThriftServerSuites.scala index 44fd970..c9f91e1 100644 --- a/thriftserver/server/src/test/scala/org/apache/livy/thriftserver/ThriftServerSuites.scala +++ b/thriftserver/server/src/test/scala/org/apache/livy/thriftserver/ThriftServerSuites.scala @@ -530,7 +530,8 @@ class BinaryThriftServerSuite extends ThriftServerBaseTest with CommonThriftTest statement.close() } } - assert(caught.getMessage.contains("Table or view not found: `global_temp`.`invalid_table`")) + assert(caught.getMessage.replaceAll("`", "") + .contains("Table or view not found: global_temp.invalid_table")) } } diff --git a/thriftserver/session/pom.xml b/thriftserver/session/pom.xml index 8797d1d..70710de 100644 --- a/thriftserver/session/pom.xml +++ b/thriftserver/session/pom.xml @@ -94,17 +94,4 @@ </plugins> </build> - <profiles> - <!-- - Override the json4s version to match Spark 2.4's. This module doesn't use json4s, but the - Spark APIs called in the tests require a different version of json4s than Livy's (and Livy - doesn't really work with Spark's version yet). - --> - <profile> - <id>spark-2.4</id> - <properties> - <json4s.version>3.5.3</json4s.version> - </properties> - </profile> - </profiles> </project> diff --git a/thriftserver/session/src/test/java/org/apache/livy/thriftserver/session/ColumnBufferTest.java b/thriftserver/session/src/test/java/org/apache/livy/thriftserver/session/ColumnBufferTest.java index 71ed020..b8832fb 100644 --- a/thriftserver/session/src/test/java/org/apache/livy/thriftserver/session/ColumnBufferTest.java +++ b/thriftserver/session/src/test/java/org/apache/livy/thriftserver/session/ColumnBufferTest.java @@ -27,7 +27,6 @@ import java.util.List; import java.util.Map; import java.util.Properties; -import org.apache.spark.SparkConf; import org.apache.spark.SparkContext; import org.apache.spark.launcher.SparkLauncher; import org.apache.spark.sql.Dataset; @@ -36,6 +35,7 @@ import org.apache.spark.sql.Encoders; import org.apache.spark.sql.Row; import org.apache.spark.sql.RowFactory; import org.apache.spark.sql.SQLContext; +import org.apache.spark.sql.SparkSession; import org.apache.spark.sql.types.StructField; import org.junit.Test; import static org.junit.Assert.*; @@ -46,14 +46,14 @@ public class ColumnBufferTest { public void testColumnBuffer() throws Exception { String warehouse = Files.createTempDirectory("spark-warehouse-").toFile().getAbsolutePath(); - SparkConf conf = new SparkConf() - .set(SparkLauncher.SPARK_MASTER, "local") - .set("spark.app.name", getClass().getName()) - .set("spark.sql.warehouse.dir", warehouse); - SparkContext sc = new SparkContext(conf); + SparkSession session = SparkSession.builder() + .master("local") + .appName(getClass().getName()) + .config("spark.sql.warehouse.dir", warehouse) + .getOrCreate(); try { - SQLContext spark = SQLContext.getOrCreate(sc); + SQLContext spark = session.sqlContext(); TestBean tb = new TestBean(); tb.setId(1); @@ -144,7 +144,7 @@ public class ColumnBufferTest { } } } finally { - sc.stop(); + session.stop(); } }