Re: Spark Streaming: Sentiment Analysis of Twitter streams

2014-10-15 Thread Akhil Das
 StreamingContext(sparkConf, Seconds(60))
val stream = TwitterUtils.createStream(ssc, None, filters)
val tweets = stream.map(r = r.getText)
tweets.print() // print tweet text

   ssc.start()
   ssc.awaitTermination()
   sc.stop()   // I tried commenting this, but the exception still appeared.
 }






 --
 View this message in context:
 http://apache-spark-user-list.1001560.n3.nabble.com/Spark-Streaming-Sentiment-Analysis-of-Twitter-streams-tp16410.html
 Sent from the Apache Spark User List mailing list archive at Nabble.com.

 -
 To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
 For additional commands, e-mail: user-h...@spark.apache.org




Re: Spark Streaming: Sentiment Analysis of Twitter streams

2014-10-15 Thread S Krishna
())
   .collect()
   .toSet

 // get the list of negative words
 val neg_list =  sc.textFile(negative-words.txt)
   .filter(line = !line.isEmpty())
   .collect()
   .toSet

// create twitter stream
val ssc = new StreamingContext(sparkConf, Seconds(60))
val stream = TwitterUtils.createStream(ssc, None, filters)
val tweets = stream.map(r = r.getText)
tweets.print() // print tweet text

   ssc.start()
   ssc.awaitTermination()
   sc.stop()   // I tried commenting this, but the exception still
 appeared.
 }






 --
 View this message in context:
 http://apache-spark-user-list.1001560.n3.nabble.com/Spark-Streaming-Sentiment-Analysis-of-Twitter-streams-tp16410.html
 Sent from the Apache Spark User List mailing list archive at Nabble.com.

 -
 To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
 For additional commands, e-mail: user-h...@spark.apache.org





Re: Spark Streaming: Sentiment Analysis of Twitter streams

2014-10-15 Thread Akhil Das
 SparkConf().setAppName(TweetSentiment)
 val sc = new SparkContext(sparkConf)

 // get the list of positive words
 val pos_list =  sc.textFile(positive-words.txt)
   .filter(line = !line.isEmpty())
   .collect()
   .toSet

 // get the list of negative words
 val neg_list =  sc.textFile(negative-words.txt)
   .filter(line = !line.isEmpty())
   .collect()
   .toSet

// create twitter stream
val ssc = new StreamingContext(sparkConf, Seconds(60))
val stream = TwitterUtils.createStream(ssc, None, filters)
val tweets = stream.map(r = r.getText)
tweets.print() // print tweet text

   ssc.start()
   ssc.awaitTermination()
   sc.stop()   // I tried commenting this, but the exception still
 appeared.
 }






 --
 View this message in context:
 http://apache-spark-user-list.1001560.n3.nabble.com/Spark-Streaming-Sentiment-Analysis-of-Twitter-streams-tp16410.html
 Sent from the Apache Spark User List mailing list archive at Nabble.com.

 -
 To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
 For additional commands, e-mail: user-h...@spark.apache.org





Re: Spark Streaming: Sentiment Analysis of Twitter streams

2014-10-15 Thread Sean Owen
It looks like you're making the StreamingContext and SparkContext
separately from the same conf. Instead, how about passing the
SparkContext to the StreamingContext constructor? it seems like better
practice and is a guess at the problem cause.

On Tue, Oct 14, 2014 at 9:13 PM, SK skrishna...@gmail.com wrote:
 Hi,

 I am trying to implement simple sentiment analysis of Twitter streams in
 Spark/Scala.  I am getting an exception and it appears when I combine
 SparkContext with StreamingContext in the same program. When I read the
 positive and negative words using only SparkContext.textFile (without
 creating a StreamingContext) and analyze static text files, the program
 works. Likewise, when I just create the twitter stream using
 StreamingContext (and dont create a SparkContext to create the vocabulary),
 the program works. The exception seems to be appearing when I combine both
 SparkContext and StreamingContext in the same program and I am not sure if
 we are not allowed to  have both simultaneously. All the examples in the
 streaming module contain only the StreamingContext. The error transcript and
 my code appear below. I would appreciate your guidance  in fixing this error
 and the right way to  read static files and streams in the same program or
 any pointers to relevant examples.
 Thanks.


 --Error transcript -
 Lost task 0.0 in stage 2.0 (TID 70, mesos4-dev.sccps.net):
 java.io.IOException: unexpected exception type

 java.io.ObjectStreamClass.throwMiscException(ObjectStreamClass.java:1538)

 java.io.ObjectStreamClass.invokeReadObject(ObjectStreamClass.java:1025)

 java.io.ObjectInputStream.readSerialData(ObjectInputStream.java:1893)

 java.io.ObjectInputStream.readOrdinaryObject(ObjectInputStream.java:1798)
 java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1350)

 java.io.ObjectInputStream.defaultReadFields(ObjectInputStream.java:1990)

 java.io.ObjectInputStream.readSerialData(ObjectInputStream.java:1915)

 java.io.ObjectInputStream.readOrdinaryObject(ObjectInputStream.java:1798)
 java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1350)
 java.io.ObjectInputStream.readObject(ObjectInputStream.java:370)

 org.apache.spark.serializer.JavaDeserializationStream.readObject(JavaSerializer.scala:62)

 org.apache.spark.serializer.JavaSerializerInstance.deserialize(JavaSerializer.scala:87)

 org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:159)

 java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1145)

 java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:615)
 java.lang.Thread.run(Thread.java:745)
 -- My code below --
 object TweetSentiment {
   def main(args: Array[String]) {


 val filters = args
 val sparkConf = new SparkConf().setAppName(TweetSentiment)
 val sc = new SparkContext(sparkConf)

 // get the list of positive words
 val pos_list =  sc.textFile(positive-words.txt)
   .filter(line = !line.isEmpty())
   .collect()
   .toSet

 // get the list of negative words
 val neg_list =  sc.textFile(negative-words.txt)
   .filter(line = !line.isEmpty())
   .collect()
   .toSet

// create twitter stream
val ssc = new StreamingContext(sparkConf, Seconds(60))
val stream = TwitterUtils.createStream(ssc, None, filters)
val tweets = stream.map(r = r.getText)
tweets.print() // print tweet text

   ssc.start()
   ssc.awaitTermination()
   sc.stop()   // I tried commenting this, but the exception still appeared.
 }






 --
 View this message in context: 
 http://apache-spark-user-list.1001560.n3.nabble.com/Spark-Streaming-Sentiment-Analysis-of-Twitter-streams-tp16410.html
 Sent from the Apache Spark User List mailing list archive at Nabble.com.

 -
 To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
 For additional commands, e-mail: user-h...@spark.apache.org


-
To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
For additional commands, e-mail: user-h...@spark.apache.org



Re: Spark Streaming: Sentiment Analysis of Twitter streams

2014-10-15 Thread SK
You are right. Creating the StreamingContext from the SparkContext instead of
SparkConf helped. Thanks for the help.



--
View this message in context: 
http://apache-spark-user-list.1001560.n3.nabble.com/Spark-Streaming-Sentiment-Analysis-of-Twitter-streams-tp16410p16520.html
Sent from the Apache Spark User List mailing list archive at Nabble.com.

-
To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
For additional commands, e-mail: user-h...@spark.apache.org



Spark Streaming: Sentiment Analysis of Twitter streams

2014-10-14 Thread SK
Hi,

I am trying to implement simple sentiment analysis of Twitter streams in
Spark/Scala.  I am getting an exception and it appears when I combine
SparkContext with StreamingContext in the same program. When I read the
positive and negative words using only SparkContext.textFile (without
creating a StreamingContext) and analyze static text files, the program
works. Likewise, when I just create the twitter stream using
StreamingContext (and dont create a SparkContext to create the vocabulary),
the program works. The exception seems to be appearing when I combine both
SparkContext and StreamingContext in the same program and I am not sure if
we are not allowed to  have both simultaneously. All the examples in the
streaming module contain only the StreamingContext. The error transcript and
my code appear below. I would appreciate your guidance  in fixing this error
and the right way to  read static files and streams in the same program or
any pointers to relevant examples.
Thanks.


--Error transcript -
Lost task 0.0 in stage 2.0 (TID 70, mesos4-dev.sccps.net):
java.io.IOException: unexpected exception type
   
java.io.ObjectStreamClass.throwMiscException(ObjectStreamClass.java:1538)
   
java.io.ObjectStreamClass.invokeReadObject(ObjectStreamClass.java:1025)
   
java.io.ObjectInputStream.readSerialData(ObjectInputStream.java:1893)
   
java.io.ObjectInputStream.readOrdinaryObject(ObjectInputStream.java:1798)
java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1350)
   
java.io.ObjectInputStream.defaultReadFields(ObjectInputStream.java:1990)
   
java.io.ObjectInputStream.readSerialData(ObjectInputStream.java:1915)
   
java.io.ObjectInputStream.readOrdinaryObject(ObjectInputStream.java:1798)
java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1350)
java.io.ObjectInputStream.readObject(ObjectInputStream.java:370)
   
org.apache.spark.serializer.JavaDeserializationStream.readObject(JavaSerializer.scala:62)
   
org.apache.spark.serializer.JavaSerializerInstance.deserialize(JavaSerializer.scala:87)
   
org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:159)
   
java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1145)
   
java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:615)
java.lang.Thread.run(Thread.java:745)
-- My code below --
object TweetSentiment {
  def main(args: Array[String]) {

 
val filters = args
val sparkConf = new SparkConf().setAppName(TweetSentiment)
val sc = new SparkContext(sparkConf)

// get the list of positive words
val pos_list =  sc.textFile(positive-words.txt)
  .filter(line = !line.isEmpty())
  .collect()
  .toSet

// get the list of negative words
val neg_list =  sc.textFile(negative-words.txt)
  .filter(line = !line.isEmpty())
  .collect()
  .toSet

   // create twitter stream
   val ssc = new StreamingContext(sparkConf, Seconds(60))
   val stream = TwitterUtils.createStream(ssc, None, filters)
   val tweets = stream.map(r = r.getText)
   tweets.print() // print tweet text

  ssc.start()
  ssc.awaitTermination()
  sc.stop()   // I tried commenting this, but the exception still appeared.
}






--
View this message in context: 
http://apache-spark-user-list.1001560.n3.nabble.com/Spark-Streaming-Sentiment-Analysis-of-Twitter-streams-tp16410.html
Sent from the Apache Spark User List mailing list archive at Nabble.com.

-
To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
For additional commands, e-mail: user-h...@spark.apache.org