[ 
https://issues.apache.org/jira/browse/SPARK-24961?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16584944#comment-16584944
 ] 

Liang-Chi Hsieh commented on SPARK-24961:
-----------------------------------------

I think it seems not common or practical to use local mode to handle very big 
data and do global sorting like that. As mentioned above, the global sorting 
involves data sampling which is configured by the sample size per partition and 
the number of partitions. So it is preferably used in distributed mode instead 
of local mode. In local mode, if you want to use global sorting like that, 
maybe you can have fewer partitions of data to relieve memory usage on data 
sampling.


> sort operation causes out of memory 
> ------------------------------------
>
>                 Key: SPARK-24961
>                 URL: https://issues.apache.org/jira/browse/SPARK-24961
>             Project: Spark
>          Issue Type: Bug
>          Components: Java API
>    Affects Versions: 2.3.1
>         Environment: Java 1.8u144+
> Windows 10
> Spark 2.3.1 in local mode
> -Xms4g -Xmx4g
> optional: -XX:+UseParallelOldGC 
>            Reporter: Markus Breuer
>            Priority: Major
>
> A sort operation on large rdd - which does not fit in memory - causes out of 
> memory exception. I made the effect reproducable by an sample, the sample 
> creates large object of about 2mb size. When saving result the oom occurs. I 
> tried several StorageLevels, but if memory is included (MEMORY_AND_DISK, 
> MEMORY_AND_DISK_SER, none) application runs in out of memory. Only DISK_ONLY 
> seems to work.
> When replacing sort() with sortWithinPartitions() no StorageLevel is required 
> and application succeeds.
> {code:java}
> package de.bytefusion.examples;
> import breeze.storage.Storage;
> import de.bytefusion.utils.Options;
> import org.apache.hadoop.io.MapFile;
> import org.apache.hadoop.io.SequenceFile;
> import org.apache.hadoop.io.Text;
> import org.apache.hadoop.mapred.SequenceFileOutputFormat;
> import org.apache.spark.api.java.JavaRDD;
> import org.apache.spark.api.java.JavaSparkContext;
> import org.apache.spark.sql.Dataset;
> import org.apache.spark.sql.Row;
> import org.apache.spark.sql.RowFactory;
> import org.apache.spark.sql.SparkSession;
> import org.apache.spark.sql.types.DataTypes;
> import org.apache.spark.sql.types.StructType;
> import org.apache.spark.storage.StorageLevel;
> import scala.Tuple2;
> import static org.apache.spark.sql.functions.*;
> import java.util.ArrayList;
> import java.util.List;
> import java.util.UUID;
> import java.util.stream.Collectors;
> import java.util.stream.IntStream;
> public class Example3 {
>     public static void main(String... args) {
>         // create spark session
>         SparkSession spark = SparkSession.builder()
>                 .appName("example1")
>                 .master("local[4]")
>                 .config("spark.driver.maxResultSize","1g")
>                 .config("spark.driver.memory","512m")
>                 .config("spark.executor.memory","512m")
>                 .config("spark.local.dir","d:/temp/spark-tmp")
>                 .getOrCreate();
>         JavaSparkContext sc = 
> JavaSparkContext.fromSparkContext(spark.sparkContext());
>         // base to generate huge data
>         List<Integer> list = new ArrayList<>();
>         for (int val = 1; val < 10000; val++) {
>             int valueOf = Integer.valueOf(val);
>             list.add(valueOf);
>         }
>         // create simple rdd of int
>         JavaRDD<Integer> rdd = sc.parallelize(list,200);
>         // use map to create large object per row
>         JavaRDD<Row> rowRDD =
>                 rdd
>                         .map(value -> 
> RowFactory.create(String.valueOf(value), 
> createLongText(UUID.randomUUID().toString(), 2 * 1024 * 1024)))
>                         // no persist => out of memory exception on write()
>                         // persist MEMORY_AND_DISK => out of memory exception 
> on write()
>                         // persist MEMORY_AND_DISK_SER => out of memory 
> exception on write()
>                         // persist(StorageLevel.DISK_ONLY())
>                 ;
>         StructType type = new StructType();
>         type = type
>                 .add("c1", DataTypes.StringType)
>                 .add( "c2", DataTypes.StringType );
>         Dataset<Row> df = spark.createDataFrame(rowRDD, type);
>         // works
>         df.show();
>         df = df
>                 .sort(col("c1").asc() )
>             ;
>         df.explain();
>         // takes a lot of time but works
>         df.show();
>         // OutOfMemoryError: java heap space
>         df
>             .write()
>             .mode("overwrite")
>             .csv("d:/temp/my.csv");
>         // OutOfMemoryError: java heap space
>         df
>                 .toJavaRDD()
>                 .mapToPair(row -> new Tuple2(new Text(row.getString(0)), new 
> Text( row.getString(1))))
>                 .saveAsHadoopFile("d:\\temp\\foo", Text.class, Text.class, 
> SequenceFileOutputFormat.class );
>     }
>     private static String createLongText( String text, int minLength ) {
>         StringBuffer sb = new StringBuffer();
>         while( sb.length() < minLength ) {
>             sb.append(text);
>         }
>         return sb.toString();
>     }
> }
> {code}
> When using StorageLevel.MEMORY_AND_DISK(_SER) an oom crashes application at 
> partition 70 at heap usage of 3g from 4g available.
> It seems sort does something like collect, an heap dump shows very large 
> array of array - possibly the partition contents. Also 
> spark.driver.maxResultSize is involved, so sort exceeds the default values. 
> Setting it to unlimited causes oom.
> Why do I think this is a bug?
>  # Operation sort() should not involve maxResultSize
>  # MEMORY_AND_DISK should work at all and at least disk should be used. But I 
> see oom when reaching 3g of 4g total heap size.
>  



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)

---------------------------------------------------------------------
To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org
For additional commands, e-mail: issues-h...@spark.apache.org

Reply via email to