Hi all, 

Here is a simplified example to show my concern. This example contains 3
files with 3 objects, depending on spark 1.6.1.

//file globalObject.scala
import org.apache.spark.broadcast.Broadcast

object globalObject {
  var br_value: Broadcast[Map[Int, Double]] = null
}


//file someFunc.scala
import org.apache.spark.SparkContext
import org.apache.spark.rdd.RDD

object someFunc {
  def go(rdd: RDD[Int])(implicit sc: SparkContext): Array[Int] = {
    rdd.map(i => {
      val acc = globalObject.br_value.value
      if(acc.contains(i)) {
        i + 1
      } else {
        i
      }
    }).take(100)
  }
}

//testMain.scala
import org.apache.spark.{SparkConf, SparkContext}

object testMain {
  def bootStrap()(implicit sc:SparkContext): Unit = {
    globalObject.br_value = sc.broadcast(Map(1->2, 2->3, 4->5))
  }

  def main(args: Array[String]): Unit = {
    lazy val appName = getClass.getSimpleName.split("\\$").last
    implicit val sc = new SparkContext(new SparkConf().setAppName(appName))
    val datardd = sc.parallelize(Range(0, 200), 200)
      .flatMap(i => Range(0, 1000))

    bootStrap()
    someFunc.go(datardd).foreach(println)

  }
}


When I run this code on cluster, it gives me the following error:
ERROR executor.Executor: Exception in task 0.0 in stage 0.0 (TID 0)
java.lang.NullPointerException
        at someFunc$$anonfun$go$1.apply$mcII$sp(someFunc.scala:7)
        at someFunc$$anonfun$go$1.apply(someFunc.scala:6)
        at someFunc$$anonfun$go$1.apply(someFunc.scala:6)
        at scala.collection.Iterator$$anon$11.next(Iterator.scala:328)
        at scala.collection.Iterator$$anon$10.next(Iterator.scala:312)
        at scala.collection.Iterator$class.foreach(Iterator.scala:727)
        at scala.collection.AbstractIterator.foreach(Iterator.scala:1157)
        at
scala.collection.generic.Growable$class.$plus$plus$eq(Growable.scala:48)
        at
scala.collection.mutable.ArrayBuffer.$plus$plus$eq(ArrayBuffer.scala:103)

Apparently, the data is not successfully broadcasted. I met this problem
when I was refactoring my code these days. I want different scala objects to
share a same broadcast variable. But now here it is. Pretty confusing now,
as to my understanding driver uses pointer to indicate broadcast variable.
Calling broadcast variable shouldn't be restricted to the same code scope. 

Correct me if I am wrong. And what's the proper way to share broadcast var
among scala objects ? Thanks in advance.






--
View this message in context: 
http://apache-spark-user-list.1001560.n3.nabble.com/Why-I-can-t-use-broadcast-var-defined-in-a-global-object-tp27523.html
Sent from the Apache Spark User List mailing list archive at Nabble.com.

---------------------------------------------------------------------
To unsubscribe e-mail: user-unsubscr...@spark.apache.org

Reply via email to