[ https://issues.apache.org/jira/browse/SPARK-44003?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17795102#comment-17795102 ]
Rafal Wojdyla commented on SPARK-44003: --------------------------------------- In context of {{DynamicPartitionDataConcurrentWriter}} https://github.com/apache/spark/pull/40952 is worthing keeping in mind. > DynamicPartitionDataSingleWriter is being starved by Parquet MemoryManager > -------------------------------------------------------------------------- > > Key: SPARK-44003 > URL: https://issues.apache.org/jira/browse/SPARK-44003 > Project: Spark > Issue Type: Bug > Components: Spark Core > Affects Versions: 3.3.1 > Reporter: Rafal Wojdyla > Priority: Major > > We have a pyspark job that writes to a partitioned parquet dataset via: > {code:python} > df.write.parquet( > path=path, > compression="snappy", > mode="overwrite", > partitionBy="year", > ) > {code} > In this specific production case we partition by 28 distinct years, so 28 > directories, each directory with 200 part files, total of 5.6K files. This > particular job runs on a single dedicated and ephemeral VM. We have noticed > that most of the time the VM is far from being saturated and the job is very > slow. It's not IO or CPU bound. Here's an [annotated VM utilization graph > |https://gist.githubusercontent.com/ravwojdyla/e468bace2bc899f86348dee067173270/raw/03cfb383d49ad43adaec2eaa3d9cbf0a3c9b8c0b/VM_util.png]. > The blue line is CPU, and turquoise is memory. This graph doesn't show IO, > but we have also monitored that, and it also was not saturated. On the labels: > * {{BQ}}, you can ignore this > * {{SPARK~1}} spark computes some data > * {{SPARK~2}} is 1st slow period > * {{SPARK~3}} is 2nd slow period > We took two 10 minute JFR profiles, those are marked {{P-1}} and {{P-2}} in > the graph above. So {{P-1}} is solely in {{SPARK~2}}, and {{P-2}} is > partially in {{SPARK~2}} but mostly in {{SPARK~3}}. Here's the > [{{P-1}}|https://gist.githubusercontent.com/ravwojdyla/e468bace2bc899f86348dee067173270/raw/98c107ebd28608da55d84d13b3aa6eaf25b3c854/p1.png] > profile, and here's > [{{P-2}}|https://gist.githubusercontent.com/ravwojdyla/e468bace2bc899f86348dee067173270/raw/98c107ebd28608da55d84d13b3aa6eaf25b3c854/p2.png] > profile. > The picture is a bit more clear when we look at the locks, here's the > [report|https://gist.githubusercontent.com/ravwojdyla/e468bace2bc899f86348dee067173270/raw/c0f1fb78ac9d5f90a3106b4b43a3a7b27700f66a/locks.png]. > We see that the threads were blocked on locks for a total of 20.5h, > mostly/specifically on the global > {{org.apache.parquet.hadoop.MemoryManager}}, which has two synchronized > methods: {{addWriter}} and {{removeWriter}}. From [parquet-mr GH > src|https://github.com/apache/parquet-mr/blob/9d80330ae4948787ac0bf4e4b0d990917f106440/parquet-hadoop/src/main/java/org/apache/parquet/hadoop/MemoryManager.java#L77-L98]: > {code:java} > /** > * Add a new writer and its memory allocation to the memory manager. > * @param writer the new created writer > * @param allocation the requested buffer size > */ > synchronized void addWriter(InternalParquetRecordWriter<?> writer, Long > allocation) { > Long oldValue = writerList.get(writer); > if (oldValue == null) { > writerList.put(writer, allocation); > } else { > throw new IllegalArgumentException("[BUG] The Parquet Memory Manager > should not add an " + > "instance of InternalParquetRecordWriter more than once. The > Manager already contains " + > "the writer: " + writer); > } > updateAllocation(); > } > /** > * Remove the given writer from the memory manager. > * @param writer the writer that has been closed > */ > synchronized void removeWriter(InternalParquetRecordWriter<?> writer) { > writerList.remove(writer); > if (!writerList.isEmpty()) { > updateAllocation(); > } > } > {code} > During the 10 minute profiling session all worker threads were mostly waiting > on this lock. The implementation of the writer comes from [Spark > src|https://github.com/apache/spark/blob/0ed48feab65f2d86f5dda3e16bd53f2f795f5bc5/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/FileFormatDataWriter.scala#L339]: > {code:java} > /** > * Dynamic partition writer with single writer, meaning only one writer is > opened at any time for > * writing. The records to be written are required to be sorted on partition > and/or bucket > * column(s) before writing. > */ > class DynamicPartitionDataSingleWriter( > {code} > It appears that a combination of large number of writers created via Spark's > {{DynamicPartitionDataSingleWriter}} and the {{MemoryManager}} > synchronization bottleneck drastically reduces the performance by starving > the writer threads. We have validated that by removing the partitioning the > job is much faster and fully utilizes the VM. -- This message was sent by Atlassian Jira (v8.20.10#820010) --------------------------------------------------------------------- To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org For additional commands, e-mail: issues-h...@spark.apache.org