I just ran the same code and it is running perfectly fine on my machine. These are the things on my end:
- Spark version: 1.1.0 - Gave full path to the negative and positive files - Set twitter auth credentials in the environment. And here's the code: import org.apache.spark.SparkContext > import org.apache.spark.SparkContext._ > import org.apache.spark.SparkConf > import org.apache.spark.streaming.twitter.TwitterUtils > import org.apache.spark.streaming.{Seconds, StreamingContext} > object Sentimenter { > def main(args: Array[String]) { > System.setProperty("twitter4j.oauth.consumerKey","XXXXXXXXXXXXXXXXX"); > > System.setProperty("twitter4j.oauth.consumerSecret","XXXXXXXXXXXXXXXXXXXXXXXXX"); > > System.setProperty("twitter4j.oauth.accessToken","XXXXXXXXXXXXXXXXXXXXXXXXXXXX"); > > System.setProperty("twitter4j.oauth.accessTokenSecret","XXXXXXXXXXXXXXXXXXXXXXX"); > > val filters = new Array[String](2) > filters(0) = "ebola" > filters(1) = "isis" > val sparkConf = new > SparkConf().setAppName("TweetSentiment").setMaster("local[2]") > val sc = new SparkContext(sparkConf) > // get the list of positive words > val pos_list = sc.textFile("file:///home/akhld/positive-words.txt") > //Random > .filter(line => !line.isEmpty()) > .collect() > .toSet > // get the list of negative words > val neg_list = sc.textFile("file:///home/akhld/negative-words.txt") > //Random > .filter(line => !line.isEmpty()) > .collect() > .toSet > // create twitter stream > val ssc = new StreamingContext(sparkConf, Seconds(5)) > val stream = TwitterUtils.createStream(ssc, None, filters) > val tweets = stream.map(r => r.getText) > tweets.print() // print tweet text > ssc.start() > ssc.awaitTermination() > } > } Thanks Best Regards On Wed, Oct 15, 2014 at 1:43 AM, SK <skrishna...@gmail.com> wrote: > Hi, > > I am trying to implement simple sentiment analysis of Twitter streams in > Spark/Scala. I am getting an exception and it appears when I combine > SparkContext with StreamingContext in the same program. When I read the > positive and negative words using only SparkContext.textFile (without > creating a StreamingContext) and analyze static text files, the program > works. Likewise, when I just create the twitter stream using > StreamingContext (and dont create a SparkContext to create the vocabulary), > the program works. The exception seems to be appearing when I combine both > SparkContext and StreamingContext in the same program and I am not sure if > we are not allowed to have both simultaneously. All the examples in the > streaming module contain only the StreamingContext. The error transcript > and > my code appear below. I would appreciate your guidance in fixing this > error > and the right way to read static files and streams in the same program or > any pointers to relevant examples. > Thanks. > > > ----------------------Error transcript ----------------------------- > Lost task 0.0 in stage 2.0 (TID 70, mesos4-dev.sccps.net): > java.io.IOException: unexpected exception type > > java.io.ObjectStreamClass.throwMiscException(ObjectStreamClass.java:1538) > > java.io.ObjectStreamClass.invokeReadObject(ObjectStreamClass.java:1025) > > java.io.ObjectInputStream.readSerialData(ObjectInputStream.java:1893) > > java.io.ObjectInputStream.readOrdinaryObject(ObjectInputStream.java:1798) > java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1350) > > java.io.ObjectInputStream.defaultReadFields(ObjectInputStream.java:1990) > > java.io.ObjectInputStream.readSerialData(ObjectInputStream.java:1915) > > java.io.ObjectInputStream.readOrdinaryObject(ObjectInputStream.java:1798) > java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1350) > java.io.ObjectInputStream.readObject(ObjectInputStream.java:370) > > > org.apache.spark.serializer.JavaDeserializationStream.readObject(JavaSerializer.scala:62) > > > org.apache.spark.serializer.JavaSerializerInstance.deserialize(JavaSerializer.scala:87) > > org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:159) > > > java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1145) > > > java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:615) > java.lang.Thread.run(Thread.java:745) > ------------------------------ My code below ------------------------------ > object TweetSentiment { > def main(args: Array[String]) { > > > val filters = args > val sparkConf = new SparkConf().setAppName("TweetSentiment") > val sc = new SparkContext(sparkConf) > > // get the list of positive words > val pos_list = sc.textFile("positive-words.txt") > .filter(line => !line.isEmpty()) > .collect() > .toSet > > // get the list of negative words > val neg_list = sc.textFile("negative-words.txt") > .filter(line => !line.isEmpty()) > .collect() > .toSet > > // create twitter stream > val ssc = new StreamingContext(sparkConf, Seconds(60)) > val stream = TwitterUtils.createStream(ssc, None, filters) > val tweets = stream.map(r => r.getText) > tweets.print() // print tweet text > > ssc.start() > ssc.awaitTermination() > sc.stop() // I tried commenting this, but the exception still appeared. > } > > > > > > > -- > View this message in context: > http://apache-spark-user-list.1001560.n3.nabble.com/Spark-Streaming-Sentiment-Analysis-of-Twitter-streams-tp16410.html > Sent from the Apache Spark User List mailing list archive at Nabble.com. > > --------------------------------------------------------------------- > To unsubscribe, e-mail: user-unsubscr...@spark.apache.org > For additional commands, e-mail: user-h...@spark.apache.org > >