Hi,

Can anyone point me out to what I'm doing wrong? I've implemented a very
basic spark streaming app that uses a single broadcast variable. When it
runs locally it produces a proper output (the array I broadcast). But when
deployed on the cluster I get "broadcastVar is null". We use v 1.4.1. Here
is the code:

--- imports go here

object BroadcastTest extends App {
  val logger = LoggerFactory.getLogger("OinkSparkMain")
  logger.info("OinkSparkMain - Setup Logger")

// This is our custom context setup code; nothing fancy goes on here
  val config = Configuration(args)
  val ssc: StreamingContext =
StreamingContext.getOrCreate(config.checkpointDirectory, () => {  
SparkStreamingContextFactory.Create(config, timeWindow = Seconds(10))})


  val kafkaStreamFactory = new KafkaStreamFactory(config, ssc)
  val messages = kafkaStreamFactory.Create

  // Grab the value data above kafka input dstream as a string
  val events = messages.map( s => s._2 )

  //Create a broadcast variable - straight from the dev guide
  val broadcastVar = ssc.sparkContext.broadcast(Array(1, 2, 3))

  //Try to print out the value of the broadcast var here
  val transformed = events.transform(rdd => {
    rdd.map(x => {
      if(broadcastVar == null) {
        println("broadcastVar is null")
      }  else {
        println("broadcastVar value: " + broadcastVar.value.mkString("|"))
      }
      x
    })
  })

  transformed.foreachRDD(x => logger.info("Data: " +
x.collect.mkString("|")))

  ssc.start()
  ssc.awaitTermination()
}

Any input is very much appreciated!

Regards,
Dmitry.







--
View this message in context: 
http://apache-spark-user-list.1001560.n3.nabble.com/Broadcast-var-is-null-tp24927.html
Sent from the Apache Spark User List mailing list archive at Nabble.com.

---------------------------------------------------------------------
To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
For additional commands, e-mail: user-h...@spark.apache.org

Reply via email to