I'm not sure what you are upto, but if you can explain what you are trying
to achieve then may be we can restructure your code. On a quick glance i
could see :

 tweetsRDD*.collect()*.map(tweet=>
DBQuery.saveTweets(tweet))


Which will bring the whole data into your driver machine and it would
possibly run out of memory, You can avoid that.

Thanks
Best Regards

On Fri, Aug 7, 2015 at 11:23 AM, Pankaj Narang <pankajnaran...@gmail.com>
wrote:

> Hi
>
> I am running one application using activator where I am retrieving tweets
> and storing them to mysql database using below code.
>
> I get OOM error after 5-6 hour with xmx1048M. If I increase the memory the
> OOM get delayed only.
>
> Can anybody give me clue. Here is the code
>
>  var tweetStream  = TwitterUtils.createStream(ssc, None,keywords)
>         var tweets = tweetStream.map(tweet => {
>               var user = tweet.getUser
>               var replyStatusId = tweet.getInReplyToStatusId
>               var reTweetStatus = tweet.getRetweetedStatus
>               var pTweetId = -1L
>               var pcreatedAt = 0L
>               if(reTweetStatus != null){
>                 pTweetId = reTweetStatus.getId
>                 pcreatedAt = reTweetStatus.getCreatedAt.getTime
>               }
>               tweet.getCreatedAt.getTime + "|$" + tweet.getId +
> "|$"+user.getId + "|$" + user.getName+ "|$" + user.getScreenName + "|$" +
> user.getDescription +
>               "|$" + tweet.getText.trim + "|$" + user.getFollowersCount +
> "|$" + user.getFriendsCount + "|$" + tweet.getGeoLocation + "|$" +
>               user.getLocation + "|$" + user.getBiggerProfileImageURL +
> "|$"
> + replyStatusId + "|$" + pTweetId + "|$" + pcreatedAt
>         } )
>       tweets.foreachRDD(tweetsRDD => {tweetsRDD.distinct()
>                                      val count = tweetsRDD.count
>                                      println("*****" +"%s tweets found on
> this RDD".format(count))
>                                  if (count >  0){
>                                     var timeMs = System.currentTimeMillis
>                                     var counter =
> DBQuery.getProcessedCount()
>                                    var location="tweets/"+ counter +"/"
>                                     tweetsRDD.collect().map(tweet=>
> DBQuery.saveTweets(tweet))
>                                     //tweetsRDD.saveAsTextFile(location+
> timeMs)+ ".txt"
>                                     DBQuery.addTweetRDD(counter)
>                                                 }
>                                     })
>
>    // Checkpoint directory to recover from failures
>    println("tweets for the last stream are saved which can be processed
> later")
>    val    = "f:/svn1/checkpoint/"
>     ssc.checkpoint(checkpointDir)
>     ssc.start()
>     ssc.awaitTermination()
>
>
> regards
> Pankaj
>
>
>
> --
> View this message in context:
> http://apache-spark-user-list.1001560.n3.nabble.com/Out-of-memory-with-twitter-spark-streaming-tp24162.html
> Sent from the Apache Spark User List mailing list archive at Nabble.com.
>
> ---------------------------------------------------------------------
> To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
> For additional commands, e-mail: user-h...@spark.apache.org
>
>

Reply via email to