Hello, My task is updating a dataframe in a while loop until there is no more data to update. The spark SQL I used is like below
================================================================ val hc = sqlContext hc.sql("use person") var temp_pair = hc.sql(""" select ROW_NUMBER() OVER (ORDER BY PID) AS pair , pid , actionchanneluserid from fdt_pid_channel_info where dt = '2016-09-27' and actionchanneltype = 2 """).repartition(200) temp_pair.persist.registerTempTable("temp_pair") var result = 1.0 while(result > 0) { val temp1 = hc.sql(""" select B.PAIR as minpair, A.* FROM TEMP_PAIR A INNER JOIN ( SELECT pid, MIN(PAIR) AS PAIR FROM TEMP_PAIR GROUP BY pid) B ON A.pid = B.pid WHERE A.PAIR > B.PAIR """) temp1.persist.registerTempTable("temp1") result = temp1.count if(temp1.count > 0) { val temp = temp_pair.except(hc.sql("select pair, pid, actionchanneluserid from temp1")).unionAll(hc.sql("select minpair, pid, actionchanneluserid from temp1")).coalesce(200) temp.persist temp.count temp_pair.unpersist temp_pair = temp temp_pair.registerTempTable("temp_pair") } temp1.unpersist val temp2 = hc.sql(""" select B.PAIR as minpair, A.* FROM TEMP_PAIR A INNER JOIN ( SELECT actionchanneluserid, MIN(PAIR) AS PAIR FROM TEMP_PAIR GROUP BY actionchanneluserid) B ON A.actionchanneluserid = B.actionchanneluserid WHERE A.PAIR > B.PAIR """) temp2.persist.registerTempTable("temp2") result = result + temp2.count if(temp2.count > 0) { val temp = temp_pair.except(hc.sql("select pair, pid, actionchanneluserid from temp2")).unionAll(hc.sql("select minpair, pid, actionchanneluserid from temp2")).coalesce(200) temp.persist temp.count temp_pair.unpersist temp_pair = temp temp_pair.registerTempTable("temp_pair") } temp2.unpersist } ============================================= This job causes the skipped stages keep increasing and finally "java.lang.OutOfMemoryError: Java heap space" Is there any way to avoid this kind of situation? Any help will be great! Thank you