Re: Out of memory HDFS Multiple Cluster Write

2019-12-21 Thread Chris Teoh
I'm not entirely sure what the behaviour is when writing to remote cluster.
It could be that the connections are being established for every element in
your dataframe, perhaps having to use for each partition may reduce the
number of connections? You may have to look at what the executors do when
they reach out to the remote cluster.

On Sun, 22 Dec 2019, 8:07 am Ruijing Li,  wrote:

> I managed to make the failing stage work by increasing memoryOverhead to
> something ridiculous > 50%. Our spark.executor.memory  = 12gb and I bumped
> spark.mesos.executor.memoryOverhead=8G
>
> *Can someone explain why this solved the issue?* As I understand, usage
> of memoryOverhead is for VM overhead and non heap items, which a simple
> read and write should not use (albeit to different hadoop clusters, but
> network should be nonissue since they are from the same machines).
>
> We use spark defaults for everything else.
>
> We are calling df.repartition(20) in our write after logic is done (before
> failing stage of multiple cluster write) to prevent spark’s small files
> problem. We reduce from 4000 partitions to 20.
>
> On Sat, Dec 21, 2019 at 11:28 AM Ruijing Li  wrote:
>
>> Not for the stage that fails, all it does is read and write - the number
>> of tasks is # of cores * # of executor instances. For us that is 60 (3
>> cores 20 executors)
>>
>> The input partition size for the failing stage, when spark reads the 20
>> files each 132M, it comes out to be 40 partitions.
>>
>>
>>
>> On Fri, Dec 20, 2019 at 4:40 PM Chris Teoh  wrote:
>>
>>> If you're using Spark SQL, that configuration setting causes a shuffle
>>> if the number of your input partitions to the write is larger than that
>>> configuration.
>>>
>>> Is there anything in the executor logs or the Spark UI DAG that
>>> indicates a shuffle? I don't expect a shuffle if it is a straight write.
>>> What's the input partition size?
>>>
>>> On Sat, 21 Dec 2019, 10:24 am Ruijing Li,  wrote:
>>>
 Could you explain why shuffle partitions might be a good starting point?

 Some more details: when I write the output the first time after logic
 is complete, I repartition the files to 20 after having
 spark.sql.shuffle.partitions = 2000 so we don’t have too many small files.
 Data is small about 130MB per file. When spark reads it reads in 40
 partitions and tries to output that to the different cluster. Unfortunately
 during that read and write stage executors drop off.

 We keep hdfs block 128Mb

 On Fri, Dec 20, 2019 at 3:01 PM Chris Teoh 
 wrote:

> spark.sql.shuffle.partitions might be a start.
>
> Is there a difference in the number of partitions when the parquet is
> read to spark.sql.shuffle.partitions? Is it much higher than
> spark.sql.shuffle.partitions?
>
> On Fri, 20 Dec 2019, 7:34 pm Ruijing Li, 
> wrote:
>
>> Hi all,
>>
>> I have encountered a strange executor OOM error. I have a data
>> pipeline using Spark 2.3 Scala 2.11.12. This pipeline writes the output 
>> to
>> one HDFS location as parquet then reads the files back in and writes to
>> multiple hadoop clusters (all co-located in the same datacenter).  It
>> should be a very simple task, but executors are being killed off 
>> exceeding
>> container thresholds. From logs, it is exceeding given memory (using 
>> Mesos
>> as the cluster manager).
>>
>> The ETL process works perfectly fine with the given resources, doing
>> joins and adding columns. The output is written successfully the first
>> time. *Only when the pipeline at the end reads the output from HDFS
>> and writes it to different HDFS cluster paths does it fail.* (It
>> does a spark.read.parquet(source).write.parquet(dest))
>>
>> This doesn't really make sense and I'm wondering what configurations
>> I should start looking at.
>>
>> --
>> Cheers,
>> Ruijing Li
>> --
>> Cheers,
>> Ruijing Li
>>
> --
 Cheers,
 Ruijing Li

>>> --
>> Cheers,
>> Ruijing Li
>>
> --
> Cheers,
> Ruijing Li
> --
> Cheers,
> Ruijing Li
>


Re: Out of memory HDFS Multiple Cluster Write

2019-12-21 Thread Ruijing Li
Not for the stage that fails, all it does is read and write - the number of
tasks is # of cores * # of executor instances. For us that is 60 (3 cores
20 executors)

The input partition size for the failing stage, when spark reads the 20
files each 132M, it comes out to be 40 partitions.



On Fri, Dec 20, 2019 at 4:40 PM Chris Teoh  wrote:

> If you're using Spark SQL, that configuration setting causes a shuffle if
> the number of your input partitions to the write is larger than that
> configuration.
>
> Is there anything in the executor logs or the Spark UI DAG that indicates
> a shuffle? I don't expect a shuffle if it is a straight write. What's the
> input partition size?
>
> On Sat, 21 Dec 2019, 10:24 am Ruijing Li,  wrote:
>
>> Could you explain why shuffle partitions might be a good starting point?
>>
>> Some more details: when I write the output the first time after logic is
>> complete, I repartition the files to 20 after having
>> spark.sql.shuffle.partitions = 2000 so we don’t have too many small files.
>> Data is small about 130MB per file. When spark reads it reads in 40
>> partitions and tries to output that to the different cluster. Unfortunately
>> during that read and write stage executors drop off.
>>
>> We keep hdfs block 128Mb
>>
>> On Fri, Dec 20, 2019 at 3:01 PM Chris Teoh  wrote:
>>
>>> spark.sql.shuffle.partitions might be a start.
>>>
>>> Is there a difference in the number of partitions when the parquet is
>>> read to spark.sql.shuffle.partitions? Is it much higher than
>>> spark.sql.shuffle.partitions?
>>>
>>> On Fri, 20 Dec 2019, 7:34 pm Ruijing Li,  wrote:
>>>
 Hi all,

 I have encountered a strange executor OOM error. I have a data pipeline
 using Spark 2.3 Scala 2.11.12. This pipeline writes the output to one HDFS
 location as parquet then reads the files back in and writes to multiple
 hadoop clusters (all co-located in the same datacenter).  It should be a
 very simple task, but executors are being killed off exceeding container
 thresholds. From logs, it is exceeding given memory (using Mesos as the
 cluster manager).

 The ETL process works perfectly fine with the given resources, doing
 joins and adding columns. The output is written successfully the first
 time. *Only when the pipeline at the end reads the output from HDFS
 and writes it to different HDFS cluster paths does it fail.* (It does
 a spark.read.parquet(source).write.parquet(dest))

 This doesn't really make sense and I'm wondering what configurations I
 should start looking at.

 --
 Cheers,
 Ruijing Li
 --
 Cheers,
 Ruijing Li

>>> --
>> Cheers,
>> Ruijing Li
>>
> --
Cheers,
Ruijing Li


Out of memory HDFS Multiple Cluster Write

2019-12-21 Thread Ruijing Li
I managed to make the failing stage work by increasing memoryOverhead to
something ridiculous > 50%. Our spark.executor.memory  = 12gb and I bumped
spark.mesos.executor.memoryOverhead=8G

*Can someone explain why this solved the issue?* As I understand, usage of
memoryOverhead is for VM overhead and non heap items, which a simple read
and write should not use (albeit to different hadoop clusters, but network
should be nonissue since they are from the same machines).

We use spark defaults for everything else.

We are calling df.repartition(20) in our write after logic is done (before
failing stage of multiple cluster write) to prevent spark’s small files
problem. We reduce from 4000 partitions to 20.

On Sat, Dec 21, 2019 at 11:28 AM Ruijing Li  wrote:

> Not for the stage that fails, all it does is read and write - the number
> of tasks is # of cores * # of executor instances. For us that is 60 (3
> cores 20 executors)
>
> The input partition size for the failing stage, when spark reads the 20
> files each 132M, it comes out to be 40 partitions.
>
>
>
> On Fri, Dec 20, 2019 at 4:40 PM Chris Teoh  wrote:
>
>> If you're using Spark SQL, that configuration setting causes a shuffle if
>> the number of your input partitions to the write is larger than that
>> configuration.
>>
>> Is there anything in the executor logs or the Spark UI DAG that indicates
>> a shuffle? I don't expect a shuffle if it is a straight write. What's the
>> input partition size?
>>
>> On Sat, 21 Dec 2019, 10:24 am Ruijing Li,  wrote:
>>
>>> Could you explain why shuffle partitions might be a good starting point?
>>>
>>> Some more details: when I write the output the first time after logic is
>>> complete, I repartition the files to 20 after having
>>> spark.sql.shuffle.partitions = 2000 so we don’t have too many small files.
>>> Data is small about 130MB per file. When spark reads it reads in 40
>>> partitions and tries to output that to the different cluster. Unfortunately
>>> during that read and write stage executors drop off.
>>>
>>> We keep hdfs block 128Mb
>>>
>>> On Fri, Dec 20, 2019 at 3:01 PM Chris Teoh  wrote:
>>>
 spark.sql.shuffle.partitions might be a start.

 Is there a difference in the number of partitions when the parquet is
 read to spark.sql.shuffle.partitions? Is it much higher than
 spark.sql.shuffle.partitions?

 On Fri, 20 Dec 2019, 7:34 pm Ruijing Li,  wrote:

> Hi all,
>
> I have encountered a strange executor OOM error. I have a data
> pipeline using Spark 2.3 Scala 2.11.12. This pipeline writes the output to
> one HDFS location as parquet then reads the files back in and writes to
> multiple hadoop clusters (all co-located in the same datacenter).  It
> should be a very simple task, but executors are being killed off exceeding
> container thresholds. From logs, it is exceeding given memory (using Mesos
> as the cluster manager).
>
> The ETL process works perfectly fine with the given resources, doing
> joins and adding columns. The output is written successfully the first
> time. *Only when the pipeline at the end reads the output from HDFS
> and writes it to different HDFS cluster paths does it fail.* (It does
> a spark.read.parquet(source).write.parquet(dest))
>
> This doesn't really make sense and I'm wondering what configurations I
> should start looking at.
>
> --
> Cheers,
> Ruijing Li
> --
> Cheers,
> Ruijing Li
>
 --
>>> Cheers,
>>> Ruijing Li
>>>
>> --
> Cheers,
> Ruijing Li
>
-- 
Cheers,
Ruijing Li
-- 
Cheers,
Ruijing Li


unsubscribe

2019-12-21 Thread Aslan Bakirov
unsubscribe