This is an automated email from the ASF dual-hosted git repository.

gurwls223 pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/spark.git


The following commit(s) were added to refs/heads/master by this push:
     new eea3f55  [SPARK-27446][R] Use existing spark conf if available.
eea3f55 is described below

commit eea3f55a316f6ebab0f91f265ae101b41a187096
Author: Bago Amirbekian <b...@databricks.com>
AuthorDate: Sun Apr 14 17:09:12 2019 +0900

    [SPARK-27446][R] Use existing spark conf if available.
    
    ## What changes were proposed in this pull request?
    
    The RBackend and RBackendHandler create new conf objects that don't pick up 
conf values from the existing SparkSession and therefore always use the default 
conf values instead of values specified by the user.
    
    In this fix we check to see if the spark env already exists, and get the 
conf from there. We fall back to creating a new conf. This follows the pattern 
used in other places including this: 
https://github.com/apache/spark/blob/3725b1324f731d57dc776c256bc1a100ec9e6cd0/core/src/main/scala/org/apache/spark/api/r/BaseRRunner.scala#L261
    
    ## How was this patch tested?
    
    (Please explain how this patch was tested. E.g. unit tests, integration 
tests, manual tests)
    (If this patch involves UI changes, please attach a screenshot; otherwise, 
remove this)
    
    Please review http://spark.apache.org/contributing.html before opening a 
pull request.
    
    Closes #24353 from MrBago/r-backend-use-existing-conf.
    
    Authored-by: Bago Amirbekian <b...@databricks.com>
    Signed-off-by: HyukjinKwon <gurwls...@apache.org>
---
 core/src/main/scala/org/apache/spark/api/r/RBackend.scala        | 6 +++---
 core/src/main/scala/org/apache/spark/api/r/RBackendHandler.scala | 4 ++--
 2 files changed, 5 insertions(+), 5 deletions(-)

diff --git a/core/src/main/scala/org/apache/spark/api/r/RBackend.scala 
b/core/src/main/scala/org/apache/spark/api/r/RBackend.scala
index 36b4132..c755dcb 100644
--- a/core/src/main/scala/org/apache/spark/api/r/RBackend.scala
+++ b/core/src/main/scala/org/apache/spark/api/r/RBackend.scala
@@ -30,7 +30,7 @@ import io.netty.handler.codec.LengthFieldBasedFrameDecoder
 import io.netty.handler.codec.bytes.{ByteArrayDecoder, ByteArrayEncoder}
 import io.netty.handler.timeout.ReadTimeoutHandler
 
-import org.apache.spark.SparkConf
+import org.apache.spark.{SparkConf, SparkEnv}
 import org.apache.spark.internal.Logging
 import org.apache.spark.internal.config.R._
 
@@ -47,7 +47,7 @@ private[spark] class RBackend {
   private[r] val jvmObjectTracker = new JVMObjectTracker
 
   def init(): (Int, RAuthHelper) = {
-    val conf = new SparkConf()
+    val conf = Option(SparkEnv.get).map(_.conf).getOrElse(new SparkConf())
     val backendConnectionTimeout = conf.get(R_BACKEND_CONNECTION_TIMEOUT)
     bossGroup = new NioEventLoopGroup(conf.get(R_NUM_BACKEND_THREADS))
     val workerGroup = bossGroup
@@ -124,7 +124,7 @@ private[spark] object RBackend extends Logging {
       val listenPort = serverSocket.getLocalPort()
       // Connection timeout is set by socket client. To make it configurable 
we will pass the
       // timeout value to client inside the temp file
-      val conf = new SparkConf()
+      val conf = Option(SparkEnv.get).map(_.conf).getOrElse(new SparkConf())
       val backendConnectionTimeout = conf.get(R_BACKEND_CONNECTION_TIMEOUT)
 
       // tell the R process via temporary file
diff --git a/core/src/main/scala/org/apache/spark/api/r/RBackendHandler.scala 
b/core/src/main/scala/org/apache/spark/api/r/RBackendHandler.scala
index 7b74efa..aaa432d 100644
--- a/core/src/main/scala/org/apache/spark/api/r/RBackendHandler.scala
+++ b/core/src/main/scala/org/apache/spark/api/r/RBackendHandler.scala
@@ -26,7 +26,7 @@ import io.netty.channel.{ChannelHandlerContext, 
SimpleChannelInboundHandler}
 import io.netty.channel.ChannelHandler.Sharable
 import io.netty.handler.timeout.ReadTimeoutException
 
-import org.apache.spark.SparkConf
+import org.apache.spark.{SparkConf, SparkEnv}
 import org.apache.spark.api.r.SerDe._
 import org.apache.spark.internal.Logging
 import org.apache.spark.internal.config.R._
@@ -98,7 +98,7 @@ private[r] class RBackendHandler(server: RBackend)
           ctx.write(pingBaos.toByteArray)
         }
       }
-      val conf = new SparkConf()
+      val conf = Option(SparkEnv.get).map(_.conf).getOrElse(new SparkConf())
       val heartBeatInterval = conf.get(R_HEARTBEAT_INTERVAL)
       val backendConnectionTimeout = conf.get(R_BACKEND_CONNECTION_TIMEOUT)
       val interval = Math.min(heartBeatInterval, backendConnectionTimeout - 1)


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscr...@spark.apache.org
For additional commands, e-mail: commits-h...@spark.apache.org

Reply via email to