[ 
https://issues.apache.org/jira/browse/SPARK-45903?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

liang.feng updated SPARK-45903:
-------------------------------
    Attachment: image-2023-11-13-15-46-16-868.png

> Different column orders lead to OOM
> -----------------------------------
>
>                 Key: SPARK-45903
>                 URL: https://issues.apache.org/jira/browse/SPARK-45903
>             Project: Spark
>          Issue Type: Question
>          Components: Spark Core, SQL
>    Affects Versions: 2.3.2, 2.4.1, 3.0.1
>            Reporter: liang.feng
>            Priority: Major
>         Attachments: image-2023-11-13-15-46-16-868.png
>
>
>  
> Problem description:
>  I am using spark to generate HFile, OOM occurs in the 
> repartitionAndSortWithinPartitions stage, and the input data is the result of 
> sql . The logic is as follows
> {code:java}
> // code placeholder
>   val family = Bytes.toBytes("cf")  
>   // colum "a" and "b" are longType,column "c" is a json string
>   val sql = "select a, b , c from table "
>   spark.sql(sql).flatMap(row => {
>     val buffer = new ArrayBuffer[Tuple2[KeyFamilyQualifier, 
> ImmutableBytesWritable]]
>     val v1 = row.getLong(0)
>     val v2 = row.getLong(1)
>     val v3 = row.getString(2)    
>     val rowkey = Bytes.toBytes(v1)
>     
>     val kfq1 = new KeyFamilyQualifier(rowkey, family, Bytes.toBytes("v1"))
>     buffer += Tuple2(kfq1, new ImmutableBytesWritable(Bytes.toBytes(v1)))  
>    
>     val kfq2 = new KeyFamilyQualifier(rowkey, family, Bytes.toBytes("v2"))
>     buffer += Tuple2(kfq2, new ImmutableBytesWritable(Bytes.toBytes(v2)))    
>     val kfq3 = new KeyFamilyQualifier(rowkey, family, Bytes.toBytes("v3"))
>     buffer += Tuple2(kfq3, new ImmutableBytesWritable(Bytes.toBytes(v3)))    
>     buffer.toArray
>   })
>   .repartitionAndSortWithinPartitions(new BulkLoadPartitioner(startKeys))
>   .map(tuple => {
>     val kfq = tuple._1
>     val value = tuple._2
>     
>     val rowkey = new ImmutableBytesWritable(kfq.getRowKey)
>     val keyValue = new KeyValue(kfq.getRowKey(), kfq.getFamily(), 
> kfq.getQualifier(), value.get())
>     (rowkey, keyValue)
>   })
>   .saveAsNewAPIHadoopFile(
>     tmpPath, 
>     classOf[ImmutableBytesWritable],
>     classOf[KeyValue],
>     classOf[HFileOutputFormat2],
>     job.getConfiguration()
>   ) 
> {code}
>  



--
This message was sent by Atlassian Jira
(v8.20.10#820010)

---------------------------------------------------------------------
To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org
For additional commands, e-mail: issues-h...@spark.apache.org

Reply via email to