Hi everyone,
The team I am working with is currently plagued by storing its data in hundreds 
of thousands of tiny parquet files. I am trying 1) to reduce the number of file 
2) reduce the number of partitions. I wrote a very simple (Py)spark (Spark 
2.1.1 packaged within HDP 2.6.2.0) application which aims at producing one file 
per partition :
spark.read\
        .option('basePath', old_dir_path)\
        .parquet(*paths)\
        .repartition(*partition_cols)\
        .write\
        .partitionBy(*partition_cols)\
        .option('compression', 'snappy')\
        .mode("append")\
        .parquet(new_dir_path)
My issue is that my executors are keep getting killed by YARN because of memory 
overhead when writing the new parquet files. The thing is that my executors 
(app settings described later below) are rather big compared to the amount of 
data to be written : 23GiB heap vs 300MB of gzipped parquet (1.6GB if cached in 
memory).
Spark UI description of failure : Job aborted due to stage failure: Task 169 in 
stage 2.0 failed 1 times, most recent failure: Lost task 169.0 in stage 2.0 
(TID 98, xxxxxxxxx.xxxxxx.airfrance.fr, executor 1): ExecutorLostFailure 
(executor 1 exited caused by one of the running tasks) Reason: Container killed 
by YARN for exceeding memory limits. 24.1 GB of 24 GB physical memory used. 
Consider boosting spark.yarn.executor.memoryOverhead.
Every time, I see the process trying to dump HUGE amount of sort data to disk :
18/11/14 14:09:21 INFO UnsafeExternalSorter: Thread 68 spilling sort data of 
15.4 GB to disk (0  time so far)
I have already tried / made sure of many things :

-          Data distribution is not skewed

-          Partitions are rather even in size and not huge

-          Increased memory available to each worker by increasing 
spark.executor.memory / increasing spark.memory.fraction / decreasing 
spark.executor.cores / increasing spark.executor.instances but still, even when 
it works, it seems that each worker needs a huge amount of memory compared to 
the amount of data to process.
The job always fails the same way : the container gets killed by YARN because 
of a memory overhead. I have seen a handful of SO mainly unanswered posts about 
this: same logs, writing parquet and amazed that so little data OOMs "big" 
executors. I found a few discussions mentioning that writing partitioned 
parquet uses a lot of memory. SPARK-12546 seems related but the issue should be 
solved in my version (brings spark.sql.sources.maxConcurrentWrites default 
value down to 1 from 5). Spark 1.6.0 release notes mention this problem as a 
KNOWN ISSUE and advise to bring down (why?) spark.memory.fraction to 0.4 for 
instance and set spark.hadoop.parquet.memory.pool.ratio to 0.3 (why?).
The latter solution seems to help but still, memory consumption seems high. For 
example : with 2 executors with 2 cores, 23GiB spark.executor.memory and 1GiB 
spark.yarn.executor.memoryOverhead each, spark.memory.fraction = 0.4 and 
spark.hadoop.parquet.memory.pool.ratio = 0.3, repartitioning 300MB of gzipped 
parquet (1.6Gb if cached into memory) implies a high execution memory peak (7.8 
Gb read in Spark UI), dropping huge quantities of data to disk (see logs below 
+ Spark UI : Shuffle read size : 3.5 Gb). What is going on here ?
18/11/14 14:29:34 INFO UnsafeExternalSorter: Thread 61 spilling sort data of 
7.8 GB to disk (0  time so far)
18/11/14 14:31:12 INFO UnsafeExternalSorter: Thread 61 spilling sort data of 
7.8 GB to disk (1  time so far)
18/11/14 14:32:12 INFO UnsafeExternalSorter: Thread 61 spilling sort data of 
7.8 GB to disk (2  times so far)
18/11/14 14:33:32 INFO UnsafeExternalSorter: Thread 61 spilling sort data of 
7.8 GB to disk (3  times so far)
18/11/14 14:34:29 INFO UnsafeExternalSorter: Thread 61 spilling sort data of 
7.8 GB to disk (4  times so far)
18/11/14 14:35:24 INFO UnsafeExternalSorter: Thread 61 spilling sort data of 
7.8 GB to disk (5  times so far)
18/11/14 14:37:19 INFO UnsafeExternalSorter: Thread 61 spilling sort data of 
7.9 GB to disk (0  time so far)
18/11/14 14:38:00 INFO UnsafeExternalSorter: Thread 61 spilling sort data of 
8.0 GB to disk (1  time so far)
18/11/14 14:38:42 INFO UnsafeExternalSorter: Thread 61 spilling sort data of 
8.0 GB to disk (2  times so far)
18/11/14 14:39:24 INFO UnsafeExternalSorter: Thread 61 spilling sort data of 
8.0 GB to disk (3  times so far)
18/11/14 14:40:05 INFO UnsafeExternalSorter: Thread 61 spilling sort data of 
8.0 GB to disk (4  times so far)
18/11/14 14:40:47 INFO UnsafeExternalSorter: Thread 61 spilling sort data of 
8.0 GB to disk (5  times so far)
18/11/14 14:41:08 INFO FileFormatWriter: Sorting complete. Writing out 
partition files one at a time.
18/11/14 14:41:09 INFO CodecConfig: Compression: SNAPPY
18/11/14 14:41:09 INFO CodecConfig: Compression: SNAPPY
18/11/14 14:41:09 INFO ParquetOutputFormat: Parquet block size to 134217728
18/11/14 14:41:09 INFO ParquetOutputFormat: Parquet page size to 1048576
18/11/14 14:41:09 INFO ParquetOutputFormat: Parquet dictionary page size to 
1048576
18/11/14 14:41:09 INFO ParquetOutputFormat: Dictionary is on
18/11/14 14:41:09 INFO ParquetOutputFormat: Validation is off
18/11/14 14:41:09 INFO ParquetOutputFormat: Writer version is: PARQUET_1_0
18/11/14 14:41:09 INFO ParquetOutputFormat: Maximum row group padding size is 0 
bytes
18/11/14 14:41:09 INFO ParquetWriteSupport: Initialized Parquet WriteSupport 
with Catalyst schema:
Still, I do not really understand what is going on and how the above-mentionned 
parameters help in this situation. I really need to get a better understanding 
to know how to design my application : how much Spark execution memory per core 
(=> which value for spark.memory.fraction ? spark.executor.memory ? 
spark.executor.cores ?) and/or what is roughly the maximum amount of data a 
given app can process ?
Is there a particular issue when writing partitioned (?) parquet that should be 
better documented ?
Many, many thanks in advance for any help, this issue has been and still is a 
real world of pain.
And sorry for the long post !
Best regards,
Pierre


[https://poolsite.airfrance.fr/repository/logo_corporate/af_logo_ita.png]<http://www.airfrance.com>
--

Acc?dez aux meilleurs tarifs Air France, g?rez vos r?servations et 
enregistrez-vous en ligne sur http://www.airfrance.com
Find best Air France fares, manage your reservations and check in online at 
http://www.airfrance.com

________________________________
Les donn?es et renseignements contenus dans ce message peuvent ?tre de nature 
confidentielle et soumis au secret professionnel et sont destin?s ? l'usage 
exclusif du destinataire dont les coordonn?es figurent ci-dessus. Si vous 
recevez cette communication par erreur, nous vous demandons de ne pas la 
copier, l'utiliser ou la divulguer. Nous vous prions de notifier cette erreur ? 
l'exp?diteur et d'effacer imm?diatement cette communication de votre syst?me. 
Soci?t? Air France - Soci?t? anonyme au capital de 126 748 775 euros - RCS 
Bobigny (France) 420 495 178 - 45, rue de Paris, Tremblay-en-France, 95747 
Roissy Charles de Gaulle CEDEX
The data and information contained in this message may be confidential and 
subject to professional secrecy and are intended for the exclusive use of the 
recipient at the address shown above. If you receive this message by mistake, 
we ask you not to copy, use or disclose it. Please notify this error to the 
sender immediately and delete this message from your system. Soci?t? Air France 
- Limited company with capital of 126,748,775 euros - Bobigny register of 
companies (France) 420 495 178 - 45, rue de Paris, Tremblay-en-France, 95747 
Roissy Charles de Gaulle CEDEX
________________________________
Pensez ? l'environnement avant d'imprimer ce message.
Think of the environment before printing this mail.

Reply via email to