Hello,

My task is updating a dataframe in a while loop until there is no more data
to update.
The spark SQL I used is like below

================================================================

val hc = sqlContext
hc.sql("use person")

var temp_pair = hc.sql("""
select ROW_NUMBER() OVER (ORDER BY PID) AS pair
, pid
, actionchanneluserid
from fdt_pid_channel_info
where dt = '2016-09-27'
and actionchanneltype = 2
""").repartition(200)
temp_pair.persist.registerTempTable("temp_pair")

var result = 1.0

while(result > 0) {
val temp1 = hc.sql("""
select B.PAIR as minpair, A.*
FROM TEMP_PAIR A
INNER JOIN (
SELECT pid, MIN(PAIR) AS PAIR
FROM TEMP_PAIR
GROUP BY pid) B
ON A.pid = B.pid
WHERE A.PAIR > B.PAIR
""")
temp1.persist.registerTempTable("temp1")

result = temp1.count

if(temp1.count > 0) {
val temp = temp_pair.except(hc.sql("select pair, pid, actionchanneluserid
from temp1")).unionAll(hc.sql("select minpair, pid, actionchanneluserid
from temp1")).coalesce(200)
temp.persist
temp.count
temp_pair.unpersist
temp_pair = temp
temp_pair.registerTempTable("temp_pair")
}

temp1.unpersist

val temp2 = hc.sql("""
select B.PAIR as minpair, A.*
FROM TEMP_PAIR A
INNER JOIN (
SELECT actionchanneluserid, MIN(PAIR) AS PAIR
FROM TEMP_PAIR
GROUP BY actionchanneluserid) B
ON A.actionchanneluserid = B.actionchanneluserid
WHERE A.PAIR > B.PAIR
""")
temp2.persist.registerTempTable("temp2")

result = result + temp2.count

if(temp2.count > 0) {
val temp = temp_pair.except(hc.sql("select pair, pid, actionchanneluserid
from temp2")).unionAll(hc.sql("select minpair, pid, actionchanneluserid
from temp2")).coalesce(200)
temp.persist
temp.count
temp_pair.unpersist
temp_pair = temp
temp_pair.registerTempTable("temp_pair")
}

temp2.unpersist
}

=============================================

This job causes the skipped stages keep increasing and finally
"java.lang.OutOfMemoryError: Java heap space"


​
Is there any way to avoid this kind of situation?
Any help will be great!
Thank you

Reply via email to