This is an automated email from the ASF dual-hosted git repository. gurwls223 pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/spark.git
The following commit(s) were added to refs/heads/master by this push: new eea3f55 [SPARK-27446][R] Use existing spark conf if available. eea3f55 is described below commit eea3f55a316f6ebab0f91f265ae101b41a187096 Author: Bago Amirbekian <b...@databricks.com> AuthorDate: Sun Apr 14 17:09:12 2019 +0900 [SPARK-27446][R] Use existing spark conf if available. ## What changes were proposed in this pull request? The RBackend and RBackendHandler create new conf objects that don't pick up conf values from the existing SparkSession and therefore always use the default conf values instead of values specified by the user. In this fix we check to see if the spark env already exists, and get the conf from there. We fall back to creating a new conf. This follows the pattern used in other places including this: https://github.com/apache/spark/blob/3725b1324f731d57dc776c256bc1a100ec9e6cd0/core/src/main/scala/org/apache/spark/api/r/BaseRRunner.scala#L261 ## How was this patch tested? (Please explain how this patch was tested. E.g. unit tests, integration tests, manual tests) (If this patch involves UI changes, please attach a screenshot; otherwise, remove this) Please review http://spark.apache.org/contributing.html before opening a pull request. Closes #24353 from MrBago/r-backend-use-existing-conf. Authored-by: Bago Amirbekian <b...@databricks.com> Signed-off-by: HyukjinKwon <gurwls...@apache.org> --- core/src/main/scala/org/apache/spark/api/r/RBackend.scala | 6 +++--- core/src/main/scala/org/apache/spark/api/r/RBackendHandler.scala | 4 ++-- 2 files changed, 5 insertions(+), 5 deletions(-) diff --git a/core/src/main/scala/org/apache/spark/api/r/RBackend.scala b/core/src/main/scala/org/apache/spark/api/r/RBackend.scala index 36b4132..c755dcb 100644 --- a/core/src/main/scala/org/apache/spark/api/r/RBackend.scala +++ b/core/src/main/scala/org/apache/spark/api/r/RBackend.scala @@ -30,7 +30,7 @@ import io.netty.handler.codec.LengthFieldBasedFrameDecoder import io.netty.handler.codec.bytes.{ByteArrayDecoder, ByteArrayEncoder} import io.netty.handler.timeout.ReadTimeoutHandler -import org.apache.spark.SparkConf +import org.apache.spark.{SparkConf, SparkEnv} import org.apache.spark.internal.Logging import org.apache.spark.internal.config.R._ @@ -47,7 +47,7 @@ private[spark] class RBackend { private[r] val jvmObjectTracker = new JVMObjectTracker def init(): (Int, RAuthHelper) = { - val conf = new SparkConf() + val conf = Option(SparkEnv.get).map(_.conf).getOrElse(new SparkConf()) val backendConnectionTimeout = conf.get(R_BACKEND_CONNECTION_TIMEOUT) bossGroup = new NioEventLoopGroup(conf.get(R_NUM_BACKEND_THREADS)) val workerGroup = bossGroup @@ -124,7 +124,7 @@ private[spark] object RBackend extends Logging { val listenPort = serverSocket.getLocalPort() // Connection timeout is set by socket client. To make it configurable we will pass the // timeout value to client inside the temp file - val conf = new SparkConf() + val conf = Option(SparkEnv.get).map(_.conf).getOrElse(new SparkConf()) val backendConnectionTimeout = conf.get(R_BACKEND_CONNECTION_TIMEOUT) // tell the R process via temporary file diff --git a/core/src/main/scala/org/apache/spark/api/r/RBackendHandler.scala b/core/src/main/scala/org/apache/spark/api/r/RBackendHandler.scala index 7b74efa..aaa432d 100644 --- a/core/src/main/scala/org/apache/spark/api/r/RBackendHandler.scala +++ b/core/src/main/scala/org/apache/spark/api/r/RBackendHandler.scala @@ -26,7 +26,7 @@ import io.netty.channel.{ChannelHandlerContext, SimpleChannelInboundHandler} import io.netty.channel.ChannelHandler.Sharable import io.netty.handler.timeout.ReadTimeoutException -import org.apache.spark.SparkConf +import org.apache.spark.{SparkConf, SparkEnv} import org.apache.spark.api.r.SerDe._ import org.apache.spark.internal.Logging import org.apache.spark.internal.config.R._ @@ -98,7 +98,7 @@ private[r] class RBackendHandler(server: RBackend) ctx.write(pingBaos.toByteArray) } } - val conf = new SparkConf() + val conf = Option(SparkEnv.get).map(_.conf).getOrElse(new SparkConf()) val heartBeatInterval = conf.get(R_HEARTBEAT_INTERVAL) val backendConnectionTimeout = conf.get(R_BACKEND_CONNECTION_TIMEOUT) val interval = Math.min(heartBeatInterval, backendConnectionTimeout - 1) --------------------------------------------------------------------- To unsubscribe, e-mail: commits-unsubscr...@spark.apache.org For additional commands, e-mail: commits-h...@spark.apache.org