[ https://issues.apache.org/jira/browse/SPARK-15669?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=15312126#comment-15312126 ]
Denis Buravlev edited comment on SPARK-15669 at 6/2/16 10:48 AM: ----------------------------------------------------------------- [~zsxwing] thank you for help. I've tried to set "akka.zeromq.*" options via the "application.conf" file. Your way is really working. I see two ways for solving my problem: * Set options via the "SparkConf" object (e.g. {{sparkConf.set("akka.zeromq.new-socket-timeout", "5")}}) * Set options via cli args (e.g. {{--conf "spark.executor.extraJavaOptions=-Dakka.zeromq.poll-timeout=100ms -Dakka.zeromq.new-socket-timeout=5s"}}) was (Author: denbur): [~zsxwing] thank you for help. I've tried to set "akka.zeromq.*" options via the "application.conf" file. Your way is really working. I see two ways for solving my problem: * Set options via the "SparkConf" object (e.g. {{sparkConf.set("akka.zeromq.new-socket-timeout", "5")}}) * Set options via cli args (e.g. {--conf "spark.executor.extraJavaOptions=-Dakka.zeromq.poll-timeout=100ms -Dakka.zeromq.new-socket-timeout=5s"}) > Driver and Executor have different akka configuration in YARN cluster mode > -------------------------------------------------------------------------- > > Key: SPARK-15669 > URL: https://issues.apache.org/jira/browse/SPARK-15669 > Project: Spark > Issue Type: Bug > Components: Spark Core, Streaming > Affects Versions: 1.6.1 > Environment: Scala: 2.10 > Spark: 1.6.1 > Yarn Cluster > Reporter: Denis Buravlev > Priority: Critical > > I'm trying to run Spark Streaming application that uses ZeroMQ on a YARN > cluster. The application is failed with following message: {{ERROR > actor.OneForOneStrategy: No configuration setting found for key > 'akka.zeromq'}}. > The configuration file exits and avaiable from Driver and Executor. > After small research I've found one interesting thing: if you'll try execute > the code > {{println(Try(SparkEnv.get.actorSystem.settings.config.getString("akka.zeromq.poll-timeout")).toOption.getOrElse("error!")})}} > > you'll get different results for Driver and Executor: > Driver stdout log: value from configuration(e.g. 100ms) > Executor stdout log: "error!" > *Is it correct behaviour for Spark?* > Full code for reproducing: > {code} > object Application { > def main(args: Array[String]) { > val config = ConfigurationManager.getConfig(args) > val sparkConf = new SparkConf().setAppName("TestApp") > val ssc = new StreamingContext(sparkConf, Seconds(1)) > println("Driver:") > > println(Try(SparkEnv.get.actorSystem.settings.config.getString("akka.zeromq.poll-timeout")).toOption.getOrElse("error1")) > val rddQueue = new Queue[RDD[Int]]() > val inputStream = ssc.queueStream(rddQueue) > val mappedStream = inputStream.map{x => > println("Executor:") > > println(Try(SparkEnv.get.actorSystem.settings.config.getString("akka.zeromq.poll-timeout")).toOption.getOrElse("error2")) > (x % 10, 1) > } > val reducedStream = mappedStream.reduceByKey(_ + _) > reducedStream.print() > ssc.start() > for (i <- 1 to 30) { > rddQueue.synchronized { > rddQueue += ssc.sparkContext.makeRDD(1 to 1000, 10) > } > Thread.sleep(1000) > } > ssc.awaitTermination() > } > } > {code} -- This message was sent by Atlassian JIRA (v6.3.4#6332) --------------------------------------------------------------------- To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org For additional commands, e-mail: issues-h...@spark.apache.org