[jira] [Commented] (SPARK-24961) sort operation causes out of memory

2019-06-20 Thread JIRA


[ 
https://issues.apache.org/jira/browse/SPARK-24961?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16868522#comment-16868522
 ] 

Juha Iso-Sipilä commented on SPARK-24961:
-

I am using Spark 2.4.3 (pyspark) and I observe the same with my dataset. The 
dataset has two columns, identifier and document, both are strings. The 
document column can vary between 1kB and 5MB. I wanted to do the following to 
shuffle the order of the documents in my dataset.
```

df = spark.read.parquet('parquet_files')

df.orderBy(fn.hash(*df.columns)).write.parquet('output_dir')
```

This fails with error suggesting I should increase maxResultSize.

It feels wrong to me that the user of sorting API should know anything about 
the technical implementation or its limitations. Setting maxResultSize to 0 
helps with my dataset but will it eventually fail with system OOM when I have 
even more data?

> sort operation causes out of memory 
> 
>
> Key: SPARK-24961
> URL: https://issues.apache.org/jira/browse/SPARK-24961
> Project: Spark
>  Issue Type: Bug
>  Components: Java API
>Affects Versions: 2.3.1
> Environment: Java 1.8u144+
> Windows 10
> Spark 2.3.1 in local mode
> -Xms4g -Xmx4g
> optional: -XX:+UseParallelOldGC 
>Reporter: Markus Breuer
>Priority: Major
>
> A sort operation on large rdd - which does not fit in memory - causes out of 
> memory exception. I made the effect reproducable by an sample, the sample 
> creates large object of about 2mb size. When saving result the oom occurs. I 
> tried several StorageLevels, but if memory is included (MEMORY_AND_DISK, 
> MEMORY_AND_DISK_SER, none) application runs in out of memory. Only DISK_ONLY 
> seems to work.
> When replacing sort() with sortWithinPartitions() no StorageLevel is required 
> and application succeeds.
> {code:java}
> package de.bytefusion.examples;
> import breeze.storage.Storage;
> import de.bytefusion.utils.Options;
> import org.apache.hadoop.io.MapFile;
> import org.apache.hadoop.io.SequenceFile;
> import org.apache.hadoop.io.Text;
> import org.apache.hadoop.mapred.SequenceFileOutputFormat;
> import org.apache.spark.api.java.JavaRDD;
> import org.apache.spark.api.java.JavaSparkContext;
> import org.apache.spark.sql.Dataset;
> import org.apache.spark.sql.Row;
> import org.apache.spark.sql.RowFactory;
> import org.apache.spark.sql.SparkSession;
> import org.apache.spark.sql.types.DataTypes;
> import org.apache.spark.sql.types.StructType;
> import org.apache.spark.storage.StorageLevel;
> import scala.Tuple2;
> import static org.apache.spark.sql.functions.*;
> import java.util.ArrayList;
> import java.util.List;
> import java.util.UUID;
> import java.util.stream.Collectors;
> import java.util.stream.IntStream;
> public class Example3 {
> public static void main(String... args) {
> // create spark session
> SparkSession spark = SparkSession.builder()
> .appName("example1")
> .master("local[4]")
> .config("spark.driver.maxResultSize","1g")
> .config("spark.driver.memory","512m")
> .config("spark.executor.memory","512m")
> .config("spark.local.dir","d:/temp/spark-tmp")
> .getOrCreate();
> JavaSparkContext sc = 
> JavaSparkContext.fromSparkContext(spark.sparkContext());
> // base to generate huge data
> List list = new ArrayList<>();
> for (int val = 1; val < 1; val++) {
> int valueOf = Integer.valueOf(val);
> list.add(valueOf);
> }
> // create simple rdd of int
> JavaRDD rdd = sc.parallelize(list,200);
> // use map to create large object per row
> JavaRDD rowRDD =
> rdd
> .map(value -> 
> RowFactory.create(String.valueOf(value), 
> createLongText(UUID.randomUUID().toString(), 2 * 1024 * 1024)))
> // no persist => out of memory exception on write()
> // persist MEMORY_AND_DISK => out of memory exception 
> on write()
> // persist MEMORY_AND_DISK_SER => out of memory 
> exception on write()
> // persist(StorageLevel.DISK_ONLY())
> ;
> StructType type = new StructType();
> type = type
> .add("c1", DataTypes.StringType)
> .add( "c2", DataTypes.StringType );
> Dataset df = spark.createDataFrame(rowRDD, type);
> // works
> df.show();
> df = df
> .sort(col("c1").asc() )
> ;
> df.explain();
> // takes a lot of time but works
> df.show();
> // OutOfMemoryError: java heap space
> df
> .write()
> .mode("overwrite")
> 

[jira] [Commented] (SPARK-24961) sort operation causes out of memory

2019-02-01 Thread Mono Shiro (JIRA)


[ 
https://issues.apache.org/jira/browse/SPARK-24961?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16758557#comment-16758557
 ] 

Mono Shiro commented on SPARK-24961:


Spark Version 2.3.2.  I have a very similar issue when simply reading a file 
that is bigger than the available memory on my machine.  Changing the 
StorageLevel to DISK_ONLY also blows up despite having ample space.  [Please 
see the question on 
stackoverflow|https://stackoverflow.com/questions/54469243/spark-storagelevel-in-local-mode-not-working/54470393#54470393]

 

It's important that local mode work for these sort of things.

> sort operation causes out of memory 
> 
>
> Key: SPARK-24961
> URL: https://issues.apache.org/jira/browse/SPARK-24961
> Project: Spark
>  Issue Type: Bug
>  Components: Java API
>Affects Versions: 2.3.1
> Environment: Java 1.8u144+
> Windows 10
> Spark 2.3.1 in local mode
> -Xms4g -Xmx4g
> optional: -XX:+UseParallelOldGC 
>Reporter: Markus Breuer
>Priority: Major
>
> A sort operation on large rdd - which does not fit in memory - causes out of 
> memory exception. I made the effect reproducable by an sample, the sample 
> creates large object of about 2mb size. When saving result the oom occurs. I 
> tried several StorageLevels, but if memory is included (MEMORY_AND_DISK, 
> MEMORY_AND_DISK_SER, none) application runs in out of memory. Only DISK_ONLY 
> seems to work.
> When replacing sort() with sortWithinPartitions() no StorageLevel is required 
> and application succeeds.
> {code:java}
> package de.bytefusion.examples;
> import breeze.storage.Storage;
> import de.bytefusion.utils.Options;
> import org.apache.hadoop.io.MapFile;
> import org.apache.hadoop.io.SequenceFile;
> import org.apache.hadoop.io.Text;
> import org.apache.hadoop.mapred.SequenceFileOutputFormat;
> import org.apache.spark.api.java.JavaRDD;
> import org.apache.spark.api.java.JavaSparkContext;
> import org.apache.spark.sql.Dataset;
> import org.apache.spark.sql.Row;
> import org.apache.spark.sql.RowFactory;
> import org.apache.spark.sql.SparkSession;
> import org.apache.spark.sql.types.DataTypes;
> import org.apache.spark.sql.types.StructType;
> import org.apache.spark.storage.StorageLevel;
> import scala.Tuple2;
> import static org.apache.spark.sql.functions.*;
> import java.util.ArrayList;
> import java.util.List;
> import java.util.UUID;
> import java.util.stream.Collectors;
> import java.util.stream.IntStream;
> public class Example3 {
> public static void main(String... args) {
> // create spark session
> SparkSession spark = SparkSession.builder()
> .appName("example1")
> .master("local[4]")
> .config("spark.driver.maxResultSize","1g")
> .config("spark.driver.memory","512m")
> .config("spark.executor.memory","512m")
> .config("spark.local.dir","d:/temp/spark-tmp")
> .getOrCreate();
> JavaSparkContext sc = 
> JavaSparkContext.fromSparkContext(spark.sparkContext());
> // base to generate huge data
> List list = new ArrayList<>();
> for (int val = 1; val < 1; val++) {
> int valueOf = Integer.valueOf(val);
> list.add(valueOf);
> }
> // create simple rdd of int
> JavaRDD rdd = sc.parallelize(list,200);
> // use map to create large object per row
> JavaRDD rowRDD =
> rdd
> .map(value -> 
> RowFactory.create(String.valueOf(value), 
> createLongText(UUID.randomUUID().toString(), 2 * 1024 * 1024)))
> // no persist => out of memory exception on write()
> // persist MEMORY_AND_DISK => out of memory exception 
> on write()
> // persist MEMORY_AND_DISK_SER => out of memory 
> exception on write()
> // persist(StorageLevel.DISK_ONLY())
> ;
> StructType type = new StructType();
> type = type
> .add("c1", DataTypes.StringType)
> .add( "c2", DataTypes.StringType );
> Dataset df = spark.createDataFrame(rowRDD, type);
> // works
> df.show();
> df = df
> .sort(col("c1").asc() )
> ;
> df.explain();
> // takes a lot of time but works
> df.show();
> // OutOfMemoryError: java heap space
> df
> .write()
> .mode("overwrite")
> .csv("d:/temp/my.csv");
> // OutOfMemoryError: java heap space
> df
> .toJavaRDD()
> .mapToPair(row -> new Tuple2(new Text(row.getString(0)), new 
> Text( row.getString(1
> .saveAsHadoopFile("d:\\temp\\foo", Text.class, 

[jira] [Commented] (SPARK-24961) sort operation causes out of memory

2018-08-21 Thread Markus Breuer (JIRA)


[ 
https://issues.apache.org/jira/browse/SPARK-24961?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16587850#comment-16587850
 ] 

Markus Breuer commented on SPARK-24961:
---

Some notes to your last comment:

Reducing number of partitions is no option. When reducing number of partitions 
the size per partition grows up. A partition is a unit of work per task. 
Smaller partitions will reduce a tasks memory consumptions. Spark has a default 
for 200 partitions and is a good idea to increase number amount of partitions 
for very large data sets. Is this correct?

Any idea why the proposal to modify 
{{spark.sql.execution.rangeExchange.sampleSizePerPartition}} has no effect?

> sort operation causes out of memory 
> 
>
> Key: SPARK-24961
> URL: https://issues.apache.org/jira/browse/SPARK-24961
> Project: Spark
>  Issue Type: Bug
>  Components: Java API
>Affects Versions: 2.3.1
> Environment: Java 1.8u144+
> Windows 10
> Spark 2.3.1 in local mode
> -Xms4g -Xmx4g
> optional: -XX:+UseParallelOldGC 
>Reporter: Markus Breuer
>Priority: Major
>
> A sort operation on large rdd - which does not fit in memory - causes out of 
> memory exception. I made the effect reproducable by an sample, the sample 
> creates large object of about 2mb size. When saving result the oom occurs. I 
> tried several StorageLevels, but if memory is included (MEMORY_AND_DISK, 
> MEMORY_AND_DISK_SER, none) application runs in out of memory. Only DISK_ONLY 
> seems to work.
> When replacing sort() with sortWithinPartitions() no StorageLevel is required 
> and application succeeds.
> {code:java}
> package de.bytefusion.examples;
> import breeze.storage.Storage;
> import de.bytefusion.utils.Options;
> import org.apache.hadoop.io.MapFile;
> import org.apache.hadoop.io.SequenceFile;
> import org.apache.hadoop.io.Text;
> import org.apache.hadoop.mapred.SequenceFileOutputFormat;
> import org.apache.spark.api.java.JavaRDD;
> import org.apache.spark.api.java.JavaSparkContext;
> import org.apache.spark.sql.Dataset;
> import org.apache.spark.sql.Row;
> import org.apache.spark.sql.RowFactory;
> import org.apache.spark.sql.SparkSession;
> import org.apache.spark.sql.types.DataTypes;
> import org.apache.spark.sql.types.StructType;
> import org.apache.spark.storage.StorageLevel;
> import scala.Tuple2;
> import static org.apache.spark.sql.functions.*;
> import java.util.ArrayList;
> import java.util.List;
> import java.util.UUID;
> import java.util.stream.Collectors;
> import java.util.stream.IntStream;
> public class Example3 {
> public static void main(String... args) {
> // create spark session
> SparkSession spark = SparkSession.builder()
> .appName("example1")
> .master("local[4]")
> .config("spark.driver.maxResultSize","1g")
> .config("spark.driver.memory","512m")
> .config("spark.executor.memory","512m")
> .config("spark.local.dir","d:/temp/spark-tmp")
> .getOrCreate();
> JavaSparkContext sc = 
> JavaSparkContext.fromSparkContext(spark.sparkContext());
> // base to generate huge data
> List list = new ArrayList<>();
> for (int val = 1; val < 1; val++) {
> int valueOf = Integer.valueOf(val);
> list.add(valueOf);
> }
> // create simple rdd of int
> JavaRDD rdd = sc.parallelize(list,200);
> // use map to create large object per row
> JavaRDD rowRDD =
> rdd
> .map(value -> 
> RowFactory.create(String.valueOf(value), 
> createLongText(UUID.randomUUID().toString(), 2 * 1024 * 1024)))
> // no persist => out of memory exception on write()
> // persist MEMORY_AND_DISK => out of memory exception 
> on write()
> // persist MEMORY_AND_DISK_SER => out of memory 
> exception on write()
> // persist(StorageLevel.DISK_ONLY())
> ;
> StructType type = new StructType();
> type = type
> .add("c1", DataTypes.StringType)
> .add( "c2", DataTypes.StringType );
> Dataset df = spark.createDataFrame(rowRDD, type);
> // works
> df.show();
> df = df
> .sort(col("c1").asc() )
> ;
> df.explain();
> // takes a lot of time but works
> df.show();
> // OutOfMemoryError: java heap space
> df
> .write()
> .mode("overwrite")
> .csv("d:/temp/my.csv");
> // OutOfMemoryError: java heap space
> df
> .toJavaRDD()
> .mapToPair(row -> new Tuple2(new Text(row.getString(0)), new 
> Text( 

[jira] [Commented] (SPARK-24961) sort operation causes out of memory

2018-08-19 Thread Markus Breuer (JIRA)


[ 
https://issues.apache.org/jira/browse/SPARK-24961?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16585258#comment-16585258
 ] 

Markus Breuer commented on SPARK-24961:
---

I retested the above code and had to correct setting 
{color:#d04437}spark.driver.maxResultSize{color:#33} to{color} 0 
{color:#33}(no limit), otherwise task had been cancelled. I am not sure why 
it happened this time - probably a different java version or my mistake - so 
this code only runs in oom when result is set to unlimited.{color}{color}

Than I used values from 10,25,50,100,2000 for 
{color:#d04437}spark.sql.execution.rangeExchange.sampleSizePerPartition{color:#33},
 but task runs in oom at partition 70 of 200.{color}{color}

{color:#d04437}{color:#33}Finally I used partition count from 20,200,2000 
but this also makes no difference. When using more partitions the counter 
increases a bit more, but oom occurs - I think - at nearly same 
position.{color}{color}

{color:#d04437}{color:#33}MaxResultSize and out of memory fits together. 
Sorting this large objects seem to use much memory and cause oom. From behavior 
I think spark tries to collect whole result to driver before writing it to 
disk. I removed df.show() from code, so that write() is the only operation. 
There should be no difference from local to distributed mode here, write() 
should have same bahavior, shouldn't it?{color}{color}

{color:#d04437}{color:#33}I think maxResultSize should have no effect to 
any sort-operation. What do you think?{color}{color}

> sort operation causes out of memory 
> 
>
> Key: SPARK-24961
> URL: https://issues.apache.org/jira/browse/SPARK-24961
> Project: Spark
>  Issue Type: Bug
>  Components: Java API
>Affects Versions: 2.3.1
> Environment: Java 1.8u144+
> Windows 10
> Spark 2.3.1 in local mode
> -Xms4g -Xmx4g
> optional: -XX:+UseParallelOldGC 
>Reporter: Markus Breuer
>Priority: Major
>
> A sort operation on large rdd - which does not fit in memory - causes out of 
> memory exception. I made the effect reproducable by an sample, the sample 
> creates large object of about 2mb size. When saving result the oom occurs. I 
> tried several StorageLevels, but if memory is included (MEMORY_AND_DISK, 
> MEMORY_AND_DISK_SER, none) application runs in out of memory. Only DISK_ONLY 
> seems to work.
> When replacing sort() with sortWithinPartitions() no StorageLevel is required 
> and application succeeds.
> {code:java}
> package de.bytefusion.examples;
> import breeze.storage.Storage;
> import de.bytefusion.utils.Options;
> import org.apache.hadoop.io.MapFile;
> import org.apache.hadoop.io.SequenceFile;
> import org.apache.hadoop.io.Text;
> import org.apache.hadoop.mapred.SequenceFileOutputFormat;
> import org.apache.spark.api.java.JavaRDD;
> import org.apache.spark.api.java.JavaSparkContext;
> import org.apache.spark.sql.Dataset;
> import org.apache.spark.sql.Row;
> import org.apache.spark.sql.RowFactory;
> import org.apache.spark.sql.SparkSession;
> import org.apache.spark.sql.types.DataTypes;
> import org.apache.spark.sql.types.StructType;
> import org.apache.spark.storage.StorageLevel;
> import scala.Tuple2;
> import static org.apache.spark.sql.functions.*;
> import java.util.ArrayList;
> import java.util.List;
> import java.util.UUID;
> import java.util.stream.Collectors;
> import java.util.stream.IntStream;
> public class Example3 {
> public static void main(String... args) {
> // create spark session
> SparkSession spark = SparkSession.builder()
> .appName("example1")
> .master("local[4]")
> .config("spark.driver.maxResultSize","1g")
> .config("spark.driver.memory","512m")
> .config("spark.executor.memory","512m")
> .config("spark.local.dir","d:/temp/spark-tmp")
> .getOrCreate();
> JavaSparkContext sc = 
> JavaSparkContext.fromSparkContext(spark.sparkContext());
> // base to generate huge data
> List list = new ArrayList<>();
> for (int val = 1; val < 1; val++) {
> int valueOf = Integer.valueOf(val);
> list.add(valueOf);
> }
> // create simple rdd of int
> JavaRDD rdd = sc.parallelize(list,200);
> // use map to create large object per row
> JavaRDD rowRDD =
> rdd
> .map(value -> 
> RowFactory.create(String.valueOf(value), 
> createLongText(UUID.randomUUID().toString(), 2 * 1024 * 1024)))
> // no persist => out of memory exception on write()
> // persist MEMORY_AND_DISK => out of memory exception 
> on write()
> // persist MEMORY_AND_DISK_SER => out of memory 
> exception on 

[jira] [Commented] (SPARK-24961) sort operation causes out of memory

2018-08-18 Thread Liang-Chi Hsieh (JIRA)


[ 
https://issues.apache.org/jira/browse/SPARK-24961?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16584961#comment-16584961
 ] 

Liang-Chi Hsieh commented on SPARK-24961:
-

When you run global sort, Spark will do data sampling for range shuffle. How 
many data is sampled per partition is controlled by this config 
{{spark.sql.execution.rangeExchange.sampleSizePerPartition}}. This is an 
internal config. As the memory size is decided by sampling size, number of 
partition and row size, that is why when you increased row size, it causes OOM.

So you can set lower partition number or 
{{spark.sql.execution.rangeExchange.sampleSizePerPartition}} to avoid the OOM.

> sort operation causes out of memory 
> 
>
> Key: SPARK-24961
> URL: https://issues.apache.org/jira/browse/SPARK-24961
> Project: Spark
>  Issue Type: Bug
>  Components: Java API
>Affects Versions: 2.3.1
> Environment: Java 1.8u144+
> Windows 10
> Spark 2.3.1 in local mode
> -Xms4g -Xmx4g
> optional: -XX:+UseParallelOldGC 
>Reporter: Markus Breuer
>Priority: Major
>
> A sort operation on large rdd - which does not fit in memory - causes out of 
> memory exception. I made the effect reproducable by an sample, the sample 
> creates large object of about 2mb size. When saving result the oom occurs. I 
> tried several StorageLevels, but if memory is included (MEMORY_AND_DISK, 
> MEMORY_AND_DISK_SER, none) application runs in out of memory. Only DISK_ONLY 
> seems to work.
> When replacing sort() with sortWithinPartitions() no StorageLevel is required 
> and application succeeds.
> {code:java}
> package de.bytefusion.examples;
> import breeze.storage.Storage;
> import de.bytefusion.utils.Options;
> import org.apache.hadoop.io.MapFile;
> import org.apache.hadoop.io.SequenceFile;
> import org.apache.hadoop.io.Text;
> import org.apache.hadoop.mapred.SequenceFileOutputFormat;
> import org.apache.spark.api.java.JavaRDD;
> import org.apache.spark.api.java.JavaSparkContext;
> import org.apache.spark.sql.Dataset;
> import org.apache.spark.sql.Row;
> import org.apache.spark.sql.RowFactory;
> import org.apache.spark.sql.SparkSession;
> import org.apache.spark.sql.types.DataTypes;
> import org.apache.spark.sql.types.StructType;
> import org.apache.spark.storage.StorageLevel;
> import scala.Tuple2;
> import static org.apache.spark.sql.functions.*;
> import java.util.ArrayList;
> import java.util.List;
> import java.util.UUID;
> import java.util.stream.Collectors;
> import java.util.stream.IntStream;
> public class Example3 {
> public static void main(String... args) {
> // create spark session
> SparkSession spark = SparkSession.builder()
> .appName("example1")
> .master("local[4]")
> .config("spark.driver.maxResultSize","1g")
> .config("spark.driver.memory","512m")
> .config("spark.executor.memory","512m")
> .config("spark.local.dir","d:/temp/spark-tmp")
> .getOrCreate();
> JavaSparkContext sc = 
> JavaSparkContext.fromSparkContext(spark.sparkContext());
> // base to generate huge data
> List list = new ArrayList<>();
> for (int val = 1; val < 1; val++) {
> int valueOf = Integer.valueOf(val);
> list.add(valueOf);
> }
> // create simple rdd of int
> JavaRDD rdd = sc.parallelize(list,200);
> // use map to create large object per row
> JavaRDD rowRDD =
> rdd
> .map(value -> 
> RowFactory.create(String.valueOf(value), 
> createLongText(UUID.randomUUID().toString(), 2 * 1024 * 1024)))
> // no persist => out of memory exception on write()
> // persist MEMORY_AND_DISK => out of memory exception 
> on write()
> // persist MEMORY_AND_DISK_SER => out of memory 
> exception on write()
> // persist(StorageLevel.DISK_ONLY())
> ;
> StructType type = new StructType();
> type = type
> .add("c1", DataTypes.StringType)
> .add( "c2", DataTypes.StringType );
> Dataset df = spark.createDataFrame(rowRDD, type);
> // works
> df.show();
> df = df
> .sort(col("c1").asc() )
> ;
> df.explain();
> // takes a lot of time but works
> df.show();
> // OutOfMemoryError: java heap space
> df
> .write()
> .mode("overwrite")
> .csv("d:/temp/my.csv");
> // OutOfMemoryError: java heap space
> df
> .toJavaRDD()
> .mapToPair(row -> new Tuple2(new Text(row.getString(0)), new 
> Text( 

[jira] [Commented] (SPARK-24961) sort operation causes out of memory

2018-08-18 Thread Markus Breuer (JIRA)


[ 
https://issues.apache.org/jira/browse/SPARK-24961?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16584955#comment-16584955
 ] 

Markus Breuer commented on SPARK-24961:
---

I started without specifying memory level and ran into oom issues. When setting 
to DISK_ONLY application passed but with very poor performance.I also tried to 
change number of partitions from many (2000 to 1) and fewer (200), but 
application ran into oom.

My understanding of spark is, that the number of executors/cores may affect the 
sort performance. So a distributed cluster may be faster on sorting. But in 
general the local mode should also do the work. In memory analyzer I see a 
large array of data, each element has the size of some hundret megabytes. I 
think this will break a distributed cluster, too.

My experience is, that spark processes large amounts of data in local mode. But 
when single row object exceeds a size of several kbytes, than oom occurs on 
large datasets. I sorted amounts of 200gb with only 4gb ram in local mode. But 
when increasing row objects to sized of 100kbyte or more, than spark runs into 
oom.

I also tried sortWithinPartition on large objects. This works fine. But 
whenever I try to merge partitions, I run into oom. I asked me why this occurs, 
because if partitions are sorted, a shuffle should easy merge the partitions. I 
think spark tries to merge them in memory, this would explain the large arrays 
in memory analyzer.

{{spark.sql.execution.rangeExchange.sampleSizePerPartition => What values 
should I use? Are there any docs about this setting? Official docs do not 
mention this setting.}}

> sort operation causes out of memory 
> 
>
> Key: SPARK-24961
> URL: https://issues.apache.org/jira/browse/SPARK-24961
> Project: Spark
>  Issue Type: Bug
>  Components: Java API
>Affects Versions: 2.3.1
> Environment: Java 1.8u144+
> Windows 10
> Spark 2.3.1 in local mode
> -Xms4g -Xmx4g
> optional: -XX:+UseParallelOldGC 
>Reporter: Markus Breuer
>Priority: Major
>
> A sort operation on large rdd - which does not fit in memory - causes out of 
> memory exception. I made the effect reproducable by an sample, the sample 
> creates large object of about 2mb size. When saving result the oom occurs. I 
> tried several StorageLevels, but if memory is included (MEMORY_AND_DISK, 
> MEMORY_AND_DISK_SER, none) application runs in out of memory. Only DISK_ONLY 
> seems to work.
> When replacing sort() with sortWithinPartitions() no StorageLevel is required 
> and application succeeds.
> {code:java}
> package de.bytefusion.examples;
> import breeze.storage.Storage;
> import de.bytefusion.utils.Options;
> import org.apache.hadoop.io.MapFile;
> import org.apache.hadoop.io.SequenceFile;
> import org.apache.hadoop.io.Text;
> import org.apache.hadoop.mapred.SequenceFileOutputFormat;
> import org.apache.spark.api.java.JavaRDD;
> import org.apache.spark.api.java.JavaSparkContext;
> import org.apache.spark.sql.Dataset;
> import org.apache.spark.sql.Row;
> import org.apache.spark.sql.RowFactory;
> import org.apache.spark.sql.SparkSession;
> import org.apache.spark.sql.types.DataTypes;
> import org.apache.spark.sql.types.StructType;
> import org.apache.spark.storage.StorageLevel;
> import scala.Tuple2;
> import static org.apache.spark.sql.functions.*;
> import java.util.ArrayList;
> import java.util.List;
> import java.util.UUID;
> import java.util.stream.Collectors;
> import java.util.stream.IntStream;
> public class Example3 {
> public static void main(String... args) {
> // create spark session
> SparkSession spark = SparkSession.builder()
> .appName("example1")
> .master("local[4]")
> .config("spark.driver.maxResultSize","1g")
> .config("spark.driver.memory","512m")
> .config("spark.executor.memory","512m")
> .config("spark.local.dir","d:/temp/spark-tmp")
> .getOrCreate();
> JavaSparkContext sc = 
> JavaSparkContext.fromSparkContext(spark.sparkContext());
> // base to generate huge data
> List list = new ArrayList<>();
> for (int val = 1; val < 1; val++) {
> int valueOf = Integer.valueOf(val);
> list.add(valueOf);
> }
> // create simple rdd of int
> JavaRDD rdd = sc.parallelize(list,200);
> // use map to create large object per row
> JavaRDD rowRDD =
> rdd
> .map(value -> 
> RowFactory.create(String.valueOf(value), 
> createLongText(UUID.randomUUID().toString(), 2 * 1024 * 1024)))
> // no persist => out of memory exception on write()
> // persist MEMORY_AND_DISK => out of memory exception 
> on write()
>  

[jira] [Commented] (SPARK-24961) sort operation causes out of memory

2018-08-18 Thread Liang-Chi Hsieh (JIRA)


[ 
https://issues.apache.org/jira/browse/SPARK-24961?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16584944#comment-16584944
 ] 

Liang-Chi Hsieh commented on SPARK-24961:
-

I think it seems not common or practical to use local mode to handle very big 
data and do global sorting like that. As mentioned above, the global sorting 
involves data sampling which is configured by the sample size per partition and 
the number of partitions. So it is preferably used in distributed mode instead 
of local mode. In local mode, if you want to use global sorting like that, 
maybe you can have fewer partitions of data to relieve memory usage on data 
sampling.


> sort operation causes out of memory 
> 
>
> Key: SPARK-24961
> URL: https://issues.apache.org/jira/browse/SPARK-24961
> Project: Spark
>  Issue Type: Bug
>  Components: Java API
>Affects Versions: 2.3.1
> Environment: Java 1.8u144+
> Windows 10
> Spark 2.3.1 in local mode
> -Xms4g -Xmx4g
> optional: -XX:+UseParallelOldGC 
>Reporter: Markus Breuer
>Priority: Major
>
> A sort operation on large rdd - which does not fit in memory - causes out of 
> memory exception. I made the effect reproducable by an sample, the sample 
> creates large object of about 2mb size. When saving result the oom occurs. I 
> tried several StorageLevels, but if memory is included (MEMORY_AND_DISK, 
> MEMORY_AND_DISK_SER, none) application runs in out of memory. Only DISK_ONLY 
> seems to work.
> When replacing sort() with sortWithinPartitions() no StorageLevel is required 
> and application succeeds.
> {code:java}
> package de.bytefusion.examples;
> import breeze.storage.Storage;
> import de.bytefusion.utils.Options;
> import org.apache.hadoop.io.MapFile;
> import org.apache.hadoop.io.SequenceFile;
> import org.apache.hadoop.io.Text;
> import org.apache.hadoop.mapred.SequenceFileOutputFormat;
> import org.apache.spark.api.java.JavaRDD;
> import org.apache.spark.api.java.JavaSparkContext;
> import org.apache.spark.sql.Dataset;
> import org.apache.spark.sql.Row;
> import org.apache.spark.sql.RowFactory;
> import org.apache.spark.sql.SparkSession;
> import org.apache.spark.sql.types.DataTypes;
> import org.apache.spark.sql.types.StructType;
> import org.apache.spark.storage.StorageLevel;
> import scala.Tuple2;
> import static org.apache.spark.sql.functions.*;
> import java.util.ArrayList;
> import java.util.List;
> import java.util.UUID;
> import java.util.stream.Collectors;
> import java.util.stream.IntStream;
> public class Example3 {
> public static void main(String... args) {
> // create spark session
> SparkSession spark = SparkSession.builder()
> .appName("example1")
> .master("local[4]")
> .config("spark.driver.maxResultSize","1g")
> .config("spark.driver.memory","512m")
> .config("spark.executor.memory","512m")
> .config("spark.local.dir","d:/temp/spark-tmp")
> .getOrCreate();
> JavaSparkContext sc = 
> JavaSparkContext.fromSparkContext(spark.sparkContext());
> // base to generate huge data
> List list = new ArrayList<>();
> for (int val = 1; val < 1; val++) {
> int valueOf = Integer.valueOf(val);
> list.add(valueOf);
> }
> // create simple rdd of int
> JavaRDD rdd = sc.parallelize(list,200);
> // use map to create large object per row
> JavaRDD rowRDD =
> rdd
> .map(value -> 
> RowFactory.create(String.valueOf(value), 
> createLongText(UUID.randomUUID().toString(), 2 * 1024 * 1024)))
> // no persist => out of memory exception on write()
> // persist MEMORY_AND_DISK => out of memory exception 
> on write()
> // persist MEMORY_AND_DISK_SER => out of memory 
> exception on write()
> // persist(StorageLevel.DISK_ONLY())
> ;
> StructType type = new StructType();
> type = type
> .add("c1", DataTypes.StringType)
> .add( "c2", DataTypes.StringType );
> Dataset df = spark.createDataFrame(rowRDD, type);
> // works
> df.show();
> df = df
> .sort(col("c1").asc() )
> ;
> df.explain();
> // takes a lot of time but works
> df.show();
> // OutOfMemoryError: java heap space
> df
> .write()
> .mode("overwrite")
> .csv("d:/temp/my.csv");
> // OutOfMemoryError: java heap space
> df
> .toJavaRDD()
> .mapToPair(row -> new Tuple2(new Text(row.getString(0)), new 
> Text( row.getString(1
> 

[jira] [Commented] (SPARK-24961) sort operation causes out of memory

2018-08-18 Thread Liang-Chi Hsieh (JIRA)


[ 
https://issues.apache.org/jira/browse/SPARK-24961?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16584943#comment-16584943
 ] 

Liang-Chi Hsieh commented on SPARK-24961:
-

I saw you described {{Spark 2.3.1 in local mode}} in {{Environment}}. I think 
it is happened because once you use {{StorageLevels}} with memory, it occupied 
some of memory on local. When it runs sort, Spark will sample data from RDD 
using {{collect}} and cause OOM. Can you try to decrease 
{{spark.sql.execution.rangeExchange.sampleSizePerPartition}} or decrease the 
number of partitions?



> sort operation causes out of memory 
> 
>
> Key: SPARK-24961
> URL: https://issues.apache.org/jira/browse/SPARK-24961
> Project: Spark
>  Issue Type: Bug
>  Components: Java API
>Affects Versions: 2.3.1
> Environment: Java 1.8u144+
> Windows 10
> Spark 2.3.1 in local mode
> -Xms4g -Xmx4g
> optional: -XX:+UseParallelOldGC 
>Reporter: Markus Breuer
>Priority: Major
>
> A sort operation on large rdd - which does not fit in memory - causes out of 
> memory exception. I made the effect reproducable by an sample, the sample 
> creates large object of about 2mb size. When saving result the oom occurs. I 
> tried several StorageLevels, but if memory is included (MEMORY_AND_DISK, 
> MEMORY_AND_DISK_SER, none) application runs in out of memory. Only DISK_ONLY 
> seems to work.
> When replacing sort() with sortWithinPartitions() no StorageLevel is required 
> and application succeeds.
> {code:java}
> package de.bytefusion.examples;
> import breeze.storage.Storage;
> import de.bytefusion.utils.Options;
> import org.apache.hadoop.io.MapFile;
> import org.apache.hadoop.io.SequenceFile;
> import org.apache.hadoop.io.Text;
> import org.apache.hadoop.mapred.SequenceFileOutputFormat;
> import org.apache.spark.api.java.JavaRDD;
> import org.apache.spark.api.java.JavaSparkContext;
> import org.apache.spark.sql.Dataset;
> import org.apache.spark.sql.Row;
> import org.apache.spark.sql.RowFactory;
> import org.apache.spark.sql.SparkSession;
> import org.apache.spark.sql.types.DataTypes;
> import org.apache.spark.sql.types.StructType;
> import org.apache.spark.storage.StorageLevel;
> import scala.Tuple2;
> import static org.apache.spark.sql.functions.*;
> import java.util.ArrayList;
> import java.util.List;
> import java.util.UUID;
> import java.util.stream.Collectors;
> import java.util.stream.IntStream;
> public class Example3 {
> public static void main(String... args) {
> // create spark session
> SparkSession spark = SparkSession.builder()
> .appName("example1")
> .master("local[4]")
> .config("spark.driver.maxResultSize","1g")
> .config("spark.driver.memory","512m")
> .config("spark.executor.memory","512m")
> .config("spark.local.dir","d:/temp/spark-tmp")
> .getOrCreate();
> JavaSparkContext sc = 
> JavaSparkContext.fromSparkContext(spark.sparkContext());
> // base to generate huge data
> List list = new ArrayList<>();
> for (int val = 1; val < 1; val++) {
> int valueOf = Integer.valueOf(val);
> list.add(valueOf);
> }
> // create simple rdd of int
> JavaRDD rdd = sc.parallelize(list,200);
> // use map to create large object per row
> JavaRDD rowRDD =
> rdd
> .map(value -> 
> RowFactory.create(String.valueOf(value), 
> createLongText(UUID.randomUUID().toString(), 2 * 1024 * 1024)))
> // no persist => out of memory exception on write()
> // persist MEMORY_AND_DISK => out of memory exception 
> on write()
> // persist MEMORY_AND_DISK_SER => out of memory 
> exception on write()
> // persist(StorageLevel.DISK_ONLY())
> ;
> StructType type = new StructType();
> type = type
> .add("c1", DataTypes.StringType)
> .add( "c2", DataTypes.StringType );
> Dataset df = spark.createDataFrame(rowRDD, type);
> // works
> df.show();
> df = df
> .sort(col("c1").asc() )
> ;
> df.explain();
> // takes a lot of time but works
> df.show();
> // OutOfMemoryError: java heap space
> df
> .write()
> .mode("overwrite")
> .csv("d:/temp/my.csv");
> // OutOfMemoryError: java heap space
> df
> .toJavaRDD()
> .mapToPair(row -> new Tuple2(new Text(row.getString(0)), new 
> Text( row.getString(1
> .saveAsHadoopFile("d:\\temp\\foo", Text.class, Text.class, 
> 

[jira] [Commented] (SPARK-24961) sort operation causes out of memory

2018-08-18 Thread Markus Breuer (JIRA)


[ 
https://issues.apache.org/jira/browse/SPARK-24961?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16584929#comment-16584929
 ] 

Markus Breuer commented on SPARK-24961:
---

There was no feedback from stackoverflow and mailing list yet. Meanwhile I 
tried several variations of the above example but alway ran into oom issues. It 
seems spark always uses an operation like collect() when merging results. Also 
size of row objects affects this issue. I sorted millions of smaller object an 
never ran into oom. Also memory profiler shows very low memory usage on small 
objects. But larger object - my was 2-4megabytes - causes an oom.

I understand that spark 2.3.1 uses mergeSort, which should not run into such 
issues. Probably memory management is calculated for smaller objects and actual 
object size is ignored? Origin of this issue was a large sequence file with 
objects (4mb) which ran into oom. I reduced the objects by splitting them to 
several rows of 4kb size and added incrementing number to sort key. Now, the 
same amount of data is sorted without memory issues.

> sort operation causes out of memory 
> 
>
> Key: SPARK-24961
> URL: https://issues.apache.org/jira/browse/SPARK-24961
> Project: Spark
>  Issue Type: Bug
>  Components: Java API
>Affects Versions: 2.3.1
> Environment: Java 1.8u144+
> Windows 10
> Spark 2.3.1 in local mode
> -Xms4g -Xmx4g
> optional: -XX:+UseParallelOldGC 
>Reporter: Markus Breuer
>Priority: Major
>
> A sort operation on large rdd - which does not fit in memory - causes out of 
> memory exception. I made the effect reproducable by an sample, the sample 
> creates large object of about 2mb size. When saving result the oom occurs. I 
> tried several StorageLevels, but if memory is included (MEMORY_AND_DISK, 
> MEMORY_AND_DISK_SER, none) application runs in out of memory. Only DISK_ONLY 
> seems to work.
> When replacing sort() with sortWithinPartitions() no StorageLevel is required 
> and application succeeds.
> {code:java}
> package de.bytefusion.examples;
> import breeze.storage.Storage;
> import de.bytefusion.utils.Options;
> import org.apache.hadoop.io.MapFile;
> import org.apache.hadoop.io.SequenceFile;
> import org.apache.hadoop.io.Text;
> import org.apache.hadoop.mapred.SequenceFileOutputFormat;
> import org.apache.spark.api.java.JavaRDD;
> import org.apache.spark.api.java.JavaSparkContext;
> import org.apache.spark.sql.Dataset;
> import org.apache.spark.sql.Row;
> import org.apache.spark.sql.RowFactory;
> import org.apache.spark.sql.SparkSession;
> import org.apache.spark.sql.types.DataTypes;
> import org.apache.spark.sql.types.StructType;
> import org.apache.spark.storage.StorageLevel;
> import scala.Tuple2;
> import static org.apache.spark.sql.functions.*;
> import java.util.ArrayList;
> import java.util.List;
> import java.util.UUID;
> import java.util.stream.Collectors;
> import java.util.stream.IntStream;
> public class Example3 {
> public static void main(String... args) {
> // create spark session
> SparkSession spark = SparkSession.builder()
> .appName("example1")
> .master("local[4]")
> .config("spark.driver.maxResultSize","1g")
> .config("spark.driver.memory","512m")
> .config("spark.executor.memory","512m")
> .config("spark.local.dir","d:/temp/spark-tmp")
> .getOrCreate();
> JavaSparkContext sc = 
> JavaSparkContext.fromSparkContext(spark.sparkContext());
> // base to generate huge data
> List list = new ArrayList<>();
> for (int val = 1; val < 1; val++) {
> int valueOf = Integer.valueOf(val);
> list.add(valueOf);
> }
> // create simple rdd of int
> JavaRDD rdd = sc.parallelize(list,200);
> // use map to create large object per row
> JavaRDD rowRDD =
> rdd
> .map(value -> 
> RowFactory.create(String.valueOf(value), 
> createLongText(UUID.randomUUID().toString(), 2 * 1024 * 1024)))
> // no persist => out of memory exception on write()
> // persist MEMORY_AND_DISK => out of memory exception 
> on write()
> // persist MEMORY_AND_DISK_SER => out of memory 
> exception on write()
> // persist(StorageLevel.DISK_ONLY())
> ;
> StructType type = new StructType();
> type = type
> .add("c1", DataTypes.StringType)
> .add( "c2", DataTypes.StringType );
> Dataset df = spark.createDataFrame(rowRDD, type);
> // works
> df.show();
> df = df
> .sort(col("c1").asc() )
> ;
> df.explain();
>

[jira] [Commented] (SPARK-24961) sort operation causes out of memory

2018-07-30 Thread Markus Breuer (JIRA)


[ 
https://issues.apache.org/jira/browse/SPARK-24961?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16562359#comment-16562359
 ] 

Markus Breuer commented on SPARK-24961:
---

I think it is an issue and I explained why. But I also posted to stackoverflow 
(some days ago) and adressed mailinglist, but got no feedback on this issue. 
Probably there is no easy answer to this issue, but probably my example helps 
to reproduce it.

> sort operation causes out of memory 
> 
>
> Key: SPARK-24961
> URL: https://issues.apache.org/jira/browse/SPARK-24961
> Project: Spark
>  Issue Type: Bug
>  Components: Java API
>Affects Versions: 2.3.1
> Environment: Java 1.8u144+
> Windows 10
> Spark 2.3.1 in local mode
> -Xms4g -Xmx4g
> optional: -XX:+UseParallelOldGC 
>Reporter: Markus Breuer
>Priority: Major
>
> A sort operation on large rdd - which does not fit in memory - causes out of 
> memory exception. I made the effect reproducable by an sample, the sample 
> creates large object of about 2mb size. When saving result the oom occurs. I 
> tried several StorageLevels, but if memory is included (MEMORY_AND_DISK, 
> MEMORY_AND_DISK_SER, none) application runs in out of memory. Only DISK_ONLY 
> seems to work.
> When replacing sort() with sortWithinPartitions() no StorageLevel is required 
> and application succeeds.
> {code:java}
> package de.bytefusion.examples;
> import breeze.storage.Storage;
> import de.bytefusion.utils.Options;
> import org.apache.hadoop.io.MapFile;
> import org.apache.hadoop.io.SequenceFile;
> import org.apache.hadoop.io.Text;
> import org.apache.hadoop.mapred.SequenceFileOutputFormat;
> import org.apache.spark.api.java.JavaRDD;
> import org.apache.spark.api.java.JavaSparkContext;
> import org.apache.spark.sql.Dataset;
> import org.apache.spark.sql.Row;
> import org.apache.spark.sql.RowFactory;
> import org.apache.spark.sql.SparkSession;
> import org.apache.spark.sql.types.DataTypes;
> import org.apache.spark.sql.types.StructType;
> import org.apache.spark.storage.StorageLevel;
> import scala.Tuple2;
> import static org.apache.spark.sql.functions.*;
> import java.util.ArrayList;
> import java.util.List;
> import java.util.UUID;
> import java.util.stream.Collectors;
> import java.util.stream.IntStream;
> public class Example3 {
> public static void main(String... args) {
> // create spark session
> SparkSession spark = SparkSession.builder()
> .appName("example1")
> .master("local[4]")
> .config("spark.driver.maxResultSize","1g")
> .config("spark.driver.memory","512m")
> .config("spark.executor.memory","512m")
> .config("spark.local.dir","d:/temp/spark-tmp")
> .getOrCreate();
> JavaSparkContext sc = 
> JavaSparkContext.fromSparkContext(spark.sparkContext());
> // base to generate huge data
> List list = new ArrayList<>();
> for (int val = 1; val < 1; val++) {
> int valueOf = Integer.valueOf(val);
> list.add(valueOf);
> }
> // create simple rdd of int
> JavaRDD rdd = sc.parallelize(list,200);
> // use map to create large object per row
> JavaRDD rowRDD =
> rdd
> .map(value -> 
> RowFactory.create(String.valueOf(value), 
> createLongText(UUID.randomUUID().toString(), 2 * 1024 * 1024)))
> // no persist => out of memory exception on write()
> // persist MEMORY_AND_DISK => out of memory exception 
> on write()
> // persist MEMORY_AND_DISK_SER => out of memory 
> exception on write()
> // persist(StorageLevel.DISK_ONLY())
> ;
> StructType type = new StructType();
> type = type
> .add("c1", DataTypes.StringType)
> .add( "c2", DataTypes.StringType );
> Dataset df = spark.createDataFrame(rowRDD, type);
> // works
> df.show();
> df = df
> .sort(col("c1").asc() )
> ;
> df.explain();
> // takes a lot of time but works
> df.show();
> // OutOfMemoryError: java heap space
> df
> .write()
> .mode("overwrite")
> .csv("d:/temp/my.csv");
> // OutOfMemoryError: java heap space
> df
> .toJavaRDD()
> .mapToPair(row -> new Tuple2(new Text(row.getString(0)), new 
> Text( row.getString(1
> .saveAsHadoopFile("d:\\temp\\foo", Text.class, Text.class, 
> SequenceFileOutputFormat.class );
> }
> private static String createLongText( String text, int minLength ) {
> StringBuffer sb = new 

[jira] [Commented] (SPARK-24961) sort operation causes out of memory

2018-07-30 Thread Hyukjin Kwon (JIRA)


[ 
https://issues.apache.org/jira/browse/SPARK-24961?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16561615#comment-16561615
 ] 

Hyukjin Kwon commented on SPARK-24961:
--

Please avoid setting a target version which is usually reserved for committers. 
Is this a question or an issue? If that's not clear, I would suggest to the 
mailing list first and leave this resolved for now. Please also see 
https://spark.apache.org/community.html

> sort operation causes out of memory 
> 
>
> Key: SPARK-24961
> URL: https://issues.apache.org/jira/browse/SPARK-24961
> Project: Spark
>  Issue Type: Bug
>  Components: Java API
>Affects Versions: 2.3.1
> Environment: Java 1.8u144+
> Windows 10
> Spark 2.3.1 in local mode
> -Xms4g -Xmx4g
> optional: -XX:+UseParallelOldGC 
>Reporter: Markus Breuer
>Priority: Major
>
> A sort operation on large rdd - which does not fit in memory - causes out of 
> memory exception. I made the effect reproducable by an sample, the sample 
> creates large object of about 2mb size. When saving result the oom occurs. I 
> tried several StorageLevels, but if memory is included (MEMORY_AND_DISK, 
> MEMORY_AND_DISK_SER, none) application runs in out of memory. Only DISK_ONLY 
> seems to work.
> When replacing sort() with sortWithinPartitions() no StorageLevel is required 
> and application succeeds.
> {code:java}
> package de.bytefusion.examples;
> import breeze.storage.Storage;
> import de.bytefusion.utils.Options;
> import org.apache.hadoop.io.MapFile;
> import org.apache.hadoop.io.SequenceFile;
> import org.apache.hadoop.io.Text;
> import org.apache.hadoop.mapred.SequenceFileOutputFormat;
> import org.apache.spark.api.java.JavaRDD;
> import org.apache.spark.api.java.JavaSparkContext;
> import org.apache.spark.sql.Dataset;
> import org.apache.spark.sql.Row;
> import org.apache.spark.sql.RowFactory;
> import org.apache.spark.sql.SparkSession;
> import org.apache.spark.sql.types.DataTypes;
> import org.apache.spark.sql.types.StructType;
> import org.apache.spark.storage.StorageLevel;
> import scala.Tuple2;
> import static org.apache.spark.sql.functions.*;
> import java.util.ArrayList;
> import java.util.List;
> import java.util.UUID;
> import java.util.stream.Collectors;
> import java.util.stream.IntStream;
> public class Example3 {
> public static void main(String... args) {
> // create spark session
> SparkSession spark = SparkSession.builder()
> .appName("example1")
> .master("local[4]")
> .config("spark.driver.maxResultSize","1g")
> .config("spark.driver.memory","512m")
> .config("spark.executor.memory","512m")
> .config("spark.local.dir","d:/temp/spark-tmp")
> .getOrCreate();
> JavaSparkContext sc = 
> JavaSparkContext.fromSparkContext(spark.sparkContext());
> // base to generate huge data
> List list = new ArrayList<>();
> for (int val = 1; val < 1; val++) {
> int valueOf = Integer.valueOf(val);
> list.add(valueOf);
> }
> // create simple rdd of int
> JavaRDD rdd = sc.parallelize(list,200);
> // use map to create large object per row
> JavaRDD rowRDD =
> rdd
> .map(value -> 
> RowFactory.create(String.valueOf(value), 
> createLongText(UUID.randomUUID().toString(), 2 * 1024 * 1024)))
> // no persist => out of memory exception on write()
> // persist MEMORY_AND_DISK => out of memory exception 
> on write()
> // persist MEMORY_AND_DISK_SER => out of memory 
> exception on write()
> // persist(StorageLevel.DISK_ONLY())
> ;
> StructType type = new StructType();
> type = type
> .add("c1", DataTypes.StringType)
> .add( "c2", DataTypes.StringType );
> Dataset df = spark.createDataFrame(rowRDD, type);
> // works
> df.show();
> df = df
> .sort(col("c1").asc() )
> ;
> df.explain();
> // takes a lot of time but works
> df.show();
> // OutOfMemoryError: java heap space
> df
> .write()
> .mode("overwrite")
> .csv("d:/temp/my.csv");
> // OutOfMemoryError: java heap space
> df
> .toJavaRDD()
> .mapToPair(row -> new Tuple2(new Text(row.getString(0)), new 
> Text( row.getString(1
> .saveAsHadoopFile("d:\\temp\\foo", Text.class, Text.class, 
> SequenceFileOutputFormat.class );
> }
> private static String createLongText( String text, int minLength ) {
> StringBuffer sb = new