[ 
https://issues.apache.org/jira/browse/SPARK-15669?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Denis Buravlev closed SPARK-15669.
----------------------------------
    Resolution: Won't Fix

> Driver and Executor have different akka configuration in YARN cluster mode
> --------------------------------------------------------------------------
>
>                 Key: SPARK-15669
>                 URL: https://issues.apache.org/jira/browse/SPARK-15669
>             Project: Spark
>          Issue Type: Bug
>          Components: Spark Core, Streaming
>    Affects Versions: 1.6.1
>         Environment: Scala: 2.10
> Spark: 1.6.1
> Yarn Cluster
>            Reporter: Denis Buravlev
>            Priority: Critical
>
> I'm trying to run Spark Streaming application that uses ZeroMQ on a YARN 
> cluster. The application is failed with following message: {{ERROR 
> actor.OneForOneStrategy: No configuration setting found for key 
> 'akka.zeromq'}}.
> The configuration file exits and avaiable from Driver and Executor.
> After small research I've found one interesting thing: if you'll try execute 
> the code
> {{println(Try(SparkEnv.get.actorSystem.settings.config.getString("akka.zeromq.poll-timeout")).toOption.getOrElse("error!")})}}
>  
> you'll get different results for Driver and Executor:
> Driver stdout log: value from configuration(e.g. 100ms)
> Executor stdout log: "error!"
> *Is it correct behaviour for Spark?*
> Full code for reproducing:
> {code}
> object Application {
>   def main(args: Array[String]) {
>     val config = ConfigurationManager.getConfig(args)
>     val sparkConf = new SparkConf().setAppName("TestApp")
>     val ssc = new StreamingContext(sparkConf, Seconds(1))
>     println("Driver:")
>     
> println(Try(SparkEnv.get.actorSystem.settings.config.getString("akka.zeromq.poll-timeout")).toOption.getOrElse("error1"))
>     val rddQueue = new Queue[RDD[Int]]()
>     val inputStream = ssc.queueStream(rddQueue)
>     val mappedStream = inputStream.map{x =>
>       println("Executor:")
>       
> println(Try(SparkEnv.get.actorSystem.settings.config.getString("akka.zeromq.poll-timeout")).toOption.getOrElse("error2"))
>       (x % 10, 1)
>     }
>     val reducedStream = mappedStream.reduceByKey(_ + _)
>     reducedStream.print()
>     ssc.start()
>     for (i <- 1 to 30) {
>       rddQueue.synchronized {
>         rddQueue += ssc.sparkContext.makeRDD(1 to 1000, 10)
>       }
>       Thread.sleep(1000)
>     }
>     ssc.awaitTermination()
>   }
> }
> {code}



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)

---------------------------------------------------------------------
To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org
For additional commands, e-mail: issues-h...@spark.apache.org

Reply via email to