Hi,
I try to read data from a static textfile stored in HDFS, store its content
into an ArrayBuffer which in turn should be broadcasted via
sparkContext.broadcast as a BroadcastVariable. I am using cloudera's spark,
version 1.6.0-cdh5.7.0 and spark-streaming_2.10.

I start the application on yarn using spark-submit:
*spark-submit --class my.package.BroadcastStreamTest1 --master yarn
--deploy-mode client --conf spark.executor.userClassPathFirst=true
current.jar*

When I do this, I get an 
*java.lang.ClassCastException: cannot assign instance of scala.Some to field
org.apache.spark.Accumulable.name of type scala.Option in instance of
org.apache.spark.Accumulator*
The same code used with a hard coded ArrayBuffer work perfectly so I assume
it has something to do with the static file resource...
Does anyone have an idea what I am possibly doing wrong? Any help
appreciated.

Here's my code:

*This does not work:*
object BroadcastStreamTest1 {

    def main(args: Array[String]) {
        val sparkConf = new SparkConf()
        val streamingContext = new StreamingContext(sparkConf, batchDuration
= Seconds(10))

        val content = streamingContext.sparkContext
            .textFile("hdfs:///data/someTextFile.txt")
            .collect()
            .toBuffer[String]

        val broadCastVar = streamingContext.sparkContext.broadcast(content)
        broadCastVar.value.foreach(line => println(line))
    }
}

*This works:*
object BroadcastStreamTest2 {

    def main(args: Array[String]) {
        val sparkConf = new SparkConf()
        val streamingContext = new StreamingContext(sparkConf, batchDuration
= Seconds(10))

        val content = new mutable.ArrayBuffer[String]
        (1 to 50).foreach(i => content += "line" + i)

        val broadCastVar = streamingContext.sparkContext.broadcast(content)
        broadCastVar.value.foreach(line => println(line))
    }
}

*StackTrace:*
16/04/25 10:09:59 ERROR scheduler.TaskSetManager: Task 0 in stage 0.0 failed
4 times; aborting job
Exception in thread "main" org.apache.spark.SparkException: Job aborted due
to stage failure: Task 0 in stage 0.0 failed 4 times, most recent failure:
Lost task 0.3 in stage 0.0 (TID 6, n525.hadoop.mxint.net):
java.io.IOException: java.lang.ClassCastException: cannot assign instance of
scala.Some to field org.apache.spark.Accumulable.name of type scala.Option
in instance of org.apache.spark.Accumulator
        at org.apache.spark.util.Utils$.tryOrIOException(Utils.scala:1208)
        at org.apache.spark.Accumulable.readObject(Accumulators.scala:151)
        at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
        at
sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:57)
        at
sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
        at java.lang.reflect.Method.invoke(Method.java:606)
        at 
java.io.ObjectStreamClass.invokeReadObject(ObjectStreamClass.java:1017)
        at java.io.ObjectInputStream.readSerialData(ObjectInputStream.java:1893)
        at
java.io.ObjectInputStream.readOrdinaryObject(ObjectInputStream.java:1798)
        at java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1350)
        at java.io.ObjectInputStream.readObject(ObjectInputStream.java:370)
        at scala.collection.immutable.$colon$colon.readObject(List.scala:362)
        at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
        at
sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:57)
        at
sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
        at java.lang.reflect.Method.invoke(Method.java:606)
        at 
java.io.ObjectStreamClass.invokeReadObject(ObjectStreamClass.java:1017)
        at java.io.ObjectInputStream.readSerialData(ObjectInputStream.java:1893)
        at
java.io.ObjectInputStream.readOrdinaryObject(ObjectInputStream.java:1798)
        at java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1350)
        at 
java.io.ObjectInputStream.defaultReadFields(ObjectInputStream.java:1990)
        at java.io.ObjectInputStream.readSerialData(ObjectInputStream.java:1915)
        at
java.io.ObjectInputStream.readOrdinaryObject(ObjectInputStream.java:1798)
        at java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1350)
        at java.io.ObjectInputStream.readObject(ObjectInputStream.java:370)
        at
org.apache.spark.serializer.JavaDeserializationStream.readObject(JavaSerializer.scala:76)
        at
org.apache.spark.serializer.JavaSerializerInstance.deserialize(JavaSerializer.scala:115)
        at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:194)
        at
java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1145)
        at
java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:615)
        at java.lang.Thread.run(Thread.java:745)
Caused by: java.lang.ClassCastException: cannot assign instance of
scala.Some to field org.apache.spark.Accumulable.name of type scala.Option
in instance of org.apache.spark.Accumulator
        at
java.io.ObjectStreamClass$FieldReflector.setObjFieldValues(ObjectStreamClass.java:2083)
        at 
java.io.ObjectStreamClass.setObjFieldValues(ObjectStreamClass.java:1261)
        at 
java.io.ObjectInputStream.defaultReadFields(ObjectInputStream.java:1996)
        at 
java.io.ObjectInputStream.defaultReadObject(ObjectInputStream.java:500)
        at
org.apache.spark.Accumulable$$anonfun$readObject$1.apply$mcV$sp(Accumulators.scala:152)
        at org.apache.spark.util.Utils$.tryOrIOException(Utils.scala:1205)
        ... 30 more

Driver stacktrace:
        at
org.apache.spark.scheduler.DAGScheduler.org$apache$spark$scheduler$DAGScheduler$$failJobAndIndependentStages(DAGScheduler.scala:1431)
        at
org.apache.spark.scheduler.DAGScheduler$$anonfun$abortStage$1.apply(DAGScheduler.scala:1419)
        at
org.apache.spark.scheduler.DAGScheduler$$anonfun$abortStage$1.apply(DAGScheduler.scala:1418)
        at
scala.collection.mutable.ResizableArray$class.foreach(ResizableArray.scala:59)
        at scala.collection.mutable.ArrayBuffer.foreach(ArrayBuffer.scala:47)
        at
org.apache.spark.scheduler.DAGScheduler.abortStage(DAGScheduler.scala:1418)
        at
org.apache.spark.scheduler.DAGScheduler$$anonfun$handleTaskSetFailed$1.apply(DAGScheduler.scala:799)
        at
org.apache.spark.scheduler.DAGScheduler$$anonfun$handleTaskSetFailed$1.apply(DAGScheduler.scala:799)
        at scala.Option.foreach(Option.scala:236)
        at
org.apache.spark.scheduler.DAGScheduler.handleTaskSetFailed(DAGScheduler.scala:799)
        at
org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.doOnReceive(DAGScheduler.scala:1640)
        at
org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.onReceive(DAGScheduler.scala:1599)
        at
org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.onReceive(DAGScheduler.scala:1588)
        at org.apache.spark.util.EventLoop$$anon$1.run(EventLoop.scala:48)
        at 
org.apache.spark.scheduler.DAGScheduler.runJob(DAGScheduler.scala:620)
        at org.apache.spark.SparkContext.runJob(SparkContext.scala:1843)
        at org.apache.spark.SparkContext.runJob(SparkContext.scala:1856)
        at org.apache.spark.SparkContext.runJob(SparkContext.scala:1869)
        at org.apache.spark.SparkContext.runJob(SparkContext.scala:1940)
        at org.apache.spark.rdd.RDD$$anonfun$collect$1.apply(RDD.scala:927)
        at
org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:150)
        at
org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:111)
        at org.apache.spark.rdd.RDD.withScope(RDD.scala:316)
        at org.apache.spark.rdd.RDD.collect(RDD.scala:926)
        at
net.meetrics.dada.streaming.application.BroadcastStreamTest1$.main(BroadcastStreamTest1.scala:14)
        at
net.meetrics.dada.streaming.application.BroadcastStreamTest1.main(BroadcastStreamTest1.scala)
        at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
        at
sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:57)
        at
sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
        at java.lang.reflect.Method.invoke(Method.java:606)
        at
org.apache.spark.deploy.SparkSubmit$.org$apache$spark$deploy$SparkSubmit$$runMain(SparkSubmit.scala:731)
        at 
org.apache.spark.deploy.SparkSubmit$.doRunMain$1(SparkSubmit.scala:181)
        at org.apache.spark.deploy.SparkSubmit$.submit(SparkSubmit.scala:206)
        at org.apache.spark.deploy.SparkSubmit$.main(SparkSubmit.scala:121)
        at org.apache.spark.deploy.SparkSubmit.main(SparkSubmit.scala)
Caused by: java.io.IOException: java.lang.ClassCastException: cannot assign
instance of scala.Some to field org.apache.spark.Accumulable.name of type
scala.Option in instance of org.apache.spark.Accumulator
        at org.apache.spark.util.Utils$.tryOrIOException(Utils.scala:1208)
        at org.apache.spark.Accumulable.readObject(Accumulators.scala:151)
        at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
        at
sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:57)
        at
sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
        at java.lang.reflect.Method.invoke(Method.java:606)
        at 
java.io.ObjectStreamClass.invokeReadObject(ObjectStreamClass.java:1017)
        at java.io.ObjectInputStream.readSerialData(ObjectInputStream.java:1893)
        at
java.io.ObjectInputStream.readOrdinaryObject(ObjectInputStream.java:1798)
        at java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1350)
        at java.io.ObjectInputStream.readObject(ObjectInputStream.java:370)
        at scala.collection.immutable.$colon$colon.readObject(List.scala:362)
        at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
        at
sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:57)
        at
sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
        at java.lang.reflect.Method.invoke(Method.java:606)
        at 
java.io.ObjectStreamClass.invokeReadObject(ObjectStreamClass.java:1017)
        at java.io.ObjectInputStream.readSerialData(ObjectInputStream.java:1893)
        at
java.io.ObjectInputStream.readOrdinaryObject(ObjectInputStream.java:1798)
        at java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1350)
        at 
java.io.ObjectInputStream.defaultReadFields(ObjectInputStream.java:1990)
        at java.io.ObjectInputStream.readSerialData(ObjectInputStream.java:1915)
        at
java.io.ObjectInputStream.readOrdinaryObject(ObjectInputStream.java:1798)
        at java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1350)
        at java.io.ObjectInputStream.readObject(ObjectInputStream.java:370)
        at
org.apache.spark.serializer.JavaDeserializationStream.readObject(JavaSerializer.scala:76)
        at
org.apache.spark.serializer.JavaSerializerInstance.deserialize(JavaSerializer.scala:115)
        at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:194)
        at
java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1145)
        at
java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:615)
        at java.lang.Thread.run(Thread.java:745)
Caused by: java.lang.ClassCastException: cannot assign instance of
scala.Some to field org.apache.spark.Accumulable.name of type scala.Option
in instance of org.apache.spark.Accumulator
        at
java.io.ObjectStreamClass$FieldReflector.setObjFieldValues(ObjectStreamClass.java:2083)
        at 
java.io.ObjectStreamClass.setObjFieldValues(ObjectStreamClass.java:1261)
        at 
java.io.ObjectInputStream.defaultReadFields(ObjectInputStream.java:1996)
        at 
java.io.ObjectInputStream.defaultReadObject(ObjectInputStream.java:500)
        at
org.apache.spark.Accumulable$$anonfun$readObject$1.apply$mcV$sp(Accumulators.scala:152)
        at org.apache.spark.util.Utils$.tryOrIOException(Utils.scala:1205)
        ... 30 more










--
View this message in context: 
http://apache-spark-user-list.1001560.n3.nabble.com/Spark-Streaming-Broadcast-variables-java-lang-ClassCastException-tp26828.html
Sent from the Apache Spark User List mailing list archive at Nabble.com.

---------------------------------------------------------------------
To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
For additional commands, e-mail: user-h...@spark.apache.org

Reply via email to