[ 
https://issues.apache.org/jira/browse/SPARK-44003?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17795101#comment-17795101
 ] 

Rafal Wojdyla commented on SPARK-44003:
---------------------------------------

Coming back briefly to this issue, and to elaborate on the last comment, why is:

{code:java}
  /**
   * This memory manager is for all the real writers 
(InternalParquetRecordWriter) in one task.
   */
  private static MemoryManager memoryManager;
{code}

>From here: 
>https://github.com/apache/parquet-mr/blob/c061d5493ef3c3b7bdee59b062ad8baf056c8503/parquet-hadoop/src/main/java/org/apache/parquet/hadoop/ParquetOutputFormat.java#L565-L568

The comment would indicate that the Memory manager is per one **task**, but 
it's static so that I don't think the expectation of "one task" always holds. 
Is it necessary to have this static?

Separately it's worth noting that in the context of Spark, one could configure 
`spark.sql.maxConcurrentOutputFileWriters` to have multiple open concurrent 
writers, in which case {{DynamicPartitionDataConcurrentWriter}} is used instead 
of {{DynamicPartitionDataSingleWriter}}, which may help reduce the impact of 
the global synchronized {{MemoryManager}}.

> DynamicPartitionDataSingleWriter is being starved by Parquet MemoryManager
> --------------------------------------------------------------------------
>
>                 Key: SPARK-44003
>                 URL: https://issues.apache.org/jira/browse/SPARK-44003
>             Project: Spark
>          Issue Type: Bug
>          Components: Spark Core
>    Affects Versions: 3.3.1
>            Reporter: Rafal Wojdyla
>            Priority: Major
>
> We have a pyspark job that writes to a partitioned parquet dataset via:
> {code:python}
> df.write.parquet(
>   path=path,
>   compression="snappy",
>   mode="overwrite",
>   partitionBy="year",
> )
> {code}
> In this specific production case we partition by 28 distinct years, so 28 
> directories, each directory with 200 part files, total of 5.6K files. This 
> particular job runs on a single dedicated and ephemeral VM. We have noticed 
> that most of the time the VM is far from being saturated and the job is very 
> slow. It's not IO or CPU bound. Here's an [annotated VM utilization graph 
> |https://gist.githubusercontent.com/ravwojdyla/e468bace2bc899f86348dee067173270/raw/03cfb383d49ad43adaec2eaa3d9cbf0a3c9b8c0b/VM_util.png].
>  The blue line is CPU, and turquoise is memory. This graph doesn't show IO, 
> but we have also monitored that, and it also was not saturated. On the labels:
>  * {{BQ}}, you can ignore this
>  * {{SPARK~1}} spark computes some data
>  * {{SPARK~2}} is 1st slow period
>  * {{SPARK~3}} is 2nd slow period
> We took two 10 minute JFR profiles, those are marked {{P-1}} and {{P-2}} in 
> the graph above. So {{P-1}} is solely in {{SPARK~2}}, and {{P-2}} is 
> partially in {{SPARK~2}} but mostly in {{SPARK~3}}. Here's the 
> [{{P-1}}|https://gist.githubusercontent.com/ravwojdyla/e468bace2bc899f86348dee067173270/raw/98c107ebd28608da55d84d13b3aa6eaf25b3c854/p1.png]
>  profile, and here's 
> [{{P-2}}|https://gist.githubusercontent.com/ravwojdyla/e468bace2bc899f86348dee067173270/raw/98c107ebd28608da55d84d13b3aa6eaf25b3c854/p2.png]
>  profile.
> The picture is a bit more clear when we look at the locks, here's the 
> [report|https://gist.githubusercontent.com/ravwojdyla/e468bace2bc899f86348dee067173270/raw/c0f1fb78ac9d5f90a3106b4b43a3a7b27700f66a/locks.png].
>  We see that the threads were blocked on locks for a total of 20.5h, 
> mostly/specifically on the global 
> {{org.apache.parquet.hadoop.MemoryManager}}, which has two synchronized 
> methods: {{addWriter}} and {{removeWriter}}. From [parquet-mr GH 
> src|https://github.com/apache/parquet-mr/blob/9d80330ae4948787ac0bf4e4b0d990917f106440/parquet-hadoop/src/main/java/org/apache/parquet/hadoop/MemoryManager.java#L77-L98]:
> {code:java}
>   /**
>    * Add a new writer and its memory allocation to the memory manager.
>    * @param writer the new created writer
>    * @param allocation the requested buffer size
>    */
>   synchronized void addWriter(InternalParquetRecordWriter<?> writer, Long 
> allocation) {
>     Long oldValue = writerList.get(writer);
>     if (oldValue == null) {
>       writerList.put(writer, allocation);
>     } else {
>       throw new IllegalArgumentException("[BUG] The Parquet Memory Manager 
> should not add an " +
>           "instance of InternalParquetRecordWriter more than once. The 
> Manager already contains " +
>           "the writer: " + writer);
>     }
>     updateAllocation();
>   }
>   /**
>    * Remove the given writer from the memory manager.
>    * @param writer the writer that has been closed
>    */
>   synchronized void removeWriter(InternalParquetRecordWriter<?> writer) {
>     writerList.remove(writer);
>     if (!writerList.isEmpty()) {
>       updateAllocation();
>     }
>   }
> {code}
> During the 10 minute profiling session all worker threads were mostly waiting 
> on this lock. The implementation of the writer comes from [Spark 
> src|https://github.com/apache/spark/blob/0ed48feab65f2d86f5dda3e16bd53f2f795f5bc5/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/FileFormatDataWriter.scala#L339]:
> {code:java}
> /**
>  * Dynamic partition writer with single writer, meaning only one writer is 
> opened at any time for
>  * writing. The records to be written are required to be sorted on partition 
> and/or bucket
>  * column(s) before writing.
>  */
> class DynamicPartitionDataSingleWriter(
> {code}
> It appears that a combination of large number of writers created via Spark's 
> {{DynamicPartitionDataSingleWriter}} and the {{MemoryManager}} 
> synchronization bottleneck drastically reduces the performance by starving 
> the writer threads. We have validated that by removing the partitioning the 
> job is much faster and fully utilizes the VM.



--
This message was sent by Atlassian Jira
(v8.20.10#820010)

---------------------------------------------------------------------
To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org
For additional commands, e-mail: issues-h...@spark.apache.org

Reply via email to