Re: Broadcast var is null
This advice solved the problem: "Stop having your object extend App, and instead give it a main method." https://issues.apache.org/jira/browse/SPARK-4170 -- View this message in context: http://apache-spark-user-list.1001560.n3.nabble.com/Broadcast-var-is-null-tp24927p24959.html Sent from the Apache Spark User List mailing list archive at Nabble.com. - To unsubscribe, e-mail: user-unsubscr...@spark.apache.org For additional commands, e-mail: user-h...@spark.apache.org
Re: Broadcast var is null
Yes, see https://issues.apache.org/jira/browse/SPARK-4170 The reason was kind of complicated, and the 'fix' was just to warn you against subclassing App! yes, use a main() method. On Tue, Oct 6, 2015 at 3:15 PM, Nick Peterson wrote: > This might seem silly, but... > > Stop having your object extend App, and instead give it a main method. > That's worked for me recently when I've had this issue. (There was a very > old issue in Spark related to this; it would seem like a possible > regression, if this fixes it for you.) > > -- Nick > > On Tue, Oct 6, 2015 at 5:31 AM dpristin wrote: >> >> I've reduced the code to the code below - no streaming, no Kafka, no >> checkpoint. Unfortunately the end result is the same - "broadcastVar is >> null" printed in the worker log. Any suggestion on what I'm missing would >> be >> very much appreciated ! >> >> >> object BroadcastTest extends App { >> val logger = LoggerFactory.getLogger("OinkSparkMain") >> logger.info("OinkSparkMain - Setup Logger") >> >> val sparkConf = new SparkConf().setAppName("OinkSparkMain") >> val sc : SparkContext = new SparkContext(sparkConf) >> >> val rdd = sc.parallelize(Array(1,2,3)); >> >> val arr = Array(1, 2, 3) >> val broadcastVar = sc.broadcast(arr) >> >> val mappedEvents = rdd.map(e => { >> val l = LoggerFactory.getLogger("OinkSparkMain1") >> >> if (broadcastVar == null) { >> l.info("broadcastVar is null") >> (e, "empty") >> } >> else { >> val str = broadcastVar.value.mkString(" | ") >> l.info("broadcastVar is " + str) >> (e, str) >> } >> }) >> >> logger.info("** Total reduced count: " + >> mappedEvents.collect().length) >> } >> >> >> >> >> >> >> -- >> View this message in context: >> http://apache-spark-user-list.1001560.n3.nabble.com/Broadcast-var-is-null-tp24927p24950.html >> Sent from the Apache Spark User List mailing list archive at Nabble.com. >> >> - >> To unsubscribe, e-mail: user-unsubscr...@spark.apache.org >> For additional commands, e-mail: user-h...@spark.apache.org >> > - To unsubscribe, e-mail: user-unsubscr...@spark.apache.org For additional commands, e-mail: user-h...@spark.apache.org
Re: Broadcast var is null
This might seem silly, but... Stop having your object extend App, and instead give it a main method. That's worked for me recently when I've had this issue. (There was a very old issue in Spark related to this; it would seem like a possible regression, if this fixes it for you.) -- Nick On Tue, Oct 6, 2015 at 5:31 AM dpristin wrote: > I've reduced the code to the code below - no streaming, no Kafka, no > checkpoint. Unfortunately the end result is the same - "broadcastVar is > null" printed in the worker log. Any suggestion on what I'm missing would > be > very much appreciated ! > > > object BroadcastTest extends App { > val logger = LoggerFactory.getLogger("OinkSparkMain") > logger.info("OinkSparkMain - Setup Logger") > > val sparkConf = new SparkConf().setAppName("OinkSparkMain") > val sc : SparkContext = new SparkContext(sparkConf) > > val rdd = sc.parallelize(Array(1,2,3)); > > val arr = Array(1, 2, 3) > val broadcastVar = sc.broadcast(arr) > > val mappedEvents = rdd.map(e => { > val l = LoggerFactory.getLogger("OinkSparkMain1") > > if (broadcastVar == null) { > l.info("broadcastVar is null") > (e, "empty") > } > else { > val str = broadcastVar.value.mkString(" | ") > l.info("broadcastVar is " + str) > (e, str) > } > }) > > logger.info("** Total reduced count: " + > mappedEvents.collect().length) > } > > > > > > > -- > View this message in context: > http://apache-spark-user-list.1001560.n3.nabble.com/Broadcast-var-is-null-tp24927p24950.html > Sent from the Apache Spark User List mailing list archive at Nabble.com. > > - > To unsubscribe, e-mail: user-unsubscr...@spark.apache.org > For additional commands, e-mail: user-h...@spark.apache.org > >
Re: Broadcast var is null
I've reduced the code to the code below - no streaming, no Kafka, no checkpoint. Unfortunately the end result is the same - "broadcastVar is null" printed in the worker log. Any suggestion on what I'm missing would be very much appreciated ! object BroadcastTest extends App { val logger = LoggerFactory.getLogger("OinkSparkMain") logger.info("OinkSparkMain - Setup Logger") val sparkConf = new SparkConf().setAppName("OinkSparkMain") val sc : SparkContext = new SparkContext(sparkConf) val rdd = sc.parallelize(Array(1,2,3)); val arr = Array(1, 2, 3) val broadcastVar = sc.broadcast(arr) val mappedEvents = rdd.map(e => { val l = LoggerFactory.getLogger("OinkSparkMain1") if (broadcastVar == null) { l.info("broadcastVar is null") (e, "empty") } else { val str = broadcastVar.value.mkString(" | ") l.info("broadcastVar is " + str) (e, str) } }) logger.info("** Total reduced count: " + mappedEvents.collect().length) } -- View this message in context: http://apache-spark-user-list.1001560.n3.nabble.com/Broadcast-var-is-null-tp24927p24950.html Sent from the Apache Spark User List mailing list archive at Nabble.com. - To unsubscribe, e-mail: user-unsubscr...@spark.apache.org For additional commands, e-mail: user-h...@spark.apache.org
Re: Broadcast var is null
Hi guys, thanks a lot for responding so quickly! I've reduced the code to the code below - no streaming, no Kafka, no checkpoint. Unfortunately the end result is the same. Any suggestion to where I'm messing up would be very much appreciated ! object BroadcastTest extends App { val logger = LoggerFactory.getLogger("OinkSparkMain") logger.info("OinkSparkMain - Setup Logger") val sparkConf = new SparkConf().setAppName("OinkSparkMain") val sc : SparkContext = new SparkContext(sparkConf) val rdd = sc.parallelize(Array(1,2,3)); val arr = Array(1, 2, 3) val broadcastVar = sc.broadcast(arr) val mappedEvents = rdd.map(e => { val l = LoggerFactory.getLogger("OinkSparkMain1") if (broadcastVar == null) { l.info("broadcastVar is null") (e, "empty") } else { val str = broadcastVar.value.mkString(" | ") l.info("broadcastVar is " + str) (e, str) } }) logger.info("** Total reduced count: " + mappedEvents.collect().length) } On Mon, Oct 5, 2015 at 4:14 PM, Adrian Tanase wrote: > FYI the same happens with accumulators when recovering from checkpoint. > I'd love to see this fixed somehow as the workaround (using a singleton > factory in foreachRdd to make sure the accumulators are initialized instead > of null) is really intrusive... > > Sent from my iPhone > > On 05 Oct 2015, at 22:52, Tathagata Das wrote: > > Make sure the broadcast variable works independent of the streaming > application. Then make sure it work without have > StreamingContext.getOrCreate(). That will disambiguate whether that error > is thrown when starting a new context, or when recovering a context from > checkpoint (as getOrCreate is supposed to do). > > On Mon, Oct 5, 2015 at 9:23 AM, dpristin wrote: > >> Hi, >> >> Can anyone point me out to what I'm doing wrong? I've implemented a very >> basic spark streaming app that uses a single broadcast variable. When it >> runs locally it produces a proper output (the array I broadcast). But when >> deployed on the cluster I get "broadcastVar is null". We use v 1.4.1. Here >> is the code: >> >> --- imports go here >> >> object BroadcastTest extends App { >> val logger = LoggerFactory.getLogger("OinkSparkMain") >> logger.info("OinkSparkMain - Setup Logger") >> >> // This is our custom context setup code; nothing fancy goes on here >> val config = Configuration(args) >> val ssc: StreamingContext = >> StreamingContext.getOrCreate(config.checkpointDirectory, () => { >> SparkStreamingContextFactory.Create(config, timeWindow = Seconds(10))}) >> >> >> val kafkaStreamFactory = new KafkaStreamFactory(config, ssc) >> val messages = kafkaStreamFactory.Create >> >> // Grab the value data above kafka input dstream as a string >> val events = messages.map( s => s._2 ) >> >> //Create a broadcast variable - straight from the dev guide >> val broadcastVar = ssc.sparkContext.broadcast(Array(1, 2, 3)) >> >> //Try to print out the value of the broadcast var here >> val transformed = events.transform(rdd => { >> rdd.map(x => { >> if(broadcastVar == null) { >> println("broadcastVar is null") >> } else { >> println("broadcastVar value: " + broadcastVar.value.mkString("|")) >> } >> x >> }) >> }) >> >> transformed.foreachRDD(x => logger.info("Data: " + >> x.collect.mkString("|"))) >> >> ssc.start() >> ssc.awaitTermination() >> } >> >> Any input is very much appreciated! >> >> Regards, >> Dmitry. >> >> >> >> >> >> >> >> -- >> View this message in context: >> http://apache-spark-user-list.1001560.n3.nabble.com/Broadcast-var-is-null-tp24927.html >> Sent from the Apache Spark User List mailing list archive at Nabble.com >> <http://nabble.com>. >> >> - >> To unsubscribe, e-mail: user-unsubscr...@spark.apache.org >> For additional commands, e-mail: user-h...@spark.apache.org >> >> >
Re: Broadcast var is null
FYI the same happens with accumulators when recovering from checkpoint. I'd love to see this fixed somehow as the workaround (using a singleton factory in foreachRdd to make sure the accumulators are initialized instead of null) is really intrusive... Sent from my iPhone On 05 Oct 2015, at 22:52, Tathagata Das mailto:t...@databricks.com>> wrote: Make sure the broadcast variable works independent of the streaming application. Then make sure it work without have StreamingContext.getOrCreate(). That will disambiguate whether that error is thrown when starting a new context, or when recovering a context from checkpoint (as getOrCreate is supposed to do). On Mon, Oct 5, 2015 at 9:23 AM, dpristin mailto:dpris...@gmail.com>> wrote: Hi, Can anyone point me out to what I'm doing wrong? I've implemented a very basic spark streaming app that uses a single broadcast variable. When it runs locally it produces a proper output (the array I broadcast). But when deployed on the cluster I get "broadcastVar is null". We use v 1.4.1. Here is the code: --- imports go here object BroadcastTest extends App { val logger = LoggerFactory.getLogger("OinkSparkMain") logger.info<http://logger.info>("OinkSparkMain - Setup Logger") // This is our custom context setup code; nothing fancy goes on here val config = Configuration(args) val ssc: StreamingContext = StreamingContext.getOrCreate(config.checkpointDirectory, () => { SparkStreamingContextFactory.Create(config, timeWindow = Seconds(10))}) val kafkaStreamFactory = new KafkaStreamFactory(config, ssc) val messages = kafkaStreamFactory.Create // Grab the value data above kafka input dstream as a string val events = messages.map( s => s._2 ) //Create a broadcast variable - straight from the dev guide val broadcastVar = ssc.sparkContext.broadcast(Array(1, 2, 3)) //Try to print out the value of the broadcast var here val transformed = events.transform(rdd => { rdd.map(x => { if(broadcastVar == null) { println("broadcastVar is null") } else { println("broadcastVar value: " + broadcastVar.value.mkString("|")) } x }) }) transformed.foreachRDD(x => logger.info<http://logger.info>("Data: " + x.collect.mkString("|"))) ssc.start() ssc.awaitTermination() } Any input is very much appreciated! Regards, Dmitry. -- View this message in context: http://apache-spark-user-list.1001560.n3.nabble.com/Broadcast-var-is-null-tp24927.html Sent from the Apache Spark User List mailing list archive at Nabble.com<http://nabble.com>. - To unsubscribe, e-mail: user-unsubscr...@spark.apache.org<mailto:user-unsubscr...@spark.apache.org> For additional commands, e-mail: user-h...@spark.apache.org<mailto:user-h...@spark.apache.org>
Re: Broadcast var is null
Make sure the broadcast variable works independent of the streaming application. Then make sure it work without have StreamingContext.getOrCreate(). That will disambiguate whether that error is thrown when starting a new context, or when recovering a context from checkpoint (as getOrCreate is supposed to do). On Mon, Oct 5, 2015 at 9:23 AM, dpristin wrote: > Hi, > > Can anyone point me out to what I'm doing wrong? I've implemented a very > basic spark streaming app that uses a single broadcast variable. When it > runs locally it produces a proper output (the array I broadcast). But when > deployed on the cluster I get "broadcastVar is null". We use v 1.4.1. Here > is the code: > > --- imports go here > > object BroadcastTest extends App { > val logger = LoggerFactory.getLogger("OinkSparkMain") > logger.info("OinkSparkMain - Setup Logger") > > // This is our custom context setup code; nothing fancy goes on here > val config = Configuration(args) > val ssc: StreamingContext = > StreamingContext.getOrCreate(config.checkpointDirectory, () => { > SparkStreamingContextFactory.Create(config, timeWindow = Seconds(10))}) > > > val kafkaStreamFactory = new KafkaStreamFactory(config, ssc) > val messages = kafkaStreamFactory.Create > > // Grab the value data above kafka input dstream as a string > val events = messages.map( s => s._2 ) > > //Create a broadcast variable - straight from the dev guide > val broadcastVar = ssc.sparkContext.broadcast(Array(1, 2, 3)) > > //Try to print out the value of the broadcast var here > val transformed = events.transform(rdd => { > rdd.map(x => { > if(broadcastVar == null) { > println("broadcastVar is null") > } else { > println("broadcastVar value: " + broadcastVar.value.mkString("|")) > } > x > }) > }) > > transformed.foreachRDD(x => logger.info("Data: " + > x.collect.mkString("|"))) > > ssc.start() > ssc.awaitTermination() > } > > Any input is very much appreciated! > > Regards, > Dmitry. > > > > > > > > -- > View this message in context: > http://apache-spark-user-list.1001560.n3.nabble.com/Broadcast-var-is-null-tp24927.html > Sent from the Apache Spark User List mailing list archive at Nabble.com. > > - > To unsubscribe, e-mail: user-unsubscr...@spark.apache.org > For additional commands, e-mail: user-h...@spark.apache.org > >
Broadcast var is null
Hi, Can anyone point me out to what I'm doing wrong? I've implemented a very basic spark streaming app that uses a single broadcast variable. When it runs locally it produces a proper output (the array I broadcast). But when deployed on the cluster I get "broadcastVar is null". We use v 1.4.1. Here is the code: --- imports go here object BroadcastTest extends App { val logger = LoggerFactory.getLogger("OinkSparkMain") logger.info("OinkSparkMain - Setup Logger") // This is our custom context setup code; nothing fancy goes on here val config = Configuration(args) val ssc: StreamingContext = StreamingContext.getOrCreate(config.checkpointDirectory, () => { SparkStreamingContextFactory.Create(config, timeWindow = Seconds(10))}) val kafkaStreamFactory = new KafkaStreamFactory(config, ssc) val messages = kafkaStreamFactory.Create // Grab the value data above kafka input dstream as a string val events = messages.map( s => s._2 ) //Create a broadcast variable - straight from the dev guide val broadcastVar = ssc.sparkContext.broadcast(Array(1, 2, 3)) //Try to print out the value of the broadcast var here val transformed = events.transform(rdd => { rdd.map(x => { if(broadcastVar == null) { println("broadcastVar is null") } else { println("broadcastVar value: " + broadcastVar.value.mkString("|")) } x }) }) transformed.foreachRDD(x => logger.info("Data: " + x.collect.mkString("|"))) ssc.start() ssc.awaitTermination() } Any input is very much appreciated! Regards, Dmitry. -- View this message in context: http://apache-spark-user-list.1001560.n3.nabble.com/Broadcast-var-is-null-tp24927.html Sent from the Apache Spark User List mailing list archive at Nabble.com. - To unsubscribe, e-mail: user-unsubscr...@spark.apache.org For additional commands, e-mail: user-h...@spark.apache.org