[ 
https://issues.apache.org/jira/browse/SPARK-45903?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

liang.feng updated SPARK-45903:
-------------------------------
    Description: 
 
Problem description:
 I am using spark to generate HFile, OOM occurs in the 
repartitionAndSortWithinPartitions stage, and the input data is the result of 
sql . When I swap column b and column c , It will not occurs OOM. 

 

The sql will occurs OOM
{code:java}
-- colum "a" and "b" are longType,column "c" is a json string   
select a, b , c from table{code}
The sql will not occurs OOM
{code:java}
-- colum "a" and "b" are longType,column "c" is a json string   
select a, c , b from table{code}
 

The code logic is as follows
{code:java}
// code placeholder

  val family = Bytes.toBytes("cf")  
  // colum "a" and "b" are longType,column "c" is a json string
  val sql = "select a, b , c from table "
  spark.sql(sql).flatMap(row => {
    val buffer = new ArrayBuffer[Tuple2[KeyFamilyQualifier, 
ImmutableBytesWritable]]
    val v1 = row.getLong(0)
    val v2 = row.getLong(1)
    val v3 = row.getString(2)    

    val rowkey = Bytes.toBytes(v1)
    
    val kfq1 = new KeyFamilyQualifier(rowkey, family, Bytes.toBytes("v1"))
    buffer += Tuple2(kfq1, new ImmutableBytesWritable(Bytes.toBytes(v1)))  
   
    val kfq2 = new KeyFamilyQualifier(rowkey, family, Bytes.toBytes("v2"))
    buffer += Tuple2(kfq2, new ImmutableBytesWritable(Bytes.toBytes(v2)))    

    val kfq3 = new KeyFamilyQualifier(rowkey, family, Bytes.toBytes("v3"))
    buffer += Tuple2(kfq3, new ImmutableBytesWritable(Bytes.toBytes(v3)))    

    buffer.toArray
  })
  .repartitionAndSortWithinPartitions(new BulkLoadPartitioner(startKeys))
  .map(tuple => {
    val kfq = tuple._1
    val value = tuple._2
    
    val rowkey = new ImmutableBytesWritable(kfq.getRowKey)
    val keyValue = new KeyValue(kfq.getRowKey(), kfq.getFamily(), 
kfq.getQualifier(), value.get())
    (rowkey, keyValue)
  })
  .saveAsNewAPIHadoopFile(
    tmpPath, 
    classOf[ImmutableBytesWritable],
    classOf[KeyValue],
    classOf[HFileOutputFormat2],
    job.getConfiguration()
  ) 


{code}
 

Now, even with a small amount of data, this program will still cause OOM during 
the shuffle stage. If  I swap the positions of fields b and c in the SQL 
statement, it will not cause OOM. the new sql is "select a, c, b from table".

 

I used mat to analyse the jvm dump file of executor, the result is 

!image-2023-11-13-15-46-16-868.png!

The JVM is filled with PartitionedPairBuffer$$data 

!image-2023-11-13-15-47-01-764.png!

  was:
 
Problem description:
 I am using spark to generate HFile, OOM occurs in the 
repartitionAndSortWithinPartitions stage, and the input data is the result of 
sql . When I swap column b and column c , It will not occurs OOM. The logic is 
as follows
{code:java}
// code placeholder

  val family = Bytes.toBytes("cf")  
  // colum "a" and "b" are longType,column "c" is a json string
  val sql = "select a, b , c from table "
  spark.sql(sql).flatMap(row => {
    val buffer = new ArrayBuffer[Tuple2[KeyFamilyQualifier, 
ImmutableBytesWritable]]
    val v1 = row.getLong(0)
    val v2 = row.getLong(1)
    val v3 = row.getString(2)    

    val rowkey = Bytes.toBytes(v1)
    
    val kfq1 = new KeyFamilyQualifier(rowkey, family, Bytes.toBytes("v1"))
    buffer += Tuple2(kfq1, new ImmutableBytesWritable(Bytes.toBytes(v1)))  
   
    val kfq2 = new KeyFamilyQualifier(rowkey, family, Bytes.toBytes("v2"))
    buffer += Tuple2(kfq2, new ImmutableBytesWritable(Bytes.toBytes(v2)))    

    val kfq3 = new KeyFamilyQualifier(rowkey, family, Bytes.toBytes("v3"))
    buffer += Tuple2(kfq3, new ImmutableBytesWritable(Bytes.toBytes(v3)))    

    buffer.toArray
  })
  .repartitionAndSortWithinPartitions(new BulkLoadPartitioner(startKeys))
  .map(tuple => {
    val kfq = tuple._1
    val value = tuple._2
    
    val rowkey = new ImmutableBytesWritable(kfq.getRowKey)
    val keyValue = new KeyValue(kfq.getRowKey(), kfq.getFamily(), 
kfq.getQualifier(), value.get())
    (rowkey, keyValue)
  })
  .saveAsNewAPIHadoopFile(
    tmpPath, 
    classOf[ImmutableBytesWritable],
    classOf[KeyValue],
    classOf[HFileOutputFormat2],
    job.getConfiguration()
  ) 


{code}
 

Now, even with a small amount of data, this program will still cause OOM during 
the shuffle stage. If  I swap the positions of fields b and c in the SQL 
statement, it will not cause OOM. the new sql is "select a, c, b from table".

 

I used mat to analyse the jvm dump file of executor, the result is 

!image-2023-11-13-15-46-16-868.png!

The JVM is filled with PartitionedPairBuffer$$data 

!image-2023-11-13-15-47-01-764.png!


> Different column orders lead to OOM
> -----------------------------------
>
>                 Key: SPARK-45903
>                 URL: https://issues.apache.org/jira/browse/SPARK-45903
>             Project: Spark
>          Issue Type: Bug
>          Components: Spark Core, SQL
>    Affects Versions: 2.3.2, 2.4.1, 3.0.1
>            Reporter: liang.feng
>            Priority: Major
>         Attachments: image-2023-11-13-15-46-16-868.png, 
> image-2023-11-13-15-47-01-764.png
>
>
>  
> Problem description:
>  I am using spark to generate HFile, OOM occurs in the 
> repartitionAndSortWithinPartitions stage, and the input data is the result of 
> sql . When I swap column b and column c , It will not occurs OOM. 
>  
> The sql will occurs OOM
> {code:java}
> -- colum "a" and "b" are longType,column "c" is a json string   
> select a, b , c from table{code}
> The sql will not occurs OOM
> {code:java}
> -- colum "a" and "b" are longType,column "c" is a json string   
> select a, c , b from table{code}
>  
> The code logic is as follows
> {code:java}
> // code placeholder
>   val family = Bytes.toBytes("cf")  
>   // colum "a" and "b" are longType,column "c" is a json string
>   val sql = "select a, b , c from table "
>   spark.sql(sql).flatMap(row => {
>     val buffer = new ArrayBuffer[Tuple2[KeyFamilyQualifier, 
> ImmutableBytesWritable]]
>     val v1 = row.getLong(0)
>     val v2 = row.getLong(1)
>     val v3 = row.getString(2)    
>     val rowkey = Bytes.toBytes(v1)
>     
>     val kfq1 = new KeyFamilyQualifier(rowkey, family, Bytes.toBytes("v1"))
>     buffer += Tuple2(kfq1, new ImmutableBytesWritable(Bytes.toBytes(v1)))  
>    
>     val kfq2 = new KeyFamilyQualifier(rowkey, family, Bytes.toBytes("v2"))
>     buffer += Tuple2(kfq2, new ImmutableBytesWritable(Bytes.toBytes(v2)))    
>     val kfq3 = new KeyFamilyQualifier(rowkey, family, Bytes.toBytes("v3"))
>     buffer += Tuple2(kfq3, new ImmutableBytesWritable(Bytes.toBytes(v3)))    
>     buffer.toArray
>   })
>   .repartitionAndSortWithinPartitions(new BulkLoadPartitioner(startKeys))
>   .map(tuple => {
>     val kfq = tuple._1
>     val value = tuple._2
>     
>     val rowkey = new ImmutableBytesWritable(kfq.getRowKey)
>     val keyValue = new KeyValue(kfq.getRowKey(), kfq.getFamily(), 
> kfq.getQualifier(), value.get())
>     (rowkey, keyValue)
>   })
>   .saveAsNewAPIHadoopFile(
>     tmpPath, 
>     classOf[ImmutableBytesWritable],
>     classOf[KeyValue],
>     classOf[HFileOutputFormat2],
>     job.getConfiguration()
>   ) 
> {code}
>  
> Now, even with a small amount of data, this program will still cause OOM 
> during the shuffle stage. If  I swap the positions of fields b and c in the 
> SQL statement, it will not cause OOM. the new sql is "select a, c, b from 
> table".
>  
> I used mat to analyse the jvm dump file of executor, the result is 
> !image-2023-11-13-15-46-16-868.png!
> The JVM is filled with PartitionedPairBuffer$$data 
> !image-2023-11-13-15-47-01-764.png!



--
This message was sent by Atlassian Jira
(v8.20.10#820010)

---------------------------------------------------------------------
To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org
For additional commands, e-mail: issues-h...@spark.apache.org

Reply via email to