the 200 number looks strangely similar to the following default number of
post-shuffle partitions which is often left untuned:

  spark.sql.shuffle.partitions

here's the property defined in the Spark source:

https://github.com/apache/spark/blob/834e71489bf560302f9d743dff669df1134e9b74/sql/core/src/main/scala/org/apache/spark/sql/SQLConf.scala#L232

note that Spark 1.6+ will make this config param obsolete in favor of
adaptive execution which uses the following as a low watermark for # of
post-shuffle partitions:

spark.sql.adaptive.minNumPostShufflePartitions

here's the property defined in the Spark source:

https://github.com/apache/spark/blob/834e71489bf560302f9d743dff669df1134e9b74/sql/core/src/main/scala/org/apache/spark/sql/SQLConf.scala#L245

On Mon, Dec 28, 2015 at 5:41 PM, Michael Armbrust <mich...@databricks.com>
wrote:

> Unfortunately in 1.5 we didn't force operators to spill when ran out of
> memory so there is not a lot you can do.  It would be awesome if you could
> test with 1.6 and see if things are any better?
>
> On Mon, Dec 28, 2015 at 2:25 PM, Andy Davidson <
> a...@santacruzintegration.com> wrote:
>
>> I am using spark 1.5.1. I am running into some memory problems with a
>> java unit test. Yes I could fix it by setting –Xmx (its set to 1024M) how
>> ever I want to better understand what is going on so I can write better
>> code in the future. The test runs on a Mac, master="Local[2]"
>>
>> I have a java unit test that starts by reading a 672K ascii file. I my
>> output data file is 152K. Its seems strange that such a small amount of
>> data would cause an out of memory exception. I am running a pretty standard
>> machine learning process
>>
>>
>>    1. Load data
>>    2. create a ML pipeline
>>    3. transform the data
>>    4. Train a model
>>    5. Make predictions
>>    6. Join the predictions back to my original data set
>>    7. Coalesce(1), I only have a small amount of data and want to save
>>    it in a single file
>>    8. Save final results back to disk
>>
>>
>> Step 7: I am unable to call Coalesce() “java.io.IOException: Unable to
>> acquire memory”
>>
>> To try and figure out what is going I put log messages in to count the
>> number of partitions
>>
>> Turns out I have 20 input files, each one winds up in a separate
>> partition. Okay so after loading I call coalesce(1) and check to make sure
>> I only have a single partition.
>>
>> The total number of observations is 1998.
>>
>> After calling step 7 I count the number of partitions and discovered I
>> have 224 partitions!. Surprising given I called Coalesce(1) before I
>> did anything with the data. My data set should easily fit in memory. When I
>> save them to disk I get 202 files created with 162 of them being empty!
>>
>> In general I am not explicitly using cache.
>>
>> Some of the data frames get registered as tables. I find it easier to use
>> sql.
>>
>> Some of the data frames get converted back to RDDs. I find it easier to
>> create RDD<LabeledPoint> this way
>>
>> I put calls to unpersist(true). In several places
>>
>>    private void memoryCheck(String name) {
>>
>>         Runtime rt = Runtime.getRuntime();
>>
>>         logger.warn("name: {} \t\ttotalMemory: {} \tfreeMemory: {}
>> df.size: {}",
>>
>>                 name,
>>
>>                 String.format("%,d", rt.totalMemory()),
>>
>>                 String.format("%,d", rt.freeMemory()));
>>
>>         }
>>
>> Any idea how I can get a better understanding of what is going on? My
>> goal is to learn to write better spark code.
>>
>> Kind regards
>>
>> Andy
>>
>> Memory usages at various points in my unit test
>>
>> name: rawInput totalMemory:   447,741,952 freeMemory:   233,203,184
>>
>> name: naiveBayesModel totalMemory:   509,083,648 freeMemory:
>> 403,504,128
>>
>> name: lpRDD totalMemory:   509,083,648 freeMemory:   402,288,104
>>
>> name: results totalMemory:   509,083,648 freeMemory:   368,011,008
>>
>>
>>        DataFrame exploreDF = results.select(results.col("id"),
>>
>>                                     results.col("label"),
>>
>>                                     results.col("binomialLabel"),
>>
>>
>>                                     results.col("labelIndex"),
>>
>>
>>                                     results.col("prediction"),
>>
>>
>>                                     results.col("words"));
>>
>>         exploreDF.show(10);
>>
>>
>>
>> Yes I realize its strange to switch styles how ever this should not cause
>> memory problems
>>
>>
>>         final String exploreTable = "exploreTable";
>>
>>         exploreDF.registerTempTable(exploreTable);
>>
>>         String fmt = "SELECT * FROM %s where binomialLabel = ’signal'";
>>
>>         String stmt = String.format(fmt, exploreTable);
>>
>>
>>         DataFrame subsetToSave = sqlContext.sql(stmt);// .show(100);
>>
>>
>> name: subsetToSave totalMemory: 1,747,451,904 freeMemory: 1,049,447,144
>>
>>
>>         exploreDF.unpersist(true); does not resolve memory issue
>>
>>
>>
>


-- 

*Chris Fregly*
Principal Data Solutions Engineer
IBM Spark Technology Center, San Francisco, CA
http://spark.tc | http://advancedspark.com

Reply via email to