I'm not sure what you are upto, but if you can explain what you are trying
to achieve then may be we can restructure your code. On a quick glance i
could see :
tweetsRDD*.collect()*.map(tweet=
DBQuery.saveTweets(tweet))
Which will bring the whole data into your driver machine and it would
possibly run out of memory, You can avoid that.
Thanks
Best Regards
On Fri, Aug 7, 2015 at 11:23 AM, Pankaj Narang pankajnaran...@gmail.com
wrote:
Hi
I am running one application using activator where I am retrieving tweets
and storing them to mysql database using below code.
I get OOM error after 5-6 hour with xmx1048M. If I increase the memory the
OOM get delayed only.
Can anybody give me clue. Here is the code
var tweetStream = TwitterUtils.createStream(ssc, None,keywords)
var tweets = tweetStream.map(tweet = {
var user = tweet.getUser
var replyStatusId = tweet.getInReplyToStatusId
var reTweetStatus = tweet.getRetweetedStatus
var pTweetId = -1L
var pcreatedAt = 0L
if(reTweetStatus != null){
pTweetId = reTweetStatus.getId
pcreatedAt = reTweetStatus.getCreatedAt.getTime
}
tweet.getCreatedAt.getTime + |$ + tweet.getId +
|$+user.getId + |$ + user.getName+ |$ + user.getScreenName + |$ +
user.getDescription +
|$ + tweet.getText.trim + |$ + user.getFollowersCount +
|$ + user.getFriendsCount + |$ + tweet.getGeoLocation + |$ +
user.getLocation + |$ + user.getBiggerProfileImageURL +
|$
+ replyStatusId + |$ + pTweetId + |$ + pcreatedAt
} )
tweets.foreachRDD(tweetsRDD = {tweetsRDD.distinct()
val count = tweetsRDD.count
println(* +%s tweets found on
this RDD.format(count))
if (count 0){
var timeMs = System.currentTimeMillis
var counter =
DBQuery.getProcessedCount()
var location=tweets/+ counter +/
tweetsRDD.collect().map(tweet=
DBQuery.saveTweets(tweet))
//tweetsRDD.saveAsTextFile(location+
timeMs)+ .txt
DBQuery.addTweetRDD(counter)
}
})
// Checkpoint directory to recover from failures
println(tweets for the last stream are saved which can be processed
later)
val= f:/svn1/checkpoint/
ssc.checkpoint(checkpointDir)
ssc.start()
ssc.awaitTermination()
regards
Pankaj
--
View this message in context:
http://apache-spark-user-list.1001560.n3.nabble.com/Out-of-memory-with-twitter-spark-streaming-tp24162.html
Sent from the Apache Spark User List mailing list archive at Nabble.com.
-
To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
For additional commands, e-mail: user-h...@spark.apache.org