[SPARK-SQL] Writing partitioned parquet requires huge amounts of memory

2018-11-14 Thread Lienhart, Pierre (DI IZ) - AF (ext)
Hi everyone,
The team I am working with is currently plagued by storing its data in hundreds 
of thousands of tiny parquet files. I am trying 1) to reduce the number of file 
2) reduce the number of partitions. I wrote a very simple (Py)spark (Spark 
2.1.1 packaged within HDP 2.6.2.0) application which aims at producing one file 
per partition :
spark.read\
.option('basePath', old_dir_path)\
.parquet(*paths)\
.repartition(*partition_cols)\
.write\
.partitionBy(*partition_cols)\
.option('compression', 'snappy')\
.mode("append")\
.parquet(new_dir_path)
My issue is that my executors are keep getting killed by YARN because of memory 
overhead when writing the new parquet files. The thing is that my executors 
(app settings described later below) are rather big compared to the amount of 
data to be written : 23GiB heap vs 300MB of gzipped parquet (1.6GB if cached in 
memory).
Spark UI description of failure : Job aborted due to stage failure: Task 169 in 
stage 2.0 failed 1 times, most recent failure: Lost task 169.0 in stage 2.0 
(TID 98, x.xx.airfrance.fr, executor 1): ExecutorLostFailure 
(executor 1 exited caused by one of the running tasks) Reason: Container killed 
by YARN for exceeding memory limits. 24.1 GB of 24 GB physical memory used. 
Consider boosting spark.yarn.executor.memoryOverhead.
Every time, I see the process trying to dump HUGE amount of sort data to disk :
18/11/14 14:09:21 INFO UnsafeExternalSorter: Thread 68 spilling sort data of 
15.4 GB to disk (0  time so far)
I have already tried / made sure of many things :

-  Data distribution is not skewed

-  Partitions are rather even in size and not huge

-  Increased memory available to each worker by increasing 
spark.executor.memory / increasing spark.memory.fraction / decreasing 
spark.executor.cores / increasing spark.executor.instances but still, even when 
it works, it seems that each worker needs a huge amount of memory compared to 
the amount of data to process.
The job always fails the same way : the container gets killed by YARN because 
of a memory overhead. I have seen a handful of SO mainly unanswered posts about 
this: same logs, writing parquet and amazed that so little data OOMs "big" 
executors. I found a few discussions mentioning that writing partitioned 
parquet uses a lot of memory. SPARK-12546 seems related but the issue should be 
solved in my version (brings spark.sql.sources.maxConcurrentWrites default 
value down to 1 from 5). Spark 1.6.0 release notes mention this problem as a 
KNOWN ISSUE and advise to bring down (why?) spark.memory.fraction to 0.4 for 
instance and set spark.hadoop.parquet.memory.pool.ratio to 0.3 (why?).
The latter solution seems to help but still, memory consumption seems high. For 
example : with 2 executors with 2 cores, 23GiB spark.executor.memory and 1GiB 
spark.yarn.executor.memoryOverhead each, spark.memory.fraction = 0.4 and 
spark.hadoop.parquet.memory.pool.ratio = 0.3, repartitioning 300MB of gzipped 
parquet (1.6Gb if cached into memory) implies a high execution memory peak (7.8 
Gb read in Spark UI), dropping huge quantities of data to disk (see logs below 
+ Spark UI : Shuffle read size : 3.5 Gb). What is going on here ?
18/11/14 14:29:34 INFO UnsafeExternalSorter: Thread 61 spilling sort data of 
7.8 GB to disk (0  time so far)
18/11/14 14:31:12 INFO UnsafeExternalSorter: Thread 61 spilling sort data of 
7.8 GB to disk (1  time so far)
18/11/14 14:32:12 INFO UnsafeExternalSorter: Thread 61 spilling sort data of 
7.8 GB to disk (2  times so far)
18/11/14 14:33:32 INFO UnsafeExternalSorter: Thread 61 spilling sort data of 
7.8 GB to disk (3  times so far)
18/11/14 14:34:29 INFO UnsafeExternalSorter: Thread 61 spilling sort data of 
7.8 GB to disk (4  times so far)
18/11/14 14:35:24 INFO UnsafeExternalSorter: Thread 61 spilling sort data of 
7.8 GB to disk (5  times so far)
18/11/14 14:37:19 INFO UnsafeExternalSorter: Thread 61 spilling sort data of 
7.9 GB to disk (0  time so far)
18/11/14 14:38:00 INFO UnsafeExternalSorter: Thread 61 spilling sort data of 
8.0 GB to disk (1  time so far)
18/11/14 14:38:42 INFO UnsafeExternalSorter: Thread 61 spilling sort data of 
8.0 GB to disk (2  times so far)
18/11/14 14:39:24 INFO UnsafeExternalSorter: Thread 61 spilling sort data of 
8.0 GB to disk (3  times so far)
18/11/14 14:40:05 INFO UnsafeExternalSorter: Thread 61 spilling sort data of 
8.0 GB to disk (4  times so far)
18/11/14 14:40:47 INFO UnsafeExternalSorter: Thread 61 spilling sort data of 
8.0 GB to disk (5  times so far)
18/11/14 14:41:08 INFO FileFormatWriter: Sorting complete. Writing out 
partition files one at a time.
18/11/14 14:41:09 INFO CodecConfig: Compression: SNAPPY
18/11/14 14:41:09 INFO CodecConfig: Compression: SNAPPY
18/11/14 14:41:09 INFO ParquetOutputFormat: Parquet block size to 134217728
18/11/14 14:41:09 INFO ParquetOutputFormat: Parquet page size to 1048576

RE: [Spark SQL] Does Spark group small files

2018-11-14 Thread Lienhart, Pierre (DI IZ) - AF (ext)
Hello Yann,

From my understanding, when reading small files Spark will group them and load 
the content of each batch into the same partition so you won’t end up with 1 
partition per file resulting in a huge number of very small partitions. This 
behavior is controlled by the spark.files.maxPartitionBytes parameter set to 
128 MiB by default. For example if you have only 8 MiB files on your file 
system, you will end up with partitions holding the content of 16 files. If 
your files are heavily compressed, it can result in pretty fat partitions of 
size spark.files.maxPartitionBytes/compression ratio.

I can’t give you a link to a specific source code snippet but this is my 
experience from working with a lot of small parquet files.

Regards,

Pierre

De : Yann Moisan [mailto:yam...@gmail.com]
Envoyé : mardi 13 novembre 2018 21:28
À : user@spark.apache.org
Objet : [Spark SQL] Does Spark group small files

Hello,

I'm using Spark 2.3.1.

I have a job that reads 5.000 small parquet files into s3.

When I do a mapPartitions followed by a collect, only 278 tasks are used (I 
would have expected 5000). Does Spark group small files ? If yes, what is the 
threshold for grouping ? Is it configurable ? Any link to corresponding source 
code ?

Rgds,

Yann.

[https://poolsite.airfrance.fr/repository/logo_corporate/af_logo_ita.png]
--

Accédez aux meilleurs tarifs Air France, gérez vos réservations et 
enregistrez-vous en ligne sur http://www.airfrance.com
Find best Air France fares, manage your reservations and check in online at 
http://www.airfrance.com


Les données et renseignements contenus dans ce message peuvent être de nature 
confidentielle et soumis au secret professionnel et sont destinés à l'usage 
exclusif du destinataire dont les coordonnées figurent ci-dessus. Si vous 
recevez cette communication par erreur, nous vous demandons de ne pas la 
copier, l'utiliser ou la divulguer. Nous vous prions de notifier cette erreur à 
l'expéditeur et d'effacer immédiatement cette communication de votre système. 
Société Air France - Société anonyme au capital de 126 748 775 euros - RCS 
Bobigny (France) 420 495 178 - 45, rue de Paris, Tremblay-en-France, 95747 
Roissy Charles de Gaulle CEDEX
The data and information contained in this message may be confidential and 
subject to professional secrecy and are intended for the exclusive use of the 
recipient at the address shown above. If you receive this message by mistake, 
we ask you not to copy, use or disclose it. Please notify this error to the 
sender immediately and delete this message from your system. Société Air France 
- Limited company with capital of 126,748,775 euros - Bobigny register of 
companies (France) 420 495 178 - 45, rue de Paris, Tremblay-en-France, 95747 
Roissy Charles de Gaulle CEDEX

Pensez à l'environnement avant d'imprimer ce message.
Think of the environment before printing this mail.