[ https://issues.apache.org/jira/browse/SPARK-24961?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16584943#comment-16584943 ]
Liang-Chi Hsieh commented on SPARK-24961: ----------------------------------------- I saw you described {{Spark 2.3.1 in local mode}} in {{Environment}}. I think it is happened because once you use {{StorageLevels}} with memory, it occupied some of memory on local. When it runs sort, Spark will sample data from RDD using {{collect}} and cause OOM. Can you try to decrease {{spark.sql.execution.rangeExchange.sampleSizePerPartition}} or decrease the number of partitions? > sort operation causes out of memory > ------------------------------------ > > Key: SPARK-24961 > URL: https://issues.apache.org/jira/browse/SPARK-24961 > Project: Spark > Issue Type: Bug > Components: Java API > Affects Versions: 2.3.1 > Environment: Java 1.8u144+ > Windows 10 > Spark 2.3.1 in local mode > -Xms4g -Xmx4g > optional: -XX:+UseParallelOldGC > Reporter: Markus Breuer > Priority: Major > > A sort operation on large rdd - which does not fit in memory - causes out of > memory exception. I made the effect reproducable by an sample, the sample > creates large object of about 2mb size. When saving result the oom occurs. I > tried several StorageLevels, but if memory is included (MEMORY_AND_DISK, > MEMORY_AND_DISK_SER, none) application runs in out of memory. Only DISK_ONLY > seems to work. > When replacing sort() with sortWithinPartitions() no StorageLevel is required > and application succeeds. > {code:java} > package de.bytefusion.examples; > import breeze.storage.Storage; > import de.bytefusion.utils.Options; > import org.apache.hadoop.io.MapFile; > import org.apache.hadoop.io.SequenceFile; > import org.apache.hadoop.io.Text; > import org.apache.hadoop.mapred.SequenceFileOutputFormat; > import org.apache.spark.api.java.JavaRDD; > import org.apache.spark.api.java.JavaSparkContext; > import org.apache.spark.sql.Dataset; > import org.apache.spark.sql.Row; > import org.apache.spark.sql.RowFactory; > import org.apache.spark.sql.SparkSession; > import org.apache.spark.sql.types.DataTypes; > import org.apache.spark.sql.types.StructType; > import org.apache.spark.storage.StorageLevel; > import scala.Tuple2; > import static org.apache.spark.sql.functions.*; > import java.util.ArrayList; > import java.util.List; > import java.util.UUID; > import java.util.stream.Collectors; > import java.util.stream.IntStream; > public class Example3 { > public static void main(String... args) { > // create spark session > SparkSession spark = SparkSession.builder() > .appName("example1") > .master("local[4]") > .config("spark.driver.maxResultSize","1g") > .config("spark.driver.memory","512m") > .config("spark.executor.memory","512m") > .config("spark.local.dir","d:/temp/spark-tmp") > .getOrCreate(); > JavaSparkContext sc = > JavaSparkContext.fromSparkContext(spark.sparkContext()); > // base to generate huge data > List<Integer> list = new ArrayList<>(); > for (int val = 1; val < 10000; val++) { > int valueOf = Integer.valueOf(val); > list.add(valueOf); > } > // create simple rdd of int > JavaRDD<Integer> rdd = sc.parallelize(list,200); > // use map to create large object per row > JavaRDD<Row> rowRDD = > rdd > .map(value -> > RowFactory.create(String.valueOf(value), > createLongText(UUID.randomUUID().toString(), 2 * 1024 * 1024))) > // no persist => out of memory exception on write() > // persist MEMORY_AND_DISK => out of memory exception > on write() > // persist MEMORY_AND_DISK_SER => out of memory > exception on write() > // persist(StorageLevel.DISK_ONLY()) > ; > StructType type = new StructType(); > type = type > .add("c1", DataTypes.StringType) > .add( "c2", DataTypes.StringType ); > Dataset<Row> df = spark.createDataFrame(rowRDD, type); > // works > df.show(); > df = df > .sort(col("c1").asc() ) > ; > df.explain(); > // takes a lot of time but works > df.show(); > // OutOfMemoryError: java heap space > df > .write() > .mode("overwrite") > .csv("d:/temp/my.csv"); > // OutOfMemoryError: java heap space > df > .toJavaRDD() > .mapToPair(row -> new Tuple2(new Text(row.getString(0)), new > Text( row.getString(1)))) > .saveAsHadoopFile("d:\\temp\\foo", Text.class, Text.class, > SequenceFileOutputFormat.class ); > } > private static String createLongText( String text, int minLength ) { > StringBuffer sb = new StringBuffer(); > while( sb.length() < minLength ) { > sb.append(text); > } > return sb.toString(); > } > } > {code} > When using StorageLevel.MEMORY_AND_DISK(_SER) an oom crashes application at > partition 70 at heap usage of 3g from 4g available. > It seems sort does something like collect, an heap dump shows very large > array of array - possibly the partition contents. Also > spark.driver.maxResultSize is involved, so sort exceeds the default values. > Setting it to unlimited causes oom. > Why do I think this is a bug? > # Operation sort() should not involve maxResultSize > # MEMORY_AND_DISK should work at all and at least disk should be used. But I > see oom when reaching 3g of 4g total heap size. > -- This message was sent by Atlassian JIRA (v7.6.3#76005) --------------------------------------------------------------------- To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org For additional commands, e-mail: issues-h...@spark.apache.org