Sounds like you guys are on the right track, this is purely FYI because I haven't seen it posted, just responding to the line in the original post that your data structure should fit in memory.

OK two more disclaimers "FWIW" and "maybe this is not relevant or already covered" OK here goes...

from http://spark.apache.org/docs/latest/tuning.html#memory-usage-of-reduce-tasks

Sometimes, you will get an OutOfMemoryError not because your RDDs don’t fit in memory, but because the working set of one of your tasks, such as one of the reduce tasks in |groupByKey|, was too large. Spark’s shuffle operations (|sortByKey|, |groupByKey|, |reduceByKey|, |join|, etc) build a hash table within each task to perform the grouping, which can often be large. The simplest fix here is to /increase the level of parallelism/, so that each task’s input set is smaller. Spark can efficiently support tasks as short as 200 ms, because it reuses one executor JVM across many tasks and it has a low task launching cost, so you can safely increase the level of parallelism to more than the number of cores in your clusters.

I would be curious if that helps at all. Sounds like an interesting problem you are working on.

Jim

On 12/29/2015 05:51 PM, Davies Liu wrote:
Hi Andy,

Could you change logging level to INFO and post some here? There will be some 
logging about the memory usage of a task when OOM.

In 1.6, the memory for a task is : (HeapSize  - 300M) * 0.75 / number of tasks. 
Is it possible that the heap is too small?

Davies

--
Davies Liu
Sent with Sparrow (http://www.sparrowmailapp.com/?sig)

已使用 Sparrow (http://www.sparrowmailapp.com/?sig)

在 2015年12月29日 星期二,下午4:28,Andy Davidson 写道:

Hi Michael
https://github.com/apache/spark/archive/v1.6.0.tar.gz Both 1.6.0 and 1.5.2 my unit test work when I call reparation(1) before saving output. Coalesce still fails. Coalesce(1) spark-1.5.2
Caused by:
java.io.IOException: Unable to acquire 33554432 bytes of memory
Coalesce(1) spark-1.6.0 Caused by:
java.lang.OutOfMemoryError: Unable to acquire 28 bytes of memory, got 0
Hope this helps Andy From: Michael Armbrust <mich...@databricks.com (mailto:mich...@databricks.com)>
Date: Monday, December 28, 2015 at 2:41 PM
To: Andrew Davidson <a...@santacruzintegration.com 
(mailto:a...@santacruzintegration.com)>
Cc: "user @spark" <user@spark.apache.org (mailto:user@spark.apache.org)>
Subject: Re: trouble understanding data frame memory usage 
³java.io.IOException: Unable to acquire memory²
Unfortunately in 1.5 we didn't force operators to spill when ran out of memory 
so there is not a lot you can do. It would be awesome if you could test with 
1.6 and see if things are any better?
On Mon, Dec 28, 2015 at 2:25 PM, Andy Davidson <a...@santacruzintegration.com (mailto:a...@santacruzintegration.com)> wrote:
I am using spark 1.5.1. I am running into some memory problems with a java unit test. Yes 
I could fix it by setting –Xmx (its set to 1024M) how ever I want to better understand 
what is going on so I can write better code in the future. The test runs on a Mac, 
master="Local[2]"
I have a java unit test that starts by reading a 672K ascii file. I my output data file is 152K. Its seems strange that such a small amount of data would cause an out of memory exception. I am running a pretty standard machine learning process Load data
create a ML pipeline
transform the data
Train a model
Make predictions
Join the predictions back to my original data set
Coalesce(1), I only have a small amount of data and want to save it in a single 
file
Save final results back to disk
Step 7: I am unable to call Coalesce() “java.io.IOException: Unable to acquire memory” To try and figure out what is going I put log messages in to count the number of partitions Turns out I have 20 input files, each one winds up in a separate partition. Okay so after loading I call coalesce(1) and check to make sure I only have a single partition. The total number of observations is 1998. After calling step 7 I count the number of partitions and discovered I have 224 partitions!. Surprising given I called Coalesce(1) before I did anything with the data. My data set should easily fit in memory. When I save them to disk I get 202 files created with 162 of them being empty! In general I am not explicitly using cache. Some of the data frames get registered as tables. I find it easier to use sql. Some of the data frames get converted back to RDDs. I find it easier to create RDD<LabeledPoint> this way I put calls to unpersist(true). In several places private void memoryCheck(String name) { Runtime rt = Runtime.getRuntime(); logger.warn("name: {} \t\ttotalMemory: {} \tfreeMemory: {} df.size: {}", name, String.format("%,d", rt.totalMemory()), String.format("%,d", rt.freeMemory())); } Any idea how I can get a better understanding of what is going on? My goal is to learn to write better spark code. Kind regards Andy Memory usages at various points in my unit test name: rawInput totalMemory: 447,741,952 freeMemory: 233,203,184 name: naiveBayesModel totalMemory: 509,083,648 freeMemory: 403,504,128 name: lpRDD totalMemory: 509,083,648 freeMemory: 402,288,104 name: results totalMemory: 509,083,648 freeMemory: 368,011,008 DataFrame exploreDF = results.select(results.col("id"), results.col("label"), results.col("binomialLabel"), results.col("labelIndex"), results.col("prediction"), results.col("words")); exploreDF.show(10); Yes I realize its strange to switch styles how ever this should not cause memory problems final String exploreTable = "exploreTable"; exploreDF.registerTempTable(exploreTable); String fmt = "SELECT * FROM %s where binomialLabel = ’signal'"; String stmt = String.format(fmt, exploreTable); DataFrame subsetToSave = sqlContext.sql(stmt);// .show(100); name: subsetToSave totalMemory: 1,747,451,904 freeMemory: 1,049,447,144 exploreDF.unpersist(true); does not resolve memory issue


---------------------------------------------------------------------
To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
For additional commands, e-mail: user-h...@spark.apache.org


Reply via email to