[ 
https://issues.apache.org/jira/browse/SPARK-40622?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Ziqi Liu updated SPARK-40622:
-----------------------------
    Description: 
when collecting results, data from single partition/task is serialized through 
byte array or ByteBuffer(which is backed by byte array as well), therefore it's 
subject to java array max size limit(in terms of byte array, it's 2GB).

 

Construct a single partition larger than 2GB and collect it can easily 
reproduce the issue
{code:java}
// create data of size ~3GB in single partition, which exceeds the byte array 
limit
// random gen to make sure it's poorly compressed
val df = spark.range(0, 3000, 1, 1).selectExpr("id", s"genData(id, 1000000) as 
data")

withSQLConf("spark.databricks.driver.localMaxResultSize" -> "4g") {
  withSQLConf("spark.sql.useChunkedBuffer" -> "true") {
    df.queryExecution.executedPlan.executeCollect()
  }
} {code}
 will get a OOM error from 
[https://github.com/AdoptOpenJDK/openjdk-jdk11/blob/master/src/java.base/share/classes/java/io/ByteArrayOutputStream.java#L125]

 

Consider using ChunkedByteBuffer to replace byte array in order to bypassing 
this limit

  was:
when collecting results, data from single partition/task is serialized through 
byte array or ByteBuffer(which is backed by byte array as well), therefore it's 
subject to java array max size limit(in terms of byte array, it's 2GB).

 

Construct a single partition larger than 2GB and collect it can easily 
reproduce the issue
{code:java}
val df = spark.range(0, 3000, 1, 1).selectExpr("id", s"genData(id, 1000000) as 
data")

withSQLConf("spark.databricks.driver.localMaxResultSize" -> "4g") {
  withSQLConf("spark.sql.useChunkedBuffer" -> "true") {
    df.queryExecution.executedPlan.executeCollect()
  }
} {code}
 will get a OOM error from 
[https://github.com/AdoptOpenJDK/openjdk-jdk11/blob/master/src/java.base/share/classes/java/io/ByteArrayOutputStream.java#L125]

 

Consider using ChunkedByteBuffer to replace byte array in order to bypassing 
this limit


> Result of a single task in collect() must fit in 2GB
> ----------------------------------------------------
>
>                 Key: SPARK-40622
>                 URL: https://issues.apache.org/jira/browse/SPARK-40622
>             Project: Spark
>          Issue Type: Bug
>          Components: Spark Core, SQL
>    Affects Versions: 3.3.0
>            Reporter: Ziqi Liu
>            Priority: Major
>
> when collecting results, data from single partition/task is serialized 
> through byte array or ByteBuffer(which is backed by byte array as well), 
> therefore it's subject to java array max size limit(in terms of byte array, 
> it's 2GB).
>  
> Construct a single partition larger than 2GB and collect it can easily 
> reproduce the issue
> {code:java}
> // create data of size ~3GB in single partition, which exceeds the byte array 
> limit
> // random gen to make sure it's poorly compressed
> val df = spark.range(0, 3000, 1, 1).selectExpr("id", s"genData(id, 1000000) 
> as data")
> withSQLConf("spark.databricks.driver.localMaxResultSize" -> "4g") {
>   withSQLConf("spark.sql.useChunkedBuffer" -> "true") {
>     df.queryExecution.executedPlan.executeCollect()
>   }
> } {code}
>  will get a OOM error from 
> [https://github.com/AdoptOpenJDK/openjdk-jdk11/blob/master/src/java.base/share/classes/java/io/ByteArrayOutputStream.java#L125]
>  
> Consider using ChunkedByteBuffer to replace byte array in order to bypassing 
> this limit



--
This message was sent by Atlassian Jira
(v8.20.10#820010)

---------------------------------------------------------------------
To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org
For additional commands, e-mail: issues-h...@spark.apache.org

Reply via email to