Hi Yang! I don't know exactly why this happen, but i think GC can't work to fast enough or size of data with additional objects created while computations to big for executor. And i found that this problem only if you make some data manipulations. You can cache you data first, after that, write in one partiton. For example
val dropDF = df.drop("hh").drop("mm").drop("mode").drop("y").drop("m").drop("d") dropDF.cache() or dropDF.write.mode(SaveMode.ErrorIfExists).parquet(temppath) val dropDF = spark.read.parquet(temppath) and then dropDF.repartition(1).write.mode(SaveMode.ErrorIfExists).parquet(targetpath) Best, On Sun, Jan 22, 2017 at 12:31 PM Yang Cao <cybea...@gmail.com> wrote: > Also, do you know why this happen? > > On 2017年1月20日, at 18:23, Pavel Plotnikov <pavel.plotni...@team.wrike.com> > wrote: > > Hi Yang, > i have faced with the same problem on Mesos and to circumvent this issue i > am usually increase partition number. On last step in your code you reduce > number of partitions to 1, try to set bigger value, may be it solve this > problem. > > Cheers, > Pavel > > On Fri, Jan 20, 2017 at 12:35 PM Yang Cao <cybea...@gmail.com> wrote: > > Hi all, > > I am running a spark application on YARN-client mode with 6 executors > (each 4 cores and executor memory = 6G and Overhead = 4G, spark version: > 1.6.3 / 2.1.0). I find that my executor memory keeps increasing until get > killed by node manager; and give out the info that tells me to boost > spark.yarn.excutor.memoryOverhead. > I know that this param mainly control the size of memory allocated > off-heap. But I don’t know when and how the spark engine will use this part > of memory. Also increase that part of memory not always solve my > problem. sometimes works sometimes not. It trends to be useless when the > input data is large. > > FYI, my app’s logic is quite simple. It means to combine the small files > generated in one single day (one directory one day) into a single one and > write back to hdfs. Here is the core code: > > val df = spark.read.parquet(originpath).filter(s"m = ${ts.month} AND d = > ${ts.day}").coalesce(400) > > val dropDF = > df.drop("hh").drop("mm").drop("mode").drop("y").drop("m").drop("d") > > dropDF.repartition(1).write.mode(SaveMode.ErrorIfExists).parquet(targetpath) > > The source file may have hundreds to thousands level’s partition. And the > total parquet file is around 1to 5 gigs. Also I find that in the step that > shuffle reading data from different machines, The size of shuffle read is > about 4 times larger than the input size, Which is wired or some principle > I don’t know. > > Anyway, I have done some search myself for this problem. Some article said > that it’s on the direct buffer memory (I don’t set myself). Some article > said that people solve it with more frequent full GC. Also I find one > people on SO with very similar situation: > http://stackoverflow.com/questions/31646679/ever-increasing-physical-memory-for-a-spark-application-in-yarn > This guy claimed that it’s a bug with parquet but comment questioned him. > People in this mail list may also receive an email hours ago from > blondowski who described this problem while writing json: > http://apache-spark-user-list.1001560.n3.nabble.com/Executors-running-out-of-memory-tt28325.html#none > > So it looks like to be common question for different output format. I hope > someone with experience about this problem could make an explanation about > this issue. Why this happen and what is a reliable way to solve this > problem. > > Best, > > >