[ https://issues.apache.org/jira/browse/SPARK-45903?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ]
liang.feng updated SPARK-45903: ------------------------------- Description: Problem description: I am using spark to generate HFile, OOM occurs in the repartitionAndSortWithinPartitions stage, and the input data is the result of sql . When I swap column b and column c , It will not occurs OOM. The sql will occurs OOM {code:java} -- colum "a" and "b" are longType,column "c" is a json string select a, b , c from table{code} The sql will not occurs OOM {code:java} -- colum "a" and "b" are longType,column "c" is a json string select a, c , b from table{code} The code logic is as follows {code:java} // code placeholder val family = Bytes.toBytes("cf") // colum "a" and "b" are longType,column "c" is a json string val sql = "select a, b , c from table " spark.sql(sql).flatMap(row => { val buffer = new ArrayBuffer[Tuple2[KeyFamilyQualifier, ImmutableBytesWritable]] val v1 = row.getLong(0) val v2 = row.getLong(1) val v3 = row.getString(2) val rowkey = Bytes.toBytes(v1) val kfq1 = new KeyFamilyQualifier(rowkey, family, Bytes.toBytes("v1")) buffer += Tuple2(kfq1, new ImmutableBytesWritable(Bytes.toBytes(v1))) val kfq2 = new KeyFamilyQualifier(rowkey, family, Bytes.toBytes("v2")) buffer += Tuple2(kfq2, new ImmutableBytesWritable(Bytes.toBytes(v2))) val kfq3 = new KeyFamilyQualifier(rowkey, family, Bytes.toBytes("v3")) buffer += Tuple2(kfq3, new ImmutableBytesWritable(Bytes.toBytes(v3))) buffer.toArray }) .repartitionAndSortWithinPartitions(new BulkLoadPartitioner(startKeys)) .map(tuple => { val kfq = tuple._1 val value = tuple._2 val rowkey = new ImmutableBytesWritable(kfq.getRowKey) val keyValue = new KeyValue(kfq.getRowKey(), kfq.getFamily(), kfq.getQualifier(), value.get()) (rowkey, keyValue) }) .saveAsNewAPIHadoopFile( tmpPath, classOf[ImmutableBytesWritable], classOf[KeyValue], classOf[HFileOutputFormat2], job.getConfiguration() ) {code} Now, even with a small amount of data, this program will still cause OOM during the shuffle stage. If I swap the positions of fields b and c in the SQL statement, it will not cause OOM. the new sql is "select a, c, b from table". I used mat to analyse the jvm dump file of executor, the result is !image-2023-11-13-15-46-16-868.png! The JVM is filled with PartitionedPairBuffer$$data !image-2023-11-13-15-47-01-764.png! was: Problem description: I am using spark to generate HFile, OOM occurs in the repartitionAndSortWithinPartitions stage, and the input data is the result of sql . When I swap column b and column c , It will not occurs OOM. The logic is as follows {code:java} // code placeholder val family = Bytes.toBytes("cf") // colum "a" and "b" are longType,column "c" is a json string val sql = "select a, b , c from table " spark.sql(sql).flatMap(row => { val buffer = new ArrayBuffer[Tuple2[KeyFamilyQualifier, ImmutableBytesWritable]] val v1 = row.getLong(0) val v2 = row.getLong(1) val v3 = row.getString(2) val rowkey = Bytes.toBytes(v1) val kfq1 = new KeyFamilyQualifier(rowkey, family, Bytes.toBytes("v1")) buffer += Tuple2(kfq1, new ImmutableBytesWritable(Bytes.toBytes(v1))) val kfq2 = new KeyFamilyQualifier(rowkey, family, Bytes.toBytes("v2")) buffer += Tuple2(kfq2, new ImmutableBytesWritable(Bytes.toBytes(v2))) val kfq3 = new KeyFamilyQualifier(rowkey, family, Bytes.toBytes("v3")) buffer += Tuple2(kfq3, new ImmutableBytesWritable(Bytes.toBytes(v3))) buffer.toArray }) .repartitionAndSortWithinPartitions(new BulkLoadPartitioner(startKeys)) .map(tuple => { val kfq = tuple._1 val value = tuple._2 val rowkey = new ImmutableBytesWritable(kfq.getRowKey) val keyValue = new KeyValue(kfq.getRowKey(), kfq.getFamily(), kfq.getQualifier(), value.get()) (rowkey, keyValue) }) .saveAsNewAPIHadoopFile( tmpPath, classOf[ImmutableBytesWritable], classOf[KeyValue], classOf[HFileOutputFormat2], job.getConfiguration() ) {code} Now, even with a small amount of data, this program will still cause OOM during the shuffle stage. If I swap the positions of fields b and c in the SQL statement, it will not cause OOM. the new sql is "select a, c, b from table". I used mat to analyse the jvm dump file of executor, the result is !image-2023-11-13-15-46-16-868.png! The JVM is filled with PartitionedPairBuffer$$data !image-2023-11-13-15-47-01-764.png! > Different column orders lead to OOM > ----------------------------------- > > Key: SPARK-45903 > URL: https://issues.apache.org/jira/browse/SPARK-45903 > Project: Spark > Issue Type: Bug > Components: Spark Core, SQL > Affects Versions: 2.3.2, 2.4.1, 3.0.1 > Reporter: liang.feng > Priority: Major > Attachments: image-2023-11-13-15-46-16-868.png, > image-2023-11-13-15-47-01-764.png > > > > Problem description: > I am using spark to generate HFile, OOM occurs in the > repartitionAndSortWithinPartitions stage, and the input data is the result of > sql . When I swap column b and column c , It will not occurs OOM. > > The sql will occurs OOM > {code:java} > -- colum "a" and "b" are longType,column "c" is a json string > select a, b , c from table{code} > The sql will not occurs OOM > {code:java} > -- colum "a" and "b" are longType,column "c" is a json string > select a, c , b from table{code} > > The code logic is as follows > {code:java} > // code placeholder > val family = Bytes.toBytes("cf") > // colum "a" and "b" are longType,column "c" is a json string > val sql = "select a, b , c from table " > spark.sql(sql).flatMap(row => { > val buffer = new ArrayBuffer[Tuple2[KeyFamilyQualifier, > ImmutableBytesWritable]] > val v1 = row.getLong(0) > val v2 = row.getLong(1) > val v3 = row.getString(2) > val rowkey = Bytes.toBytes(v1) > > val kfq1 = new KeyFamilyQualifier(rowkey, family, Bytes.toBytes("v1")) > buffer += Tuple2(kfq1, new ImmutableBytesWritable(Bytes.toBytes(v1))) > > val kfq2 = new KeyFamilyQualifier(rowkey, family, Bytes.toBytes("v2")) > buffer += Tuple2(kfq2, new ImmutableBytesWritable(Bytes.toBytes(v2))) > val kfq3 = new KeyFamilyQualifier(rowkey, family, Bytes.toBytes("v3")) > buffer += Tuple2(kfq3, new ImmutableBytesWritable(Bytes.toBytes(v3))) > buffer.toArray > }) > .repartitionAndSortWithinPartitions(new BulkLoadPartitioner(startKeys)) > .map(tuple => { > val kfq = tuple._1 > val value = tuple._2 > > val rowkey = new ImmutableBytesWritable(kfq.getRowKey) > val keyValue = new KeyValue(kfq.getRowKey(), kfq.getFamily(), > kfq.getQualifier(), value.get()) > (rowkey, keyValue) > }) > .saveAsNewAPIHadoopFile( > tmpPath, > classOf[ImmutableBytesWritable], > classOf[KeyValue], > classOf[HFileOutputFormat2], > job.getConfiguration() > ) > {code} > > Now, even with a small amount of data, this program will still cause OOM > during the shuffle stage. If I swap the positions of fields b and c in the > SQL statement, it will not cause OOM. the new sql is "select a, c, b from > table". > > I used mat to analyse the jvm dump file of executor, the result is > !image-2023-11-13-15-46-16-868.png! > The JVM is filled with PartitionedPairBuffer$$data > !image-2023-11-13-15-47-01-764.png! -- This message was sent by Atlassian Jira (v8.20.10#820010) --------------------------------------------------------------------- To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org For additional commands, e-mail: issues-h...@spark.apache.org