[ 
https://issues.apache.org/jira/browse/SPARK-45903?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

liang.feng updated SPARK-45903:
-------------------------------
    Issue Type: Bug  (was: Question)

> Different column orders lead to OOM
> -----------------------------------
>
>                 Key: SPARK-45903
>                 URL: https://issues.apache.org/jira/browse/SPARK-45903
>             Project: Spark
>          Issue Type: Bug
>          Components: Spark Core, SQL
>    Affects Versions: 2.3.2, 2.4.1, 3.0.1
>            Reporter: liang.feng
>            Priority: Major
>         Attachments: image-2023-11-13-15-46-16-868.png, 
> image-2023-11-13-15-47-01-764.png
>
>
>  
> Problem description:
>  I am using spark to generate HFile, OOM occurs in the 
> repartitionAndSortWithinPartitions stage, and the input data is the result of 
> sql . When I swap column b and column c , It will not occurs OOM. The logic 
> is as follows
> {code:java}
> // code placeholder
>   val family = Bytes.toBytes("cf")  
>   // colum "a" and "b" are longType,column "c" is a json string
>   val sql = "select a, b , c from table "
>   spark.sql(sql).flatMap(row => {
>     val buffer = new ArrayBuffer[Tuple2[KeyFamilyQualifier, 
> ImmutableBytesWritable]]
>     val v1 = row.getLong(0)
>     val v2 = row.getLong(1)
>     val v3 = row.getString(2)    
>     val rowkey = Bytes.toBytes(v1)
>     
>     val kfq1 = new KeyFamilyQualifier(rowkey, family, Bytes.toBytes("v1"))
>     buffer += Tuple2(kfq1, new ImmutableBytesWritable(Bytes.toBytes(v1)))  
>    
>     val kfq2 = new KeyFamilyQualifier(rowkey, family, Bytes.toBytes("v2"))
>     buffer += Tuple2(kfq2, new ImmutableBytesWritable(Bytes.toBytes(v2)))    
>     val kfq3 = new KeyFamilyQualifier(rowkey, family, Bytes.toBytes("v3"))
>     buffer += Tuple2(kfq3, new ImmutableBytesWritable(Bytes.toBytes(v3)))    
>     buffer.toArray
>   })
>   .repartitionAndSortWithinPartitions(new BulkLoadPartitioner(startKeys))
>   .map(tuple => {
>     val kfq = tuple._1
>     val value = tuple._2
>     
>     val rowkey = new ImmutableBytesWritable(kfq.getRowKey)
>     val keyValue = new KeyValue(kfq.getRowKey(), kfq.getFamily(), 
> kfq.getQualifier(), value.get())
>     (rowkey, keyValue)
>   })
>   .saveAsNewAPIHadoopFile(
>     tmpPath, 
>     classOf[ImmutableBytesWritable],
>     classOf[KeyValue],
>     classOf[HFileOutputFormat2],
>     job.getConfiguration()
>   ) 
> {code}
>  
> Now, even with a small amount of data, this program will still cause OOM 
> during the shuffle stage. If  I swap the positions of fields b and c in the 
> SQL statement, it will not cause OOM. the new sql is "select a, c, b from 
> table".
>  
> I used mat to analyse the jvm dump file of executor, the result is 
> !image-2023-11-13-15-46-16-868.png!
> The JVM is filled with PartitionedPairBuffer$$data 
> !image-2023-11-13-15-47-01-764.png!



--
This message was sent by Atlassian Jira
(v8.20.10#820010)

---------------------------------------------------------------------
To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org
For additional commands, e-mail: issues-h...@spark.apache.org

Reply via email to