Re: Out of memory with twitter spark streaming

2015-08-09 Thread Akhil Das
I'm not sure what you are upto, but if you can explain what you are trying
to achieve then may be we can restructure your code. On a quick glance i
could see :

 tweetsRDD*.collect()*.map(tweet=
DBQuery.saveTweets(tweet))


Which will bring the whole data into your driver machine and it would
possibly run out of memory, You can avoid that.

Thanks
Best Regards

On Fri, Aug 7, 2015 at 11:23 AM, Pankaj Narang pankajnaran...@gmail.com
wrote:

 Hi

 I am running one application using activator where I am retrieving tweets
 and storing them to mysql database using below code.

 I get OOM error after 5-6 hour with xmx1048M. If I increase the memory the
 OOM get delayed only.

 Can anybody give me clue. Here is the code

  var tweetStream  = TwitterUtils.createStream(ssc, None,keywords)
 var tweets = tweetStream.map(tweet = {
   var user = tweet.getUser
   var replyStatusId = tweet.getInReplyToStatusId
   var reTweetStatus = tweet.getRetweetedStatus
   var pTweetId = -1L
   var pcreatedAt = 0L
   if(reTweetStatus != null){
 pTweetId = reTweetStatus.getId
 pcreatedAt = reTweetStatus.getCreatedAt.getTime
   }
   tweet.getCreatedAt.getTime + |$ + tweet.getId +
 |$+user.getId + |$ + user.getName+ |$ + user.getScreenName + |$ +
 user.getDescription +
   |$ + tweet.getText.trim + |$ + user.getFollowersCount +
 |$ + user.getFriendsCount + |$ + tweet.getGeoLocation + |$ +
   user.getLocation + |$ + user.getBiggerProfileImageURL +
 |$
 + replyStatusId + |$ + pTweetId + |$ + pcreatedAt
 } )
   tweets.foreachRDD(tweetsRDD = {tweetsRDD.distinct()
  val count = tweetsRDD.count
  println(* +%s tweets found on
 this RDD.format(count))
  if (count   0){
 var timeMs = System.currentTimeMillis
 var counter =
 DBQuery.getProcessedCount()
var location=tweets/+ counter +/
 tweetsRDD.collect().map(tweet=
 DBQuery.saveTweets(tweet))
 //tweetsRDD.saveAsTextFile(location+
 timeMs)+ .txt
 DBQuery.addTweetRDD(counter)
 }
 })

// Checkpoint directory to recover from failures
println(tweets for the last stream are saved which can be processed
 later)
val= f:/svn1/checkpoint/
 ssc.checkpoint(checkpointDir)
 ssc.start()
 ssc.awaitTermination()


 regards
 Pankaj



 --
 View this message in context:
 http://apache-spark-user-list.1001560.n3.nabble.com/Out-of-memory-with-twitter-spark-streaming-tp24162.html
 Sent from the Apache Spark User List mailing list archive at Nabble.com.

 -
 To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
 For additional commands, e-mail: user-h...@spark.apache.org




Out of memory with twitter spark streaming

2015-08-06 Thread Pankaj Narang
Hi 

I am running one application using activator where I am retrieving tweets
and storing them to mysql database using below code. 

I get OOM error after 5-6 hour with xmx1048M. If I increase the memory the
OOM get delayed only.

Can anybody give me clue. Here is the code

 var tweetStream  = TwitterUtils.createStream(ssc, None,keywords)
var tweets = tweetStream.map(tweet = { 
  var user = tweet.getUser
  var replyStatusId = tweet.getInReplyToStatusId
  var reTweetStatus = tweet.getRetweetedStatus
  var pTweetId = -1L
  var pcreatedAt = 0L
  if(reTweetStatus != null){
pTweetId = reTweetStatus.getId
pcreatedAt = reTweetStatus.getCreatedAt.getTime
  }  
  tweet.getCreatedAt.getTime + |$ + tweet.getId +
|$+user.getId + |$ + user.getName+ |$ + user.getScreenName + |$ +
user.getDescription +
  |$ + tweet.getText.trim + |$ + user.getFollowersCount +
|$ + user.getFriendsCount + |$ + tweet.getGeoLocation + |$ +
  user.getLocation + |$ + user.getBiggerProfileImageURL + |$
+ replyStatusId + |$ + pTweetId + |$ + pcreatedAt
} )
  tweets.foreachRDD(tweetsRDD = {tweetsRDD.distinct()
 val count = tweetsRDD.count
 println(* +%s tweets found on
this RDD.format(count))
 if (count   0){
var timeMs = System.currentTimeMillis
var counter =
DBQuery.getProcessedCount()
   var location=tweets/+ counter +/ 
tweetsRDD.collect().map(tweet= 
DBQuery.saveTweets(tweet)) 
//tweetsRDD.saveAsTextFile(location+
timeMs)+ .txt
DBQuery.addTweetRDD(counter) 
}
})
  
   // Checkpoint directory to recover from failures
   println(tweets for the last stream are saved which can be processed
later)
   val= f:/svn1/checkpoint/
ssc.checkpoint(checkpointDir)
ssc.start()
ssc.awaitTermination()


regards
Pankaj



--
View this message in context: 
http://apache-spark-user-list.1001560.n3.nabble.com/Out-of-memory-with-twitter-spark-streaming-tp24162.html
Sent from the Apache Spark User List mailing list archive at Nabble.com.

-
To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
For additional commands, e-mail: user-h...@spark.apache.org