[ https://issues.apache.org/jira/browse/SPARK-40038?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17578169#comment-17578169 ]
RJ Marcus commented on SPARK-40038: ----------------------------------- the screenshot with spillage didn't have the stage fully completed when I made the capture, which is why the total data input / output may not look correct (should be 230GB) > spark.sql.files.maxPartitionBytes does not observe on-disk compression > ---------------------------------------------------------------------- > > Key: SPARK-40038 > URL: https://issues.apache.org/jira/browse/SPARK-40038 > Project: Spark > Issue Type: Question > Components: Input/Output, Optimizer, PySpark, SQL > Affects Versions: 3.2.0 > Environment: files: > - ORC with snappy compression > - 232 GB files on disk > - 1800 files on disk (pretty sure no individual file is over 200MB) > - 9 partitions on disk > cluster: > - EMR 6.6.0 (spark 3.2.0) > - cluster: 288 vCPU (executors), 1.1TB memory (executors) > OS info: > LSB Version: > :core-4.1-amd64:core-4.1-noarch:cxx-4.1-amd64:cxx-4.1-noarch:desktop-4.1-amd64:desktop-4.1-noarch:languages-4.1-amd64:languages-4.1-noarch:printing-4.1-amd64:printing-4.1-noarch > Distributor ID: Amazon > Description: Amazon Linux release 2 (Karoo) > Release: 2 > Codename: Karoo > Reporter: RJ Marcus > Priority: Major > Attachments: Screenshot from 2022-08-10 16-50-37.png, Screenshot from > 2022-08-10 16-59-56.png > > > Why does `spark.sql.files.maxPartitionBytes` estimate the number of > partitions based on {_}file size on disk instead of the uncompressed file > size{_}? > For example I have a dataset that is 213GB on disk. When I read this in to my > application I get 2050 partitions based on the default value of 128MB for > maxPartitionBytes. My application is a simple broadcast index join that adds > 1 column to the dataframe and writes it out. There is no shuffle. > Initially the size of input /output records seem ok, but I still get a large > amount of memory "spill" on the executors. I believe this is due to the data > being highly compressed and each partition becoming too big when it is > deserialized to work on in memory. > !image-2022-08-10-16-59-05-233.png! > (If I try to do a repartition immediately after reading I still see the first > stage spilling memory to disk, so that is not the right solution or what I'm > interested in.) > Instead, I attempt to lower maxPartitionBytes by the (average) compression > ratio of my files (about 7x, so let's round up to 8). So I set > maxPartitionBytes=16MB. At this point I see that spark is reading in from > the file in 12-28 MB chunks. Now it makes 14316 partitions on the initial > file read and completes with no spillage. > !image-2022-08-10-16-59-59-778.png! > > Is there something I'm missing here? Is this just intended behavior? How can > I tune my partition size correctly for my application when I do not know how > much the data will be compressed ahead of time? -- This message was sent by Atlassian Jira (v8.20.10#820010) --------------------------------------------------------------------- To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org For additional commands, e-mail: issues-h...@spark.apache.org