[jira] [Commented] (SPARK-24961) sort operation causes out of memory
[ https://issues.apache.org/jira/browse/SPARK-24961?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16868522#comment-16868522 ] Juha Iso-Sipilä commented on SPARK-24961: - I am using Spark 2.4.3 (pyspark) and I observe the same with my dataset. The dataset has two columns, identifier and document, both are strings. The document column can vary between 1kB and 5MB. I wanted to do the following to shuffle the order of the documents in my dataset. ``` df = spark.read.parquet('parquet_files') df.orderBy(fn.hash(*df.columns)).write.parquet('output_dir') ``` This fails with error suggesting I should increase maxResultSize. It feels wrong to me that the user of sorting API should know anything about the technical implementation or its limitations. Setting maxResultSize to 0 helps with my dataset but will it eventually fail with system OOM when I have even more data? > sort operation causes out of memory > > > Key: SPARK-24961 > URL: https://issues.apache.org/jira/browse/SPARK-24961 > Project: Spark > Issue Type: Bug > Components: Java API >Affects Versions: 2.3.1 > Environment: Java 1.8u144+ > Windows 10 > Spark 2.3.1 in local mode > -Xms4g -Xmx4g > optional: -XX:+UseParallelOldGC >Reporter: Markus Breuer >Priority: Major > > A sort operation on large rdd - which does not fit in memory - causes out of > memory exception. I made the effect reproducable by an sample, the sample > creates large object of about 2mb size. When saving result the oom occurs. I > tried several StorageLevels, but if memory is included (MEMORY_AND_DISK, > MEMORY_AND_DISK_SER, none) application runs in out of memory. Only DISK_ONLY > seems to work. > When replacing sort() with sortWithinPartitions() no StorageLevel is required > and application succeeds. > {code:java} > package de.bytefusion.examples; > import breeze.storage.Storage; > import de.bytefusion.utils.Options; > import org.apache.hadoop.io.MapFile; > import org.apache.hadoop.io.SequenceFile; > import org.apache.hadoop.io.Text; > import org.apache.hadoop.mapred.SequenceFileOutputFormat; > import org.apache.spark.api.java.JavaRDD; > import org.apache.spark.api.java.JavaSparkContext; > import org.apache.spark.sql.Dataset; > import org.apache.spark.sql.Row; > import org.apache.spark.sql.RowFactory; > import org.apache.spark.sql.SparkSession; > import org.apache.spark.sql.types.DataTypes; > import org.apache.spark.sql.types.StructType; > import org.apache.spark.storage.StorageLevel; > import scala.Tuple2; > import static org.apache.spark.sql.functions.*; > import java.util.ArrayList; > import java.util.List; > import java.util.UUID; > import java.util.stream.Collectors; > import java.util.stream.IntStream; > public class Example3 { > public static void main(String... args) { > // create spark session > SparkSession spark = SparkSession.builder() > .appName("example1") > .master("local[4]") > .config("spark.driver.maxResultSize","1g") > .config("spark.driver.memory","512m") > .config("spark.executor.memory","512m") > .config("spark.local.dir","d:/temp/spark-tmp") > .getOrCreate(); > JavaSparkContext sc = > JavaSparkContext.fromSparkContext(spark.sparkContext()); > // base to generate huge data > List list = new ArrayList<>(); > for (int val = 1; val < 1; val++) { > int valueOf = Integer.valueOf(val); > list.add(valueOf); > } > // create simple rdd of int > JavaRDD rdd = sc.parallelize(list,200); > // use map to create large object per row > JavaRDD rowRDD = > rdd > .map(value -> > RowFactory.create(String.valueOf(value), > createLongText(UUID.randomUUID().toString(), 2 * 1024 * 1024))) > // no persist => out of memory exception on write() > // persist MEMORY_AND_DISK => out of memory exception > on write() > // persist MEMORY_AND_DISK_SER => out of memory > exception on write() > // persist(StorageLevel.DISK_ONLY()) > ; > StructType type = new StructType(); > type = type > .add("c1", DataTypes.StringType) > .add( "c2", DataTypes.StringType ); > Dataset df = spark.createDataFrame(rowRDD, type); > // works > df.show(); > df = df > .sort(col("c1").asc() ) > ; > df.explain(); > // takes a lot of time but works > df.show(); > // OutOfMemoryError: java heap space > df > .write() > .mode("overwrite") >
[jira] [Commented] (SPARK-24961) sort operation causes out of memory
[ https://issues.apache.org/jira/browse/SPARK-24961?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16758557#comment-16758557 ] Mono Shiro commented on SPARK-24961: Spark Version 2.3.2. I have a very similar issue when simply reading a file that is bigger than the available memory on my machine. Changing the StorageLevel to DISK_ONLY also blows up despite having ample space. [Please see the question on stackoverflow|https://stackoverflow.com/questions/54469243/spark-storagelevel-in-local-mode-not-working/54470393#54470393] It's important that local mode work for these sort of things. > sort operation causes out of memory > > > Key: SPARK-24961 > URL: https://issues.apache.org/jira/browse/SPARK-24961 > Project: Spark > Issue Type: Bug > Components: Java API >Affects Versions: 2.3.1 > Environment: Java 1.8u144+ > Windows 10 > Spark 2.3.1 in local mode > -Xms4g -Xmx4g > optional: -XX:+UseParallelOldGC >Reporter: Markus Breuer >Priority: Major > > A sort operation on large rdd - which does not fit in memory - causes out of > memory exception. I made the effect reproducable by an sample, the sample > creates large object of about 2mb size. When saving result the oom occurs. I > tried several StorageLevels, but if memory is included (MEMORY_AND_DISK, > MEMORY_AND_DISK_SER, none) application runs in out of memory. Only DISK_ONLY > seems to work. > When replacing sort() with sortWithinPartitions() no StorageLevel is required > and application succeeds. > {code:java} > package de.bytefusion.examples; > import breeze.storage.Storage; > import de.bytefusion.utils.Options; > import org.apache.hadoop.io.MapFile; > import org.apache.hadoop.io.SequenceFile; > import org.apache.hadoop.io.Text; > import org.apache.hadoop.mapred.SequenceFileOutputFormat; > import org.apache.spark.api.java.JavaRDD; > import org.apache.spark.api.java.JavaSparkContext; > import org.apache.spark.sql.Dataset; > import org.apache.spark.sql.Row; > import org.apache.spark.sql.RowFactory; > import org.apache.spark.sql.SparkSession; > import org.apache.spark.sql.types.DataTypes; > import org.apache.spark.sql.types.StructType; > import org.apache.spark.storage.StorageLevel; > import scala.Tuple2; > import static org.apache.spark.sql.functions.*; > import java.util.ArrayList; > import java.util.List; > import java.util.UUID; > import java.util.stream.Collectors; > import java.util.stream.IntStream; > public class Example3 { > public static void main(String... args) { > // create spark session > SparkSession spark = SparkSession.builder() > .appName("example1") > .master("local[4]") > .config("spark.driver.maxResultSize","1g") > .config("spark.driver.memory","512m") > .config("spark.executor.memory","512m") > .config("spark.local.dir","d:/temp/spark-tmp") > .getOrCreate(); > JavaSparkContext sc = > JavaSparkContext.fromSparkContext(spark.sparkContext()); > // base to generate huge data > List list = new ArrayList<>(); > for (int val = 1; val < 1; val++) { > int valueOf = Integer.valueOf(val); > list.add(valueOf); > } > // create simple rdd of int > JavaRDD rdd = sc.parallelize(list,200); > // use map to create large object per row > JavaRDD rowRDD = > rdd > .map(value -> > RowFactory.create(String.valueOf(value), > createLongText(UUID.randomUUID().toString(), 2 * 1024 * 1024))) > // no persist => out of memory exception on write() > // persist MEMORY_AND_DISK => out of memory exception > on write() > // persist MEMORY_AND_DISK_SER => out of memory > exception on write() > // persist(StorageLevel.DISK_ONLY()) > ; > StructType type = new StructType(); > type = type > .add("c1", DataTypes.StringType) > .add( "c2", DataTypes.StringType ); > Dataset df = spark.createDataFrame(rowRDD, type); > // works > df.show(); > df = df > .sort(col("c1").asc() ) > ; > df.explain(); > // takes a lot of time but works > df.show(); > // OutOfMemoryError: java heap space > df > .write() > .mode("overwrite") > .csv("d:/temp/my.csv"); > // OutOfMemoryError: java heap space > df > .toJavaRDD() > .mapToPair(row -> new Tuple2(new Text(row.getString(0)), new > Text( row.getString(1 > .saveAsHadoopFile("d:\\temp\\foo", Text.class,
[jira] [Commented] (SPARK-24961) sort operation causes out of memory
[ https://issues.apache.org/jira/browse/SPARK-24961?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16587850#comment-16587850 ] Markus Breuer commented on SPARK-24961: --- Some notes to your last comment: Reducing number of partitions is no option. When reducing number of partitions the size per partition grows up. A partition is a unit of work per task. Smaller partitions will reduce a tasks memory consumptions. Spark has a default for 200 partitions and is a good idea to increase number amount of partitions for very large data sets. Is this correct? Any idea why the proposal to modify {{spark.sql.execution.rangeExchange.sampleSizePerPartition}} has no effect? > sort operation causes out of memory > > > Key: SPARK-24961 > URL: https://issues.apache.org/jira/browse/SPARK-24961 > Project: Spark > Issue Type: Bug > Components: Java API >Affects Versions: 2.3.1 > Environment: Java 1.8u144+ > Windows 10 > Spark 2.3.1 in local mode > -Xms4g -Xmx4g > optional: -XX:+UseParallelOldGC >Reporter: Markus Breuer >Priority: Major > > A sort operation on large rdd - which does not fit in memory - causes out of > memory exception. I made the effect reproducable by an sample, the sample > creates large object of about 2mb size. When saving result the oom occurs. I > tried several StorageLevels, but if memory is included (MEMORY_AND_DISK, > MEMORY_AND_DISK_SER, none) application runs in out of memory. Only DISK_ONLY > seems to work. > When replacing sort() with sortWithinPartitions() no StorageLevel is required > and application succeeds. > {code:java} > package de.bytefusion.examples; > import breeze.storage.Storage; > import de.bytefusion.utils.Options; > import org.apache.hadoop.io.MapFile; > import org.apache.hadoop.io.SequenceFile; > import org.apache.hadoop.io.Text; > import org.apache.hadoop.mapred.SequenceFileOutputFormat; > import org.apache.spark.api.java.JavaRDD; > import org.apache.spark.api.java.JavaSparkContext; > import org.apache.spark.sql.Dataset; > import org.apache.spark.sql.Row; > import org.apache.spark.sql.RowFactory; > import org.apache.spark.sql.SparkSession; > import org.apache.spark.sql.types.DataTypes; > import org.apache.spark.sql.types.StructType; > import org.apache.spark.storage.StorageLevel; > import scala.Tuple2; > import static org.apache.spark.sql.functions.*; > import java.util.ArrayList; > import java.util.List; > import java.util.UUID; > import java.util.stream.Collectors; > import java.util.stream.IntStream; > public class Example3 { > public static void main(String... args) { > // create spark session > SparkSession spark = SparkSession.builder() > .appName("example1") > .master("local[4]") > .config("spark.driver.maxResultSize","1g") > .config("spark.driver.memory","512m") > .config("spark.executor.memory","512m") > .config("spark.local.dir","d:/temp/spark-tmp") > .getOrCreate(); > JavaSparkContext sc = > JavaSparkContext.fromSparkContext(spark.sparkContext()); > // base to generate huge data > List list = new ArrayList<>(); > for (int val = 1; val < 1; val++) { > int valueOf = Integer.valueOf(val); > list.add(valueOf); > } > // create simple rdd of int > JavaRDD rdd = sc.parallelize(list,200); > // use map to create large object per row > JavaRDD rowRDD = > rdd > .map(value -> > RowFactory.create(String.valueOf(value), > createLongText(UUID.randomUUID().toString(), 2 * 1024 * 1024))) > // no persist => out of memory exception on write() > // persist MEMORY_AND_DISK => out of memory exception > on write() > // persist MEMORY_AND_DISK_SER => out of memory > exception on write() > // persist(StorageLevel.DISK_ONLY()) > ; > StructType type = new StructType(); > type = type > .add("c1", DataTypes.StringType) > .add( "c2", DataTypes.StringType ); > Dataset df = spark.createDataFrame(rowRDD, type); > // works > df.show(); > df = df > .sort(col("c1").asc() ) > ; > df.explain(); > // takes a lot of time but works > df.show(); > // OutOfMemoryError: java heap space > df > .write() > .mode("overwrite") > .csv("d:/temp/my.csv"); > // OutOfMemoryError: java heap space > df > .toJavaRDD() > .mapToPair(row -> new Tuple2(new Text(row.getString(0)), new > Text(
[jira] [Commented] (SPARK-24961) sort operation causes out of memory
[ https://issues.apache.org/jira/browse/SPARK-24961?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16585258#comment-16585258 ] Markus Breuer commented on SPARK-24961: --- I retested the above code and had to correct setting {color:#d04437}spark.driver.maxResultSize{color:#33} to{color} 0 {color:#33}(no limit), otherwise task had been cancelled. I am not sure why it happened this time - probably a different java version or my mistake - so this code only runs in oom when result is set to unlimited.{color}{color} Than I used values from 10,25,50,100,2000 for {color:#d04437}spark.sql.execution.rangeExchange.sampleSizePerPartition{color:#33}, but task runs in oom at partition 70 of 200.{color}{color} {color:#d04437}{color:#33}Finally I used partition count from 20,200,2000 but this also makes no difference. When using more partitions the counter increases a bit more, but oom occurs - I think - at nearly same position.{color}{color} {color:#d04437}{color:#33}MaxResultSize and out of memory fits together. Sorting this large objects seem to use much memory and cause oom. From behavior I think spark tries to collect whole result to driver before writing it to disk. I removed df.show() from code, so that write() is the only operation. There should be no difference from local to distributed mode here, write() should have same bahavior, shouldn't it?{color}{color} {color:#d04437}{color:#33}I think maxResultSize should have no effect to any sort-operation. What do you think?{color}{color} > sort operation causes out of memory > > > Key: SPARK-24961 > URL: https://issues.apache.org/jira/browse/SPARK-24961 > Project: Spark > Issue Type: Bug > Components: Java API >Affects Versions: 2.3.1 > Environment: Java 1.8u144+ > Windows 10 > Spark 2.3.1 in local mode > -Xms4g -Xmx4g > optional: -XX:+UseParallelOldGC >Reporter: Markus Breuer >Priority: Major > > A sort operation on large rdd - which does not fit in memory - causes out of > memory exception. I made the effect reproducable by an sample, the sample > creates large object of about 2mb size. When saving result the oom occurs. I > tried several StorageLevels, but if memory is included (MEMORY_AND_DISK, > MEMORY_AND_DISK_SER, none) application runs in out of memory. Only DISK_ONLY > seems to work. > When replacing sort() with sortWithinPartitions() no StorageLevel is required > and application succeeds. > {code:java} > package de.bytefusion.examples; > import breeze.storage.Storage; > import de.bytefusion.utils.Options; > import org.apache.hadoop.io.MapFile; > import org.apache.hadoop.io.SequenceFile; > import org.apache.hadoop.io.Text; > import org.apache.hadoop.mapred.SequenceFileOutputFormat; > import org.apache.spark.api.java.JavaRDD; > import org.apache.spark.api.java.JavaSparkContext; > import org.apache.spark.sql.Dataset; > import org.apache.spark.sql.Row; > import org.apache.spark.sql.RowFactory; > import org.apache.spark.sql.SparkSession; > import org.apache.spark.sql.types.DataTypes; > import org.apache.spark.sql.types.StructType; > import org.apache.spark.storage.StorageLevel; > import scala.Tuple2; > import static org.apache.spark.sql.functions.*; > import java.util.ArrayList; > import java.util.List; > import java.util.UUID; > import java.util.stream.Collectors; > import java.util.stream.IntStream; > public class Example3 { > public static void main(String... args) { > // create spark session > SparkSession spark = SparkSession.builder() > .appName("example1") > .master("local[4]") > .config("spark.driver.maxResultSize","1g") > .config("spark.driver.memory","512m") > .config("spark.executor.memory","512m") > .config("spark.local.dir","d:/temp/spark-tmp") > .getOrCreate(); > JavaSparkContext sc = > JavaSparkContext.fromSparkContext(spark.sparkContext()); > // base to generate huge data > List list = new ArrayList<>(); > for (int val = 1; val < 1; val++) { > int valueOf = Integer.valueOf(val); > list.add(valueOf); > } > // create simple rdd of int > JavaRDD rdd = sc.parallelize(list,200); > // use map to create large object per row > JavaRDD rowRDD = > rdd > .map(value -> > RowFactory.create(String.valueOf(value), > createLongText(UUID.randomUUID().toString(), 2 * 1024 * 1024))) > // no persist => out of memory exception on write() > // persist MEMORY_AND_DISK => out of memory exception > on write() > // persist MEMORY_AND_DISK_SER => out of memory > exception on
[jira] [Commented] (SPARK-24961) sort operation causes out of memory
[ https://issues.apache.org/jira/browse/SPARK-24961?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16584961#comment-16584961 ] Liang-Chi Hsieh commented on SPARK-24961: - When you run global sort, Spark will do data sampling for range shuffle. How many data is sampled per partition is controlled by this config {{spark.sql.execution.rangeExchange.sampleSizePerPartition}}. This is an internal config. As the memory size is decided by sampling size, number of partition and row size, that is why when you increased row size, it causes OOM. So you can set lower partition number or {{spark.sql.execution.rangeExchange.sampleSizePerPartition}} to avoid the OOM. > sort operation causes out of memory > > > Key: SPARK-24961 > URL: https://issues.apache.org/jira/browse/SPARK-24961 > Project: Spark > Issue Type: Bug > Components: Java API >Affects Versions: 2.3.1 > Environment: Java 1.8u144+ > Windows 10 > Spark 2.3.1 in local mode > -Xms4g -Xmx4g > optional: -XX:+UseParallelOldGC >Reporter: Markus Breuer >Priority: Major > > A sort operation on large rdd - which does not fit in memory - causes out of > memory exception. I made the effect reproducable by an sample, the sample > creates large object of about 2mb size. When saving result the oom occurs. I > tried several StorageLevels, but if memory is included (MEMORY_AND_DISK, > MEMORY_AND_DISK_SER, none) application runs in out of memory. Only DISK_ONLY > seems to work. > When replacing sort() with sortWithinPartitions() no StorageLevel is required > and application succeeds. > {code:java} > package de.bytefusion.examples; > import breeze.storage.Storage; > import de.bytefusion.utils.Options; > import org.apache.hadoop.io.MapFile; > import org.apache.hadoop.io.SequenceFile; > import org.apache.hadoop.io.Text; > import org.apache.hadoop.mapred.SequenceFileOutputFormat; > import org.apache.spark.api.java.JavaRDD; > import org.apache.spark.api.java.JavaSparkContext; > import org.apache.spark.sql.Dataset; > import org.apache.spark.sql.Row; > import org.apache.spark.sql.RowFactory; > import org.apache.spark.sql.SparkSession; > import org.apache.spark.sql.types.DataTypes; > import org.apache.spark.sql.types.StructType; > import org.apache.spark.storage.StorageLevel; > import scala.Tuple2; > import static org.apache.spark.sql.functions.*; > import java.util.ArrayList; > import java.util.List; > import java.util.UUID; > import java.util.stream.Collectors; > import java.util.stream.IntStream; > public class Example3 { > public static void main(String... args) { > // create spark session > SparkSession spark = SparkSession.builder() > .appName("example1") > .master("local[4]") > .config("spark.driver.maxResultSize","1g") > .config("spark.driver.memory","512m") > .config("spark.executor.memory","512m") > .config("spark.local.dir","d:/temp/spark-tmp") > .getOrCreate(); > JavaSparkContext sc = > JavaSparkContext.fromSparkContext(spark.sparkContext()); > // base to generate huge data > List list = new ArrayList<>(); > for (int val = 1; val < 1; val++) { > int valueOf = Integer.valueOf(val); > list.add(valueOf); > } > // create simple rdd of int > JavaRDD rdd = sc.parallelize(list,200); > // use map to create large object per row > JavaRDD rowRDD = > rdd > .map(value -> > RowFactory.create(String.valueOf(value), > createLongText(UUID.randomUUID().toString(), 2 * 1024 * 1024))) > // no persist => out of memory exception on write() > // persist MEMORY_AND_DISK => out of memory exception > on write() > // persist MEMORY_AND_DISK_SER => out of memory > exception on write() > // persist(StorageLevel.DISK_ONLY()) > ; > StructType type = new StructType(); > type = type > .add("c1", DataTypes.StringType) > .add( "c2", DataTypes.StringType ); > Dataset df = spark.createDataFrame(rowRDD, type); > // works > df.show(); > df = df > .sort(col("c1").asc() ) > ; > df.explain(); > // takes a lot of time but works > df.show(); > // OutOfMemoryError: java heap space > df > .write() > .mode("overwrite") > .csv("d:/temp/my.csv"); > // OutOfMemoryError: java heap space > df > .toJavaRDD() > .mapToPair(row -> new Tuple2(new Text(row.getString(0)), new > Text(
[jira] [Commented] (SPARK-24961) sort operation causes out of memory
[ https://issues.apache.org/jira/browse/SPARK-24961?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16584955#comment-16584955 ] Markus Breuer commented on SPARK-24961: --- I started without specifying memory level and ran into oom issues. When setting to DISK_ONLY application passed but with very poor performance.I also tried to change number of partitions from many (2000 to 1) and fewer (200), but application ran into oom. My understanding of spark is, that the number of executors/cores may affect the sort performance. So a distributed cluster may be faster on sorting. But in general the local mode should also do the work. In memory analyzer I see a large array of data, each element has the size of some hundret megabytes. I think this will break a distributed cluster, too. My experience is, that spark processes large amounts of data in local mode. But when single row object exceeds a size of several kbytes, than oom occurs on large datasets. I sorted amounts of 200gb with only 4gb ram in local mode. But when increasing row objects to sized of 100kbyte or more, than spark runs into oom. I also tried sortWithinPartition on large objects. This works fine. But whenever I try to merge partitions, I run into oom. I asked me why this occurs, because if partitions are sorted, a shuffle should easy merge the partitions. I think spark tries to merge them in memory, this would explain the large arrays in memory analyzer. {{spark.sql.execution.rangeExchange.sampleSizePerPartition => What values should I use? Are there any docs about this setting? Official docs do not mention this setting.}} > sort operation causes out of memory > > > Key: SPARK-24961 > URL: https://issues.apache.org/jira/browse/SPARK-24961 > Project: Spark > Issue Type: Bug > Components: Java API >Affects Versions: 2.3.1 > Environment: Java 1.8u144+ > Windows 10 > Spark 2.3.1 in local mode > -Xms4g -Xmx4g > optional: -XX:+UseParallelOldGC >Reporter: Markus Breuer >Priority: Major > > A sort operation on large rdd - which does not fit in memory - causes out of > memory exception. I made the effect reproducable by an sample, the sample > creates large object of about 2mb size. When saving result the oom occurs. I > tried several StorageLevels, but if memory is included (MEMORY_AND_DISK, > MEMORY_AND_DISK_SER, none) application runs in out of memory. Only DISK_ONLY > seems to work. > When replacing sort() with sortWithinPartitions() no StorageLevel is required > and application succeeds. > {code:java} > package de.bytefusion.examples; > import breeze.storage.Storage; > import de.bytefusion.utils.Options; > import org.apache.hadoop.io.MapFile; > import org.apache.hadoop.io.SequenceFile; > import org.apache.hadoop.io.Text; > import org.apache.hadoop.mapred.SequenceFileOutputFormat; > import org.apache.spark.api.java.JavaRDD; > import org.apache.spark.api.java.JavaSparkContext; > import org.apache.spark.sql.Dataset; > import org.apache.spark.sql.Row; > import org.apache.spark.sql.RowFactory; > import org.apache.spark.sql.SparkSession; > import org.apache.spark.sql.types.DataTypes; > import org.apache.spark.sql.types.StructType; > import org.apache.spark.storage.StorageLevel; > import scala.Tuple2; > import static org.apache.spark.sql.functions.*; > import java.util.ArrayList; > import java.util.List; > import java.util.UUID; > import java.util.stream.Collectors; > import java.util.stream.IntStream; > public class Example3 { > public static void main(String... args) { > // create spark session > SparkSession spark = SparkSession.builder() > .appName("example1") > .master("local[4]") > .config("spark.driver.maxResultSize","1g") > .config("spark.driver.memory","512m") > .config("spark.executor.memory","512m") > .config("spark.local.dir","d:/temp/spark-tmp") > .getOrCreate(); > JavaSparkContext sc = > JavaSparkContext.fromSparkContext(spark.sparkContext()); > // base to generate huge data > List list = new ArrayList<>(); > for (int val = 1; val < 1; val++) { > int valueOf = Integer.valueOf(val); > list.add(valueOf); > } > // create simple rdd of int > JavaRDD rdd = sc.parallelize(list,200); > // use map to create large object per row > JavaRDD rowRDD = > rdd > .map(value -> > RowFactory.create(String.valueOf(value), > createLongText(UUID.randomUUID().toString(), 2 * 1024 * 1024))) > // no persist => out of memory exception on write() > // persist MEMORY_AND_DISK => out of memory exception > on write() >
[jira] [Commented] (SPARK-24961) sort operation causes out of memory
[ https://issues.apache.org/jira/browse/SPARK-24961?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16584944#comment-16584944 ] Liang-Chi Hsieh commented on SPARK-24961: - I think it seems not common or practical to use local mode to handle very big data and do global sorting like that. As mentioned above, the global sorting involves data sampling which is configured by the sample size per partition and the number of partitions. So it is preferably used in distributed mode instead of local mode. In local mode, if you want to use global sorting like that, maybe you can have fewer partitions of data to relieve memory usage on data sampling. > sort operation causes out of memory > > > Key: SPARK-24961 > URL: https://issues.apache.org/jira/browse/SPARK-24961 > Project: Spark > Issue Type: Bug > Components: Java API >Affects Versions: 2.3.1 > Environment: Java 1.8u144+ > Windows 10 > Spark 2.3.1 in local mode > -Xms4g -Xmx4g > optional: -XX:+UseParallelOldGC >Reporter: Markus Breuer >Priority: Major > > A sort operation on large rdd - which does not fit in memory - causes out of > memory exception. I made the effect reproducable by an sample, the sample > creates large object of about 2mb size. When saving result the oom occurs. I > tried several StorageLevels, but if memory is included (MEMORY_AND_DISK, > MEMORY_AND_DISK_SER, none) application runs in out of memory. Only DISK_ONLY > seems to work. > When replacing sort() with sortWithinPartitions() no StorageLevel is required > and application succeeds. > {code:java} > package de.bytefusion.examples; > import breeze.storage.Storage; > import de.bytefusion.utils.Options; > import org.apache.hadoop.io.MapFile; > import org.apache.hadoop.io.SequenceFile; > import org.apache.hadoop.io.Text; > import org.apache.hadoop.mapred.SequenceFileOutputFormat; > import org.apache.spark.api.java.JavaRDD; > import org.apache.spark.api.java.JavaSparkContext; > import org.apache.spark.sql.Dataset; > import org.apache.spark.sql.Row; > import org.apache.spark.sql.RowFactory; > import org.apache.spark.sql.SparkSession; > import org.apache.spark.sql.types.DataTypes; > import org.apache.spark.sql.types.StructType; > import org.apache.spark.storage.StorageLevel; > import scala.Tuple2; > import static org.apache.spark.sql.functions.*; > import java.util.ArrayList; > import java.util.List; > import java.util.UUID; > import java.util.stream.Collectors; > import java.util.stream.IntStream; > public class Example3 { > public static void main(String... args) { > // create spark session > SparkSession spark = SparkSession.builder() > .appName("example1") > .master("local[4]") > .config("spark.driver.maxResultSize","1g") > .config("spark.driver.memory","512m") > .config("spark.executor.memory","512m") > .config("spark.local.dir","d:/temp/spark-tmp") > .getOrCreate(); > JavaSparkContext sc = > JavaSparkContext.fromSparkContext(spark.sparkContext()); > // base to generate huge data > List list = new ArrayList<>(); > for (int val = 1; val < 1; val++) { > int valueOf = Integer.valueOf(val); > list.add(valueOf); > } > // create simple rdd of int > JavaRDD rdd = sc.parallelize(list,200); > // use map to create large object per row > JavaRDD rowRDD = > rdd > .map(value -> > RowFactory.create(String.valueOf(value), > createLongText(UUID.randomUUID().toString(), 2 * 1024 * 1024))) > // no persist => out of memory exception on write() > // persist MEMORY_AND_DISK => out of memory exception > on write() > // persist MEMORY_AND_DISK_SER => out of memory > exception on write() > // persist(StorageLevel.DISK_ONLY()) > ; > StructType type = new StructType(); > type = type > .add("c1", DataTypes.StringType) > .add( "c2", DataTypes.StringType ); > Dataset df = spark.createDataFrame(rowRDD, type); > // works > df.show(); > df = df > .sort(col("c1").asc() ) > ; > df.explain(); > // takes a lot of time but works > df.show(); > // OutOfMemoryError: java heap space > df > .write() > .mode("overwrite") > .csv("d:/temp/my.csv"); > // OutOfMemoryError: java heap space > df > .toJavaRDD() > .mapToPair(row -> new Tuple2(new Text(row.getString(0)), new > Text( row.getString(1 >
[jira] [Commented] (SPARK-24961) sort operation causes out of memory
[ https://issues.apache.org/jira/browse/SPARK-24961?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16584943#comment-16584943 ] Liang-Chi Hsieh commented on SPARK-24961: - I saw you described {{Spark 2.3.1 in local mode}} in {{Environment}}. I think it is happened because once you use {{StorageLevels}} with memory, it occupied some of memory on local. When it runs sort, Spark will sample data from RDD using {{collect}} and cause OOM. Can you try to decrease {{spark.sql.execution.rangeExchange.sampleSizePerPartition}} or decrease the number of partitions? > sort operation causes out of memory > > > Key: SPARK-24961 > URL: https://issues.apache.org/jira/browse/SPARK-24961 > Project: Spark > Issue Type: Bug > Components: Java API >Affects Versions: 2.3.1 > Environment: Java 1.8u144+ > Windows 10 > Spark 2.3.1 in local mode > -Xms4g -Xmx4g > optional: -XX:+UseParallelOldGC >Reporter: Markus Breuer >Priority: Major > > A sort operation on large rdd - which does not fit in memory - causes out of > memory exception. I made the effect reproducable by an sample, the sample > creates large object of about 2mb size. When saving result the oom occurs. I > tried several StorageLevels, but if memory is included (MEMORY_AND_DISK, > MEMORY_AND_DISK_SER, none) application runs in out of memory. Only DISK_ONLY > seems to work. > When replacing sort() with sortWithinPartitions() no StorageLevel is required > and application succeeds. > {code:java} > package de.bytefusion.examples; > import breeze.storage.Storage; > import de.bytefusion.utils.Options; > import org.apache.hadoop.io.MapFile; > import org.apache.hadoop.io.SequenceFile; > import org.apache.hadoop.io.Text; > import org.apache.hadoop.mapred.SequenceFileOutputFormat; > import org.apache.spark.api.java.JavaRDD; > import org.apache.spark.api.java.JavaSparkContext; > import org.apache.spark.sql.Dataset; > import org.apache.spark.sql.Row; > import org.apache.spark.sql.RowFactory; > import org.apache.spark.sql.SparkSession; > import org.apache.spark.sql.types.DataTypes; > import org.apache.spark.sql.types.StructType; > import org.apache.spark.storage.StorageLevel; > import scala.Tuple2; > import static org.apache.spark.sql.functions.*; > import java.util.ArrayList; > import java.util.List; > import java.util.UUID; > import java.util.stream.Collectors; > import java.util.stream.IntStream; > public class Example3 { > public static void main(String... args) { > // create spark session > SparkSession spark = SparkSession.builder() > .appName("example1") > .master("local[4]") > .config("spark.driver.maxResultSize","1g") > .config("spark.driver.memory","512m") > .config("spark.executor.memory","512m") > .config("spark.local.dir","d:/temp/spark-tmp") > .getOrCreate(); > JavaSparkContext sc = > JavaSparkContext.fromSparkContext(spark.sparkContext()); > // base to generate huge data > List list = new ArrayList<>(); > for (int val = 1; val < 1; val++) { > int valueOf = Integer.valueOf(val); > list.add(valueOf); > } > // create simple rdd of int > JavaRDD rdd = sc.parallelize(list,200); > // use map to create large object per row > JavaRDD rowRDD = > rdd > .map(value -> > RowFactory.create(String.valueOf(value), > createLongText(UUID.randomUUID().toString(), 2 * 1024 * 1024))) > // no persist => out of memory exception on write() > // persist MEMORY_AND_DISK => out of memory exception > on write() > // persist MEMORY_AND_DISK_SER => out of memory > exception on write() > // persist(StorageLevel.DISK_ONLY()) > ; > StructType type = new StructType(); > type = type > .add("c1", DataTypes.StringType) > .add( "c2", DataTypes.StringType ); > Dataset df = spark.createDataFrame(rowRDD, type); > // works > df.show(); > df = df > .sort(col("c1").asc() ) > ; > df.explain(); > // takes a lot of time but works > df.show(); > // OutOfMemoryError: java heap space > df > .write() > .mode("overwrite") > .csv("d:/temp/my.csv"); > // OutOfMemoryError: java heap space > df > .toJavaRDD() > .mapToPair(row -> new Tuple2(new Text(row.getString(0)), new > Text( row.getString(1 > .saveAsHadoopFile("d:\\temp\\foo", Text.class, Text.class, >
[jira] [Commented] (SPARK-24961) sort operation causes out of memory
[ https://issues.apache.org/jira/browse/SPARK-24961?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16584929#comment-16584929 ] Markus Breuer commented on SPARK-24961: --- There was no feedback from stackoverflow and mailing list yet. Meanwhile I tried several variations of the above example but alway ran into oom issues. It seems spark always uses an operation like collect() when merging results. Also size of row objects affects this issue. I sorted millions of smaller object an never ran into oom. Also memory profiler shows very low memory usage on small objects. But larger object - my was 2-4megabytes - causes an oom. I understand that spark 2.3.1 uses mergeSort, which should not run into such issues. Probably memory management is calculated for smaller objects and actual object size is ignored? Origin of this issue was a large sequence file with objects (4mb) which ran into oom. I reduced the objects by splitting them to several rows of 4kb size and added incrementing number to sort key. Now, the same amount of data is sorted without memory issues. > sort operation causes out of memory > > > Key: SPARK-24961 > URL: https://issues.apache.org/jira/browse/SPARK-24961 > Project: Spark > Issue Type: Bug > Components: Java API >Affects Versions: 2.3.1 > Environment: Java 1.8u144+ > Windows 10 > Spark 2.3.1 in local mode > -Xms4g -Xmx4g > optional: -XX:+UseParallelOldGC >Reporter: Markus Breuer >Priority: Major > > A sort operation on large rdd - which does not fit in memory - causes out of > memory exception. I made the effect reproducable by an sample, the sample > creates large object of about 2mb size. When saving result the oom occurs. I > tried several StorageLevels, but if memory is included (MEMORY_AND_DISK, > MEMORY_AND_DISK_SER, none) application runs in out of memory. Only DISK_ONLY > seems to work. > When replacing sort() with sortWithinPartitions() no StorageLevel is required > and application succeeds. > {code:java} > package de.bytefusion.examples; > import breeze.storage.Storage; > import de.bytefusion.utils.Options; > import org.apache.hadoop.io.MapFile; > import org.apache.hadoop.io.SequenceFile; > import org.apache.hadoop.io.Text; > import org.apache.hadoop.mapred.SequenceFileOutputFormat; > import org.apache.spark.api.java.JavaRDD; > import org.apache.spark.api.java.JavaSparkContext; > import org.apache.spark.sql.Dataset; > import org.apache.spark.sql.Row; > import org.apache.spark.sql.RowFactory; > import org.apache.spark.sql.SparkSession; > import org.apache.spark.sql.types.DataTypes; > import org.apache.spark.sql.types.StructType; > import org.apache.spark.storage.StorageLevel; > import scala.Tuple2; > import static org.apache.spark.sql.functions.*; > import java.util.ArrayList; > import java.util.List; > import java.util.UUID; > import java.util.stream.Collectors; > import java.util.stream.IntStream; > public class Example3 { > public static void main(String... args) { > // create spark session > SparkSession spark = SparkSession.builder() > .appName("example1") > .master("local[4]") > .config("spark.driver.maxResultSize","1g") > .config("spark.driver.memory","512m") > .config("spark.executor.memory","512m") > .config("spark.local.dir","d:/temp/spark-tmp") > .getOrCreate(); > JavaSparkContext sc = > JavaSparkContext.fromSparkContext(spark.sparkContext()); > // base to generate huge data > List list = new ArrayList<>(); > for (int val = 1; val < 1; val++) { > int valueOf = Integer.valueOf(val); > list.add(valueOf); > } > // create simple rdd of int > JavaRDD rdd = sc.parallelize(list,200); > // use map to create large object per row > JavaRDD rowRDD = > rdd > .map(value -> > RowFactory.create(String.valueOf(value), > createLongText(UUID.randomUUID().toString(), 2 * 1024 * 1024))) > // no persist => out of memory exception on write() > // persist MEMORY_AND_DISK => out of memory exception > on write() > // persist MEMORY_AND_DISK_SER => out of memory > exception on write() > // persist(StorageLevel.DISK_ONLY()) > ; > StructType type = new StructType(); > type = type > .add("c1", DataTypes.StringType) > .add( "c2", DataTypes.StringType ); > Dataset df = spark.createDataFrame(rowRDD, type); > // works > df.show(); > df = df > .sort(col("c1").asc() ) > ; > df.explain(); >
[jira] [Commented] (SPARK-24961) sort operation causes out of memory
[ https://issues.apache.org/jira/browse/SPARK-24961?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16562359#comment-16562359 ] Markus Breuer commented on SPARK-24961: --- I think it is an issue and I explained why. But I also posted to stackoverflow (some days ago) and adressed mailinglist, but got no feedback on this issue. Probably there is no easy answer to this issue, but probably my example helps to reproduce it. > sort operation causes out of memory > > > Key: SPARK-24961 > URL: https://issues.apache.org/jira/browse/SPARK-24961 > Project: Spark > Issue Type: Bug > Components: Java API >Affects Versions: 2.3.1 > Environment: Java 1.8u144+ > Windows 10 > Spark 2.3.1 in local mode > -Xms4g -Xmx4g > optional: -XX:+UseParallelOldGC >Reporter: Markus Breuer >Priority: Major > > A sort operation on large rdd - which does not fit in memory - causes out of > memory exception. I made the effect reproducable by an sample, the sample > creates large object of about 2mb size. When saving result the oom occurs. I > tried several StorageLevels, but if memory is included (MEMORY_AND_DISK, > MEMORY_AND_DISK_SER, none) application runs in out of memory. Only DISK_ONLY > seems to work. > When replacing sort() with sortWithinPartitions() no StorageLevel is required > and application succeeds. > {code:java} > package de.bytefusion.examples; > import breeze.storage.Storage; > import de.bytefusion.utils.Options; > import org.apache.hadoop.io.MapFile; > import org.apache.hadoop.io.SequenceFile; > import org.apache.hadoop.io.Text; > import org.apache.hadoop.mapred.SequenceFileOutputFormat; > import org.apache.spark.api.java.JavaRDD; > import org.apache.spark.api.java.JavaSparkContext; > import org.apache.spark.sql.Dataset; > import org.apache.spark.sql.Row; > import org.apache.spark.sql.RowFactory; > import org.apache.spark.sql.SparkSession; > import org.apache.spark.sql.types.DataTypes; > import org.apache.spark.sql.types.StructType; > import org.apache.spark.storage.StorageLevel; > import scala.Tuple2; > import static org.apache.spark.sql.functions.*; > import java.util.ArrayList; > import java.util.List; > import java.util.UUID; > import java.util.stream.Collectors; > import java.util.stream.IntStream; > public class Example3 { > public static void main(String... args) { > // create spark session > SparkSession spark = SparkSession.builder() > .appName("example1") > .master("local[4]") > .config("spark.driver.maxResultSize","1g") > .config("spark.driver.memory","512m") > .config("spark.executor.memory","512m") > .config("spark.local.dir","d:/temp/spark-tmp") > .getOrCreate(); > JavaSparkContext sc = > JavaSparkContext.fromSparkContext(spark.sparkContext()); > // base to generate huge data > List list = new ArrayList<>(); > for (int val = 1; val < 1; val++) { > int valueOf = Integer.valueOf(val); > list.add(valueOf); > } > // create simple rdd of int > JavaRDD rdd = sc.parallelize(list,200); > // use map to create large object per row > JavaRDD rowRDD = > rdd > .map(value -> > RowFactory.create(String.valueOf(value), > createLongText(UUID.randomUUID().toString(), 2 * 1024 * 1024))) > // no persist => out of memory exception on write() > // persist MEMORY_AND_DISK => out of memory exception > on write() > // persist MEMORY_AND_DISK_SER => out of memory > exception on write() > // persist(StorageLevel.DISK_ONLY()) > ; > StructType type = new StructType(); > type = type > .add("c1", DataTypes.StringType) > .add( "c2", DataTypes.StringType ); > Dataset df = spark.createDataFrame(rowRDD, type); > // works > df.show(); > df = df > .sort(col("c1").asc() ) > ; > df.explain(); > // takes a lot of time but works > df.show(); > // OutOfMemoryError: java heap space > df > .write() > .mode("overwrite") > .csv("d:/temp/my.csv"); > // OutOfMemoryError: java heap space > df > .toJavaRDD() > .mapToPair(row -> new Tuple2(new Text(row.getString(0)), new > Text( row.getString(1 > .saveAsHadoopFile("d:\\temp\\foo", Text.class, Text.class, > SequenceFileOutputFormat.class ); > } > private static String createLongText( String text, int minLength ) { > StringBuffer sb = new
[jira] [Commented] (SPARK-24961) sort operation causes out of memory
[ https://issues.apache.org/jira/browse/SPARK-24961?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16561615#comment-16561615 ] Hyukjin Kwon commented on SPARK-24961: -- Please avoid setting a target version which is usually reserved for committers. Is this a question or an issue? If that's not clear, I would suggest to the mailing list first and leave this resolved for now. Please also see https://spark.apache.org/community.html > sort operation causes out of memory > > > Key: SPARK-24961 > URL: https://issues.apache.org/jira/browse/SPARK-24961 > Project: Spark > Issue Type: Bug > Components: Java API >Affects Versions: 2.3.1 > Environment: Java 1.8u144+ > Windows 10 > Spark 2.3.1 in local mode > -Xms4g -Xmx4g > optional: -XX:+UseParallelOldGC >Reporter: Markus Breuer >Priority: Major > > A sort operation on large rdd - which does not fit in memory - causes out of > memory exception. I made the effect reproducable by an sample, the sample > creates large object of about 2mb size. When saving result the oom occurs. I > tried several StorageLevels, but if memory is included (MEMORY_AND_DISK, > MEMORY_AND_DISK_SER, none) application runs in out of memory. Only DISK_ONLY > seems to work. > When replacing sort() with sortWithinPartitions() no StorageLevel is required > and application succeeds. > {code:java} > package de.bytefusion.examples; > import breeze.storage.Storage; > import de.bytefusion.utils.Options; > import org.apache.hadoop.io.MapFile; > import org.apache.hadoop.io.SequenceFile; > import org.apache.hadoop.io.Text; > import org.apache.hadoop.mapred.SequenceFileOutputFormat; > import org.apache.spark.api.java.JavaRDD; > import org.apache.spark.api.java.JavaSparkContext; > import org.apache.spark.sql.Dataset; > import org.apache.spark.sql.Row; > import org.apache.spark.sql.RowFactory; > import org.apache.spark.sql.SparkSession; > import org.apache.spark.sql.types.DataTypes; > import org.apache.spark.sql.types.StructType; > import org.apache.spark.storage.StorageLevel; > import scala.Tuple2; > import static org.apache.spark.sql.functions.*; > import java.util.ArrayList; > import java.util.List; > import java.util.UUID; > import java.util.stream.Collectors; > import java.util.stream.IntStream; > public class Example3 { > public static void main(String... args) { > // create spark session > SparkSession spark = SparkSession.builder() > .appName("example1") > .master("local[4]") > .config("spark.driver.maxResultSize","1g") > .config("spark.driver.memory","512m") > .config("spark.executor.memory","512m") > .config("spark.local.dir","d:/temp/spark-tmp") > .getOrCreate(); > JavaSparkContext sc = > JavaSparkContext.fromSparkContext(spark.sparkContext()); > // base to generate huge data > List list = new ArrayList<>(); > for (int val = 1; val < 1; val++) { > int valueOf = Integer.valueOf(val); > list.add(valueOf); > } > // create simple rdd of int > JavaRDD rdd = sc.parallelize(list,200); > // use map to create large object per row > JavaRDD rowRDD = > rdd > .map(value -> > RowFactory.create(String.valueOf(value), > createLongText(UUID.randomUUID().toString(), 2 * 1024 * 1024))) > // no persist => out of memory exception on write() > // persist MEMORY_AND_DISK => out of memory exception > on write() > // persist MEMORY_AND_DISK_SER => out of memory > exception on write() > // persist(StorageLevel.DISK_ONLY()) > ; > StructType type = new StructType(); > type = type > .add("c1", DataTypes.StringType) > .add( "c2", DataTypes.StringType ); > Dataset df = spark.createDataFrame(rowRDD, type); > // works > df.show(); > df = df > .sort(col("c1").asc() ) > ; > df.explain(); > // takes a lot of time but works > df.show(); > // OutOfMemoryError: java heap space > df > .write() > .mode("overwrite") > .csv("d:/temp/my.csv"); > // OutOfMemoryError: java heap space > df > .toJavaRDD() > .mapToPair(row -> new Tuple2(new Text(row.getString(0)), new > Text( row.getString(1 > .saveAsHadoopFile("d:\\temp\\foo", Text.class, Text.class, > SequenceFileOutputFormat.class ); > } > private static String createLongText( String text, int minLength ) { > StringBuffer sb = new