I just ran the same code and it is running perfectly fine on my machine.
These are the things on my end:

- Spark version: 1.1.0
- Gave full path to the negative and positive files
- Set twitter auth credentials in the environment.

And here's the code:

import org.apache.spark.SparkContext
> import org.apache.spark.SparkContext._
> import org.apache.spark.SparkConf
> import org.apache.spark.streaming.twitter.TwitterUtils
> import org.apache.spark.streaming.{Seconds, StreamingContext}
> object Sentimenter {
>   def main(args: Array[String]) {
>     System.setProperty("twitter4j.oauth.consumerKey","XXXXXXXXXXXXXXXXX");
>
> System.setProperty("twitter4j.oauth.consumerSecret","XXXXXXXXXXXXXXXXXXXXXXXXX");
>
> System.setProperty("twitter4j.oauth.accessToken","XXXXXXXXXXXXXXXXXXXXXXXXXXXX");
>
> System.setProperty("twitter4j.oauth.accessTokenSecret","XXXXXXXXXXXXXXXXXXXXXXX");
>
>     val filters = new Array[String](2)
>     filters(0) = "ebola"
>     filters(1) = "isis"
>     val sparkConf = new
> SparkConf().setAppName("TweetSentiment").setMaster("local[2]")
>     val sc = new SparkContext(sparkConf)
>     // get the list of positive words
>     val pos_list =  sc.textFile("file:///home/akhld/positive-words.txt")
> //Random
>       .filter(line => !line.isEmpty())
>       .collect()
>       .toSet
>     // get the list of negative words
>     val neg_list =  sc.textFile("file:///home/akhld/negative-words.txt")
> //Random
>       .filter(line => !line.isEmpty())
>       .collect()
>       .toSet
>     // create twitter stream
>     val ssc = new StreamingContext(sparkConf, Seconds(5))
>     val stream = TwitterUtils.createStream(ssc, None, filters)
>     val tweets = stream.map(r => r.getText)
>     tweets.print() // print tweet text
>     ssc.start()
>     ssc.awaitTermination()
>   }
> }





Thanks
Best Regards

On Wed, Oct 15, 2014 at 1:43 AM, SK <skrishna...@gmail.com> wrote:

> Hi,
>
> I am trying to implement simple sentiment analysis of Twitter streams in
> Spark/Scala.  I am getting an exception and it appears when I combine
> SparkContext with StreamingContext in the same program. When I read the
> positive and negative words using only SparkContext.textFile (without
> creating a StreamingContext) and analyze static text files, the program
> works. Likewise, when I just create the twitter stream using
> StreamingContext (and dont create a SparkContext to create the vocabulary),
> the program works. The exception seems to be appearing when I combine both
> SparkContext and StreamingContext in the same program and I am not sure if
> we are not allowed to  have both simultaneously. All the examples in the
> streaming module contain only the StreamingContext. The error transcript
> and
> my code appear below. I would appreciate your guidance  in fixing this
> error
> and the right way to  read static files and streams in the same program or
> any pointers to relevant examples.
> Thanks.
>
>
> ----------------------Error transcript -----------------------------
> Lost task 0.0 in stage 2.0 (TID 70, mesos4-dev.sccps.net):
> java.io.IOException: unexpected exception type
>
> java.io.ObjectStreamClass.throwMiscException(ObjectStreamClass.java:1538)
>
> java.io.ObjectStreamClass.invokeReadObject(ObjectStreamClass.java:1025)
>
> java.io.ObjectInputStream.readSerialData(ObjectInputStream.java:1893)
>
> java.io.ObjectInputStream.readOrdinaryObject(ObjectInputStream.java:1798)
>         java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1350)
>
> java.io.ObjectInputStream.defaultReadFields(ObjectInputStream.java:1990)
>
> java.io.ObjectInputStream.readSerialData(ObjectInputStream.java:1915)
>
> java.io.ObjectInputStream.readOrdinaryObject(ObjectInputStream.java:1798)
>         java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1350)
>         java.io.ObjectInputStream.readObject(ObjectInputStream.java:370)
>
>
> org.apache.spark.serializer.JavaDeserializationStream.readObject(JavaSerializer.scala:62)
>
>
> org.apache.spark.serializer.JavaSerializerInstance.deserialize(JavaSerializer.scala:87)
>
> org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:159)
>
>
> java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1145)
>
>
> java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:615)
>         java.lang.Thread.run(Thread.java:745)
> ------------------------------ My code below ------------------------------
> object TweetSentiment {
>   def main(args: Array[String]) {
>
>
>     val filters = args
>     val sparkConf = new SparkConf().setAppName("TweetSentiment")
>     val sc = new SparkContext(sparkConf)
>
>     // get the list of positive words
>     val pos_list =  sc.textFile("positive-words.txt")
>                       .filter(line => !line.isEmpty())
>                       .collect()
>                       .toSet
>
>     // get the list of negative words
>     val neg_list =  sc.textFile("negative-words.txt")
>                       .filter(line => !line.isEmpty())
>                       .collect()
>                       .toSet
>
>    // create twitter stream
>    val ssc = new StreamingContext(sparkConf, Seconds(60))
>    val stream = TwitterUtils.createStream(ssc, None, filters)
>    val tweets = stream.map(r => r.getText)
>    tweets.print() // print tweet text
>
>   ssc.start()
>   ssc.awaitTermination()
>   sc.stop()   // I tried commenting this, but the exception still appeared.
> }
>
>
>
>
>
>
> --
> View this message in context:
> http://apache-spark-user-list.1001560.n3.nabble.com/Spark-Streaming-Sentiment-Analysis-of-Twitter-streams-tp16410.html
> Sent from the Apache Spark User List mailing list archive at Nabble.com.
>
> ---------------------------------------------------------------------
> To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
> For additional commands, e-mail: user-h...@spark.apache.org
>
>

Reply via email to