Hi everyone, The team I am working with is currently plagued by storing its data in hundreds of thousands of tiny parquet files. I am trying 1) to reduce the number of file 2) reduce the number of partitions. I wrote a very simple (Py)spark (Spark 2.1.1 packaged within HDP 2.6.2.0) application which aims at producing one file per partition : spark.read\ .option('basePath', old_dir_path)\ .parquet(*paths)\ .repartition(*partition_cols)\ .write\ .partitionBy(*partition_cols)\ .option('compression', 'snappy')\ .mode("append")\ .parquet(new_dir_path) My issue is that my executors are keep getting killed by YARN because of memory overhead when writing the new parquet files. The thing is that my executors (app settings described later below) are rather big compared to the amount of data to be written : 23GiB heap vs 300MB of gzipped parquet (1.6GB if cached in memory). Spark UI description of failure : Job aborted due to stage failure: Task 169 in stage 2.0 failed 1 times, most recent failure: Lost task 169.0 in stage 2.0 (TID 98, xxxxxxxxx.xxxxxx.airfrance.fr, executor 1): ExecutorLostFailure (executor 1 exited caused by one of the running tasks) Reason: Container killed by YARN for exceeding memory limits. 24.1 GB of 24 GB physical memory used. Consider boosting spark.yarn.executor.memoryOverhead. Every time, I see the process trying to dump HUGE amount of sort data to disk : 18/11/14 14:09:21 INFO UnsafeExternalSorter: Thread 68 spilling sort data of 15.4 GB to disk (0 time so far) I have already tried / made sure of many things :
- Data distribution is not skewed - Partitions are rather even in size and not huge - Increased memory available to each worker by increasing spark.executor.memory / increasing spark.memory.fraction / decreasing spark.executor.cores / increasing spark.executor.instances but still, even when it works, it seems that each worker needs a huge amount of memory compared to the amount of data to process. The job always fails the same way : the container gets killed by YARN because of a memory overhead. I have seen a handful of SO mainly unanswered posts about this: same logs, writing parquet and amazed that so little data OOMs "big" executors. I found a few discussions mentioning that writing partitioned parquet uses a lot of memory. SPARK-12546 seems related but the issue should be solved in my version (brings spark.sql.sources.maxConcurrentWrites default value down to 1 from 5). Spark 1.6.0 release notes mention this problem as a KNOWN ISSUE and advise to bring down (why?) spark.memory.fraction to 0.4 for instance and set spark.hadoop.parquet.memory.pool.ratio to 0.3 (why?). The latter solution seems to help but still, memory consumption seems high. For example : with 2 executors with 2 cores, 23GiB spark.executor.memory and 1GiB spark.yarn.executor.memoryOverhead each, spark.memory.fraction = 0.4 and spark.hadoop.parquet.memory.pool.ratio = 0.3, repartitioning 300MB of gzipped parquet (1.6Gb if cached into memory) implies a high execution memory peak (7.8 Gb read in Spark UI), dropping huge quantities of data to disk (see logs below + Spark UI : Shuffle read size : 3.5 Gb). What is going on here ? 18/11/14 14:29:34 INFO UnsafeExternalSorter: Thread 61 spilling sort data of 7.8 GB to disk (0 time so far) 18/11/14 14:31:12 INFO UnsafeExternalSorter: Thread 61 spilling sort data of 7.8 GB to disk (1 time so far) 18/11/14 14:32:12 INFO UnsafeExternalSorter: Thread 61 spilling sort data of 7.8 GB to disk (2 times so far) 18/11/14 14:33:32 INFO UnsafeExternalSorter: Thread 61 spilling sort data of 7.8 GB to disk (3 times so far) 18/11/14 14:34:29 INFO UnsafeExternalSorter: Thread 61 spilling sort data of 7.8 GB to disk (4 times so far) 18/11/14 14:35:24 INFO UnsafeExternalSorter: Thread 61 spilling sort data of 7.8 GB to disk (5 times so far) 18/11/14 14:37:19 INFO UnsafeExternalSorter: Thread 61 spilling sort data of 7.9 GB to disk (0 time so far) 18/11/14 14:38:00 INFO UnsafeExternalSorter: Thread 61 spilling sort data of 8.0 GB to disk (1 time so far) 18/11/14 14:38:42 INFO UnsafeExternalSorter: Thread 61 spilling sort data of 8.0 GB to disk (2 times so far) 18/11/14 14:39:24 INFO UnsafeExternalSorter: Thread 61 spilling sort data of 8.0 GB to disk (3 times so far) 18/11/14 14:40:05 INFO UnsafeExternalSorter: Thread 61 spilling sort data of 8.0 GB to disk (4 times so far) 18/11/14 14:40:47 INFO UnsafeExternalSorter: Thread 61 spilling sort data of 8.0 GB to disk (5 times so far) 18/11/14 14:41:08 INFO FileFormatWriter: Sorting complete. Writing out partition files one at a time. 18/11/14 14:41:09 INFO CodecConfig: Compression: SNAPPY 18/11/14 14:41:09 INFO CodecConfig: Compression: SNAPPY 18/11/14 14:41:09 INFO ParquetOutputFormat: Parquet block size to 134217728 18/11/14 14:41:09 INFO ParquetOutputFormat: Parquet page size to 1048576 18/11/14 14:41:09 INFO ParquetOutputFormat: Parquet dictionary page size to 1048576 18/11/14 14:41:09 INFO ParquetOutputFormat: Dictionary is on 18/11/14 14:41:09 INFO ParquetOutputFormat: Validation is off 18/11/14 14:41:09 INFO ParquetOutputFormat: Writer version is: PARQUET_1_0 18/11/14 14:41:09 INFO ParquetOutputFormat: Maximum row group padding size is 0 bytes 18/11/14 14:41:09 INFO ParquetWriteSupport: Initialized Parquet WriteSupport with Catalyst schema: Still, I do not really understand what is going on and how the above-mentionned parameters help in this situation. I really need to get a better understanding to know how to design my application : how much Spark execution memory per core (=> which value for spark.memory.fraction ? spark.executor.memory ? spark.executor.cores ?) and/or what is roughly the maximum amount of data a given app can process ? Is there a particular issue when writing partitioned (?) parquet that should be better documented ? Many, many thanks in advance for any help, this issue has been and still is a real world of pain. And sorry for the long post ! Best regards, Pierre [https://poolsite.airfrance.fr/repository/logo_corporate/af_logo_ita.png]<http://www.airfrance.com> -- Acc?dez aux meilleurs tarifs Air France, g?rez vos r?servations et enregistrez-vous en ligne sur http://www.airfrance.com Find best Air France fares, manage your reservations and check in online at http://www.airfrance.com ________________________________ Les donn?es et renseignements contenus dans ce message peuvent ?tre de nature confidentielle et soumis au secret professionnel et sont destin?s ? l'usage exclusif du destinataire dont les coordonn?es figurent ci-dessus. Si vous recevez cette communication par erreur, nous vous demandons de ne pas la copier, l'utiliser ou la divulguer. Nous vous prions de notifier cette erreur ? l'exp?diteur et d'effacer imm?diatement cette communication de votre syst?me. Soci?t? Air France - Soci?t? anonyme au capital de 126 748 775 euros - RCS Bobigny (France) 420 495 178 - 45, rue de Paris, Tremblay-en-France, 95747 Roissy Charles de Gaulle CEDEX The data and information contained in this message may be confidential and subject to professional secrecy and are intended for the exclusive use of the recipient at the address shown above. If you receive this message by mistake, we ask you not to copy, use or disclose it. Please notify this error to the sender immediately and delete this message from your system. Soci?t? Air France - Limited company with capital of 126,748,775 euros - Bobigny register of companies (France) 420 495 178 - 45, rue de Paris, Tremblay-en-France, 95747 Roissy Charles de Gaulle CEDEX ________________________________ Pensez ? l'environnement avant d'imprimer ce message. Think of the environment before printing this mail.