the 200 number looks strangely similar to the following default number of post-shuffle partitions which is often left untuned:
spark.sql.shuffle.partitions here's the property defined in the Spark source: https://github.com/apache/spark/blob/834e71489bf560302f9d743dff669df1134e9b74/sql/core/src/main/scala/org/apache/spark/sql/SQLConf.scala#L232 note that Spark 1.6+ will make this config param obsolete in favor of adaptive execution which uses the following as a low watermark for # of post-shuffle partitions: spark.sql.adaptive.minNumPostShufflePartitions here's the property defined in the Spark source: https://github.com/apache/spark/blob/834e71489bf560302f9d743dff669df1134e9b74/sql/core/src/main/scala/org/apache/spark/sql/SQLConf.scala#L245 On Mon, Dec 28, 2015 at 5:41 PM, Michael Armbrust <mich...@databricks.com> wrote: > Unfortunately in 1.5 we didn't force operators to spill when ran out of > memory so there is not a lot you can do. It would be awesome if you could > test with 1.6 and see if things are any better? > > On Mon, Dec 28, 2015 at 2:25 PM, Andy Davidson < > a...@santacruzintegration.com> wrote: > >> I am using spark 1.5.1. I am running into some memory problems with a >> java unit test. Yes I could fix it by setting –Xmx (its set to 1024M) how >> ever I want to better understand what is going on so I can write better >> code in the future. The test runs on a Mac, master="Local[2]" >> >> I have a java unit test that starts by reading a 672K ascii file. I my >> output data file is 152K. Its seems strange that such a small amount of >> data would cause an out of memory exception. I am running a pretty standard >> machine learning process >> >> >> 1. Load data >> 2. create a ML pipeline >> 3. transform the data >> 4. Train a model >> 5. Make predictions >> 6. Join the predictions back to my original data set >> 7. Coalesce(1), I only have a small amount of data and want to save >> it in a single file >> 8. Save final results back to disk >> >> >> Step 7: I am unable to call Coalesce() “java.io.IOException: Unable to >> acquire memory” >> >> To try and figure out what is going I put log messages in to count the >> number of partitions >> >> Turns out I have 20 input files, each one winds up in a separate >> partition. Okay so after loading I call coalesce(1) and check to make sure >> I only have a single partition. >> >> The total number of observations is 1998. >> >> After calling step 7 I count the number of partitions and discovered I >> have 224 partitions!. Surprising given I called Coalesce(1) before I >> did anything with the data. My data set should easily fit in memory. When I >> save them to disk I get 202 files created with 162 of them being empty! >> >> In general I am not explicitly using cache. >> >> Some of the data frames get registered as tables. I find it easier to use >> sql. >> >> Some of the data frames get converted back to RDDs. I find it easier to >> create RDD<LabeledPoint> this way >> >> I put calls to unpersist(true). In several places >> >> private void memoryCheck(String name) { >> >> Runtime rt = Runtime.getRuntime(); >> >> logger.warn("name: {} \t\ttotalMemory: {} \tfreeMemory: {} >> df.size: {}", >> >> name, >> >> String.format("%,d", rt.totalMemory()), >> >> String.format("%,d", rt.freeMemory())); >> >> } >> >> Any idea how I can get a better understanding of what is going on? My >> goal is to learn to write better spark code. >> >> Kind regards >> >> Andy >> >> Memory usages at various points in my unit test >> >> name: rawInput totalMemory: 447,741,952 freeMemory: 233,203,184 >> >> name: naiveBayesModel totalMemory: 509,083,648 freeMemory: >> 403,504,128 >> >> name: lpRDD totalMemory: 509,083,648 freeMemory: 402,288,104 >> >> name: results totalMemory: 509,083,648 freeMemory: 368,011,008 >> >> >> DataFrame exploreDF = results.select(results.col("id"), >> >> results.col("label"), >> >> results.col("binomialLabel"), >> >> >> results.col("labelIndex"), >> >> >> results.col("prediction"), >> >> >> results.col("words")); >> >> exploreDF.show(10); >> >> >> >> Yes I realize its strange to switch styles how ever this should not cause >> memory problems >> >> >> final String exploreTable = "exploreTable"; >> >> exploreDF.registerTempTable(exploreTable); >> >> String fmt = "SELECT * FROM %s where binomialLabel = ’signal'"; >> >> String stmt = String.format(fmt, exploreTable); >> >> >> DataFrame subsetToSave = sqlContext.sql(stmt);// .show(100); >> >> >> name: subsetToSave totalMemory: 1,747,451,904 freeMemory: 1,049,447,144 >> >> >> exploreDF.unpersist(true); does not resolve memory issue >> >> >> > -- *Chris Fregly* Principal Data Solutions Engineer IBM Spark Technology Center, San Francisco, CA http://spark.tc | http://advancedspark.com