Re: Out of memory with twitter spark streaming

2015-08-09 Thread Akhil Das
I'm not sure what you are upto, but if you can explain what you are trying
to achieve then may be we can restructure your code. On a quick glance i
could see :

 tweetsRDD*.collect()*.map(tweet=>
DBQuery.saveTweets(tweet))


Which will bring the whole data into your driver machine and it would
possibly run out of memory, You can avoid that.

Thanks
Best Regards

On Fri, Aug 7, 2015 at 11:23 AM, Pankaj Narang 
wrote:

> Hi
>
> I am running one application using activator where I am retrieving tweets
> and storing them to mysql database using below code.
>
> I get OOM error after 5-6 hour with xmx1048M. If I increase the memory the
> OOM get delayed only.
>
> Can anybody give me clue. Here is the code
>
>  var tweetStream  = TwitterUtils.createStream(ssc, None,keywords)
> var tweets = tweetStream.map(tweet => {
>   var user = tweet.getUser
>   var replyStatusId = tweet.getInReplyToStatusId
>   var reTweetStatus = tweet.getRetweetedStatus
>   var pTweetId = -1L
>   var pcreatedAt = 0L
>   if(reTweetStatus != null){
> pTweetId = reTweetStatus.getId
> pcreatedAt = reTweetStatus.getCreatedAt.getTime
>   }
>   tweet.getCreatedAt.getTime + "|$" + tweet.getId +
> "|$"+user.getId + "|$" + user.getName+ "|$" + user.getScreenName + "|$" +
> user.getDescription +
>   "|$" + tweet.getText.trim + "|$" + user.getFollowersCount +
> "|$" + user.getFriendsCount + "|$" + tweet.getGeoLocation + "|$" +
>   user.getLocation + "|$" + user.getBiggerProfileImageURL +
> "|$"
> + replyStatusId + "|$" + pTweetId + "|$" + pcreatedAt
> } )
>   tweets.foreachRDD(tweetsRDD => {tweetsRDD.distinct()
>  val count = tweetsRDD.count
>  println("*" +"%s tweets found on
> this RDD".format(count))
>  if (count >  0){
> var timeMs = System.currentTimeMillis
> var counter =
> DBQuery.getProcessedCount()
>var location="tweets/"+ counter +"/"
> tweetsRDD.collect().map(tweet=>
> DBQuery.saveTweets(tweet))
> //tweetsRDD.saveAsTextFile(location+
> timeMs)+ ".txt"
> DBQuery.addTweetRDD(counter)
> }
> })
>
>// Checkpoint directory to recover from failures
>println("tweets for the last stream are saved which can be processed
> later")
>val= "f:/svn1/checkpoint/"
> ssc.checkpoint(checkpointDir)
> ssc.start()
> ssc.awaitTermination()
>
>
> regards
> Pankaj
>
>
>
> --
> View this message in context:
> http://apache-spark-user-list.1001560.n3.nabble.com/Out-of-memory-with-twitter-spark-streaming-tp24162.html
> Sent from the Apache Spark User List mailing list archive at Nabble.com.
>
> -
> To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
> For additional commands, e-mail: user-h...@spark.apache.org
>
>


Out of memory with twitter spark streaming

2015-08-06 Thread Pankaj Narang
Hi 

I am running one application using activator where I am retrieving tweets
and storing them to mysql database using below code. 

I get OOM error after 5-6 hour with xmx1048M. If I increase the memory the
OOM get delayed only.

Can anybody give me clue. Here is the code

 var tweetStream  = TwitterUtils.createStream(ssc, None,keywords)
var tweets = tweetStream.map(tweet => { 
  var user = tweet.getUser
  var replyStatusId = tweet.getInReplyToStatusId
  var reTweetStatus = tweet.getRetweetedStatus
  var pTweetId = -1L
  var pcreatedAt = 0L
  if(reTweetStatus != null){
pTweetId = reTweetStatus.getId
pcreatedAt = reTweetStatus.getCreatedAt.getTime
  }  
  tweet.getCreatedAt.getTime + "|$" + tweet.getId +
"|$"+user.getId + "|$" + user.getName+ "|$" + user.getScreenName + "|$" +
user.getDescription +
  "|$" + tweet.getText.trim + "|$" + user.getFollowersCount +
"|$" + user.getFriendsCount + "|$" + tweet.getGeoLocation + "|$" +
  user.getLocation + "|$" + user.getBiggerProfileImageURL + "|$"
+ replyStatusId + "|$" + pTweetId + "|$" + pcreatedAt
} )
  tweets.foreachRDD(tweetsRDD => {tweetsRDD.distinct()
 val count = tweetsRDD.count
 println("*" +"%s tweets found on
this RDD".format(count))
 if (count >  0){
var timeMs = System.currentTimeMillis
var counter =
DBQuery.getProcessedCount()
   var location="tweets/"+ counter +"/" 
tweetsRDD.collect().map(tweet=> 
DBQuery.saveTweets(tweet)) 
//tweetsRDD.saveAsTextFile(location+
timeMs)+ ".txt"
DBQuery.addTweetRDD(counter) 
}
})
  
   // Checkpoint directory to recover from failures
   println("tweets for the last stream are saved which can be processed
later")
   val    = "f:/svn1/checkpoint/"
    ssc.checkpoint(checkpointDir)
ssc.start()
ssc.awaitTermination()


regards
Pankaj



--
View this message in context: 
http://apache-spark-user-list.1001560.n3.nabble.com/Out-of-memory-with-twitter-spark-streaming-tp24162.html
Sent from the Apache Spark User List mailing list archive at Nabble.com.

-
To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
For additional commands, e-mail: user-h...@spark.apache.org