This is an automated email from the ASF dual-hosted git repository. vanzin pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/incubator-livy.git
The following commit(s) were added to refs/heads/master by this push: new 5abc043 [LIVY-563] Propagate RSC configuration when creating sessions. 5abc043 is described below commit 5abc043708b443ba36612a4f4e2f5137bd63621a Author: Marcelo Vanzin <van...@cloudera.com> AuthorDate: Thu Apr 11 09:29:39 2019 -0700 [LIVY-563] Propagate RSC configuration when creating sessions. Even though not all RSC configs apply to the remote driver, a few do, so propagate all of them when starting a new session. Includes new unit test. Author: Marcelo Vanzin <van...@cloudera.com> Closes #168 from vanzin/LIVY-563. --- rsc/src/main/java/org/apache/livy/rsc/RSCConf.java | 2 +- .../livy/server/interactive/InteractiveSession.scala | 9 +++++++++ .../livy/server/interactive/InteractiveSessionSpec.scala | 14 +++++++++++++- 3 files changed, 23 insertions(+), 2 deletions(-) diff --git a/rsc/src/main/java/org/apache/livy/rsc/RSCConf.java b/rsc/src/main/java/org/apache/livy/rsc/RSCConf.java index 7c76164..d2496b5 100644 --- a/rsc/src/main/java/org/apache/livy/rsc/RSCConf.java +++ b/rsc/src/main/java/org/apache/livy/rsc/RSCConf.java @@ -35,7 +35,7 @@ public class RSCConf extends ClientConf<RSCConf> { public static final String SPARK_CONF_PREFIX = "spark."; public static final String LIVY_SPARK_PREFIX = SPARK_CONF_PREFIX + "__livy__."; - private static final String RSC_CONF_PREFIX = "livy.rsc."; + public static final String RSC_CONF_PREFIX = "livy.rsc."; public static enum Entry implements ConfEntry { CLIENT_ID("client.auth.id", null), diff --git a/server/src/main/scala/org/apache/livy/server/interactive/InteractiveSession.scala b/server/src/main/scala/org/apache/livy/server/interactive/InteractiveSession.scala index 6ec2d75..9529ed3 100644 --- a/server/src/main/scala/org/apache/livy/server/interactive/InteractiveSession.scala +++ b/server/src/main/scala/org/apache/livy/server/interactive/InteractiveSession.scala @@ -346,6 +346,15 @@ object InteractiveSession extends Logging { mergeHiveSiteAndHiveDeps(sparkMajorVersion) } + // Pick all the RSC-specific configs that have not been explicitly set otherwise, and + // put them in the resulting properties, so that the remote driver can use them. + livyConf.iterator().asScala.foreach { e => + val (key, value) = (e.getKey(), e.getValue()) + if (key.startsWith(RSCConf.RSC_CONF_PREFIX) && !builderProperties.contains(key)) { + builderProperties(key) = value + } + } + builderProperties } } diff --git a/server/src/test/scala/org/apache/livy/server/interactive/InteractiveSessionSpec.scala b/server/src/test/scala/org/apache/livy/server/interactive/InteractiveSessionSpec.scala index 2a99abb..2e21483 100644 --- a/server/src/test/scala/org/apache/livy/server/interactive/InteractiveSessionSpec.scala +++ b/server/src/test/scala/org/apache/livy/server/interactive/InteractiveSessionSpec.scala @@ -130,7 +130,6 @@ class InteractiveSessionSpec extends FunSpec "dummy.jar")) } - it("should set rsc jars through livy conf") { val rscJars = Set( "dummy.jar", @@ -177,6 +176,19 @@ class InteractiveSessionSpec extends FunSpec session.state should (be(SessionState.Starting) or be(SessionState.Idle)) } + it("should propagate RSC configuration properties") { + val livyConf = new LivyConf(false) + .set(LivyConf.REPL_JARS, "dummy.jar") + .set(RSCConf.Entry.SASL_QOP.key(), "foo") + .set(RSCConf.Entry.RPC_CHANNEL_LOG_LEVEL.key(), "TRACE") + .set(LivyConf.LIVY_SPARK_VERSION, sys.env("LIVY_SPARK_VERSION")) + .set(LivyConf.LIVY_SPARK_SCALA_VERSION, "2.10") + + val properties = InteractiveSession.prepareBuilderProp(Map.empty, Spark, livyConf) + assert(properties(RSCConf.Entry.SASL_QOP.key()) === "foo") + assert(properties(RSCConf.Entry.RPC_CHANNEL_LOG_LEVEL.key()) === "TRACE") + } + withSession("should execute `1 + 2` == 3") { session => val pyResult = executeStatement("1 + 2", Some("pyspark")) pyResult should equal (Extraction.decompose(Map(