Hi 

I am running one application using activator where I am retrieving tweets
and storing them to mysql database using below code. 

I get OOM error after 5-6 hour with xmx1048M. If I increase the memory the
OOM get delayed only.

Can anybody give me clue. Here is the code

 var tweetStream  = TwitterUtils.createStream(ssc, None,keywords)
        var tweets = tweetStream.map(tweet => { 
              var user = tweet.getUser
              var replyStatusId = tweet.getInReplyToStatusId
              var reTweetStatus = tweet.getRetweetedStatus
              var pTweetId = -1L
              var pcreatedAt = 0L
              if(reTweetStatus != null){
                pTweetId = reTweetStatus.getId
                pcreatedAt = reTweetStatus.getCreatedAt.getTime
              }              
              tweet.getCreatedAt.getTime + "|$" + tweet.getId +
"|$"+user.getId + "|$" + user.getName+ "|$" + user.getScreenName + "|$" +
user.getDescription +
              "|$" + tweet.getText.trim + "|$" + user.getFollowersCount +
"|$" + user.getFriendsCount + "|$" + tweet.getGeoLocation + "|$" +
              user.getLocation + "|$" + user.getBiggerProfileImageURL + "|$"
+ replyStatusId + "|$" + pTweetId + "|$" + pcreatedAt
        } )
      tweets.foreachRDD(tweetsRDD => {tweetsRDD.distinct()
                                     val count = tweetsRDD.count
                                     println("*****" +"%s tweets found on
this RDD".format(count))
                                 if (count >  0){
                                    var timeMs = System.currentTimeMillis
                                    var counter =
DBQuery.getProcessedCount()
                                   var location="tweets/"+ counter +"/" 
                                    tweetsRDD.collect().map(tweet=> 
DBQuery.saveTweets(tweet)) 
                                    //tweetsRDD.saveAsTextFile(location+
timeMs)+ ".txt"
                                    DBQuery.addTweetRDD(counter) 
                                                }
                                    })
                                  
   // Checkpoint directory to recover from failures
   println("tweets for the last stream are saved which can be processed
later")
   val    = "f:/svn1/checkpoint/"
    ssc.checkpoint(checkpointDir)
    ssc.start()
    ssc.awaitTermination()


regards
Pankaj



--
View this message in context: 
http://apache-spark-user-list.1001560.n3.nabble.com/Out-of-memory-with-twitter-spark-streaming-tp24162.html
Sent from the Apache Spark User List mailing list archive at Nabble.com.

---------------------------------------------------------------------
To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
For additional commands, e-mail: user-h...@spark.apache.org

Reply via email to