This is an automated email from the ASF dual-hosted git repository.

vanzin pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/incubator-livy.git


The following commit(s) were added to refs/heads/master by this push:
     new 5abc043  [LIVY-563] Propagate RSC configuration when creating sessions.
5abc043 is described below

commit 5abc043708b443ba36612a4f4e2f5137bd63621a
Author: Marcelo Vanzin <van...@cloudera.com>
AuthorDate: Thu Apr 11 09:29:39 2019 -0700

    [LIVY-563] Propagate RSC configuration when creating sessions.
    
    Even though not all RSC configs apply to the remote driver, a few do,
    so propagate all of them when starting a new session.
    
    Includes new unit test.
    
    Author: Marcelo Vanzin <van...@cloudera.com>
    
    Closes #168 from vanzin/LIVY-563.
---
 rsc/src/main/java/org/apache/livy/rsc/RSCConf.java         |  2 +-
 .../livy/server/interactive/InteractiveSession.scala       |  9 +++++++++
 .../livy/server/interactive/InteractiveSessionSpec.scala   | 14 +++++++++++++-
 3 files changed, 23 insertions(+), 2 deletions(-)

diff --git a/rsc/src/main/java/org/apache/livy/rsc/RSCConf.java 
b/rsc/src/main/java/org/apache/livy/rsc/RSCConf.java
index 7c76164..d2496b5 100644
--- a/rsc/src/main/java/org/apache/livy/rsc/RSCConf.java
+++ b/rsc/src/main/java/org/apache/livy/rsc/RSCConf.java
@@ -35,7 +35,7 @@ public class RSCConf extends ClientConf<RSCConf> {
   public static final String SPARK_CONF_PREFIX = "spark.";
   public static final String LIVY_SPARK_PREFIX = SPARK_CONF_PREFIX + 
"__livy__.";
 
-  private static final String RSC_CONF_PREFIX = "livy.rsc.";
+  public static final String RSC_CONF_PREFIX = "livy.rsc.";
 
   public static enum Entry implements ConfEntry {
     CLIENT_ID("client.auth.id", null),
diff --git 
a/server/src/main/scala/org/apache/livy/server/interactive/InteractiveSession.scala
 
b/server/src/main/scala/org/apache/livy/server/interactive/InteractiveSession.scala
index 6ec2d75..9529ed3 100644
--- 
a/server/src/main/scala/org/apache/livy/server/interactive/InteractiveSession.scala
+++ 
b/server/src/main/scala/org/apache/livy/server/interactive/InteractiveSession.scala
@@ -346,6 +346,15 @@ object InteractiveSession extends Logging {
       mergeHiveSiteAndHiveDeps(sparkMajorVersion)
     }
 
+    // Pick all the RSC-specific configs that have not been explicitly set 
otherwise, and
+    // put them in the resulting properties, so that the remote driver can use 
them.
+    livyConf.iterator().asScala.foreach { e =>
+      val (key, value) = (e.getKey(), e.getValue())
+      if (key.startsWith(RSCConf.RSC_CONF_PREFIX) && 
!builderProperties.contains(key)) {
+        builderProperties(key) = value
+      }
+    }
+
     builderProperties
   }
 }
diff --git 
a/server/src/test/scala/org/apache/livy/server/interactive/InteractiveSessionSpec.scala
 
b/server/src/test/scala/org/apache/livy/server/interactive/InteractiveSessionSpec.scala
index 2a99abb..2e21483 100644
--- 
a/server/src/test/scala/org/apache/livy/server/interactive/InteractiveSessionSpec.scala
+++ 
b/server/src/test/scala/org/apache/livy/server/interactive/InteractiveSessionSpec.scala
@@ -130,7 +130,6 @@ class InteractiveSessionSpec extends FunSpec
         "dummy.jar"))
     }
 
-
     it("should set rsc jars through livy conf") {
       val rscJars = Set(
         "dummy.jar",
@@ -177,6 +176,19 @@ class InteractiveSessionSpec extends FunSpec
       session.state should (be(SessionState.Starting) or be(SessionState.Idle))
     }
 
+    it("should propagate RSC configuration properties") {
+      val livyConf = new LivyConf(false)
+        .set(LivyConf.REPL_JARS, "dummy.jar")
+        .set(RSCConf.Entry.SASL_QOP.key(), "foo")
+        .set(RSCConf.Entry.RPC_CHANNEL_LOG_LEVEL.key(), "TRACE")
+        .set(LivyConf.LIVY_SPARK_VERSION, sys.env("LIVY_SPARK_VERSION"))
+        .set(LivyConf.LIVY_SPARK_SCALA_VERSION, "2.10")
+
+      val properties = InteractiveSession.prepareBuilderProp(Map.empty, Spark, 
livyConf)
+      assert(properties(RSCConf.Entry.SASL_QOP.key()) === "foo")
+      assert(properties(RSCConf.Entry.RPC_CHANNEL_LOG_LEVEL.key()) === "TRACE")
+    }
+
     withSession("should execute `1 + 2` == 3") { session =>
       val pyResult = executeStatement("1 + 2", Some("pyspark"))
       pyResult should equal (Extraction.decompose(Map(

Reply via email to