Hi, I am trying to implement simple sentiment analysis of Twitter streams in Spark/Scala. I am getting an exception and it appears when I combine SparkContext with StreamingContext in the same program. When I read the positive and negative words using only SparkContext.textFile (without creating a StreamingContext) and analyze static text files, the program works. Likewise, when I just create the twitter stream using StreamingContext (and dont create a SparkContext to create the vocabulary), the program works. The exception seems to be appearing when I combine both SparkContext and StreamingContext in the same program and I am not sure if we are not allowed to have both simultaneously. All the examples in the streaming module contain only the StreamingContext. The error transcript and my code appear below. I would appreciate your guidance in fixing this error and the right way to read static files and streams in the same program or any pointers to relevant examples. Thanks.
----------------------Error transcript ----------------------------- Lost task 0.0 in stage 2.0 (TID 70, mesos4-dev.sccps.net): java.io.IOException: unexpected exception type java.io.ObjectStreamClass.throwMiscException(ObjectStreamClass.java:1538) java.io.ObjectStreamClass.invokeReadObject(ObjectStreamClass.java:1025) java.io.ObjectInputStream.readSerialData(ObjectInputStream.java:1893) java.io.ObjectInputStream.readOrdinaryObject(ObjectInputStream.java:1798) java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1350) java.io.ObjectInputStream.defaultReadFields(ObjectInputStream.java:1990) java.io.ObjectInputStream.readSerialData(ObjectInputStream.java:1915) java.io.ObjectInputStream.readOrdinaryObject(ObjectInputStream.java:1798) java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1350) java.io.ObjectInputStream.readObject(ObjectInputStream.java:370) org.apache.spark.serializer.JavaDeserializationStream.readObject(JavaSerializer.scala:62) org.apache.spark.serializer.JavaSerializerInstance.deserialize(JavaSerializer.scala:87) org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:159) java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1145) java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:615) java.lang.Thread.run(Thread.java:745) ------------------------------ My code below ------------------------------ object TweetSentiment { def main(args: Array[String]) { val filters = args val sparkConf = new SparkConf().setAppName("TweetSentiment") val sc = new SparkContext(sparkConf) // get the list of positive words val pos_list = sc.textFile("positive-words.txt") .filter(line => !line.isEmpty()) .collect() .toSet // get the list of negative words val neg_list = sc.textFile("negative-words.txt") .filter(line => !line.isEmpty()) .collect() .toSet // create twitter stream val ssc = new StreamingContext(sparkConf, Seconds(60)) val stream = TwitterUtils.createStream(ssc, None, filters) val tweets = stream.map(r => r.getText) tweets.print() // print tweet text ssc.start() ssc.awaitTermination() sc.stop() // I tried commenting this, but the exception still appeared. } -- View this message in context: http://apache-spark-user-list.1001560.n3.nabble.com/Spark-Streaming-Sentiment-Analysis-of-Twitter-streams-tp16410.html Sent from the Apache Spark User List mailing list archive at Nabble.com. --------------------------------------------------------------------- To unsubscribe, e-mail: user-unsubscr...@spark.apache.org For additional commands, e-mail: user-h...@spark.apache.org