[ https://issues.apache.org/jira/browse/SPARK-24961?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16584929#comment-16584929 ]
Markus Breuer commented on SPARK-24961: --------------------------------------- There was no feedback from stackoverflow and mailing list yet. Meanwhile I tried several variations of the above example but alway ran into oom issues. It seems spark always uses an operation like collect() when merging results. Also size of row objects affects this issue. I sorted millions of smaller object an never ran into oom. Also memory profiler shows very low memory usage on small objects. But larger object - my was 2-4megabytes - causes an oom. I understand that spark 2.3.1 uses mergeSort, which should not run into such issues. Probably memory management is calculated for smaller objects and actual object size is ignored? Origin of this issue was a large sequence file with objects (4mb) which ran into oom. I reduced the objects by splitting them to several rows of 4kb size and added incrementing number to sort key. Now, the same amount of data is sorted without memory issues. > sort operation causes out of memory > ------------------------------------ > > Key: SPARK-24961 > URL: https://issues.apache.org/jira/browse/SPARK-24961 > Project: Spark > Issue Type: Bug > Components: Java API > Affects Versions: 2.3.1 > Environment: Java 1.8u144+ > Windows 10 > Spark 2.3.1 in local mode > -Xms4g -Xmx4g > optional: -XX:+UseParallelOldGC > Reporter: Markus Breuer > Priority: Major > > A sort operation on large rdd - which does not fit in memory - causes out of > memory exception. I made the effect reproducable by an sample, the sample > creates large object of about 2mb size. When saving result the oom occurs. I > tried several StorageLevels, but if memory is included (MEMORY_AND_DISK, > MEMORY_AND_DISK_SER, none) application runs in out of memory. Only DISK_ONLY > seems to work. > When replacing sort() with sortWithinPartitions() no StorageLevel is required > and application succeeds. > {code:java} > package de.bytefusion.examples; > import breeze.storage.Storage; > import de.bytefusion.utils.Options; > import org.apache.hadoop.io.MapFile; > import org.apache.hadoop.io.SequenceFile; > import org.apache.hadoop.io.Text; > import org.apache.hadoop.mapred.SequenceFileOutputFormat; > import org.apache.spark.api.java.JavaRDD; > import org.apache.spark.api.java.JavaSparkContext; > import org.apache.spark.sql.Dataset; > import org.apache.spark.sql.Row; > import org.apache.spark.sql.RowFactory; > import org.apache.spark.sql.SparkSession; > import org.apache.spark.sql.types.DataTypes; > import org.apache.spark.sql.types.StructType; > import org.apache.spark.storage.StorageLevel; > import scala.Tuple2; > import static org.apache.spark.sql.functions.*; > import java.util.ArrayList; > import java.util.List; > import java.util.UUID; > import java.util.stream.Collectors; > import java.util.stream.IntStream; > public class Example3 { > public static void main(String... args) { > // create spark session > SparkSession spark = SparkSession.builder() > .appName("example1") > .master("local[4]") > .config("spark.driver.maxResultSize","1g") > .config("spark.driver.memory","512m") > .config("spark.executor.memory","512m") > .config("spark.local.dir","d:/temp/spark-tmp") > .getOrCreate(); > JavaSparkContext sc = > JavaSparkContext.fromSparkContext(spark.sparkContext()); > // base to generate huge data > List<Integer> list = new ArrayList<>(); > for (int val = 1; val < 10000; val++) { > int valueOf = Integer.valueOf(val); > list.add(valueOf); > } > // create simple rdd of int > JavaRDD<Integer> rdd = sc.parallelize(list,200); > // use map to create large object per row > JavaRDD<Row> rowRDD = > rdd > .map(value -> > RowFactory.create(String.valueOf(value), > createLongText(UUID.randomUUID().toString(), 2 * 1024 * 1024))) > // no persist => out of memory exception on write() > // persist MEMORY_AND_DISK => out of memory exception > on write() > // persist MEMORY_AND_DISK_SER => out of memory > exception on write() > // persist(StorageLevel.DISK_ONLY()) > ; > StructType type = new StructType(); > type = type > .add("c1", DataTypes.StringType) > .add( "c2", DataTypes.StringType ); > Dataset<Row> df = spark.createDataFrame(rowRDD, type); > // works > df.show(); > df = df > .sort(col("c1").asc() ) > ; > df.explain(); > // takes a lot of time but works > df.show(); > // OutOfMemoryError: java heap space > df > .write() > .mode("overwrite") > .csv("d:/temp/my.csv"); > // OutOfMemoryError: java heap space > df > .toJavaRDD() > .mapToPair(row -> new Tuple2(new Text(row.getString(0)), new > Text( row.getString(1)))) > .saveAsHadoopFile("d:\\temp\\foo", Text.class, Text.class, > SequenceFileOutputFormat.class ); > } > private static String createLongText( String text, int minLength ) { > StringBuffer sb = new StringBuffer(); > while( sb.length() < minLength ) { > sb.append(text); > } > return sb.toString(); > } > } > {code} > When using StorageLevel.MEMORY_AND_DISK(_SER) an oom crashes application at > partition 70 at heap usage of 3g from 4g available. > It seems sort does something like collect, an heap dump shows very large > array of array - possibly the partition contents. Also > spark.driver.maxResultSize is involved, so sort exceeds the default values. > Setting it to unlimited causes oom. > Why do I think this is a bug? > # Operation sort() should not involve maxResultSize > # MEMORY_AND_DISK should work at all and at least disk should be used. But I > see oom when reaching 3g of 4g total heap size. > -- This message was sent by Atlassian JIRA (v7.6.3#76005) --------------------------------------------------------------------- To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org For additional commands, e-mail: issues-h...@spark.apache.org