Hi Yang!

I don't know exactly why this happen, but i think GC can't work to fast
enough or size of data with additional objects created while computations
to big for executor.
And i found that this problem only if you make some data manipulations. You
can cache you data first, after that, write in one partiton.
For example

val dropDF = df.drop("hh").drop("mm").drop("mode").drop("y").drop("m").drop("d")

dropDF.cache()

or

dropDF.write.mode(SaveMode.ErrorIfExists).parquet(temppath)

val dropDF = spark.read.parquet(temppath)

and then

dropDF.repartition(1).write.mode(SaveMode.ErrorIfExists).parquet(targetpath)

Best,

On Sun, Jan 22, 2017 at 12:31 PM Yang Cao <cybea...@gmail.com> wrote:

> Also, do you know why this happen?
>
> On 2017年1月20日, at 18:23, Pavel Plotnikov <pavel.plotni...@team.wrike.com>
> wrote:
>
> Hi Yang,
> i have faced with the same problem on Mesos and to circumvent this issue i
> am usually increase partition number. On last step in your code you reduce
> number of partitions to 1, try to set bigger value, may be it solve this
> problem.
>
> Cheers,
> Pavel
>
> On Fri, Jan 20, 2017 at 12:35 PM Yang Cao <cybea...@gmail.com> wrote:
>
> Hi all,
>
> I am running a spark application on YARN-client mode with 6 executors
> (each 4 cores and executor memory = 6G and Overhead = 4G, spark version:
> 1.6.3 / 2.1.0). I find that my executor memory keeps increasing until get
> killed by node manager; and give out the info that tells me to boost 
> spark.yarn.excutor.memoryOverhead.
> I know that this param mainly control the size of memory allocated
> off-heap. But I don’t know when and how the spark engine will use this part
> of memory. Also increase that part of memory not always solve my
> problem. sometimes works sometimes not. It trends to be useless when the
> input data is large.
>
> FYI, my app’s logic is quite simple. It means to combine the small files
> generated in one single day (one directory one day) into a single one and
> write back to hdfs. Here is the core code:
>
> val df = spark.read.parquet(originpath).filter(s"m = ${ts.month} AND d = 
> ${ts.day}").coalesce(400)
>
> val dropDF = 
> df.drop("hh").drop("mm").drop("mode").drop("y").drop("m").drop("d")
>
> dropDF.repartition(1).write.mode(SaveMode.ErrorIfExists).parquet(targetpath)
>
> The source file may have hundreds to thousands level’s partition. And the
> total parquet file is around 1to 5 gigs. Also I find that in the step that
> shuffle reading data from different machines, The size of shuffle read is
> about 4 times larger than the input size, Which is wired or some principle
> I don’t know.
>
> Anyway, I have done some search myself for this problem. Some article said
> that it’s on the direct buffer memory (I don’t set myself). Some article
> said that people solve it with more frequent full GC. Also I find one
> people on SO with very similar situation:
> http://stackoverflow.com/questions/31646679/ever-increasing-physical-memory-for-a-spark-application-in-yarn
> This guy claimed that it’s a bug with parquet but comment questioned him.
> People in this mail list may also receive an email hours ago from
> blondowski who described this problem while writing json:
> http://apache-spark-user-list.1001560.n3.nabble.com/Executors-running-out-of-memory-tt28325.html#none
>
> So it looks like to be common question for different output format. I hope
> someone with experience about this problem could make an explanation about
> this issue. Why this happen and what is a reliable way to solve this
> problem.
>
> Best,
>
>
>

Reply via email to