Re: How to unit test HiveContext without OutOfMemoryError (using sbt)

2015-08-26 Thread Mike Trienis
Thanks for your response Yana,

I can increase the MaxPermSize parameter and it will allow me to run the
unit test a few more times before I run out of memory.

However, the primary issue is that running the same unit test in the same
JVM (multiple times) results in increased memory (each run of the unit
test) and I believe it has something to do with HiveContext not reclaiming
memory after it is finished (or I'm not shutting it down properly).

It could very well be related to sbt, however, it's not clear to me.


On Tue, Aug 25, 2015 at 1:12 PM, Yana Kadiyska 
wrote:

> The PermGen space error is controlled with MaxPermSize parameter. I run
> with this in my pom, I think copied pretty literally from Spark's own
> tests... I don't know what the sbt equivalent is but you should be able to
> pass it...possibly via SBT_OPTS?
>
>
>  
>   org.scalatest
>   scalatest-maven-plugin
>   1.0
>   
>
> ${project.build.directory}/surefire-reports
>   false
>   .
>   SparkTestSuite.txt
>   -Xmx3g -XX:MaxPermSize=256m
> -XX:ReservedCodeCacheSize=512m
>   
>   
>   true
>   1
>   false
>
> true
>   
>   
>   
>   
>   test
>   
>   test
>   
>   
>   
>   
>   
>
>
> On Tue, Aug 25, 2015 at 2:10 PM, Mike Trienis 
> wrote:
>
>> Hello,
>>
>> I am using sbt and created a unit test where I create a `HiveContext` and
>> execute some query and then return. Each time I run the unit test the JVM
>> will increase it's memory usage until I get the error:
>>
>> Internal error when running tests: java.lang.OutOfMemoryError: PermGen
>> space
>> Exception in thread "Thread-2" java.io.EOFException
>>
>> As a work-around, I can fork a new JVM each time I run the unit test,
>> however, it seems like a bad solution as takes a while to run the unit
>> test.
>>
>> By the way, I tried to importing the TestHiveContext:
>>
>>- import org.apache.spark.sql.hive.test.TestHiveContext
>>
>> However, it suffers from the same memory issue. Has anyone else suffered
>> from the same problem? Note that I am running these unit tests on my mac.
>>
>> Cheers, Mike.
>>
>>
>


Re: How to unit test HiveContext without OutOfMemoryError (using sbt)

2015-08-26 Thread Michael Armbrust
I'd suggest setting sbt to fork when running tests.

On Wed, Aug 26, 2015 at 10:51 AM, Mike Trienis 
wrote:

> Thanks for your response Yana,
>
> I can increase the MaxPermSize parameter and it will allow me to run the
> unit test a few more times before I run out of memory.
>
> However, the primary issue is that running the same unit test in the same
> JVM (multiple times) results in increased memory (each run of the unit
> test) and I believe it has something to do with HiveContext not reclaiming
> memory after it is finished (or I'm not shutting it down properly).
>
> It could very well be related to sbt, however, it's not clear to me.
>
>
> On Tue, Aug 25, 2015 at 1:12 PM, Yana Kadiyska 
> wrote:
>
>> The PermGen space error is controlled with MaxPermSize parameter. I run
>> with this in my pom, I think copied pretty literally from Spark's own
>> tests... I don't know what the sbt equivalent is but you should be able to
>> pass it...possibly via SBT_OPTS?
>>
>>
>>  
>>   org.scalatest
>>   scalatest-maven-plugin
>>   1.0
>>   
>>
>> ${project.build.directory}/surefire-reports
>>   false
>>   .
>>   SparkTestSuite.txt
>>   -Xmx3g -XX:MaxPermSize=256m
>> -XX:ReservedCodeCacheSize=512m
>>   
>>   
>>   true
>>   1
>>   false
>>
>> true
>>   
>>   
>>   
>>   
>>   test
>>   
>>   test
>>   
>>   
>>   
>>   
>>   
>>
>>
>> On Tue, Aug 25, 2015 at 2:10 PM, Mike Trienis 
>> wrote:
>>
>>> Hello,
>>>
>>> I am using sbt and created a unit test where I create a `HiveContext`
>>> and execute some query and then return. Each time I run the unit test the
>>> JVM will increase it's memory usage until I get the error:
>>>
>>> Internal error when running tests: java.lang.OutOfMemoryError: PermGen
>>> space
>>> Exception in thread "Thread-2" java.io.EOFException
>>>
>>> As a work-around, I can fork a new JVM each time I run the unit test,
>>> however, it seems like a bad solution as takes a while to run the unit
>>> test.
>>>
>>> By the way, I tried to importing the TestHiveContext:
>>>
>>>- import org.apache.spark.sql.hive.test.TestHiveContext
>>>
>>> However, it suffers from the same memory issue. Has anyone else suffered
>>> from the same problem? Note that I am running these unit tests on my mac.
>>>
>>> Cheers, Mike.
>>>
>>>
>>
>


Re: Spark-xml - OutOfMemoryError: Requested array size exceeds VM limit

2016-11-15 Thread Hyukjin Kwon
Hi Arun,


I have few questions.

Dose your XML file have like few huge documents? In this case of a row
having a huge size like (like 500MB), it would consume a lot of memory

becuase at least it should hold a row to iterate if I remember correctly. I
remember this happened to me before while processing a huge record for test
purpose.


How about trying to increase --executor-memory?


Also, you could try to select only few fields to prune the data with the
latest version just to doubly sure if you don't mind?.


Lastly, do you mind if I ask to open an issue in
https://github.com/databricks/spark-xml/issues if you still face this
problem?

I will try to take a look at my best.


Thank you.


2016-11-16 9:12 GMT+09:00 Arun Patel :

> I am trying to read an XML file which is 1GB is size.  I am getting an
> error 'java.lang.OutOfMemoryError: Requested array size exceeds VM limit'
> after reading 7 partitions in local mode.  In Yarn mode, it
> throws 'java.lang.OutOfMemoryError: Java heap space' error after reading
> 3 partitions.
>
> Any suggestion?
>
> PySpark Shell Command:pyspark --master local[4] --driver-memory 3G
> --jars / tmp/spark-xml_2.10-0.3.3.jar
>
>
>
> Dataframe Creation Command:   df = sqlContext.read.format('com.da
> tabricks.spark.xml').options(rowTag='GGL').load('GGL_1.2G.xml')
>
>
>
> 16/11/15 18:27:04 INFO TaskSetManager: Finished task 1.0 in stage 0.0 (TID
> 1) in 25978 ms on localhost (1/10)
>
> 16/11/15 18:27:04 INFO NewHadoopRDD: Input split:
> hdfs://singlenodevm:8020/user/arunp/GGL_1.2G.xml:268435456+134217728
>
> 16/11/15 18:27:55 INFO Executor: Finished task 2.0 in stage 0.0 (TID 2).
> 2309 bytes result sent to driver
>
> 16/11/15 18:27:55 INFO TaskSetManager: Starting task 3.0 in stage 0.0 (TID
> 3, localhost, partition 3,ANY, 2266 bytes)
>
> 16/11/15 18:27:55 INFO Executor: Running task 3.0 in stage 0.0 (TID 3)
>
> 16/11/15 18:27:55 INFO TaskSetManager: Finished task 2.0 in stage 0.0 (TID
> 2) in 51001 ms on localhost (2/10)
>
> 16/11/15 18:27:55 INFO NewHadoopRDD: Input split:
> hdfs://singlenodevm:8020/user/arunp/GGL_1.2G.xml:402653184+134217728
>
> 16/11/15 18:28:19 INFO Executor: Finished task 3.0 in stage 0.0 (TID 3).
> 2309 bytes result sent to driver
>
> 16/11/15 18:28:19 INFO TaskSetManager: Starting task 4.0 in stage 0.0 (TID
> 4, localhost, partition 4,ANY, 2266 bytes)
>
> 16/11/15 18:28:19 INFO Executor: Running task 4.0 in stage 0.0 (TID 4)
>
> 16/11/15 18:28:19 INFO TaskSetManager: Finished task 3.0 in stage 0.0 (TID
> 3) in 24336 ms on localhost (3/10)
>
> 16/11/15 18:28:19 INFO NewHadoopRDD: Input split:
> hdfs://singlenodevm:8020/user/arunp/GGL_1.2G.xml:536870912+134217728
>
> 16/11/15 18:28:40 INFO Executor: Finished task 4.0 in stage 0.0 (TID 4).
> 2309 bytes result sent to driver
>
> 16/11/15 18:28:40 INFO TaskSetManager: Starting task 5.0 in stage 0.0 (TID
> 5, localhost, partition 5,ANY, 2266 bytes)
>
> 16/11/15 18:28:40 INFO Executor: Running task 5.0 in stage 0.0 (TID 5)
>
> 16/11/15 18:28:40 INFO TaskSetManager: Finished task 4.0 in stage 0.0 (TID
> 4) in 20895 ms on localhost (4/10)
>
> 16/11/15 18:28:40 INFO NewHadoopRDD: Input split:
> hdfs://singlenodevm:8020/user/arunp/GGL_1.2G.xml:671088640+134217728
>
> 16/11/15 18:29:01 INFO Executor: Finished task 5.0 in stage 0.0 (TID 5).
> 2309 bytes result sent to driver
>
> 16/11/15 18:29:01 INFO TaskSetManager: Starting task 6.0 in stage 0.0 (TID
> 6, localhost, partition 6,ANY, 2266 bytes)
>
> 16/11/15 18:29:01 INFO Executor: Running task 6.0 in stage 0.0 (TID 6)
>
> 16/11/15 18:29:01 INFO TaskSetManager: Finished task 5.0 in stage 0.0 (TID
> 5) in 20793 ms on localhost (5/10)
>
> 16/11/15 18:29:01 INFO NewHadoopRDD: Input split:
> hdfs://singlenodevm:8020/user/arunp/GGL_1.2G.xml:805306368+134217728
>
> 16/11/15 18:29:22 INFO Executor: Finished task 6.0 in stage 0.0 (TID 6).
> 2309 bytes result sent to driver
>
> 16/11/15 18:29:22 INFO TaskSetManager: Starting task 7.0 in stage 0.0 (TID
> 7, localhost, partition 7,ANY, 2266 bytes)
>
> 16/11/15 18:29:22 INFO Executor: Running task 7.0 in stage 0.0 (TID 7)
>
> 16/11/15 18:29:22 INFO TaskSetManager: Finished task 6.0 in stage 0.0 (TID
> 6) in 21306 ms on localhost (6/10)
>
> 16/11/15 18:29:22 INFO NewHadoopRDD: Input split:
> hdfs://singlenodevm:8020/user/arunp/GGL_1.2G.xml:939524096+134217728
>
> 16/11/15 18:29:43 INFO Executor: Finished task 7.0 in stage 0.0 (TID 7).
> 2309 bytes result sent to driver
>
> 16/11/15 18:29:43 INFO TaskSetManager: Starting task 8.0 in stage 0.0 (TID
> 8, localhost, partition 8,ANY, 2266 bytes)
>
> 16/11/15 18:29:43 INFO Executor: Running task 8.0 in stage 0.0 (TID 8)
>
> 16/11/15 18:29:43 INFO TaskSetManager: Finished task 7.0 in stage 0.0 (TID
> 7) in 21130 ms on localhost (7/10)
>
> 16/11/15 18:29:43 INFO NewHadoopRDD: Input split:
> hdfs://singlenodevm:8020/user/arunp/GGL_1.2G.xml:1073741824+134217728
>
> 16/11/15 18:29:48 ERROR Executor: Exception in task 0.0 in stage 0.0 (TID
> 0)
>
> java.lang.OutOfMemoryError: Request

Re: Spark-xml - OutOfMemoryError: Requested array size exceeds VM limit

2016-11-15 Thread Arun Patel
Thanks for the quick response.

Its a single XML file and I am using a top level rowTag.  So, it creates
only one row in a Dataframe with 5 columns. One of these columns will
contain most of the data as StructType.  Is there a limitation to store
data in a cell of a Dataframe?

I will check with new version and try to use different rowTags and increase
executor-memory tomorrow. I will open a new issue as well.



On Tue, Nov 15, 2016 at 7:52 PM, Hyukjin Kwon  wrote:

> Hi Arun,
>
>
> I have few questions.
>
> Dose your XML file have like few huge documents? In this case of a row
> having a huge size like (like 500MB), it would consume a lot of memory
>
> becuase at least it should hold a row to iterate if I remember correctly.
> I remember this happened to me before while processing a huge record for
> test purpose.
>
>
> How about trying to increase --executor-memory?
>
>
> Also, you could try to select only few fields to prune the data with the
> latest version just to doubly sure if you don't mind?.
>
>
> Lastly, do you mind if I ask to open an issue in https://github.com/
> databricks/spark-xml/issues if you still face this problem?
>
> I will try to take a look at my best.
>
>
> Thank you.
>
>
> 2016-11-16 9:12 GMT+09:00 Arun Patel :
>
>> I am trying to read an XML file which is 1GB is size.  I am getting an
>> error 'java.lang.OutOfMemoryError: Requested array size exceeds VM
>> limit' after reading 7 partitions in local mode.  In Yarn mode, it
>> throws 'java.lang.OutOfMemoryError: Java heap space' error after reading
>> 3 partitions.
>>
>> Any suggestion?
>>
>> PySpark Shell Command:pyspark --master local[4] --driver-memory 3G
>> --jars / tmp/spark-xml_2.10-0.3.3.jar
>>
>>
>>
>> Dataframe Creation Command:   df = sqlContext.read.format('com.da
>> tabricks.spark.xml').options(rowTag='GGL').load('GGL_1.2G.xml')
>>
>>
>>
>> 16/11/15 18:27:04 INFO TaskSetManager: Finished task 1.0 in stage 0.0
>> (TID 1) in 25978 ms on localhost (1/10)
>>
>> 16/11/15 18:27:04 INFO NewHadoopRDD: Input split:
>> hdfs://singlenodevm:8020/user/arunp/GGL_1.2G.xml:268435456+134217728
>>
>> 16/11/15 18:27:55 INFO Executor: Finished task 2.0 in stage 0.0 (TID 2).
>> 2309 bytes result sent to driver
>>
>> 16/11/15 18:27:55 INFO TaskSetManager: Starting task 3.0 in stage 0.0
>> (TID 3, localhost, partition 3,ANY, 2266 bytes)
>>
>> 16/11/15 18:27:55 INFO Executor: Running task 3.0 in stage 0.0 (TID 3)
>>
>> 16/11/15 18:27:55 INFO TaskSetManager: Finished task 2.0 in stage 0.0
>> (TID 2) in 51001 ms on localhost (2/10)
>>
>> 16/11/15 18:27:55 INFO NewHadoopRDD: Input split:
>> hdfs://singlenodevm:8020/user/arunp/GGL_1.2G.xml:402653184+134217728
>>
>> 16/11/15 18:28:19 INFO Executor: Finished task 3.0 in stage 0.0 (TID 3).
>> 2309 bytes result sent to driver
>>
>> 16/11/15 18:28:19 INFO TaskSetManager: Starting task 4.0 in stage 0.0
>> (TID 4, localhost, partition 4,ANY, 2266 bytes)
>>
>> 16/11/15 18:28:19 INFO Executor: Running task 4.0 in stage 0.0 (TID 4)
>>
>> 16/11/15 18:28:19 INFO TaskSetManager: Finished task 3.0 in stage 0.0
>> (TID 3) in 24336 ms on localhost (3/10)
>>
>> 16/11/15 18:28:19 INFO NewHadoopRDD: Input split:
>> hdfs://singlenodevm:8020/user/arunp/GGL_1.2G.xml:536870912+134217728
>>
>> 16/11/15 18:28:40 INFO Executor: Finished task 4.0 in stage 0.0 (TID 4).
>> 2309 bytes result sent to driver
>>
>> 16/11/15 18:28:40 INFO TaskSetManager: Starting task 5.0 in stage 0.0
>> (TID 5, localhost, partition 5,ANY, 2266 bytes)
>>
>> 16/11/15 18:28:40 INFO Executor: Running task 5.0 in stage 0.0 (TID 5)
>>
>> 16/11/15 18:28:40 INFO TaskSetManager: Finished task 4.0 in stage 0.0
>> (TID 4) in 20895 ms on localhost (4/10)
>>
>> 16/11/15 18:28:40 INFO NewHadoopRDD: Input split:
>> hdfs://singlenodevm:8020/user/arunp/GGL_1.2G.xml:671088640+134217728
>>
>> 16/11/15 18:29:01 INFO Executor: Finished task 5.0 in stage 0.0 (TID 5).
>> 2309 bytes result sent to driver
>>
>> 16/11/15 18:29:01 INFO TaskSetManager: Starting task 6.0 in stage 0.0
>> (TID 6, localhost, partition 6,ANY, 2266 bytes)
>>
>> 16/11/15 18:29:01 INFO Executor: Running task 6.0 in stage 0.0 (TID 6)
>>
>> 16/11/15 18:29:01 INFO TaskSetManager: Finished task 5.0 in stage 0.0
>> (TID 5) in 20793 ms on localhost (5/10)
>>
>> 16/11/15 18:29:01 INFO NewHadoopRDD: Input split:
>> hdfs://singlenodevm:8020/user/arunp/GGL_1.2G.xml:805306368+134217728
>>
>> 16/11/15 18:29:22 INFO Executor: Finished task 6.0 in stage 0.0 (TID 6).
>> 2309 bytes result sent to driver
>>
>> 16/11/15 18:29:22 INFO TaskSetManager: Starting task 7.0 in stage 0.0
>> (TID 7, localhost, partition 7,ANY, 2266 bytes)
>>
>> 16/11/15 18:29:22 INFO Executor: Running task 7.0 in stage 0.0 (TID 7)
>>
>> 16/11/15 18:29:22 INFO TaskSetManager: Finished task 6.0 in stage 0.0
>> (TID 6) in 21306 ms on localhost (6/10)
>>
>> 16/11/15 18:29:22 INFO NewHadoopRDD: Input split:
>> hdfs://singlenodevm:8020/user/arunp/GGL_1.2G.xml:939524096+134217728
>>
>> 16/11/15 18:29:43 INFO Executor: Finished task 7.0 in sta

Re: Spark-xml - OutOfMemoryError: Requested array size exceeds VM limit

2016-11-16 Thread Arun Patel
I tried below options.

1) Increase executor memory.  Increased up to maximum possibility 14GB.
Same error.
2) Tried new version - spark-xml_2.10:0.4.1.  Same error.
3) Tried with low level rowTags.  It worked for lower level rowTag and
returned 16000 rows.

Are there any workarounds for this issue?  I tried playing with
spark.memory.fraction
and spark.memory.storageFraction.  But, it did not help.  Appreciate your
help on this!!!



On Tue, Nov 15, 2016 at 8:44 PM, Arun Patel  wrote:

> Thanks for the quick response.
>
> Its a single XML file and I am using a top level rowTag.  So, it creates
> only one row in a Dataframe with 5 columns. One of these columns will
> contain most of the data as StructType.  Is there a limitation to store
> data in a cell of a Dataframe?
>
> I will check with new version and try to use different rowTags and
> increase executor-memory tomorrow. I will open a new issue as well.
>
>
>
> On Tue, Nov 15, 2016 at 7:52 PM, Hyukjin Kwon  wrote:
>
>> Hi Arun,
>>
>>
>> I have few questions.
>>
>> Dose your XML file have like few huge documents? In this case of a row
>> having a huge size like (like 500MB), it would consume a lot of memory
>>
>> becuase at least it should hold a row to iterate if I remember correctly.
>> I remember this happened to me before while processing a huge record for
>> test purpose.
>>
>>
>> How about trying to increase --executor-memory?
>>
>>
>> Also, you could try to select only few fields to prune the data with the
>> latest version just to doubly sure if you don't mind?.
>>
>>
>> Lastly, do you mind if I ask to open an issue in
>> https://github.com/databricks/spark-xml/issues if you still face this
>> problem?
>>
>> I will try to take a look at my best.
>>
>>
>> Thank you.
>>
>>
>> 2016-11-16 9:12 GMT+09:00 Arun Patel :
>>
>>> I am trying to read an XML file which is 1GB is size.  I am getting an
>>> error 'java.lang.OutOfMemoryError: Requested array size exceeds VM
>>> limit' after reading 7 partitions in local mode.  In Yarn mode, it
>>> throws 'java.lang.OutOfMemoryError: Java heap space' error after
>>> reading 3 partitions.
>>>
>>> Any suggestion?
>>>
>>> PySpark Shell Command:pyspark --master local[4] --driver-memory 3G
>>> --jars / tmp/spark-xml_2.10-0.3.3.jar
>>>
>>>
>>>
>>> Dataframe Creation Command:   df = sqlContext.read.format('com.da
>>> tabricks.spark.xml').options(rowTag='GGL').load('GGL_1.2G.xml')
>>>
>>>
>>>
>>> 16/11/15 18:27:04 INFO TaskSetManager: Finished task 1.0 in stage 0.0
>>> (TID 1) in 25978 ms on localhost (1/10)
>>>
>>> 16/11/15 18:27:04 INFO NewHadoopRDD: Input split:
>>> hdfs://singlenodevm:8020/user/arunp/GGL_1.2G.xml:268435456+134217728
>>>
>>> 16/11/15 18:27:55 INFO Executor: Finished task 2.0 in stage 0.0 (TID 2).
>>> 2309 bytes result sent to driver
>>>
>>> 16/11/15 18:27:55 INFO TaskSetManager: Starting task 3.0 in stage 0.0
>>> (TID 3, localhost, partition 3,ANY, 2266 bytes)
>>>
>>> 16/11/15 18:27:55 INFO Executor: Running task 3.0 in stage 0.0 (TID 3)
>>>
>>> 16/11/15 18:27:55 INFO TaskSetManager: Finished task 2.0 in stage 0.0
>>> (TID 2) in 51001 ms on localhost (2/10)
>>>
>>> 16/11/15 18:27:55 INFO NewHadoopRDD: Input split:
>>> hdfs://singlenodevm:8020/user/arunp/GGL_1.2G.xml:402653184+134217728
>>>
>>> 16/11/15 18:28:19 INFO Executor: Finished task 3.0 in stage 0.0 (TID 3).
>>> 2309 bytes result sent to driver
>>>
>>> 16/11/15 18:28:19 INFO TaskSetManager: Starting task 4.0 in stage 0.0
>>> (TID 4, localhost, partition 4,ANY, 2266 bytes)
>>>
>>> 16/11/15 18:28:19 INFO Executor: Running task 4.0 in stage 0.0 (TID 4)
>>>
>>> 16/11/15 18:28:19 INFO TaskSetManager: Finished task 3.0 in stage 0.0
>>> (TID 3) in 24336 ms on localhost (3/10)
>>>
>>> 16/11/15 18:28:19 INFO NewHadoopRDD: Input split:
>>> hdfs://singlenodevm:8020/user/arunp/GGL_1.2G.xml:536870912+134217728
>>>
>>> 16/11/15 18:28:40 INFO Executor: Finished task 4.0 in stage 0.0 (TID 4).
>>> 2309 bytes result sent to driver
>>>
>>> 16/11/15 18:28:40 INFO TaskSetManager: Starting task 5.0 in stage 0.0
>>> (TID 5, localhost, partition 5,ANY, 2266 bytes)
>>>
>>> 16/11/15 18:28:40 INFO Executor: Running task 5.0 in stage 0.0 (TID 5)
>>>
>>> 16/11/15 18:28:40 INFO TaskSetManager: Finished task 4.0 in stage 0.0
>>> (TID 4) in 20895 ms on localhost (4/10)
>>>
>>> 16/11/15 18:28:40 INFO NewHadoopRDD: Input split:
>>> hdfs://singlenodevm:8020/user/arunp/GGL_1.2G.xml:671088640+134217728
>>>
>>> 16/11/15 18:29:01 INFO Executor: Finished task 5.0 in stage 0.0 (TID 5).
>>> 2309 bytes result sent to driver
>>>
>>> 16/11/15 18:29:01 INFO TaskSetManager: Starting task 6.0 in stage 0.0
>>> (TID 6, localhost, partition 6,ANY, 2266 bytes)
>>>
>>> 16/11/15 18:29:01 INFO Executor: Running task 6.0 in stage 0.0 (TID 6)
>>>
>>> 16/11/15 18:29:01 INFO TaskSetManager: Finished task 5.0 in stage 0.0
>>> (TID 5) in 20793 ms on localhost (5/10)
>>>
>>> 16/11/15 18:29:01 INFO NewHadoopRDD: Input split:
>>> hdfs://singlenodevm:8020/user/arunp/GGL_1.2G.xml:805306368+134217728
>>>

Re: Spark-xml - OutOfMemoryError: Requested array size exceeds VM limit

2016-11-16 Thread Hyukjin Kwon
It seems a bit weird. Could we open an issue and talk in the repository
link I sent?

Let me try to reproduce your case with your data if possible.

On 17 Nov 2016 2:26 a.m., "Arun Patel"  wrote:

> I tried below options.
>
> 1) Increase executor memory.  Increased up to maximum possibility 14GB.
> Same error.
> 2) Tried new version - spark-xml_2.10:0.4.1.  Same error.
> 3) Tried with low level rowTags.  It worked for lower level rowTag and
> returned 16000 rows.
>
> Are there any workarounds for this issue?  I tried playing with 
> spark.memory.fraction
> and spark.memory.storageFraction.  But, it did not help.  Appreciate your
> help on this!!!
>
>
>
> On Tue, Nov 15, 2016 at 8:44 PM, Arun Patel 
> wrote:
>
>> Thanks for the quick response.
>>
>> Its a single XML file and I am using a top level rowTag.  So, it creates
>> only one row in a Dataframe with 5 columns. One of these columns will
>> contain most of the data as StructType.  Is there a limitation to store
>> data in a cell of a Dataframe?
>>
>> I will check with new version and try to use different rowTags and
>> increase executor-memory tomorrow. I will open a new issue as well.
>>
>>
>>
>> On Tue, Nov 15, 2016 at 7:52 PM, Hyukjin Kwon 
>> wrote:
>>
>>> Hi Arun,
>>>
>>>
>>> I have few questions.
>>>
>>> Dose your XML file have like few huge documents? In this case of a row
>>> having a huge size like (like 500MB), it would consume a lot of memory
>>>
>>> becuase at least it should hold a row to iterate if I remember
>>> correctly. I remember this happened to me before while processing a huge
>>> record for test purpose.
>>>
>>>
>>> How about trying to increase --executor-memory?
>>>
>>>
>>> Also, you could try to select only few fields to prune the data with the
>>> latest version just to doubly sure if you don't mind?.
>>>
>>>
>>> Lastly, do you mind if I ask to open an issue in
>>> https://github.com/databricks/spark-xml/issues if you still face this
>>> problem?
>>>
>>> I will try to take a look at my best.
>>>
>>>
>>> Thank you.
>>>
>>>
>>> 2016-11-16 9:12 GMT+09:00 Arun Patel :
>>>
 I am trying to read an XML file which is 1GB is size.  I am getting an
 error 'java.lang.OutOfMemoryError: Requested array size exceeds VM
 limit' after reading 7 partitions in local mode.  In Yarn mode, it
 throws 'java.lang.OutOfMemoryError: Java heap space' error after
 reading 3 partitions.

 Any suggestion?

 PySpark Shell Command:pyspark --master local[4] --driver-memory 3G
 --jars / tmp/spark-xml_2.10-0.3.3.jar



 Dataframe Creation Command:   df = sqlContext.read.format('com.da
 tabricks.spark.xml').options(rowTag='GGL').load('GGL_1.2G.xml')



 16/11/15 18:27:04 INFO TaskSetManager: Finished task 1.0 in stage 0.0
 (TID 1) in 25978 ms on localhost (1/10)

 16/11/15 18:27:04 INFO NewHadoopRDD: Input split:
 hdfs://singlenodevm:8020/user/arunp/GGL_1.2G.xml:268435456+134217728

 16/11/15 18:27:55 INFO Executor: Finished task 2.0 in stage 0.0 (TID
 2). 2309 bytes result sent to driver

 16/11/15 18:27:55 INFO TaskSetManager: Starting task 3.0 in stage 0.0
 (TID 3, localhost, partition 3,ANY, 2266 bytes)

 16/11/15 18:27:55 INFO Executor: Running task 3.0 in stage 0.0 (TID 3)

 16/11/15 18:27:55 INFO TaskSetManager: Finished task 2.0 in stage 0.0
 (TID 2) in 51001 ms on localhost (2/10)

 16/11/15 18:27:55 INFO NewHadoopRDD: Input split:
 hdfs://singlenodevm:8020/user/arunp/GGL_1.2G.xml:402653184+134217728

 16/11/15 18:28:19 INFO Executor: Finished task 3.0 in stage 0.0 (TID
 3). 2309 bytes result sent to driver

 16/11/15 18:28:19 INFO TaskSetManager: Starting task 4.0 in stage 0.0
 (TID 4, localhost, partition 4,ANY, 2266 bytes)

 16/11/15 18:28:19 INFO Executor: Running task 4.0 in stage 0.0 (TID 4)

 16/11/15 18:28:19 INFO TaskSetManager: Finished task 3.0 in stage 0.0
 (TID 3) in 24336 ms on localhost (3/10)

 16/11/15 18:28:19 INFO NewHadoopRDD: Input split:
 hdfs://singlenodevm:8020/user/arunp/GGL_1.2G.xml:536870912+134217728

 16/11/15 18:28:40 INFO Executor: Finished task 4.0 in stage 0.0 (TID
 4). 2309 bytes result sent to driver

 16/11/15 18:28:40 INFO TaskSetManager: Starting task 5.0 in stage 0.0
 (TID 5, localhost, partition 5,ANY, 2266 bytes)

 16/11/15 18:28:40 INFO Executor: Running task 5.0 in stage 0.0 (TID 5)

 16/11/15 18:28:40 INFO TaskSetManager: Finished task 4.0 in stage 0.0
 (TID 4) in 20895 ms on localhost (4/10)

 16/11/15 18:28:40 INFO NewHadoopRDD: Input split:
 hdfs://singlenodevm:8020/user/arunp/GGL_1.2G.xml:671088640+134217728

 16/11/15 18:29:01 INFO Executor: Finished task 5.0 in stage 0.0 (TID
 5). 2309 bytes result sent to driver

 16/11/15 18:29:01 INFO TaskSetManager: Starting task 6.0 in stage 0.0
 (TID 6, localhost, partition 6,ANY, 

Re: Driver OutOfMemoryError in MapOutputTracker$.serializeMapStatuses for 40 TB shuffle.

2018-09-07 Thread Vadim Semenov
You have too many partitions, so when the driver is trying to gather
the status of all map outputs and send back to executors it chokes on
the size of the structure that needs to be GZipped, and since it's
bigger than 2GiB, it produces OOM.
On Fri, Sep 7, 2018 at 10:35 AM Harel Gliksman  wrote:
>
> Hi,
>
> We are running a Spark (2.3.1) job on an EMR cluster with 500 r3.2xlarge (60 
> GB, 8 vcores, 160 GB SSD ). Driver memory is set to 25GB.
>
> It processes ~40 TB of data using aggregateByKey in which we specify 
> numPartitions = 300,000.
> Map side tasks succeed, but reduce side tasks all fail.
>
> We notice the following driver error:
>
> 18/09/07 13:35:03 WARN Utils: Suppressing exception in finally: null
>
>  java.lang.OutOfMemoryError
>
> at java.io.ByteArrayOutputStream.hugeCapacity(ByteArrayOutputStream.java:123)
> at java.io.ByteArrayOutputStream.grow(ByteArrayOutputStream.java:117)
> at java.io.ByteArrayOutputStream.ensureCapacity(ByteArrayOutputStream.java:93)
> at java.io.ByteArrayOutputStream.write(ByteArrayOutputStream.java:153)
> at java.util.zip.DeflaterOutputStream.deflate(DeflaterOutputStream.java:253)
> at java.util.zip.DeflaterOutputStream.write(DeflaterOutputStream.java:211)
> at java.util.zip.GZIPOutputStream.write(GZIPOutputStream.java:145)
> at 
> java.io.ObjectOutputStream$BlockDataOutputStream.writeBlockHeader(ObjectOutputStream.java:1894)
> at 
> java.io.ObjectOutputStream$BlockDataOutputStream.drain(ObjectOutputStream.java:1875)
> at 
> java.io.ObjectOutputStream$BlockDataOutputStream.flush(ObjectOutputStream.java:1822)
> at java.io.ObjectOutputStream.flush(ObjectOutputStream.java:719)
> at java.io.ObjectOutputStream.close(ObjectOutputStream.java:740)
> at 
> org.apache.spark.MapOutputTracker$$anonfun$serializeMapStatuses$2.apply$mcV$sp(MapOutputTracker.scala:790)
> at org.apache.spark.util.Utils$.tryWithSafeFinally(Utils.scala:1389)
> at 
> org.apache.spark.MapOutputTracker$.serializeMapStatuses(MapOutputTracker.scala:789)
> at 
> org.apache.spark.ShuffleStatus.serializedMapStatus(MapOutputTracker.scala:174)
> at 
> org.apache.spark.MapOutputTrackerMaster$MessageLoop.run(MapOutputTracker.scala:397)
> at 
> java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)
> at 
> java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
> at java.lang.Thread.run(Thread.java:748)
> Exception in thread "map-output-dispatcher-0" java.lang.OutOfMemoryError
> at java.io.ByteArrayOutputStream.hugeCapacity(ByteArrayOutputStream.java:123)
> at java.io.ByteArrayOutputStream.grow(ByteArrayOutputStream.java:117)
> at java.io.ByteArrayOutputStream.ensureCapacity(ByteArrayOutputStream.java:93)
> at java.io.ByteArrayOutputStream.write(ByteArrayOutputStream.java:153)
> at java.util.zip.DeflaterOutputStream.deflate(DeflaterOutputStream.java:253)
> at java.util.zip.DeflaterOutputStream.write(DeflaterOutputStream.java:211)
> at java.util.zip.GZIPOutputStream.write(GZIPOutputStream.java:145)
> at 
> java.io.ObjectOutputStream$BlockDataOutputStream.writeBlockHeader(ObjectOutputStream.java:1894)
> at 
> java.io.ObjectOutputStream$BlockDataOutputStream.drain(ObjectOutputStream.java:1875)
> at 
> java.io.ObjectOutputStream$BlockDataOutputStream.setBlockDataMode(ObjectOutputStream.java:1786)
> at java.io.ObjectOutputStream.writeObject0(ObjectOutputStream.java:1189)
> at java.io.ObjectOutputStream.writeArray(ObjectOutputStream.java:1378)
> at java.io.ObjectOutputStream.writeObject0(ObjectOutputStream.java:1174)
> at java.io.ObjectOutputStream.writeObject(ObjectOutputStream.java:348)
> at 
> org.apache.spark.MapOutputTracker$$anonfun$serializeMapStatuses$1.apply$mcV$sp(MapOutputTracker.scala:787)
> at 
> org.apache.spark.MapOutputTracker$$anonfun$serializeMapStatuses$1.apply(MapOutputTracker.scala:786)
> at 
> org.apache.spark.MapOutputTracker$$anonfun$serializeMapStatuses$1.apply(MapOutputTracker.scala:786)
> at org.apache.spark.util.Utils$.tryWithSafeFinally(Utils.scala:1380)
> at 
> org.apache.spark.MapOutputTracker$.serializeMapStatuses(MapOutputTracker.scala:789)
> at 
> org.apache.spark.ShuffleStatus.serializedMapStatus(MapOutputTracker.scala:174)
> at 
> org.apache.spark.MapOutputTrackerMaster$MessageLoop.run(MapOutputTracker.scala:397)
> at 
> java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)
> at 
> java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
> at java.lang.Thread.run(Thread.java:748)
> Suppressed: java.lang.OutOfMemoryError
> at java.io.ByteArrayOutputStream.hugeCapacity(ByteArrayOutputStream.java:123)
> at java.io.ByteArrayOutputStream.grow(ByteArrayOutputStream.java:117)
> at java.io.ByteArrayOutputStream.ensureCapacity(ByteArrayOutputStream.java:93)
> at java.io.ByteArrayOutputStream.write(ByteArrayOutputStream.java:153)
> at java.util.zip.DeflaterOutputStream.deflate(DeflaterOutputStream.java:253)
> at java.util.zip.DeflaterOutputStream.write(DeflaterOutputStream.java:211)
> at java.util.z

Re: Driver OutOfMemoryError in MapOutputTracker$.serializeMapStatuses for 40 TB shuffle.

2018-09-07 Thread Harel Gliksman
I understand the error is because the number of partitions is very high,
yet when processing 40 TB (and this number is expected to grow) this number
seems reasonable:
40TB / 300,000 will result in partitions size of ~ 130MB (data should be
evenly distributed).

On Fri, Sep 7, 2018 at 6:28 PM Vadim Semenov  wrote:

> You have too many partitions, so when the driver is trying to gather
> the status of all map outputs and send back to executors it chokes on
> the size of the structure that needs to be GZipped, and since it's
> bigger than 2GiB, it produces OOM.
> On Fri, Sep 7, 2018 at 10:35 AM Harel Gliksman 
> wrote:
> >
> > Hi,
> >
> > We are running a Spark (2.3.1) job on an EMR cluster with 500 r3.2xlarge
> (60 GB, 8 vcores, 160 GB SSD ). Driver memory is set to 25GB.
> >
> > It processes ~40 TB of data using aggregateByKey in which we specify
> numPartitions = 300,000.
> > Map side tasks succeed, but reduce side tasks all fail.
> >
> > We notice the following driver error:
> >
> > 18/09/07 13:35:03 WARN Utils: Suppressing exception in finally: null
> >
> >  java.lang.OutOfMemoryError
> >
> > at
> java.io.ByteArrayOutputStream.hugeCapacity(ByteArrayOutputStream.java:123)
> > at java.io.ByteArrayOutputStream.grow(ByteArrayOutputStream.java:117)
> > at
> java.io.ByteArrayOutputStream.ensureCapacity(ByteArrayOutputStream.java:93)
> > at java.io.ByteArrayOutputStream.write(ByteArrayOutputStream.java:153)
> > at
> java.util.zip.DeflaterOutputStream.deflate(DeflaterOutputStream.java:253)
> > at
> java.util.zip.DeflaterOutputStream.write(DeflaterOutputStream.java:211)
> > at java.util.zip.GZIPOutputStream.write(GZIPOutputStream.java:145)
> > at
> java.io.ObjectOutputStream$BlockDataOutputStream.writeBlockHeader(ObjectOutputStream.java:1894)
> > at
> java.io.ObjectOutputStream$BlockDataOutputStream.drain(ObjectOutputStream.java:1875)
> > at
> java.io.ObjectOutputStream$BlockDataOutputStream.flush(ObjectOutputStream.java:1822)
> > at java.io.ObjectOutputStream.flush(ObjectOutputStream.java:719)
> > at java.io.ObjectOutputStream.close(ObjectOutputStream.java:740)
> > at
> org.apache.spark.MapOutputTracker$$anonfun$serializeMapStatuses$2.apply$mcV$sp(MapOutputTracker.scala:790)
> > at org.apache.spark.util.Utils$.tryWithSafeFinally(Utils.scala:1389)
> > at
> org.apache.spark.MapOutputTracker$.serializeMapStatuses(MapOutputTracker.scala:789)
> > at
> org.apache.spark.ShuffleStatus.serializedMapStatus(MapOutputTracker.scala:174)
> > at
> org.apache.spark.MapOutputTrackerMaster$MessageLoop.run(MapOutputTracker.scala:397)
> > at
> java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)
> > at
> java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
> > at java.lang.Thread.run(Thread.java:748)
> > Exception in thread "map-output-dispatcher-0" java.lang.OutOfMemoryError
> > at
> java.io.ByteArrayOutputStream.hugeCapacity(ByteArrayOutputStream.java:123)
> > at java.io.ByteArrayOutputStream.grow(ByteArrayOutputStream.java:117)
> > at
> java.io.ByteArrayOutputStream.ensureCapacity(ByteArrayOutputStream.java:93)
> > at java.io.ByteArrayOutputStream.write(ByteArrayOutputStream.java:153)
> > at
> java.util.zip.DeflaterOutputStream.deflate(DeflaterOutputStream.java:253)
> > at
> java.util.zip.DeflaterOutputStream.write(DeflaterOutputStream.java:211)
> > at java.util.zip.GZIPOutputStream.write(GZIPOutputStream.java:145)
> > at
> java.io.ObjectOutputStream$BlockDataOutputStream.writeBlockHeader(ObjectOutputStream.java:1894)
> > at
> java.io.ObjectOutputStream$BlockDataOutputStream.drain(ObjectOutputStream.java:1875)
> > at
> java.io.ObjectOutputStream$BlockDataOutputStream.setBlockDataMode(ObjectOutputStream.java:1786)
> > at java.io.ObjectOutputStream.writeObject0(ObjectOutputStream.java:1189)
> > at java.io.ObjectOutputStream.writeArray(ObjectOutputStream.java:1378)
> > at java.io.ObjectOutputStream.writeObject0(ObjectOutputStream.java:1174)
> > at java.io.ObjectOutputStream.writeObject(ObjectOutputStream.java:348)
> > at
> org.apache.spark.MapOutputTracker$$anonfun$serializeMapStatuses$1.apply$mcV$sp(MapOutputTracker.scala:787)
> > at
> org.apache.spark.MapOutputTracker$$anonfun$serializeMapStatuses$1.apply(MapOutputTracker.scala:786)
> > at
> org.apache.spark.MapOutputTracker$$anonfun$serializeMapStatuses$1.apply(MapOutputTracker.scala:786)
> > at org.apache.spark.util.Utils$.tryWithSafeFinally(Utils.scala:1380)
> > at
> org.apache.spark.MapOutputTracker$.serializeMapStatuses(MapOutputTracker.scala:789)
> > at
> org.apache.spark.ShuffleStatus.serializedMapStatus(MapOutputTracker.scala:174)
> > at
> org.apache.spark.MapOutputTrackerMaster$MessageLoop.run(MapOutputTracker.scala:397)
> > at
> java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)
> > at
> java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
> > at java.lang.Thread.run(Thread.java:748)
> > Suppressed: java.lang.OutOfMemoryError
> > at
> java.io.ByteArrayOutputStre

Re: Driver OutOfMemoryError in MapOutputTracker$.serializeMapStatuses for 40 TB shuffle.

2019-11-07 Thread abeboparebop
I ran into the same issue processing 20TB of data, with 200k tasks on both
the map and reduce sides. Reducing to 100k tasks each resolved the issue.
But this could/would be a major problem in cases where the data is bigger or
the computation is heavier, since reducing the number of partitions may not
be an option.


harelglik wrote
> I understand the error is because the number of partitions is very high,
> yet when processing 40 TB (and this number is expected to grow) this
> number
> seems reasonable:
> 40TB / 300,000 will result in partitions size of ~ 130MB (data should be
> evenly distributed).
> 
> On Fri, Sep 7, 2018 at 6:28 PM Vadim Semenov <

> vadim@

> > wrote:
> 
>> You have too many partitions, so when the driver is trying to gather
>> the status of all map outputs and send back to executors it chokes on
>> the size of the structure that needs to be GZipped, and since it's
>> bigger than 2GiB, it produces OOM.
>> On Fri, Sep 7, 2018 at 10:35 AM Harel Gliksman <

> harelglik@

> >
>> wrote:
>> >
>> > Hi,
>> >
>> > We are running a Spark (2.3.1) job on an EMR cluster with 500
>> r3.2xlarge
>> (60 GB, 8 vcores, 160 GB SSD ). Driver memory is set to 25GB.
>> >
>> > It processes ~40 TB of data using aggregateByKey in which we specify
>> numPartitions = 300,000.
>> > Map side tasks succeed, but reduce side tasks all fail.
>> >
>> > We notice the following driver error:
>> >
>> > 18/09/07 13:35:03 WARN Utils: Suppressing exception in finally: null
>> >
>> >  java.lang.OutOfMemoryError
>> >
>> > at
>> java.io.ByteArrayOutputStream.hugeCapacity(ByteArrayOutputStream.java:123)
>> > at java.io.ByteArrayOutputStream.grow(ByteArrayOutputStream.java:117)
>> > at
>> java.io.ByteArrayOutputStream.ensureCapacity(ByteArrayOutputStream.java:93)
>> > at java.io.ByteArrayOutputStream.write(ByteArrayOutputStream.java:153)
>> > at
>> java.util.zip.DeflaterOutputStream.deflate(DeflaterOutputStream.java:253)
>> > at
>> java.util.zip.DeflaterOutputStream.write(DeflaterOutputStream.java:211)
>> > at java.util.zip.GZIPOutputStream.write(GZIPOutputStream.java:145)
>> > at
>> java.io.ObjectOutputStream$BlockDataOutputStream.writeBlockHeader(ObjectOutputStream.java:1894)
>> > at
>> java.io.ObjectOutputStream$BlockDataOutputStream.drain(ObjectOutputStream.java:1875)
>> > at
>> java.io.ObjectOutputStream$BlockDataOutputStream.flush(ObjectOutputStream.java:1822)
>> > at java.io.ObjectOutputStream.flush(ObjectOutputStream.java:719)
>> > at java.io.ObjectOutputStream.close(ObjectOutputStream.java:740)
>> > at
>> org.apache.spark.MapOutputTracker$$anonfun$serializeMapStatuses$2.apply$mcV$sp(MapOutputTracker.scala:790)
>> > at org.apache.spark.util.Utils$.tryWithSafeFinally(Utils.scala:1389)
>> > at
>> org.apache.spark.MapOutputTracker$.serializeMapStatuses(MapOutputTracker.scala:789)
>> > at
>> org.apache.spark.ShuffleStatus.serializedMapStatus(MapOutputTracker.scala:174)
>> > at
>> org.apache.spark.MapOutputTrackerMaster$MessageLoop.run(MapOutputTracker.scala:397)
>> > at
>> java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)
>> > at
>> java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
>> > at java.lang.Thread.run(Thread.java:748)
>> > Exception in thread "map-output-dispatcher-0"
>> java.lang.OutOfMemoryError
>> > at
>> java.io.ByteArrayOutputStream.hugeCapacity(ByteArrayOutputStream.java:123)
>> > at java.io.ByteArrayOutputStream.grow(ByteArrayOutputStream.java:117)
>> > at
>> java.io.ByteArrayOutputStream.ensureCapacity(ByteArrayOutputStream.java:93)
>> > at java.io.ByteArrayOutputStream.write(ByteArrayOutputStream.java:153)
>> > at
>> java.util.zip.DeflaterOutputStream.deflate(DeflaterOutputStream.java:253)
>> > at
>> java.util.zip.DeflaterOutputStream.write(DeflaterOutputStream.java:211)
>> > at java.util.zip.GZIPOutputStream.write(GZIPOutputStream.java:145)
>> > at
>> java.io.ObjectOutputStream$BlockDataOutputStream.writeBlockHeader(ObjectOutputStream.java:1894)
>> > at
>> java.io.ObjectOutputStream$BlockDataOutputStream.drain(ObjectOutputStream.java:1875)
>> > at
>> java.io.ObjectOutputStream$BlockDataOutputStream.setBlockDataMode(ObjectOutputStream.java:1786)
>> > at
>> java.io.ObjectOutputStream.writeObject0(ObjectOutputStream.java:1189)
>> > at java.io.ObjectOutputStream.writeArray(ObjectOutputStream.java:1378)
>> > at
>> java.io.ObjectOutputStream.writeObject0(ObjectOutputStream.java:1174)
>> > at java.io.ObjectOutputStream.writeObject(ObjectOutputStream.java:348)
>> > at
>> org.apache.spark.MapOutputTracker$$anonfun$serializeMapStatuses$1.apply$mcV$sp(MapOutputTracker.scala:787)
>> > at
>> org.apache.spark.MapOutputTracker$$anonfun$serializeMapStatuses$1.apply(MapOutputTracker.scala:786)
>> > at
>> org.apache.spark.MapOutputTracker$$anonfun$serializeMapStatuses$1.apply(MapOutputTracker.scala:786)
>> > at org.apache.spark.util.Utils$.tryWithSafeFinally(Utils.scala:1380)
>> > at
>> org.apache.spark.MapOutputTracker$.serializeMapStatuses(MapOutputTracker.scala:78

Re: Driver OutOfMemoryError in MapOutputTracker$.serializeMapStatuses for 40 TB shuffle.

2019-11-08 Thread Spico Florin
Hi!
What file system are you using: EMRFS or HDFS?
Also what memory are you using for the reducer ?

On Thu, Nov 7, 2019 at 8:37 PM abeboparebop  wrote:

> I ran into the same issue processing 20TB of data, with 200k tasks on both
> the map and reduce sides. Reducing to 100k tasks each resolved the issue.
> But this could/would be a major problem in cases where the data is bigger
> or
> the computation is heavier, since reducing the number of partitions may not
> be an option.
>
>
> harelglik wrote
> > I understand the error is because the number of partitions is very high,
> > yet when processing 40 TB (and this number is expected to grow) this
> > number
> > seems reasonable:
> > 40TB / 300,000 will result in partitions size of ~ 130MB (data should be
> > evenly distributed).
> >
> > On Fri, Sep 7, 2018 at 6:28 PM Vadim Semenov <
>
> > vadim@
>
> > > wrote:
> >
> >> You have too many partitions, so when the driver is trying to gather
> >> the status of all map outputs and send back to executors it chokes on
> >> the size of the structure that needs to be GZipped, and since it's
> >> bigger than 2GiB, it produces OOM.
> >> On Fri, Sep 7, 2018 at 10:35 AM Harel Gliksman <
>
> > harelglik@
>
> > >
> >> wrote:
> >> >
> >> > Hi,
> >> >
> >> > We are running a Spark (2.3.1) job on an EMR cluster with 500
> >> r3.2xlarge
> >> (60 GB, 8 vcores, 160 GB SSD ). Driver memory is set to 25GB.
> >> >
> >> > It processes ~40 TB of data using aggregateByKey in which we specify
> >> numPartitions = 300,000.
> >> > Map side tasks succeed, but reduce side tasks all fail.
> >> >
> >> > We notice the following driver error:
> >> >
> >> > 18/09/07 13:35:03 WARN Utils: Suppressing exception in finally: null
> >> >
> >> >  java.lang.OutOfMemoryError
> >> >
> >> > at
> >>
> java.io.ByteArrayOutputStream.hugeCapacity(ByteArrayOutputStream.java:123)
> >> > at java.io.ByteArrayOutputStream.grow(ByteArrayOutputStream.java:117)
> >> > at
> >>
> java.io.ByteArrayOutputStream.ensureCapacity(ByteArrayOutputStream.java:93)
> >> > at java.io.ByteArrayOutputStream.write(ByteArrayOutputStream.java:153)
> >> > at
> >>
> java.util.zip.DeflaterOutputStream.deflate(DeflaterOutputStream.java:253)
> >> > at
> >> java.util.zip.DeflaterOutputStream.write(DeflaterOutputStream.java:211)
> >> > at java.util.zip.GZIPOutputStream.write(GZIPOutputStream.java:145)
> >> > at
> >>
> java.io.ObjectOutputStream$BlockDataOutputStream.writeBlockHeader(ObjectOutputStream.java:1894)
> >> > at
> >>
> java.io.ObjectOutputStream$BlockDataOutputStream.drain(ObjectOutputStream.java:1875)
> >> > at
> >>
> java.io.ObjectOutputStream$BlockDataOutputStream.flush(ObjectOutputStream.java:1822)
> >> > at java.io.ObjectOutputStream.flush(ObjectOutputStream.java:719)
> >> > at java.io.ObjectOutputStream.close(ObjectOutputStream.java:740)
> >> > at
> >>
> org.apache.spark.MapOutputTracker$$anonfun$serializeMapStatuses$2.apply$mcV$sp(MapOutputTracker.scala:790)
> >> > at org.apache.spark.util.Utils$.tryWithSafeFinally(Utils.scala:1389)
> >> > at
> >>
> org.apache.spark.MapOutputTracker$.serializeMapStatuses(MapOutputTracker.scala:789)
> >> > at
> >>
> org.apache.spark.ShuffleStatus.serializedMapStatus(MapOutputTracker.scala:174)
> >> > at
> >>
> org.apache.spark.MapOutputTrackerMaster$MessageLoop.run(MapOutputTracker.scala:397)
> >> > at
> >>
> java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)
> >> > at
> >>
> java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
> >> > at java.lang.Thread.run(Thread.java:748)
> >> > Exception in thread "map-output-dispatcher-0"
> >> java.lang.OutOfMemoryError
> >> > at
> >>
> java.io.ByteArrayOutputStream.hugeCapacity(ByteArrayOutputStream.java:123)
> >> > at java.io.ByteArrayOutputStream.grow(ByteArrayOutputStream.java:117)
> >> > at
> >>
> java.io.ByteArrayOutputStream.ensureCapacity(ByteArrayOutputStream.java:93)
> >> > at java.io.ByteArrayOutputStream.write(ByteArrayOutputStream.java:153)
> >> > at
> >>
> java.util.zip.DeflaterOutputStream.deflate(DeflaterOutputStream.java:253)
> >> > at
> >> java.util.zip.DeflaterOutputStream.write(DeflaterOutputStream.java:211)
> >> > at java.util.zip.GZIPOutputStream.write(GZIPOutputStream.java:145)
> >> > at
> >>
> java.io.ObjectOutputStream$BlockDataOutputStream.writeBlockHeader(ObjectOutputStream.java:1894)
> >> > at
> >>
> java.io.ObjectOutputStream$BlockDataOutputStream.drain(ObjectOutputStream.java:1875)
> >> > at
> >>
> java.io.ObjectOutputStream$BlockDataOutputStream.setBlockDataMode(ObjectOutputStream.java:1786)
> >> > at
> >> java.io.ObjectOutputStream.writeObject0(ObjectOutputStream.java:1189)
> >> > at java.io.ObjectOutputStream.writeArray(ObjectOutputStream.java:1378)
> >> > at
> >> java.io.ObjectOutputStream.writeObject0(ObjectOutputStream.java:1174)
> >> > at java.io.ObjectOutputStream.writeObject(ObjectOutputStream.java:348)
> >> > at
> >>
> org.apache.spark.MapOutputTracker$$anonfun$serializeMapStatuses$1.apply$mcV$sp(MapOutputTrack

Re: Driver OutOfMemoryError in MapOutputTracker$.serializeMapStatuses for 40 TB shuffle.

2019-11-08 Thread Jacob Lynn
File system is HDFS. Executors are 2 cores, 14GB RAM. But I don't think
either of these relate to the problem -- this is a memory allocation issue
on the driver side, and happens in an intermediate stage that has no HDFS
read/write.

On Fri, Nov 8, 2019 at 10:01 AM Spico Florin  wrote:

> Hi!
> What file system are you using: EMRFS or HDFS?
> Also what memory are you using for the reducer ?
>
> On Thu, Nov 7, 2019 at 8:37 PM abeboparebop 
> wrote:
>
>> I ran into the same issue processing 20TB of data, with 200k tasks on both
>> the map and reduce sides. Reducing to 100k tasks each resolved the issue.
>> But this could/would be a major problem in cases where the data is bigger
>> or
>> the computation is heavier, since reducing the number of partitions may
>> not
>> be an option.
>>
>>
>> harelglik wrote
>> > I understand the error is because the number of partitions is very high,
>> > yet when processing 40 TB (and this number is expected to grow) this
>> > number
>> > seems reasonable:
>> > 40TB / 300,000 will result in partitions size of ~ 130MB (data should be
>> > evenly distributed).
>> >
>> > On Fri, Sep 7, 2018 at 6:28 PM Vadim Semenov <
>>
>> > vadim@
>>
>> > > wrote:
>> >
>> >> You have too many partitions, so when the driver is trying to gather
>> >> the status of all map outputs and send back to executors it chokes on
>> >> the size of the structure that needs to be GZipped, and since it's
>> >> bigger than 2GiB, it produces OOM.
>> >> On Fri, Sep 7, 2018 at 10:35 AM Harel Gliksman <
>>
>> > harelglik@
>>
>> > >
>> >> wrote:
>> >> >
>> >> > Hi,
>> >> >
>> >> > We are running a Spark (2.3.1) job on an EMR cluster with 500
>> >> r3.2xlarge
>> >> (60 GB, 8 vcores, 160 GB SSD ). Driver memory is set to 25GB.
>> >> >
>> >> > It processes ~40 TB of data using aggregateByKey in which we specify
>> >> numPartitions = 300,000.
>> >> > Map side tasks succeed, but reduce side tasks all fail.
>> >> >
>> >> > We notice the following driver error:
>> >> >
>> >> > 18/09/07 13:35:03 WARN Utils: Suppressing exception in finally: null
>> >> >
>> >> >  java.lang.OutOfMemoryError
>> >> >
>> >> > at
>> >>
>> java.io.ByteArrayOutputStream.hugeCapacity(ByteArrayOutputStream.java:123)
>> >> > at java.io.ByteArrayOutputStream.grow(ByteArrayOutputStream.java:117)
>> >> > at
>> >>
>> java.io.ByteArrayOutputStream.ensureCapacity(ByteArrayOutputStream.java:93)
>> >> > at
>> java.io.ByteArrayOutputStream.write(ByteArrayOutputStream.java:153)
>> >> > at
>> >>
>> java.util.zip.DeflaterOutputStream.deflate(DeflaterOutputStream.java:253)
>> >> > at
>> >> java.util.zip.DeflaterOutputStream.write(DeflaterOutputStream.java:211)
>> >> > at java.util.zip.GZIPOutputStream.write(GZIPOutputStream.java:145)
>> >> > at
>> >>
>> java.io.ObjectOutputStream$BlockDataOutputStream.writeBlockHeader(ObjectOutputStream.java:1894)
>> >> > at
>> >>
>> java.io.ObjectOutputStream$BlockDataOutputStream.drain(ObjectOutputStream.java:1875)
>> >> > at
>> >>
>> java.io.ObjectOutputStream$BlockDataOutputStream.flush(ObjectOutputStream.java:1822)
>> >> > at java.io.ObjectOutputStream.flush(ObjectOutputStream.java:719)
>> >> > at java.io.ObjectOutputStream.close(ObjectOutputStream.java:740)
>> >> > at
>> >>
>> org.apache.spark.MapOutputTracker$$anonfun$serializeMapStatuses$2.apply$mcV$sp(MapOutputTracker.scala:790)
>> >> > at org.apache.spark.util.Utils$.tryWithSafeFinally(Utils.scala:1389)
>> >> > at
>> >>
>> org.apache.spark.MapOutputTracker$.serializeMapStatuses(MapOutputTracker.scala:789)
>> >> > at
>> >>
>> org.apache.spark.ShuffleStatus.serializedMapStatus(MapOutputTracker.scala:174)
>> >> > at
>> >>
>> org.apache.spark.MapOutputTrackerMaster$MessageLoop.run(MapOutputTracker.scala:397)
>> >> > at
>> >>
>> java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)
>> >> > at
>> >>
>> java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
>> >> > at java.lang.Thread.run(Thread.java:748)
>> >> > Exception in thread "map-output-dispatcher-0"
>> >> java.lang.OutOfMemoryError
>> >> > at
>> >>
>> java.io.ByteArrayOutputStream.hugeCapacity(ByteArrayOutputStream.java:123)
>> >> > at java.io.ByteArrayOutputStream.grow(ByteArrayOutputStream.java:117)
>> >> > at
>> >>
>> java.io.ByteArrayOutputStream.ensureCapacity(ByteArrayOutputStream.java:93)
>> >> > at
>> java.io.ByteArrayOutputStream.write(ByteArrayOutputStream.java:153)
>> >> > at
>> >>
>> java.util.zip.DeflaterOutputStream.deflate(DeflaterOutputStream.java:253)
>> >> > at
>> >> java.util.zip.DeflaterOutputStream.write(DeflaterOutputStream.java:211)
>> >> > at java.util.zip.GZIPOutputStream.write(GZIPOutputStream.java:145)
>> >> > at
>> >>
>> java.io.ObjectOutputStream$BlockDataOutputStream.writeBlockHeader(ObjectOutputStream.java:1894)
>> >> > at
>> >>
>> java.io.ObjectOutputStream$BlockDataOutputStream.drain(ObjectOutputStream.java:1875)
>> >> > at
>> >>
>> java.io.ObjectOutputStream$BlockDataOutputStream.setBlockDataMode(ObjectOutputStream.java:1786)
>> >> > 

Re: Driver OutOfMemoryError in MapOutputTracker$.serializeMapStatuses for 40 TB shuffle.

2019-11-08 Thread Vadim Semenov
Basically, the driver tracks partitions and sends it over to
executors, so what it's trying to do is to serialize and compress the
map but because it's so big, it goes over 2GiB and that's Java's limit
on the max size of byte arrays, so the whole thing drops.

The size of data doesn't matter here much but the number of partitions
is what the root cause of the issue, try reducing it below 3 and
see how it goes.

On Fri, Sep 7, 2018 at 10:35 AM Harel Gliksman  wrote:
>
> Hi,
>
> We are running a Spark (2.3.1) job on an EMR cluster with 500 r3.2xlarge (60 
> GB, 8 vcores, 160 GB SSD ). Driver memory is set to 25GB.
>
> It processes ~40 TB of data using aggregateByKey in which we specify 
> numPartitions = 300,000.
> Map side tasks succeed, but reduce side tasks all fail.
>
> We notice the following driver error:
>
> 18/09/07 13:35:03 WARN Utils: Suppressing exception in finally: null
>
>  java.lang.OutOfMemoryError
>
> at java.io.ByteArrayOutputStream.hugeCapacity(ByteArrayOutputStream.java:123)
> at java.io.ByteArrayOutputStream.grow(ByteArrayOutputStream.java:117)
> at java.io.ByteArrayOutputStream.ensureCapacity(ByteArrayOutputStream.java:93)
> at java.io.ByteArrayOutputStream.write(ByteArrayOutputStream.java:153)
> at java.util.zip.DeflaterOutputStream.deflate(DeflaterOutputStream.java:253)
> at java.util.zip.DeflaterOutputStream.write(DeflaterOutputStream.java:211)
> at java.util.zip.GZIPOutputStream.write(GZIPOutputStream.java:145)
> at 
> java.io.ObjectOutputStream$BlockDataOutputStream.writeBlockHeader(ObjectOutputStream.java:1894)
> at 
> java.io.ObjectOutputStream$BlockDataOutputStream.drain(ObjectOutputStream.java:1875)
> at 
> java.io.ObjectOutputStream$BlockDataOutputStream.flush(ObjectOutputStream.java:1822)
> at java.io.ObjectOutputStream.flush(ObjectOutputStream.java:719)
> at java.io.ObjectOutputStream.close(ObjectOutputStream.java:740)
> at 
> org.apache.spark.MapOutputTracker$$anonfun$serializeMapStatuses$2.apply$mcV$sp(MapOutputTracker.scala:790)
> at org.apache.spark.util.Utils$.tryWithSafeFinally(Utils.scala:1389)
> at 
> org.apache.spark.MapOutputTracker$.serializeMapStatuses(MapOutputTracker.scala:789)
> at 
> org.apache.spark.ShuffleStatus.serializedMapStatus(MapOutputTracker.scala:174)
> at 
> org.apache.spark.MapOutputTrackerMaster$MessageLoop.run(MapOutputTracker.scala:397)
> at 
> java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)
> at 
> java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
> at java.lang.Thread.run(Thread.java:748)
> Exception in thread "map-output-dispatcher-0" java.lang.OutOfMemoryError
> at java.io.ByteArrayOutputStream.hugeCapacity(ByteArrayOutputStream.java:123)
> at java.io.ByteArrayOutputStream.grow(ByteArrayOutputStream.java:117)
> at java.io.ByteArrayOutputStream.ensureCapacity(ByteArrayOutputStream.java:93)
> at java.io.ByteArrayOutputStream.write(ByteArrayOutputStream.java:153)
> at java.util.zip.DeflaterOutputStream.deflate(DeflaterOutputStream.java:253)
> at java.util.zip.DeflaterOutputStream.write(DeflaterOutputStream.java:211)
> at java.util.zip.GZIPOutputStream.write(GZIPOutputStream.java:145)
> at 
> java.io.ObjectOutputStream$BlockDataOutputStream.writeBlockHeader(ObjectOutputStream.java:1894)
> at 
> java.io.ObjectOutputStream$BlockDataOutputStream.drain(ObjectOutputStream.java:1875)
> at 
> java.io.ObjectOutputStream$BlockDataOutputStream.setBlockDataMode(ObjectOutputStream.java:1786)
> at java.io.ObjectOutputStream.writeObject0(ObjectOutputStream.java:1189)
> at java.io.ObjectOutputStream.writeArray(ObjectOutputStream.java:1378)
> at java.io.ObjectOutputStream.writeObject0(ObjectOutputStream.java:1174)
> at java.io.ObjectOutputStream.writeObject(ObjectOutputStream.java:348)
> at 
> org.apache.spark.MapOutputTracker$$anonfun$serializeMapStatuses$1.apply$mcV$sp(MapOutputTracker.scala:787)
> at 
> org.apache.spark.MapOutputTracker$$anonfun$serializeMapStatuses$1.apply(MapOutputTracker.scala:786)
> at 
> org.apache.spark.MapOutputTracker$$anonfun$serializeMapStatuses$1.apply(MapOutputTracker.scala:786)
> at org.apache.spark.util.Utils$.tryWithSafeFinally(Utils.scala:1380)
> at 
> org.apache.spark.MapOutputTracker$.serializeMapStatuses(MapOutputTracker.scala:789)
> at 
> org.apache.spark.ShuffleStatus.serializedMapStatus(MapOutputTracker.scala:174)
> at 
> org.apache.spark.MapOutputTrackerMaster$MessageLoop.run(MapOutputTracker.scala:397)
> at 
> java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)
> at 
> java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
> at java.lang.Thread.run(Thread.java:748)
> Suppressed: java.lang.OutOfMemoryError
> at java.io.ByteArrayOutputStream.hugeCapacity(ByteArrayOutputStream.java:123)
> at java.io.ByteArrayOutputStream.grow(ByteArrayOutputStream.java:117)
> at java.io.ByteArrayOutputStream.ensureCapacity(ByteArrayOutputStream.java:93)
> at java.io.ByteArrayOutputStream.write(ByteArrayOutputStream.java

Re: Driver OutOfMemoryError in MapOutputTracker$.serializeMapStatuses for 40 TB shuffle.

2019-11-08 Thread Jacob Lynn
Sorry for the noise, folks! I understand that reducing the number of
partitions works around the issue (at the scale I'm working at, anyway) --
as I mentioned in my initial email -- and I understand the root cause. I'm
not looking for advice on how to resolve my issue. I'm just pointing out
that this is a real bug/limitation that impacts real-world use cases, in
case there is some proper Spark dev out there who is looking for a problem
to solve.

On Fri, Nov 8, 2019 at 2:24 PM Vadim Semenov 
wrote:

> Basically, the driver tracks partitions and sends it over to
> executors, so what it's trying to do is to serialize and compress the
> map but because it's so big, it goes over 2GiB and that's Java's limit
> on the max size of byte arrays, so the whole thing drops.
>
> The size of data doesn't matter here much but the number of partitions
> is what the root cause of the issue, try reducing it below 3 and
> see how it goes.
>
> On Fri, Sep 7, 2018 at 10:35 AM Harel Gliksman 
> wrote:
> >
> > Hi,
> >
> > We are running a Spark (2.3.1) job on an EMR cluster with 500 r3.2xlarge
> (60 GB, 8 vcores, 160 GB SSD ). Driver memory is set to 25GB.
> >
> > It processes ~40 TB of data using aggregateByKey in which we specify
> numPartitions = 300,000.
> > Map side tasks succeed, but reduce side tasks all fail.
> >
> > We notice the following driver error:
> >
> > 18/09/07 13:35:03 WARN Utils: Suppressing exception in finally: null
> >
> >  java.lang.OutOfMemoryError
> >
> > at
> java.io.ByteArrayOutputStream.hugeCapacity(ByteArrayOutputStream.java:123)
> > at java.io.ByteArrayOutputStream.grow(ByteArrayOutputStream.java:117)
> > at
> java.io.ByteArrayOutputStream.ensureCapacity(ByteArrayOutputStream.java:93)
> > at java.io.ByteArrayOutputStream.write(ByteArrayOutputStream.java:153)
> > at
> java.util.zip.DeflaterOutputStream.deflate(DeflaterOutputStream.java:253)
> > at
> java.util.zip.DeflaterOutputStream.write(DeflaterOutputStream.java:211)
> > at java.util.zip.GZIPOutputStream.write(GZIPOutputStream.java:145)
> > at
> java.io.ObjectOutputStream$BlockDataOutputStream.writeBlockHeader(ObjectOutputStream.java:1894)
> > at
> java.io.ObjectOutputStream$BlockDataOutputStream.drain(ObjectOutputStream.java:1875)
> > at
> java.io.ObjectOutputStream$BlockDataOutputStream.flush(ObjectOutputStream.java:1822)
> > at java.io.ObjectOutputStream.flush(ObjectOutputStream.java:719)
> > at java.io.ObjectOutputStream.close(ObjectOutputStream.java:740)
> > at
> org.apache.spark.MapOutputTracker$$anonfun$serializeMapStatuses$2.apply$mcV$sp(MapOutputTracker.scala:790)
> > at org.apache.spark.util.Utils$.tryWithSafeFinally(Utils.scala:1389)
> > at
> org.apache.spark.MapOutputTracker$.serializeMapStatuses(MapOutputTracker.scala:789)
> > at
> org.apache.spark.ShuffleStatus.serializedMapStatus(MapOutputTracker.scala:174)
> > at
> org.apache.spark.MapOutputTrackerMaster$MessageLoop.run(MapOutputTracker.scala:397)
> > at
> java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)
> > at
> java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
> > at java.lang.Thread.run(Thread.java:748)
> > Exception in thread "map-output-dispatcher-0" java.lang.OutOfMemoryError
> > at
> java.io.ByteArrayOutputStream.hugeCapacity(ByteArrayOutputStream.java:123)
> > at java.io.ByteArrayOutputStream.grow(ByteArrayOutputStream.java:117)
> > at
> java.io.ByteArrayOutputStream.ensureCapacity(ByteArrayOutputStream.java:93)
> > at java.io.ByteArrayOutputStream.write(ByteArrayOutputStream.java:153)
> > at
> java.util.zip.DeflaterOutputStream.deflate(DeflaterOutputStream.java:253)
> > at
> java.util.zip.DeflaterOutputStream.write(DeflaterOutputStream.java:211)
> > at java.util.zip.GZIPOutputStream.write(GZIPOutputStream.java:145)
> > at
> java.io.ObjectOutputStream$BlockDataOutputStream.writeBlockHeader(ObjectOutputStream.java:1894)
> > at
> java.io.ObjectOutputStream$BlockDataOutputStream.drain(ObjectOutputStream.java:1875)
> > at
> java.io.ObjectOutputStream$BlockDataOutputStream.setBlockDataMode(ObjectOutputStream.java:1786)
> > at java.io.ObjectOutputStream.writeObject0(ObjectOutputStream.java:1189)
> > at java.io.ObjectOutputStream.writeArray(ObjectOutputStream.java:1378)
> > at java.io.ObjectOutputStream.writeObject0(ObjectOutputStream.java:1174)
> > at java.io.ObjectOutputStream.writeObject(ObjectOutputStream.java:348)
> > at
> org.apache.spark.MapOutputTracker$$anonfun$serializeMapStatuses$1.apply$mcV$sp(MapOutputTracker.scala:787)
> > at
> org.apache.spark.MapOutputTracker$$anonfun$serializeMapStatuses$1.apply(MapOutputTracker.scala:786)
> > at
> org.apache.spark.MapOutputTracker$$anonfun$serializeMapStatuses$1.apply(MapOutputTracker.scala:786)
> > at org.apache.spark.util.Utils$.tryWithSafeFinally(Utils.scala:1380)
> > at
> org.apache.spark.MapOutputTracker$.serializeMapStatuses(MapOutputTracker.scala:789)
> > at
> org.apache.spark.ShuffleStatus.serializedMapStatus(MapOutputTracker.scala:174)
> > at
> org.apache.

Re: Driver OutOfMemoryError in MapOutputTracker$.serializeMapStatuses for 40 TB shuffle.

2019-11-11 Thread Vadim Semenov
There's an umbrella ticket for various 2GB limitations
https://issues.apache.org/jira/browse/SPARK-6235

On Fri, Nov 8, 2019 at 4:11 PM Jacob Lynn  wrote:
>
> Sorry for the noise, folks! I understand that reducing the number of 
> partitions works around the issue (at the scale I'm working at, anyway) -- as 
> I mentioned in my initial email -- and I understand the root cause. I'm not 
> looking for advice on how to resolve my issue. I'm just pointing out that 
> this is a real bug/limitation that impacts real-world use cases, in case 
> there is some proper Spark dev out there who is looking for a problem to 
> solve.
>
> On Fri, Nov 8, 2019 at 2:24 PM Vadim Semenov  
> wrote:
>>
>> Basically, the driver tracks partitions and sends it over to
>> executors, so what it's trying to do is to serialize and compress the
>> map but because it's so big, it goes over 2GiB and that's Java's limit
>> on the max size of byte arrays, so the whole thing drops.
>>
>> The size of data doesn't matter here much but the number of partitions
>> is what the root cause of the issue, try reducing it below 3 and
>> see how it goes.
>>
>> On Fri, Sep 7, 2018 at 10:35 AM Harel Gliksman  wrote:
>> >
>> > Hi,
>> >
>> > We are running a Spark (2.3.1) job on an EMR cluster with 500 r3.2xlarge 
>> > (60 GB, 8 vcores, 160 GB SSD ). Driver memory is set to 25GB.
>> >
>> > It processes ~40 TB of data using aggregateByKey in which we specify 
>> > numPartitions = 300,000.
>> > Map side tasks succeed, but reduce side tasks all fail.
>> >
>> > We notice the following driver error:
>> >
>> > 18/09/07 13:35:03 WARN Utils: Suppressing exception in finally: null
>> >
>> >  java.lang.OutOfMemoryError
>> >
>> > at 
>> > java.io.ByteArrayOutputStream.hugeCapacity(ByteArrayOutputStream.java:123)
>> > at java.io.ByteArrayOutputStream.grow(ByteArrayOutputStream.java:117)
>> > at 
>> > java.io.ByteArrayOutputStream.ensureCapacity(ByteArrayOutputStream.java:93)
>> > at java.io.ByteArrayOutputStream.write(ByteArrayOutputStream.java:153)
>> > at 
>> > java.util.zip.DeflaterOutputStream.deflate(DeflaterOutputStream.java:253)
>> > at java.util.zip.DeflaterOutputStream.write(DeflaterOutputStream.java:211)
>> > at java.util.zip.GZIPOutputStream.write(GZIPOutputStream.java:145)
>> > at 
>> > java.io.ObjectOutputStream$BlockDataOutputStream.writeBlockHeader(ObjectOutputStream.java:1894)
>> > at 
>> > java.io.ObjectOutputStream$BlockDataOutputStream.drain(ObjectOutputStream.java:1875)
>> > at 
>> > java.io.ObjectOutputStream$BlockDataOutputStream.flush(ObjectOutputStream.java:1822)
>> > at java.io.ObjectOutputStream.flush(ObjectOutputStream.java:719)
>> > at java.io.ObjectOutputStream.close(ObjectOutputStream.java:740)
>> > at 
>> > org.apache.spark.MapOutputTracker$$anonfun$serializeMapStatuses$2.apply$mcV$sp(MapOutputTracker.scala:790)
>> > at org.apache.spark.util.Utils$.tryWithSafeFinally(Utils.scala:1389)
>> > at 
>> > org.apache.spark.MapOutputTracker$.serializeMapStatuses(MapOutputTracker.scala:789)
>> > at 
>> > org.apache.spark.ShuffleStatus.serializedMapStatus(MapOutputTracker.scala:174)
>> > at 
>> > org.apache.spark.MapOutputTrackerMaster$MessageLoop.run(MapOutputTracker.scala:397)
>> > at 
>> > java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)
>> > at 
>> > java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
>> > at java.lang.Thread.run(Thread.java:748)
>> > Exception in thread "map-output-dispatcher-0" java.lang.OutOfMemoryError
>> > at 
>> > java.io.ByteArrayOutputStream.hugeCapacity(ByteArrayOutputStream.java:123)
>> > at java.io.ByteArrayOutputStream.grow(ByteArrayOutputStream.java:117)
>> > at 
>> > java.io.ByteArrayOutputStream.ensureCapacity(ByteArrayOutputStream.java:93)
>> > at java.io.ByteArrayOutputStream.write(ByteArrayOutputStream.java:153)
>> > at 
>> > java.util.zip.DeflaterOutputStream.deflate(DeflaterOutputStream.java:253)
>> > at java.util.zip.DeflaterOutputStream.write(DeflaterOutputStream.java:211)
>> > at java.util.zip.GZIPOutputStream.write(GZIPOutputStream.java:145)
>> > at 
>> > java.io.ObjectOutputStream$BlockDataOutputStream.writeBlockHeader(ObjectOutputStream.java:1894)
>> > at 
>> > java.io.ObjectOutputStream$BlockDataOutputStream.drain(ObjectOutputStream.java:1875)
>> > at 
>> > java.io.ObjectOutputStream$BlockDataOutputStream.setBlockDataMode(ObjectOutputStream.java:1786)
>> > at java.io.ObjectOutputStream.writeObject0(ObjectOutputStream.java:1189)
>> > at java.io.ObjectOutputStream.writeArray(ObjectOutputStream.java:1378)
>> > at java.io.ObjectOutputStream.writeObject0(ObjectOutputStream.java:1174)
>> > at java.io.ObjectOutputStream.writeObject(ObjectOutputStream.java:348)
>> > at 
>> > org.apache.spark.MapOutputTracker$$anonfun$serializeMapStatuses$1.apply$mcV$sp(MapOutputTracker.scala:787)
>> > at 
>> > org.apache.spark.MapOutputTracker$$anonfun$serializeMapStatuses$1.apply(MapOutputTracker.scala:786)
>> > at 
>> > org.apache.spark.MapOutputTracker$$anonfu

Re: Driver OutOfMemoryError in MapOutputTracker$.serializeMapStatuses for 40 TB shuffle.

2019-11-12 Thread Jacob Lynn
Thanks for the pointer, Vadim. However, I just tried it with Spark 2.4 and
get the same failure. (I was previously testing with 2.2 and/or 2.3.) And I
don't see this particular issue referred to there.  The ticket that Harel
commented on indeed appears to be the most similar one to this issue:
https://issues.apache.org/jira/browse/SPARK-1239.

On Mon, Nov 11, 2019 at 4:43 PM Vadim Semenov  wrote:

> There's an umbrella ticket for various 2GB limitations
> https://issues.apache.org/jira/browse/SPARK-6235
>
> On Fri, Nov 8, 2019 at 4:11 PM Jacob Lynn  wrote:
> >
> > Sorry for the noise, folks! I understand that reducing the number of
> partitions works around the issue (at the scale I'm working at, anyway) --
> as I mentioned in my initial email -- and I understand the root cause. I'm
> not looking for advice on how to resolve my issue. I'm just pointing out
> that this is a real bug/limitation that impacts real-world use cases, in
> case there is some proper Spark dev out there who is looking for a problem
> to solve.
> >
> > On Fri, Nov 8, 2019 at 2:24 PM Vadim Semenov 
> wrote:
> >>
> >> Basically, the driver tracks partitions and sends it over to
> >> executors, so what it's trying to do is to serialize and compress the
> >> map but because it's so big, it goes over 2GiB and that's Java's limit
> >> on the max size of byte arrays, so the whole thing drops.
> >>
> >> The size of data doesn't matter here much but the number of partitions
> >> is what the root cause of the issue, try reducing it below 3 and
> >> see how it goes.
> >>
> >> On Fri, Sep 7, 2018 at 10:35 AM Harel Gliksman 
> wrote:
> >> >
> >> > Hi,
> >> >
> >> > We are running a Spark (2.3.1) job on an EMR cluster with 500
> r3.2xlarge (60 GB, 8 vcores, 160 GB SSD ). Driver memory is set to 25GB.
> >> >
> >> > It processes ~40 TB of data using aggregateByKey in which we specify
> numPartitions = 300,000.
> >> > Map side tasks succeed, but reduce side tasks all fail.
> >> >
> >> > We notice the following driver error:
> >> >
> >> > 18/09/07 13:35:03 WARN Utils: Suppressing exception in finally: null
> >> >
> >> >  java.lang.OutOfMemoryError
> >> >
> >> > at
> java.io.ByteArrayOutputStream.hugeCapacity(ByteArrayOutputStream.java:123)
> >> > at java.io.ByteArrayOutputStream.grow(ByteArrayOutputStream.java:117)
> >> > at
> java.io.ByteArrayOutputStream.ensureCapacity(ByteArrayOutputStream.java:93)
> >> > at java.io.ByteArrayOutputStream.write(ByteArrayOutputStream.java:153)
> >> > at
> java.util.zip.DeflaterOutputStream.deflate(DeflaterOutputStream.java:253)
> >> > at
> java.util.zip.DeflaterOutputStream.write(DeflaterOutputStream.java:211)
> >> > at java.util.zip.GZIPOutputStream.write(GZIPOutputStream.java:145)
> >> > at
> java.io.ObjectOutputStream$BlockDataOutputStream.writeBlockHeader(ObjectOutputStream.java:1894)
> >> > at
> java.io.ObjectOutputStream$BlockDataOutputStream.drain(ObjectOutputStream.java:1875)
> >> > at
> java.io.ObjectOutputStream$BlockDataOutputStream.flush(ObjectOutputStream.java:1822)
> >> > at java.io.ObjectOutputStream.flush(ObjectOutputStream.java:719)
> >> > at java.io.ObjectOutputStream.close(ObjectOutputStream.java:740)
> >> > at
> org.apache.spark.MapOutputTracker$$anonfun$serializeMapStatuses$2.apply$mcV$sp(MapOutputTracker.scala:790)
> >> > at org.apache.spark.util.Utils$.tryWithSafeFinally(Utils.scala:1389)
> >> > at
> org.apache.spark.MapOutputTracker$.serializeMapStatuses(MapOutputTracker.scala:789)
> >> > at
> org.apache.spark.ShuffleStatus.serializedMapStatus(MapOutputTracker.scala:174)
> >> > at
> org.apache.spark.MapOutputTrackerMaster$MessageLoop.run(MapOutputTracker.scala:397)
> >> > at
> java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)
> >> > at
> java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
> >> > at java.lang.Thread.run(Thread.java:748)
> >> > Exception in thread "map-output-dispatcher-0"
> java.lang.OutOfMemoryError
> >> > at
> java.io.ByteArrayOutputStream.hugeCapacity(ByteArrayOutputStream.java:123)
> >> > at java.io.ByteArrayOutputStream.grow(ByteArrayOutputStream.java:117)
> >> > at
> java.io.ByteArrayOutputStream.ensureCapacity(ByteArrayOutputStream.java:93)
> >> > at java.io.ByteArrayOutputStream.write(ByteArrayOutputStream.java:153)
> >> > at
> java.util.zip.DeflaterOutputStream.deflate(DeflaterOutputStream.java:253)
> >> > at
> java.util.zip.DeflaterOutputStream.write(DeflaterOutputStream.java:211)
> >> > at java.util.zip.GZIPOutputStream.write(GZIPOutputStream.java:145)
> >> > at
> java.io.ObjectOutputStream$BlockDataOutputStream.writeBlockHeader(ObjectOutputStream.java:1894)
> >> > at
> java.io.ObjectOutputStream$BlockDataOutputStream.drain(ObjectOutputStream.java:1875)
> >> > at
> java.io.ObjectOutputStream$BlockDataOutputStream.setBlockDataMode(ObjectOutputStream.java:1786)
> >> > at
> java.io.ObjectOutputStream.writeObject0(ObjectOutputStream.java:1189)
> >> > at java.io.ObjectOutputStream.writeArray(ObjectOutputStream.

Re: Unable to Hive program from Spark Programming Guide (OutOfMemoryError)

2015-03-25 Thread ๏̯͡๏
Can someone please respond to this ?

On Wed, Mar 25, 2015 at 11:18 PM, ÐΞ€ρ@Ҝ (๏̯͡๏)  wrote:

> http://spark.apache.org/docs/1.3.0/sql-programming-guide.html#hive-tables
>
>
>
> I modified the Hive query but run into same error. (
> http://spark.apache.org/docs/1.3.0/sql-programming-guide.html#hive-tables)
>
>
> val sqlContext = new org.apache.spark.sql.hive.HiveContext(sc)
> sqlContext.sql("CREATE TABLE IF NOT EXISTS src_spark (key INT, value
> STRING)")
> sqlContext.sql("LOAD DATA LOCAL INPATH
> 'examples/src/main/resources/kv1.txt' INTO TABLE src")
>
> // Queries are expressed in HiveQL
> sqlContext.sql("FROM src SELECT key, value").collect().foreach(println)
>
>
>
> Command
>
> ./bin/spark-submit -v --master yarn-cluster --jars
> /home/dvasthimal/spark1.3/spark-avro_2.10-1.0.0.jar,/home/dvasthimal/spark1.3/spark-1.3.0-bin-hadoop2.4/lib/datanucleus-api-jdo-3.2.6.jar,/home/dvasthimal/spark1.3/spark-1.3.0-bin-hadoop2.4/lib/datanucleus-core-3.2.10.jar,/home/dvasthimal/spark1.3/spark-1.3.0-bin-hadoop2.4/lib/datanucleus-rdbms-3.2.9.jar
> --num-executors 3 --driver-memory 8g --executor-memory 2g --executor-cores
> 1 --queue hdmi-express --class com.ebay.ep.poc.spark.reporting.SparkApp
> spark_reporting-1.0-SNAPSHOT.jar startDate=2015-02-16 endDate=2015-02-16
> input=/user/dvasthimal/epdatasets/successdetail1/part-r-0.avro
> subcommand=successevents2 output=/user/dvasthimal/epdatasets/successdetail2
>
>
> Input
>
> -sh-4.1$ ls -l examples/src/main/resources/kv1.txt
> -rw-r--r-- 1 dvasthimal gid-dvasthimal 5812 Mar  5 17:31
> examples/src/main/resources/kv1.txt
> -sh-4.1$ head examples/src/main/resources/kv1.txt
> 238val_238
> 86val_86
> 311val_311
> 27val_27
> 165val_165
> 409val_409
> 255val_255
> 278val_278
> 98val_98
> 484val_484
> -sh-4.1$
>
> Log
>
> /apache/hadoop/bin/yarn logs -applicationId application_1426715280024_82757
>
> …
> …
> …
>
>
> 15/03/25 07:52:44 INFO metastore.HiveMetaStore: No user is added in admin
> role, since config is empty
> 15/03/25 07:52:44 INFO session.SessionState: No Tez session required at
> this point. hive.execution.engine=mr.
> 15/03/25 07:52:47 INFO parse.ParseDriver: Parsing command: CREATE TABLE IF
> NOT EXISTS src_spark (key INT, value STRING)
> 15/03/25 07:52:47 INFO parse.ParseDriver: Parse Completed
> 15/03/25 07:52:48 INFO log.PerfLogger:  from=org.apache.hadoop.hive.ql.Driver>
> 15/03/25 07:52:48 INFO log.PerfLogger:  from=org.apache.hadoop.hive.ql.Driver>
> 15/03/25 07:52:48 INFO ql.Driver: Concurrency mode is disabled, not
> creating a lock manager
> 15/03/25 07:52:48 INFO log.PerfLogger:  from=org.apache.hadoop.hive.ql.Driver>
> 15/03/25 07:52:48 INFO log.PerfLogger:  from=org.apache.hadoop.hive.ql.Driver>
> 15/03/25 07:52:48 INFO parse.ParseDriver: Parsing command: CREATE TABLE IF
> NOT EXISTS src_spark (key INT, value STRING)
> 15/03/25 07:52:48 INFO parse.ParseDriver: Parse Completed
> 15/03/25 07:52:48 INFO log.PerfLogger:  start=1427295168392 end=1427295168393 duration=1
> from=org.apache.hadoop.hive.ql.Driver>
> 15/03/25 07:52:48 INFO log.PerfLogger:  from=org.apache.hadoop.hive.ql.Driver>
> 15/03/25 07:52:48 INFO parse.SemanticAnalyzer: Starting Semantic Analysis
> 15/03/25 07:52:48 INFO parse.SemanticAnalyzer: Creating table src_spark
> position=27
> 15/03/25 07:52:48 INFO metastore.HiveMetaStore: 0: get_table : db=default
> tbl=src_spark
> 15/03/25 07:52:48 INFO HiveMetaStore.audit: ugi=dvasthimal
> ip=unknown-ip-addr cmd=get_table : db=default tbl=src_spark
> 15/03/25 07:52:48 INFO metastore.HiveMetaStore: 0: get_database: default
> 15/03/25 07:52:48 INFO HiveMetaStore.audit: ugi=dvasthimal
> ip=unknown-ip-addr cmd=get_database: default
> 15/03/25 07:52:48 INFO ql.Driver: Semantic Analysis Completed
> 15/03/25 07:52:48 INFO log.PerfLogger:  start=1427295168393 end=1427295168595 duration=202
> from=org.apache.hadoop.hive.ql.Driver>
> 15/03/25 07:52:48 INFO ql.Driver: Returning Hive schema:
> Schema(fieldSchemas:null, properties:null)
> 15/03/25 07:52:48 INFO log.PerfLogger:  start=1427295168352 end=1427295168607 duration=255
> from=org.apache.hadoop.hive.ql.Driver>
> 15/03/25 07:52:48 INFO log.PerfLogger:  from=org.apache.hadoop.hive.ql.Driver>
> 15/03/25 07:52:48 INFO ql.Driver: Starting command: CREATE TABLE IF NOT
> EXISTS src_spark (key INT, value STRING)
> 15/03/25 07:52:48 INFO log.PerfLogger:  start=1427295168349 end=1427295168625 duration=276
> from=org.apache.hadoop.hive.ql.Driver>
> 15/03/25 07:52:48 INFO log.PerfLogger:  from=org.apache.hadoop.hive.ql.Driver>
> 15/03/25 07:52:48 INFO log.PerfLogger:  from=org.apache.hadoop.hive.ql.Driver>
> 15/03/25 07:52:51 INFO exec.DDLTask: Default to LazySimpleSerDe for table
> src_spark
> 15/03/25 07:52:52 INFO metastore.HiveMetaStore: 0: create_table:
> Table(tableName:src_spark, dbName:default, owner:dvasthimal, createTime:
> 1427295171, lastAccessTime:0, retention:0,
> sd:StorageDescriptor(cols:[FieldSchema(name:key, type:int, comment:null),
> FieldSchema(name:value, type:str

Re: Unable to Hive program from Spark Programming Guide (OutOfMemoryError)

2015-03-26 Thread ๏̯͡๏
Resolved. Bold text is FIX.

./bin/spark-submit -v --master yarn-cluster --jars
/home/dvasthimal/spark1.3/spark-avro_2.10-1.0.0.jar,/home/dvasthimal/spark1.3/spark-1.3.0-bin-hadoop2.4/lib/datanucleus-api-jdo-3.2.6.jar,/home/dvasthimal/spark1.3/spark-1.3.0-bin-hadoop2.4/lib/datanucleus-core-3.2.10.jar,/home/dvasthimal/spark1.3/spark-1.3.0-bin-hadoop2.4/lib/datanucleus-rdbms-3.2.9.jar
--num-executors 1 --driver-memory 4g *--driver-java-options
"-XX:MaxPermSize=2G" *--executor-memory 2g --executor-cores 1 --queue
hdmi-express --class com.ebay.ep.poc.spark.reporting.SparkApp
spark_reporting-1.0-SNAPSHOT.jar startDate=2015-02-16 endDate=2015-02-16
input=/user/dvasthimal/epdatasets/successdetail1/part-r-0.avro
subcommand=successevents2 output=/user/dvasthimal/epdatasets/successdetail2


On Thu, Mar 26, 2015 at 7:54 AM, ÐΞ€ρ@Ҝ (๏̯͡๏)  wrote:

> Can someone please respond to this ?
>
> On Wed, Mar 25, 2015 at 11:18 PM, ÐΞ€ρ@Ҝ (๏̯͡๏) 
> wrote:
>
>> http://spark.apache.org/docs/1.3.0/sql-programming-guide.html#hive-tables
>>
>>
>>
>> I modified the Hive query but run into same error. (
>> http://spark.apache.org/docs/1.3.0/sql-programming-guide.html#hive-tables
>> )
>>
>>
>> val sqlContext = new org.apache.spark.sql.hive.HiveContext(sc)
>> sqlContext.sql("CREATE TABLE IF NOT EXISTS src_spark (key INT, value
>> STRING)")
>> sqlContext.sql("LOAD DATA LOCAL INPATH
>> 'examples/src/main/resources/kv1.txt' INTO TABLE src")
>>
>> // Queries are expressed in HiveQL
>> sqlContext.sql("FROM src SELECT key,
>> value").collect().foreach(println)
>>
>>
>>
>> Command
>>
>> ./bin/spark-submit -v --master yarn-cluster --jars
>> /home/dvasthimal/spark1.3/spark-avro_2.10-1.0.0.jar,/home/dvasthimal/spark1.3/spark-1.3.0-bin-hadoop2.4/lib/datanucleus-api-jdo-3.2.6.jar,/home/dvasthimal/spark1.3/spark-1.3.0-bin-hadoop2.4/lib/datanucleus-core-3.2.10.jar,/home/dvasthimal/spark1.3/spark-1.3.0-bin-hadoop2.4/lib/datanucleus-rdbms-3.2.9.jar
>> --num-executors 3 --driver-memory 8g --executor-memory 2g --executor-cores
>> 1 --queue hdmi-express --class com.ebay.ep.poc.spark.reporting.SparkApp
>> spark_reporting-1.0-SNAPSHOT.jar startDate=2015-02-16 endDate=2015-02-16
>> input=/user/dvasthimal/epdatasets/successdetail1/part-r-0.avro
>> subcommand=successevents2 output=/user/dvasthimal/epdatasets/successdetail2
>>
>>
>> Input
>>
>> -sh-4.1$ ls -l examples/src/main/resources/kv1.txt
>> -rw-r--r-- 1 dvasthimal gid-dvasthimal 5812 Mar  5 17:31
>> examples/src/main/resources/kv1.txt
>> -sh-4.1$ head examples/src/main/resources/kv1.txt
>> 238val_238
>> 86val_86
>> 311val_311
>> 27val_27
>> 165val_165
>> 409val_409
>> 255val_255
>> 278val_278
>> 98val_98
>> 484val_484
>> -sh-4.1$
>>
>> Log
>>
>> /apache/hadoop/bin/yarn logs -applicationId
>> application_1426715280024_82757
>>
>> …
>> …
>> …
>>
>>
>> 15/03/25 07:52:44 INFO metastore.HiveMetaStore: No user is added in admin
>> role, since config is empty
>> 15/03/25 07:52:44 INFO session.SessionState: No Tez session required at
>> this point. hive.execution.engine=mr.
>> 15/03/25 07:52:47 INFO parse.ParseDriver: Parsing command: CREATE TABLE
>> IF NOT EXISTS src_spark (key INT, value STRING)
>> 15/03/25 07:52:47 INFO parse.ParseDriver: Parse Completed
>> 15/03/25 07:52:48 INFO log.PerfLogger: > from=org.apache.hadoop.hive.ql.Driver>
>> 15/03/25 07:52:48 INFO log.PerfLogger: > from=org.apache.hadoop.hive.ql.Driver>
>> 15/03/25 07:52:48 INFO ql.Driver: Concurrency mode is disabled, not
>> creating a lock manager
>> 15/03/25 07:52:48 INFO log.PerfLogger: > from=org.apache.hadoop.hive.ql.Driver>
>> 15/03/25 07:52:48 INFO log.PerfLogger: > from=org.apache.hadoop.hive.ql.Driver>
>> 15/03/25 07:52:48 INFO parse.ParseDriver: Parsing command: CREATE TABLE
>> IF NOT EXISTS src_spark (key INT, value STRING)
>> 15/03/25 07:52:48 INFO parse.ParseDriver: Parse Completed
>> 15/03/25 07:52:48 INFO log.PerfLogger: > start=1427295168392 end=1427295168393 duration=1
>> from=org.apache.hadoop.hive.ql.Driver>
>> 15/03/25 07:52:48 INFO log.PerfLogger: > from=org.apache.hadoop.hive.ql.Driver>
>> 15/03/25 07:52:48 INFO parse.SemanticAnalyzer: Starting Semantic Analysis
>> 15/03/25 07:52:48 INFO parse.SemanticAnalyzer: Creating table src_spark
>> position=27
>> 15/03/25 07:52:48 INFO metastore.HiveMetaStore: 0: get_table : db=default
>> tbl=src_spark
>> 15/03/25 07:52:48 INFO HiveMetaStore.audit: ugi=dvasthimal
>> ip=unknown-ip-addr cmd=get_table : db=default tbl=src_spark
>> 15/03/25 07:52:48 INFO metastore.HiveMetaStore: 0: get_database: default
>> 15/03/25 07:52:48 INFO HiveMetaStore.audit: ugi=dvasthimal
>> ip=unknown-ip-addr cmd=get_database: default
>> 15/03/25 07:52:48 INFO ql.Driver: Semantic Analysis Completed
>> 15/03/25 07:52:48 INFO log.PerfLogger: > start=1427295168393 end=1427295168595 duration=202
>> from=org.apache.hadoop.hive.ql.Driver>
>> 15/03/25 07:52:48 INFO ql.Driver: Returning Hive schema:
>> Schema(fieldSchemas:null, properties:null)
>> 15/03/25 07:52:48 INFO log.PerfLogger: > s

Idiomatic way to rate-limit streaming sources to avoid OutOfMemoryError?

2024-04-07 Thread Baran, Mert

Hi Spark community,

I have a Spark Structured Streaming application that reads data from a 
socket source (implemented very similarly to the 
TextSocketMicroBatchStream). The issue is that the source can generate 
data faster than Spark can process it, eventually leading to an 
OutOfMemoryError when Spark runs out of memory trying to queue up all 
the pending data.


I'm looking for advice on the most idiomatic/recommended way in Spark to 
rate-limit data ingestion to avoid overwhelming the system.


Approaches I've considered:

1. Using a BlockingQueue with a fixed size to throttle the data. 
However, this requires careful tuning of the queue size. If too small, 
it limits throughput; if too large, you risk batches taking too long.


2. Fetching a limited number of records in the PartitionReader's next(), 
adding the records into a queue and checking if the queue is empty. 
However, I'm not sure if there is a built-in way to dynamically scale 
the number of records fetched (i.e., dynamically calculating the offset) 
based on the system load and capabilities.


So in summary, what is the recommended way to dynamically rate-limit a 
streaming source to match Spark's processing capacity and avoid 
out-of-memory issues? Are there any best practices or configuration 
options I should look at?
Any guidance would be much appreciated! Let me know if you need any 
other details.


Thanks,
Mert


-
To unsubscribe e-mail: user-unsubscr...@spark.apache.org



SqlContext parquet read OutOfMemoryError: Requested array size exceeds VM limit error

2016-05-04 Thread Bijay Kumar Pathak
Hi,

I am reading the parquet file around 50+ G which has 4013 partitions with
240 columns. Below is my configuration

driver : 20G memory with 4 cores
executors: 45 executors with 15G memory and 4 cores.

I tried to read the data using both Dataframe read and using hive context
to read the data using hive SQL but for the both cases, it throws me below
error with no  further description on error.

hive_context.sql("select * from test.base_table where
date='{0}'".format(part_dt))
sqlcontext.read.parquet("/path/to/partion/")

#
# java.lang.OutOfMemoryError: Requested array size exceeds VM limit
# -XX:OnOutOfMemoryError="kill -9 %p"
#   Executing /bin/sh -c "kill -9 16953"...


What could be wrong over here since I think increasing memory only will not
help in this case since it reached the array size limit.

Thanks,
Bijay


Java heap space OutOfMemoryError in pyspark spark-submit (spark version:2.2)

2018-01-04 Thread Anu B Nair
Hi,

I have a data set size of 10GB(example Test.txt).

I wrote my pyspark script like below(Test.py):

*from pyspark import SparkConf
from pyspark.sql import SparkSession
from pyspark.sql import SQLContext
spark = SparkSession.builder.appName("FilterProduct").getOrCreate()
sc = spark.sparkContext
sqlContext = SQLContext(sc)
lines = spark.read.text("C:/Users/test/Desktop/Test.txt").rdd
lines.collect()*

Then I am executing the above script using below command :

spark-submit Test.py --executor-memory  15G --driver-memory 15G

Then I am getting error like below:



*17/12/29 13:27:18 INFO FileScanRDD: Reading File path:
file:///C:/Users/test/Desktop/Test.txt, range: 402653184-536870912,
partition values: [empty row]
17/12/29 13:27:18 INFO CodeGenerator: Code generated in 22.743725 ms
17/12/29 13:27:44 ERROR Executor: Exception in task 1.0 in stage 0.0 (TID 1)
java.lang.OutOfMemoryError: Java heap space
at java.util.Arrays.copyOf(Arrays.java:3230)
at java.io.ByteArrayOutputStream.grow(ByteArrayOutputStream.java:113)
at 
java.io.ByteArrayOutputStream.ensureCapacity(ByteArrayOutputStream.java:93)
at java.io.ByteArrayOutputStream.write(ByteArrayOutputStream.java:140)
at 
org.apache.spark.util.ByteBufferOutputStream.write(ByteBufferOutputStream.scala:41)
at 
java.io.ObjectOutputStream$BlockDataOutputStream.drain(ObjectOutputStream.java:1877)
at 
java.io.ObjectOutputStream$BlockDataOutputStream.setBlockDataMode(ObjectOutputStream.java:1786)
at java.io.ObjectOutputStream.writeObject0(ObjectOutputStream.java:1189)
at java.io.ObjectOutputStream.writeObject(ObjectOutputStream.java:348)
at 
org.apache.spark.serializer.JavaSerializationStream.writeObject(JavaSerializer.scala:43)
at 
org.apache.spark.serializer.JavaSerializerInstance.serialize(JavaSerializer.scala:100)
at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:383)
at 
java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142)
at 
java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617)
at java.lang.Thread.run(Thread.java:745)
17/12/29 13:27:44 ERROR Executor: Exception in task 2.0 in stage 0.0 (TID 2)
java.lang.OutOfMemoryError: Java heap space
at java.util.Arrays.copyOf(Arrays.java:3230)
at java.io.ByteArrayOutputStream.grow(ByteArrayOutputStream.java:113)
at 
java.io.ByteArrayOutputStream.ensureCapacity(ByteArrayOutputStream.java:93*

Please let me know how to resolve this ?

--


Anu


Re: Idiomatic way to rate-limit streaming sources to avoid OutOfMemoryError?

2024-04-07 Thread Mich Talebzadeh
OK,

This is a common issue in Spark Structured Streaming (SSS), where the
source generates data faster than Spark can process it. SSS doesn't have a
built-in mechanism for directly rate-limiting the incoming data stream
itself. However, consider the following:


   - Limit the rate at which data is produced. This can involve configuring
   the data source itself to emit data at a controlled rate or implementing
   rate limiting mechanisms in the application or system that produces the
   data.
   - SSS supports backpressure, which allows it to dynamically adjust the
   ingestion rate based on the processing capacity of the system. This can
   help prevent overwhelming the system with data. To enable backpressure, set
   the appropriate configuration properties such as
spark.conf.set("spark.streaming.backpressure.enabled",
   "true") and spark.streaming.backpressure.initialRate.
   - Consider adjusting the micro-batch interval to control the rate at
   which data is processed. Increasing the micro-batch interval and reduce the
   frequency of processing, allowing more time for each batch to be processed
   and reducing the likelihood of out-of-memory
   errors.. spark.conf.set("spark.sql.streaming.trigger.interval", "
   seconds"
   -  Dynamic Resource Allocation (DRA), Not implemented yet. DRA will
   automatically adjust allocated resources based on workload. This ensures
   Spark has enough resources to process incoming data within the trigger
   interval, preventing backlogs and potential OOM issues.


>From Spark UI, look at the streaming tab. There are various statistics
there. In general your Processing Time has to be less than your batch
interval. The scheduling Delay and Total Delay are additional indicator of
health.

HTH

Mich Talebzadeh,
Technologist | Solutions Architect | Data Engineer  | Generative AI
London
United Kingdom


   view my Linkedin profile
<https://www.linkedin.com/in/mich-talebzadeh-ph-d-5205b2/>


 https://en.everybodywiki.com/Mich_Talebzadeh



*Disclaimer:* The information provided is correct to the best of my
knowledge but of course cannot be guaranteed . It is essential to note
that, as with any advice, quote "one test result is worth one-thousand
expert opinions (Werner  <https://en.wikipedia.org/wiki/Wernher_von_Braun>Von
Braun <https://en.wikipedia.org/wiki/Wernher_von_Braun>)".


On Sun, 7 Apr 2024 at 15:11, Baran, Mert  wrote:

> Hi Spark community,
>
> I have a Spark Structured Streaming application that reads data from a
> socket source (implemented very similarly to the
> TextSocketMicroBatchStream). The issue is that the source can generate
> data faster than Spark can process it, eventually leading to an
> OutOfMemoryError when Spark runs out of memory trying to queue up all
> the pending data.
>
> I'm looking for advice on the most idiomatic/recommended way in Spark to
> rate-limit data ingestion to avoid overwhelming the system.
>
> Approaches I've considered:
>
> 1. Using a BlockingQueue with a fixed size to throttle the data.
> However, this requires careful tuning of the queue size. If too small,
> it limits throughput; if too large, you risk batches taking too long.
>
> 2. Fetching a limited number of records in the PartitionReader's next(),
> adding the records into a queue and checking if the queue is empty.
> However, I'm not sure if there is a built-in way to dynamically scale
> the number of records fetched (i.e., dynamically calculating the offset)
> based on the system load and capabilities.
>
> So in summary, what is the recommended way to dynamically rate-limit a
> streaming source to match Spark's processing capacity and avoid
> out-of-memory issues? Are there any best practices or configuration
> options I should look at?
> Any guidance would be much appreciated! Let me know if you need any
> other details.
>
> Thanks,
> Mert
>
>
> -
> To unsubscribe e-mail: user-unsubscr...@spark.apache.org
>
>


Re: SqlContext parquet read OutOfMemoryError: Requested array size exceeds VM limit error

2016-05-04 Thread Ted Yu
Have you seen this thread ?

http://search-hadoop.com/m/q3RTtyXr2N13hf9O&subj=java+lang+OutOfMemoryError+Requested+array+size+exceeds+VM+limit

On Wed, May 4, 2016 at 2:44 PM, Bijay Kumar Pathak  wrote:

> Hi,
>
> I am reading the parquet file around 50+ G which has 4013 partitions with
> 240 columns. Below is my configuration
>
> driver : 20G memory with 4 cores
> executors: 45 executors with 15G memory and 4 cores.
>
> I tried to read the data using both Dataframe read and using hive context
> to read the data using hive SQL but for the both cases, it throws me below
> error with no  further description on error.
>
> hive_context.sql("select * from test.base_table where
> date='{0}'".format(part_dt))
> sqlcontext.read.parquet("/path/to/partion/")
>
> #
> # java.lang.OutOfMemoryError: Requested array size exceeds VM limit
> # -XX:OnOutOfMemoryError="kill -9 %p"
> #   Executing /bin/sh -c "kill -9 16953"...
>
>
> What could be wrong over here since I think increasing memory only will
> not help in this case since it reached the array size limit.
>
> Thanks,
> Bijay
>


Re: SqlContext parquet read OutOfMemoryError: Requested array size exceeds VM limit error

2016-05-04 Thread Prajwal Tuladhar
If you are running on 64-bit JVM with less than 32G heap, you might want to
enable -XX:+UseCompressedOops[1]. And if your dataframe is somehow
generating more than 2^31-1 number of arrays, you might have to rethink
your options.

[1] https://spark.apache.org/docs/latest/tuning.html

On Wed, May 4, 2016 at 9:44 PM, Bijay Kumar Pathak  wrote:

> Hi,
>
> I am reading the parquet file around 50+ G which has 4013 partitions with
> 240 columns. Below is my configuration
>
> driver : 20G memory with 4 cores
> executors: 45 executors with 15G memory and 4 cores.
>
> I tried to read the data using both Dataframe read and using hive context
> to read the data using hive SQL but for the both cases, it throws me below
> error with no  further description on error.
>
> hive_context.sql("select * from test.base_table where
> date='{0}'".format(part_dt))
> sqlcontext.read.parquet("/path/to/partion/")
>
> #
> # java.lang.OutOfMemoryError: Requested array size exceeds VM limit
> # -XX:OnOutOfMemoryError="kill -9 %p"
> #   Executing /bin/sh -c "kill -9 16953"...
>
>
> What could be wrong over here since I think increasing memory only will
> not help in this case since it reached the array size limit.
>
> Thanks,
> Bijay
>



-- 
--
Cheers,
Praj


Re: SqlContext parquet read OutOfMemoryError: Requested array size exceeds VM limit error

2016-05-04 Thread Bijay Kumar Pathak
Thanks for the suggestions and links. The problem arises when I used
DataFrame api to write but it works fine when doing insert overwrite in
hive table.

# Works good
hive_context.sql("insert overwrite table {0} partiton (e_dt, c_dt) select *
from temp_table".format(table_name))
# Doesn't work, throws java.lang.OutOfMemoryError: Requested array size
exceeds VM limit
df.write.mode('overwrite').partitionBy('e_dt','c_dt').parquet("/path/to/file/")

Thanks,
Bijay

On Wed, May 4, 2016 at 3:02 PM, Prajwal Tuladhar  wrote:

> If you are running on 64-bit JVM with less than 32G heap, you might want
> to enable -XX:+UseCompressedOops[1]. And if your dataframe is somehow
> generating more than 2^31-1 number of arrays, you might have to rethink
> your options.
>
> [1] https://spark.apache.org/docs/latest/tuning.html
>
> On Wed, May 4, 2016 at 9:44 PM, Bijay Kumar Pathak 
> wrote:
>
>> Hi,
>>
>> I am reading the parquet file around 50+ G which has 4013 partitions with
>> 240 columns. Below is my configuration
>>
>> driver : 20G memory with 4 cores
>> executors: 45 executors with 15G memory and 4 cores.
>>
>> I tried to read the data using both Dataframe read and using hive context
>> to read the data using hive SQL but for the both cases, it throws me below
>> error with no  further description on error.
>>
>> hive_context.sql("select * from test.base_table where
>> date='{0}'".format(part_dt))
>> sqlcontext.read.parquet("/path/to/partion/")
>>
>> #
>> # java.lang.OutOfMemoryError: Requested array size exceeds VM limit
>> # -XX:OnOutOfMemoryError="kill -9 %p"
>> #   Executing /bin/sh -c "kill -9 16953"...
>>
>>
>> What could be wrong over here since I think increasing memory only will
>> not help in this case since it reached the array size limit.
>>
>> Thanks,
>> Bijay
>>
>
>
>
> --
> --
> Cheers,
> Praj
>


How to fix OutOfMemoryError: GC overhead limit exceeded when using Spark Streaming checkpointing

2015-08-10 Thread Dmitry Goldenberg
We're getting the below error.  Tried increasing spark.executor.memory e.g.
from 1g to 2g but the below error still happens.

Any recommendations? Something to do with specifying -Xmx in the submit job
scripts?

Thanks.

Exception in thread "main" java.lang.OutOfMemoryError: GC overhead limit
exceeded
at java.util.Arrays.copyOf(Arrays.java:3332)
at
java.lang.AbstractStringBuilder.expandCapacity(AbstractStringBuilder.java:137)
at
java.lang.AbstractStringBuilder.ensureCapacityInternal(AbstractStringBuilder.java:121)
at java.lang.AbstractStringBuilder.append(AbstractStringBuilder.java:421)
at java.lang.StringBuilder.append(StringBuilder.java:136)
at java.lang.StackTraceElement.toString(StackTraceElement.java:173)
at
org.apache.spark.util.Utils$$anonfun$getCallSite$1.apply(Utils.scala:1212)
at
org.apache.spark.util.Utils$$anonfun$getCallSite$1.apply(Utils.scala:1190)
at
scala.collection.IndexedSeqOptimized$class.foreach(IndexedSeqOptimized.scala:33)
at scala.collection.mutable.ArrayOps$ofRef.foreach(ArrayOps.scala:108)
at org.apache.spark.util.Utils$.getCallSite(Utils.scala:1190)
at
org.apache.spark.SparkContext$$anonfun$getCallSite$2.apply(SparkContext.scala:1441)
at
org.apache.spark.SparkContext$$anonfun$getCallSite$2.apply(SparkContext.scala:1441)
at scala.Option.getOrElse(Option.scala:120)
at org.apache.spark.SparkContext.getCallSite(SparkContext.scala:1441)
at org.apache.spark.rdd.RDD.(RDD.scala:1365)
at org.apache.spark.streaming.kafka.KafkaRDD.(KafkaRDD.scala:46)
at
org.apache.spark.streaming.kafka.DirectKafkaInputDStream$DirectKafkaInputDStreamCheckpointData$$anonfun$restore$2.apply(DirectKafkaInputDStream.scala:155)
at
org.apache.spark.streaming.kafka.DirectKafkaInputDStream$DirectKafkaInputDStreamCheckpointData$$anonfun$restore$2.apply(DirectKafkaInputDStream.scala:153)
at
scala.collection.mutable.ResizableArray$class.foreach(ResizableArray.scala:59)
at scala.collection.mutable.ArrayBuffer.foreach(ArrayBuffer.scala:47)
at
org.apache.spark.streaming.kafka.DirectKafkaInputDStream$DirectKafkaInputDStreamCheckpointData.restore(DirectKafkaInputDStream.scala:153)
at
org.apache.spark.streaming.dstream.DStream.restoreCheckpointData(DStream.scala:402)
at
org.apache.spark.streaming.dstream.DStream$$anonfun$restoreCheckpointData$2.apply(DStream.scala:403)
at
org.apache.spark.streaming.dstream.DStream$$anonfun$restoreCheckpointData$2.apply(DStream.scala:403)
at scala.collection.immutable.List.foreach(List.scala:318)
at
org.apache.spark.streaming.dstream.DStream.restoreCheckpointData(DStream.scala:403)
at
org.apache.spark.streaming.dstream.DStream$$anonfun$restoreCheckpointData$2.apply(DStream.scala:403)
at
org.apache.spark.streaming.dstream.DStream$$anonfun$restoreCheckpointData$2.apply(DStream.scala:403)
at scala.collection.immutable.List.foreach(List.scala:318)
at
org.apache.spark.streaming.dstream.DStream.restoreCheckpointData(DStream.scala:403)
at
org.apache.spark.streaming.DStreamGraph$$anonfun$restoreCheckpointData$2.apply(DStreamGraph.scala:149)


[GraphX] how to set memory configurations to avoid OutOfMemoryError "GC overhead limit exceeded"

2014-08-18 Thread Yifan LI
Hi,

I am testing our application(similar to "personalised page rank" using Pregel, 
and note that each vertex property will need pretty much more space to store 
after new iteration), it works correctly on small graph.(we have one single 
machine, 8 cores, 16G memory)

But when we ran it on larger graph(e.g. LiveJouranl), it always end at the 
error "GC overhead limit exceeded", even the partitions number is increased to 
48 from 8.

The existing memory setting in spark-env.sh:
SPARK_DAEMON_JAVA_OPTS='-Xms1g -Xmx10g -XX:MaxPermSize=1g'
SPARK_MEM=12g
export SPARK_DAEMON_JAVA_OPTS
export SPARK_MEM


Is there any improvements I could do? 


Thanks in advance!


Best,
Yifan

Re: How to fix OutOfMemoryError: GC overhead limit exceeded when using Spark Streaming checkpointing

2015-08-10 Thread Cody Koeninger
That looks like it's during recovery from a checkpoint, so it'd be driver
memory not executor memory.

How big is the checkpoint directory that you're trying to restore from?

On Mon, Aug 10, 2015 at 10:57 AM, Dmitry Goldenberg <
dgoldenberg...@gmail.com> wrote:

> We're getting the below error.  Tried increasing spark.executor.memory
> e.g. from 1g to 2g but the below error still happens.
>
> Any recommendations? Something to do with specifying -Xmx in the submit
> job scripts?
>
> Thanks.
>
> Exception in thread "main" java.lang.OutOfMemoryError: GC overhead limit
> exceeded
> at java.util.Arrays.copyOf(Arrays.java:3332)
> at
> java.lang.AbstractStringBuilder.expandCapacity(AbstractStringBuilder.java:137)
> at
> java.lang.AbstractStringBuilder.ensureCapacityInternal(AbstractStringBuilder.java:121)
> at java.lang.AbstractStringBuilder.append(AbstractStringBuilder.java:421)
> at java.lang.StringBuilder.append(StringBuilder.java:136)
> at java.lang.StackTraceElement.toString(StackTraceElement.java:173)
> at
> org.apache.spark.util.Utils$$anonfun$getCallSite$1.apply(Utils.scala:1212)
> at
> org.apache.spark.util.Utils$$anonfun$getCallSite$1.apply(Utils.scala:1190)
> at
> scala.collection.IndexedSeqOptimized$class.foreach(IndexedSeqOptimized.scala:33)
> at scala.collection.mutable.ArrayOps$ofRef.foreach(ArrayOps.scala:108)
> at org.apache.spark.util.Utils$.getCallSite(Utils.scala:1190)
> at
> org.apache.spark.SparkContext$$anonfun$getCallSite$2.apply(SparkContext.scala:1441)
> at
> org.apache.spark.SparkContext$$anonfun$getCallSite$2.apply(SparkContext.scala:1441)
> at scala.Option.getOrElse(Option.scala:120)
> at org.apache.spark.SparkContext.getCallSite(SparkContext.scala:1441)
> at org.apache.spark.rdd.RDD.(RDD.scala:1365)
> at org.apache.spark.streaming.kafka.KafkaRDD.(KafkaRDD.scala:46)
> at
> org.apache.spark.streaming.kafka.DirectKafkaInputDStream$DirectKafkaInputDStreamCheckpointData$$anonfun$restore$2.apply(DirectKafkaInputDStream.scala:155)
> at
> org.apache.spark.streaming.kafka.DirectKafkaInputDStream$DirectKafkaInputDStreamCheckpointData$$anonfun$restore$2.apply(DirectKafkaInputDStream.scala:153)
> at
> scala.collection.mutable.ResizableArray$class.foreach(ResizableArray.scala:59)
> at scala.collection.mutable.ArrayBuffer.foreach(ArrayBuffer.scala:47)
> at
> org.apache.spark.streaming.kafka.DirectKafkaInputDStream$DirectKafkaInputDStreamCheckpointData.restore(DirectKafkaInputDStream.scala:153)
> at
> org.apache.spark.streaming.dstream.DStream.restoreCheckpointData(DStream.scala:402)
> at
> org.apache.spark.streaming.dstream.DStream$$anonfun$restoreCheckpointData$2.apply(DStream.scala:403)
> at
> org.apache.spark.streaming.dstream.DStream$$anonfun$restoreCheckpointData$2.apply(DStream.scala:403)
> at scala.collection.immutable.List.foreach(List.scala:318)
> at
> org.apache.spark.streaming.dstream.DStream.restoreCheckpointData(DStream.scala:403)
> at
> org.apache.spark.streaming.dstream.DStream$$anonfun$restoreCheckpointData$2.apply(DStream.scala:403)
> at
> org.apache.spark.streaming.dstream.DStream$$anonfun$restoreCheckpointData$2.apply(DStream.scala:403)
> at scala.collection.immutable.List.foreach(List.scala:318)
> at
> org.apache.spark.streaming.dstream.DStream.restoreCheckpointData(DStream.scala:403)
> at
> org.apache.spark.streaming.DStreamGraph$$anonfun$restoreCheckpointData$2.apply(DStreamGraph.scala:149)
>
>
>
>


Re: How to fix OutOfMemoryError: GC overhead limit exceeded when using Spark Streaming checkpointing

2015-08-10 Thread Dmitry Goldenberg
Thanks, Cody, will try that. Unfortunately due to a reinstall I don't have
the original checkpointing directory :(  Thanks for the clarification on
spark.driver.memory, I'll keep testing (at 2g things seem OK for now).

On Mon, Aug 10, 2015 at 12:10 PM, Cody Koeninger  wrote:

> That looks like it's during recovery from a checkpoint, so it'd be driver
> memory not executor memory.
>
> How big is the checkpoint directory that you're trying to restore from?
>
> On Mon, Aug 10, 2015 at 10:57 AM, Dmitry Goldenberg <
> dgoldenberg...@gmail.com> wrote:
>
>> We're getting the below error.  Tried increasing spark.executor.memory
>> e.g. from 1g to 2g but the below error still happens.
>>
>> Any recommendations? Something to do with specifying -Xmx in the submit
>> job scripts?
>>
>> Thanks.
>>
>> Exception in thread "main" java.lang.OutOfMemoryError: GC overhead limit
>> exceeded
>> at java.util.Arrays.copyOf(Arrays.java:3332)
>> at
>> java.lang.AbstractStringBuilder.expandCapacity(AbstractStringBuilder.java:137)
>> at
>> java.lang.AbstractStringBuilder.ensureCapacityInternal(AbstractStringBuilder.java:121)
>> at java.lang.AbstractStringBuilder.append(AbstractStringBuilder.java:421)
>> at java.lang.StringBuilder.append(StringBuilder.java:136)
>> at java.lang.StackTraceElement.toString(StackTraceElement.java:173)
>> at
>> org.apache.spark.util.Utils$$anonfun$getCallSite$1.apply(Utils.scala:1212)
>> at
>> org.apache.spark.util.Utils$$anonfun$getCallSite$1.apply(Utils.scala:1190)
>> at
>> scala.collection.IndexedSeqOptimized$class.foreach(IndexedSeqOptimized.scala:33)
>> at scala.collection.mutable.ArrayOps$ofRef.foreach(ArrayOps.scala:108)
>> at org.apache.spark.util.Utils$.getCallSite(Utils.scala:1190)
>> at
>> org.apache.spark.SparkContext$$anonfun$getCallSite$2.apply(SparkContext.scala:1441)
>> at
>> org.apache.spark.SparkContext$$anonfun$getCallSite$2.apply(SparkContext.scala:1441)
>> at scala.Option.getOrElse(Option.scala:120)
>> at org.apache.spark.SparkContext.getCallSite(SparkContext.scala:1441)
>> at org.apache.spark.rdd.RDD.(RDD.scala:1365)
>> at org.apache.spark.streaming.kafka.KafkaRDD.(KafkaRDD.scala:46)
>> at
>> org.apache.spark.streaming.kafka.DirectKafkaInputDStream$DirectKafkaInputDStreamCheckpointData$$anonfun$restore$2.apply(DirectKafkaInputDStream.scala:155)
>> at
>> org.apache.spark.streaming.kafka.DirectKafkaInputDStream$DirectKafkaInputDStreamCheckpointData$$anonfun$restore$2.apply(DirectKafkaInputDStream.scala:153)
>> at
>> scala.collection.mutable.ResizableArray$class.foreach(ResizableArray.scala:59)
>> at scala.collection.mutable.ArrayBuffer.foreach(ArrayBuffer.scala:47)
>> at
>> org.apache.spark.streaming.kafka.DirectKafkaInputDStream$DirectKafkaInputDStreamCheckpointData.restore(DirectKafkaInputDStream.scala:153)
>> at
>> org.apache.spark.streaming.dstream.DStream.restoreCheckpointData(DStream.scala:402)
>> at
>> org.apache.spark.streaming.dstream.DStream$$anonfun$restoreCheckpointData$2.apply(DStream.scala:403)
>> at
>> org.apache.spark.streaming.dstream.DStream$$anonfun$restoreCheckpointData$2.apply(DStream.scala:403)
>> at scala.collection.immutable.List.foreach(List.scala:318)
>> at
>> org.apache.spark.streaming.dstream.DStream.restoreCheckpointData(DStream.scala:403)
>> at
>> org.apache.spark.streaming.dstream.DStream$$anonfun$restoreCheckpointData$2.apply(DStream.scala:403)
>> at
>> org.apache.spark.streaming.dstream.DStream$$anonfun$restoreCheckpointData$2.apply(DStream.scala:403)
>> at scala.collection.immutable.List.foreach(List.scala:318)
>> at
>> org.apache.spark.streaming.dstream.DStream.restoreCheckpointData(DStream.scala:403)
>> at
>> org.apache.spark.streaming.DStreamGraph$$anonfun$restoreCheckpointData$2.apply(DStreamGraph.scala:149)
>>
>>
>>
>>
>


Re: How to fix OutOfMemoryError: GC overhead limit exceeded when using Spark Streaming checkpointing

2015-08-10 Thread Ted Yu
I wonder during recovery from a checkpoint whether we can estimate the size
of the checkpoint and compare with Runtime.getRuntime().freeMemory().

If the size of checkpoint is much bigger than free memory, log warning, etc

Cheers

On Mon, Aug 10, 2015 at 9:34 AM, Dmitry Goldenberg  wrote:

> Thanks, Cody, will try that. Unfortunately due to a reinstall I don't have
> the original checkpointing directory :(  Thanks for the clarification on
> spark.driver.memory, I'll keep testing (at 2g things seem OK for now).
>
> On Mon, Aug 10, 2015 at 12:10 PM, Cody Koeninger 
> wrote:
>
>> That looks like it's during recovery from a checkpoint, so it'd be driver
>> memory not executor memory.
>>
>> How big is the checkpoint directory that you're trying to restore from?
>>
>> On Mon, Aug 10, 2015 at 10:57 AM, Dmitry Goldenberg <
>> dgoldenberg...@gmail.com> wrote:
>>
>>> We're getting the below error.  Tried increasing spark.executor.memory
>>> e.g. from 1g to 2g but the below error still happens.
>>>
>>> Any recommendations? Something to do with specifying -Xmx in the submit
>>> job scripts?
>>>
>>> Thanks.
>>>
>>> Exception in thread "main" java.lang.OutOfMemoryError: GC overhead limit
>>> exceeded
>>> at java.util.Arrays.copyOf(Arrays.java:3332)
>>> at
>>> java.lang.AbstractStringBuilder.expandCapacity(AbstractStringBuilder.java:137)
>>> at
>>> java.lang.AbstractStringBuilder.ensureCapacityInternal(AbstractStringBuilder.java:121)
>>> at java.lang.AbstractStringBuilder.append(AbstractStringBuilder.java:421)
>>> at java.lang.StringBuilder.append(StringBuilder.java:136)
>>> at java.lang.StackTraceElement.toString(StackTraceElement.java:173)
>>> at
>>> org.apache.spark.util.Utils$$anonfun$getCallSite$1.apply(Utils.scala:1212)
>>> at
>>> org.apache.spark.util.Utils$$anonfun$getCallSite$1.apply(Utils.scala:1190)
>>> at
>>> scala.collection.IndexedSeqOptimized$class.foreach(IndexedSeqOptimized.scala:33)
>>> at scala.collection.mutable.ArrayOps$ofRef.foreach(ArrayOps.scala:108)
>>> at org.apache.spark.util.Utils$.getCallSite(Utils.scala:1190)
>>> at
>>> org.apache.spark.SparkContext$$anonfun$getCallSite$2.apply(SparkContext.scala:1441)
>>> at
>>> org.apache.spark.SparkContext$$anonfun$getCallSite$2.apply(SparkContext.scala:1441)
>>> at scala.Option.getOrElse(Option.scala:120)
>>> at org.apache.spark.SparkContext.getCallSite(SparkContext.scala:1441)
>>> at org.apache.spark.rdd.RDD.(RDD.scala:1365)
>>> at org.apache.spark.streaming.kafka.KafkaRDD.(KafkaRDD.scala:46)
>>> at
>>> org.apache.spark.streaming.kafka.DirectKafkaInputDStream$DirectKafkaInputDStreamCheckpointData$$anonfun$restore$2.apply(DirectKafkaInputDStream.scala:155)
>>> at
>>> org.apache.spark.streaming.kafka.DirectKafkaInputDStream$DirectKafkaInputDStreamCheckpointData$$anonfun$restore$2.apply(DirectKafkaInputDStream.scala:153)
>>> at
>>> scala.collection.mutable.ResizableArray$class.foreach(ResizableArray.scala:59)
>>> at scala.collection.mutable.ArrayBuffer.foreach(ArrayBuffer.scala:47)
>>> at
>>> org.apache.spark.streaming.kafka.DirectKafkaInputDStream$DirectKafkaInputDStreamCheckpointData.restore(DirectKafkaInputDStream.scala:153)
>>> at
>>> org.apache.spark.streaming.dstream.DStream.restoreCheckpointData(DStream.scala:402)
>>> at
>>> org.apache.spark.streaming.dstream.DStream$$anonfun$restoreCheckpointData$2.apply(DStream.scala:403)
>>> at
>>> org.apache.spark.streaming.dstream.DStream$$anonfun$restoreCheckpointData$2.apply(DStream.scala:403)
>>> at scala.collection.immutable.List.foreach(List.scala:318)
>>> at
>>> org.apache.spark.streaming.dstream.DStream.restoreCheckpointData(DStream.scala:403)
>>> at
>>> org.apache.spark.streaming.dstream.DStream$$anonfun$restoreCheckpointData$2.apply(DStream.scala:403)
>>> at
>>> org.apache.spark.streaming.dstream.DStream$$anonfun$restoreCheckpointData$2.apply(DStream.scala:403)
>>> at scala.collection.immutable.List.foreach(List.scala:318)
>>> at
>>> org.apache.spark.streaming.dstream.DStream.restoreCheckpointData(DStream.scala:403)
>>> at
>>> org.apache.spark.streaming.DStreamGraph$$anonfun$restoreCheckpointData$2.apply(DStreamGraph.scala:149)
>>>
>>>
>>>
>>>
>>
>


Re: How to fix OutOfMemoryError: GC overhead limit exceeded when using Spark Streaming checkpointing

2015-08-10 Thread Dmitry Goldenberg
Would there be a way to chunk up/batch up the contents of the checkpointing
directories as they're being processed by Spark Streaming?  Is it mandatory
to load the whole thing in one go?

On Mon, Aug 10, 2015 at 12:42 PM, Ted Yu  wrote:

> I wonder during recovery from a checkpoint whether we can estimate the
> size of the checkpoint and compare with Runtime.getRuntime().freeMemory().
>
> If the size of checkpoint is much bigger than free memory, log warning, etc
>
> Cheers
>
> On Mon, Aug 10, 2015 at 9:34 AM, Dmitry Goldenberg <
> dgoldenberg...@gmail.com> wrote:
>
>> Thanks, Cody, will try that. Unfortunately due to a reinstall I don't
>> have the original checkpointing directory :(  Thanks for the clarification
>> on spark.driver.memory, I'll keep testing (at 2g things seem OK for now).
>>
>> On Mon, Aug 10, 2015 at 12:10 PM, Cody Koeninger 
>> wrote:
>>
>>> That looks like it's during recovery from a checkpoint, so it'd be
>>> driver memory not executor memory.
>>>
>>> How big is the checkpoint directory that you're trying to restore from?
>>>
>>> On Mon, Aug 10, 2015 at 10:57 AM, Dmitry Goldenberg <
>>> dgoldenberg...@gmail.com> wrote:
>>>
 We're getting the below error.  Tried increasing spark.executor.memory
 e.g. from 1g to 2g but the below error still happens.

 Any recommendations? Something to do with specifying -Xmx in the submit
 job scripts?

 Thanks.

 Exception in thread "main" java.lang.OutOfMemoryError: GC overhead
 limit exceeded
 at java.util.Arrays.copyOf(Arrays.java:3332)
 at
 java.lang.AbstractStringBuilder.expandCapacity(AbstractStringBuilder.java:137)
 at
 java.lang.AbstractStringBuilder.ensureCapacityInternal(AbstractStringBuilder.java:121)
 at
 java.lang.AbstractStringBuilder.append(AbstractStringBuilder.java:421)
 at java.lang.StringBuilder.append(StringBuilder.java:136)
 at java.lang.StackTraceElement.toString(StackTraceElement.java:173)
 at
 org.apache.spark.util.Utils$$anonfun$getCallSite$1.apply(Utils.scala:1212)
 at
 org.apache.spark.util.Utils$$anonfun$getCallSite$1.apply(Utils.scala:1190)
 at
 scala.collection.IndexedSeqOptimized$class.foreach(IndexedSeqOptimized.scala:33)
 at scala.collection.mutable.ArrayOps$ofRef.foreach(ArrayOps.scala:108)
 at org.apache.spark.util.Utils$.getCallSite(Utils.scala:1190)
 at
 org.apache.spark.SparkContext$$anonfun$getCallSite$2.apply(SparkContext.scala:1441)
 at
 org.apache.spark.SparkContext$$anonfun$getCallSite$2.apply(SparkContext.scala:1441)
 at scala.Option.getOrElse(Option.scala:120)
 at org.apache.spark.SparkContext.getCallSite(SparkContext.scala:1441)
 at org.apache.spark.rdd.RDD.(RDD.scala:1365)
 at org.apache.spark.streaming.kafka.KafkaRDD.(KafkaRDD.scala:46)
 at
 org.apache.spark.streaming.kafka.DirectKafkaInputDStream$DirectKafkaInputDStreamCheckpointData$$anonfun$restore$2.apply(DirectKafkaInputDStream.scala:155)
 at
 org.apache.spark.streaming.kafka.DirectKafkaInputDStream$DirectKafkaInputDStreamCheckpointData$$anonfun$restore$2.apply(DirectKafkaInputDStream.scala:153)
 at
 scala.collection.mutable.ResizableArray$class.foreach(ResizableArray.scala:59)
 at scala.collection.mutable.ArrayBuffer.foreach(ArrayBuffer.scala:47)
 at
 org.apache.spark.streaming.kafka.DirectKafkaInputDStream$DirectKafkaInputDStreamCheckpointData.restore(DirectKafkaInputDStream.scala:153)
 at
 org.apache.spark.streaming.dstream.DStream.restoreCheckpointData(DStream.scala:402)
 at
 org.apache.spark.streaming.dstream.DStream$$anonfun$restoreCheckpointData$2.apply(DStream.scala:403)
 at
 org.apache.spark.streaming.dstream.DStream$$anonfun$restoreCheckpointData$2.apply(DStream.scala:403)
 at scala.collection.immutable.List.foreach(List.scala:318)
 at
 org.apache.spark.streaming.dstream.DStream.restoreCheckpointData(DStream.scala:403)
 at
 org.apache.spark.streaming.dstream.DStream$$anonfun$restoreCheckpointData$2.apply(DStream.scala:403)
 at
 org.apache.spark.streaming.dstream.DStream$$anonfun$restoreCheckpointData$2.apply(DStream.scala:403)
 at scala.collection.immutable.List.foreach(List.scala:318)
 at
 org.apache.spark.streaming.dstream.DStream.restoreCheckpointData(DStream.scala:403)
 at
 org.apache.spark.streaming.DStreamGraph$$anonfun$restoreCheckpointData$2.apply(DStreamGraph.scala:149)




>>>
>>
>


Re: How to fix OutOfMemoryError: GC overhead limit exceeded when using Spark Streaming checkpointing

2015-08-10 Thread Cody Koeninger
You need to keep a certain number of rdds around for checkpointing, based
on e.g. the window size.  Those would all need to be loaded at once.

On Mon, Aug 10, 2015 at 11:49 AM, Dmitry Goldenberg <
dgoldenberg...@gmail.com> wrote:

> Would there be a way to chunk up/batch up the contents of the
> checkpointing directories as they're being processed by Spark Streaming?
> Is it mandatory to load the whole thing in one go?
>
> On Mon, Aug 10, 2015 at 12:42 PM, Ted Yu  wrote:
>
>> I wonder during recovery from a checkpoint whether we can estimate the
>> size of the checkpoint and compare with Runtime.getRuntime().freeMemory
>> ().
>>
>> If the size of checkpoint is much bigger than free memory, log warning,
>> etc
>>
>> Cheers
>>
>> On Mon, Aug 10, 2015 at 9:34 AM, Dmitry Goldenberg <
>> dgoldenberg...@gmail.com> wrote:
>>
>>> Thanks, Cody, will try that. Unfortunately due to a reinstall I don't
>>> have the original checkpointing directory :(  Thanks for the clarification
>>> on spark.driver.memory, I'll keep testing (at 2g things seem OK for now).
>>>
>>> On Mon, Aug 10, 2015 at 12:10 PM, Cody Koeninger 
>>> wrote:
>>>
 That looks like it's during recovery from a checkpoint, so it'd be
 driver memory not executor memory.

 How big is the checkpoint directory that you're trying to restore from?

 On Mon, Aug 10, 2015 at 10:57 AM, Dmitry Goldenberg <
 dgoldenberg...@gmail.com> wrote:

> We're getting the below error.  Tried increasing spark.executor.memory
> e.g. from 1g to 2g but the below error still happens.
>
> Any recommendations? Something to do with specifying -Xmx in the
> submit job scripts?
>
> Thanks.
>
> Exception in thread "main" java.lang.OutOfMemoryError: GC overhead
> limit exceeded
> at java.util.Arrays.copyOf(Arrays.java:3332)
> at
> java.lang.AbstractStringBuilder.expandCapacity(AbstractStringBuilder.java:137)
> at
> java.lang.AbstractStringBuilder.ensureCapacityInternal(AbstractStringBuilder.java:121)
> at
> java.lang.AbstractStringBuilder.append(AbstractStringBuilder.java:421)
> at java.lang.StringBuilder.append(StringBuilder.java:136)
> at java.lang.StackTraceElement.toString(StackTraceElement.java:173)
> at
> org.apache.spark.util.Utils$$anonfun$getCallSite$1.apply(Utils.scala:1212)
> at
> org.apache.spark.util.Utils$$anonfun$getCallSite$1.apply(Utils.scala:1190)
> at
> scala.collection.IndexedSeqOptimized$class.foreach(IndexedSeqOptimized.scala:33)
> at scala.collection.mutable.ArrayOps$ofRef.foreach(ArrayOps.scala:108)
> at org.apache.spark.util.Utils$.getCallSite(Utils.scala:1190)
> at
> org.apache.spark.SparkContext$$anonfun$getCallSite$2.apply(SparkContext.scala:1441)
> at
> org.apache.spark.SparkContext$$anonfun$getCallSite$2.apply(SparkContext.scala:1441)
> at scala.Option.getOrElse(Option.scala:120)
> at org.apache.spark.SparkContext.getCallSite(SparkContext.scala:1441)
> at org.apache.spark.rdd.RDD.(RDD.scala:1365)
> at org.apache.spark.streaming.kafka.KafkaRDD.(KafkaRDD.scala:46)
> at
> org.apache.spark.streaming.kafka.DirectKafkaInputDStream$DirectKafkaInputDStreamCheckpointData$$anonfun$restore$2.apply(DirectKafkaInputDStream.scala:155)
> at
> org.apache.spark.streaming.kafka.DirectKafkaInputDStream$DirectKafkaInputDStreamCheckpointData$$anonfun$restore$2.apply(DirectKafkaInputDStream.scala:153)
> at
> scala.collection.mutable.ResizableArray$class.foreach(ResizableArray.scala:59)
> at scala.collection.mutable.ArrayBuffer.foreach(ArrayBuffer.scala:47)
> at
> org.apache.spark.streaming.kafka.DirectKafkaInputDStream$DirectKafkaInputDStreamCheckpointData.restore(DirectKafkaInputDStream.scala:153)
> at
> org.apache.spark.streaming.dstream.DStream.restoreCheckpointData(DStream.scala:402)
> at
> org.apache.spark.streaming.dstream.DStream$$anonfun$restoreCheckpointData$2.apply(DStream.scala:403)
> at
> org.apache.spark.streaming.dstream.DStream$$anonfun$restoreCheckpointData$2.apply(DStream.scala:403)
> at scala.collection.immutable.List.foreach(List.scala:318)
> at
> org.apache.spark.streaming.dstream.DStream.restoreCheckpointData(DStream.scala:403)
> at
> org.apache.spark.streaming.dstream.DStream$$anonfun$restoreCheckpointData$2.apply(DStream.scala:403)
> at
> org.apache.spark.streaming.dstream.DStream$$anonfun$restoreCheckpointData$2.apply(DStream.scala:403)
> at scala.collection.immutable.List.foreach(List.scala:318)
> at
> org.apache.spark.streaming.dstream.DStream.restoreCheckpointData(DStream.scala:403)
> at
> org.apache.spark.streaming.DStreamGraph$$anonfun$restoreCheckpointData$2.apply(DStreamGraph.scala:149)
>
>
>
>

>>>
>>
>


Re: How to fix OutOfMemoryError: GC overhead limit exceeded when using Spark Streaming checkpointing

2015-08-10 Thread Ted Yu
Looks like workaround is to reduce *window length.*

*Cheers*

On Mon, Aug 10, 2015 at 10:07 AM, Cody Koeninger  wrote:

> You need to keep a certain number of rdds around for checkpointing, based
> on e.g. the window size.  Those would all need to be loaded at once.
>
> On Mon, Aug 10, 2015 at 11:49 AM, Dmitry Goldenberg <
> dgoldenberg...@gmail.com> wrote:
>
>> Would there be a way to chunk up/batch up the contents of the
>> checkpointing directories as they're being processed by Spark Streaming?
>> Is it mandatory to load the whole thing in one go?
>>
>> On Mon, Aug 10, 2015 at 12:42 PM, Ted Yu  wrote:
>>
>>> I wonder during recovery from a checkpoint whether we can estimate the
>>> size of the checkpoint and compare with Runtime.getRuntime().freeMemory
>>> ().
>>>
>>> If the size of checkpoint is much bigger than free memory, log warning,
>>> etc
>>>
>>> Cheers
>>>
>>> On Mon, Aug 10, 2015 at 9:34 AM, Dmitry Goldenberg <
>>> dgoldenberg...@gmail.com> wrote:
>>>
 Thanks, Cody, will try that. Unfortunately due to a reinstall I don't
 have the original checkpointing directory :(  Thanks for the clarification
 on spark.driver.memory, I'll keep testing (at 2g things seem OK for now).

 On Mon, Aug 10, 2015 at 12:10 PM, Cody Koeninger 
 wrote:

> That looks like it's during recovery from a checkpoint, so it'd be
> driver memory not executor memory.
>
> How big is the checkpoint directory that you're trying to restore from?
>
> On Mon, Aug 10, 2015 at 10:57 AM, Dmitry Goldenberg <
> dgoldenberg...@gmail.com> wrote:
>
>> We're getting the below error.  Tried increasing
>> spark.executor.memory e.g. from 1g to 2g but the below error still 
>> happens.
>>
>> Any recommendations? Something to do with specifying -Xmx in the
>> submit job scripts?
>>
>> Thanks.
>>
>> Exception in thread "main" java.lang.OutOfMemoryError: GC overhead
>> limit exceeded
>> at java.util.Arrays.copyOf(Arrays.java:3332)
>> at
>> java.lang.AbstractStringBuilder.expandCapacity(AbstractStringBuilder.java:137)
>> at
>> java.lang.AbstractStringBuilder.ensureCapacityInternal(AbstractStringBuilder.java:121)
>> at
>> java.lang.AbstractStringBuilder.append(AbstractStringBuilder.java:421)
>> at java.lang.StringBuilder.append(StringBuilder.java:136)
>> at java.lang.StackTraceElement.toString(StackTraceElement.java:173)
>> at
>> org.apache.spark.util.Utils$$anonfun$getCallSite$1.apply(Utils.scala:1212)
>> at
>> org.apache.spark.util.Utils$$anonfun$getCallSite$1.apply(Utils.scala:1190)
>> at
>> scala.collection.IndexedSeqOptimized$class.foreach(IndexedSeqOptimized.scala:33)
>> at scala.collection.mutable.ArrayOps$ofRef.foreach(ArrayOps.scala:108)
>> at org.apache.spark.util.Utils$.getCallSite(Utils.scala:1190)
>> at
>> org.apache.spark.SparkContext$$anonfun$getCallSite$2.apply(SparkContext.scala:1441)
>> at
>> org.apache.spark.SparkContext$$anonfun$getCallSite$2.apply(SparkContext.scala:1441)
>> at scala.Option.getOrElse(Option.scala:120)
>> at org.apache.spark.SparkContext.getCallSite(SparkContext.scala:1441)
>> at org.apache.spark.rdd.RDD.(RDD.scala:1365)
>> at org.apache.spark.streaming.kafka.KafkaRDD.(KafkaRDD.scala:46)
>> at
>> org.apache.spark.streaming.kafka.DirectKafkaInputDStream$DirectKafkaInputDStreamCheckpointData$$anonfun$restore$2.apply(DirectKafkaInputDStream.scala:155)
>> at
>> org.apache.spark.streaming.kafka.DirectKafkaInputDStream$DirectKafkaInputDStreamCheckpointData$$anonfun$restore$2.apply(DirectKafkaInputDStream.scala:153)
>> at
>> scala.collection.mutable.ResizableArray$class.foreach(ResizableArray.scala:59)
>> at scala.collection.mutable.ArrayBuffer.foreach(ArrayBuffer.scala:47)
>> at
>> org.apache.spark.streaming.kafka.DirectKafkaInputDStream$DirectKafkaInputDStreamCheckpointData.restore(DirectKafkaInputDStream.scala:153)
>> at
>> org.apache.spark.streaming.dstream.DStream.restoreCheckpointData(DStream.scala:402)
>> at
>> org.apache.spark.streaming.dstream.DStream$$anonfun$restoreCheckpointData$2.apply(DStream.scala:403)
>> at
>> org.apache.spark.streaming.dstream.DStream$$anonfun$restoreCheckpointData$2.apply(DStream.scala:403)
>> at scala.collection.immutable.List.foreach(List.scala:318)
>> at
>> org.apache.spark.streaming.dstream.DStream.restoreCheckpointData(DStream.scala:403)
>> at
>> org.apache.spark.streaming.dstream.DStream$$anonfun$restoreCheckpointData$2.apply(DStream.scala:403)
>> at
>> org.apache.spark.streaming.dstream.DStream$$anonfun$restoreCheckpointData$2.apply(DStream.scala:403)
>> at scala.collection.immutable.List.foreach(List.scala:318)
>> at
>> org.apache.spark.streaming.dstream.DStream.restoreCheckpointData(DStream.scala:403)
>> at
>> org.apache.spark.streaming.DStreamGraph$$anonfun$restoreCheckp

Re: How to fix OutOfMemoryError: GC overhead limit exceeded when using Spark Streaming checkpointing

2015-08-10 Thread Dmitry Goldenberg
"You need to keep a certain number of rdds around for checkpointing" --
that seems like a hefty expense to pay in order to achieve fault
tolerance.  Why does Spark persist whole RDD's of data?  Shouldn't it be
sufficient to just persist the offsets, to know where to resume from?

Thanks.

On Mon, Aug 10, 2015 at 1:07 PM, Cody Koeninger  wrote:

> You need to keep a certain number of rdds around for checkpointing, based
> on e.g. the window size.  Those would all need to be loaded at once.
>
> On Mon, Aug 10, 2015 at 11:49 AM, Dmitry Goldenberg <
> dgoldenberg...@gmail.com> wrote:
>
>> Would there be a way to chunk up/batch up the contents of the
>> checkpointing directories as they're being processed by Spark Streaming?
>> Is it mandatory to load the whole thing in one go?
>>
>> On Mon, Aug 10, 2015 at 12:42 PM, Ted Yu  wrote:
>>
>>> I wonder during recovery from a checkpoint whether we can estimate the
>>> size of the checkpoint and compare with Runtime.getRuntime().freeMemory
>>> ().
>>>
>>> If the size of checkpoint is much bigger than free memory, log warning,
>>> etc
>>>
>>> Cheers
>>>
>>> On Mon, Aug 10, 2015 at 9:34 AM, Dmitry Goldenberg <
>>> dgoldenberg...@gmail.com> wrote:
>>>
 Thanks, Cody, will try that. Unfortunately due to a reinstall I don't
 have the original checkpointing directory :(  Thanks for the clarification
 on spark.driver.memory, I'll keep testing (at 2g things seem OK for now).

 On Mon, Aug 10, 2015 at 12:10 PM, Cody Koeninger 
 wrote:

> That looks like it's during recovery from a checkpoint, so it'd be
> driver memory not executor memory.
>
> How big is the checkpoint directory that you're trying to restore from?
>
> On Mon, Aug 10, 2015 at 10:57 AM, Dmitry Goldenberg <
> dgoldenberg...@gmail.com> wrote:
>
>> We're getting the below error.  Tried increasing
>> spark.executor.memory e.g. from 1g to 2g but the below error still 
>> happens.
>>
>> Any recommendations? Something to do with specifying -Xmx in the
>> submit job scripts?
>>
>> Thanks.
>>
>> Exception in thread "main" java.lang.OutOfMemoryError: GC overhead
>> limit exceeded
>> at java.util.Arrays.copyOf(Arrays.java:3332)
>> at
>> java.lang.AbstractStringBuilder.expandCapacity(AbstractStringBuilder.java:137)
>> at
>> java.lang.AbstractStringBuilder.ensureCapacityInternal(AbstractStringBuilder.java:121)
>> at
>> java.lang.AbstractStringBuilder.append(AbstractStringBuilder.java:421)
>> at java.lang.StringBuilder.append(StringBuilder.java:136)
>> at java.lang.StackTraceElement.toString(StackTraceElement.java:173)
>> at
>> org.apache.spark.util.Utils$$anonfun$getCallSite$1.apply(Utils.scala:1212)
>> at
>> org.apache.spark.util.Utils$$anonfun$getCallSite$1.apply(Utils.scala:1190)
>> at
>> scala.collection.IndexedSeqOptimized$class.foreach(IndexedSeqOptimized.scala:33)
>> at scala.collection.mutable.ArrayOps$ofRef.foreach(ArrayOps.scala:108)
>> at org.apache.spark.util.Utils$.getCallSite(Utils.scala:1190)
>> at
>> org.apache.spark.SparkContext$$anonfun$getCallSite$2.apply(SparkContext.scala:1441)
>> at
>> org.apache.spark.SparkContext$$anonfun$getCallSite$2.apply(SparkContext.scala:1441)
>> at scala.Option.getOrElse(Option.scala:120)
>> at org.apache.spark.SparkContext.getCallSite(SparkContext.scala:1441)
>> at org.apache.spark.rdd.RDD.(RDD.scala:1365)
>> at org.apache.spark.streaming.kafka.KafkaRDD.(KafkaRDD.scala:46)
>> at
>> org.apache.spark.streaming.kafka.DirectKafkaInputDStream$DirectKafkaInputDStreamCheckpointData$$anonfun$restore$2.apply(DirectKafkaInputDStream.scala:155)
>> at
>> org.apache.spark.streaming.kafka.DirectKafkaInputDStream$DirectKafkaInputDStreamCheckpointData$$anonfun$restore$2.apply(DirectKafkaInputDStream.scala:153)
>> at
>> scala.collection.mutable.ResizableArray$class.foreach(ResizableArray.scala:59)
>> at scala.collection.mutable.ArrayBuffer.foreach(ArrayBuffer.scala:47)
>> at
>> org.apache.spark.streaming.kafka.DirectKafkaInputDStream$DirectKafkaInputDStreamCheckpointData.restore(DirectKafkaInputDStream.scala:153)
>> at
>> org.apache.spark.streaming.dstream.DStream.restoreCheckpointData(DStream.scala:402)
>> at
>> org.apache.spark.streaming.dstream.DStream$$anonfun$restoreCheckpointData$2.apply(DStream.scala:403)
>> at
>> org.apache.spark.streaming.dstream.DStream$$anonfun$restoreCheckpointData$2.apply(DStream.scala:403)
>> at scala.collection.immutable.List.foreach(List.scala:318)
>> at
>> org.apache.spark.streaming.dstream.DStream.restoreCheckpointData(DStream.scala:403)
>> at
>> org.apache.spark.streaming.dstream.DStream$$anonfun$restoreCheckpointData$2.apply(DStream.scala:403)
>> at
>> org.apache.spark.streaming.dstream.DStream$$anonfun$restoreCheckpointData$2.apply(DStream.scala:403)
>> at scala.coll

Re: How to fix OutOfMemoryError: GC overhead limit exceeded when using Spark Streaming checkpointing

2015-08-10 Thread Cody Koeninger
The rdd is indeed defined by mostly just the offsets / topic partitions.

On Mon, Aug 10, 2015 at 3:24 PM, Dmitry Goldenberg  wrote:

> "You need to keep a certain number of rdds around for checkpointing" --
> that seems like a hefty expense to pay in order to achieve fault
> tolerance.  Why does Spark persist whole RDD's of data?  Shouldn't it be
> sufficient to just persist the offsets, to know where to resume from?
>
> Thanks.
>
>
> On Mon, Aug 10, 2015 at 1:07 PM, Cody Koeninger 
> wrote:
>
>> You need to keep a certain number of rdds around for checkpointing, based
>> on e.g. the window size.  Those would all need to be loaded at once.
>>
>> On Mon, Aug 10, 2015 at 11:49 AM, Dmitry Goldenberg <
>> dgoldenberg...@gmail.com> wrote:
>>
>>> Would there be a way to chunk up/batch up the contents of the
>>> checkpointing directories as they're being processed by Spark Streaming?
>>> Is it mandatory to load the whole thing in one go?
>>>
>>> On Mon, Aug 10, 2015 at 12:42 PM, Ted Yu  wrote:
>>>
 I wonder during recovery from a checkpoint whether we can estimate the
 size of the checkpoint and compare with Runtime.getRuntime().freeMemory
 ().

 If the size of checkpoint is much bigger than free memory, log warning,
 etc

 Cheers

 On Mon, Aug 10, 2015 at 9:34 AM, Dmitry Goldenberg <
 dgoldenberg...@gmail.com> wrote:

> Thanks, Cody, will try that. Unfortunately due to a reinstall I don't
> have the original checkpointing directory :(  Thanks for the clarification
> on spark.driver.memory, I'll keep testing (at 2g things seem OK for now).
>
> On Mon, Aug 10, 2015 at 12:10 PM, Cody Koeninger 
> wrote:
>
>> That looks like it's during recovery from a checkpoint, so it'd be
>> driver memory not executor memory.
>>
>> How big is the checkpoint directory that you're trying to restore
>> from?
>>
>> On Mon, Aug 10, 2015 at 10:57 AM, Dmitry Goldenberg <
>> dgoldenberg...@gmail.com> wrote:
>>
>>> We're getting the below error.  Tried increasing
>>> spark.executor.memory e.g. from 1g to 2g but the below error still 
>>> happens.
>>>
>>> Any recommendations? Something to do with specifying -Xmx in the
>>> submit job scripts?
>>>
>>> Thanks.
>>>
>>> Exception in thread "main" java.lang.OutOfMemoryError: GC overhead
>>> limit exceeded
>>> at java.util.Arrays.copyOf(Arrays.java:3332)
>>> at
>>> java.lang.AbstractStringBuilder.expandCapacity(AbstractStringBuilder.java:137)
>>> at
>>> java.lang.AbstractStringBuilder.ensureCapacityInternal(AbstractStringBuilder.java:121)
>>> at
>>> java.lang.AbstractStringBuilder.append(AbstractStringBuilder.java:421)
>>> at java.lang.StringBuilder.append(StringBuilder.java:136)
>>> at java.lang.StackTraceElement.toString(StackTraceElement.java:173)
>>> at
>>> org.apache.spark.util.Utils$$anonfun$getCallSite$1.apply(Utils.scala:1212)
>>> at
>>> org.apache.spark.util.Utils$$anonfun$getCallSite$1.apply(Utils.scala:1190)
>>> at
>>> scala.collection.IndexedSeqOptimized$class.foreach(IndexedSeqOptimized.scala:33)
>>> at
>>> scala.collection.mutable.ArrayOps$ofRef.foreach(ArrayOps.scala:108)
>>> at org.apache.spark.util.Utils$.getCallSite(Utils.scala:1190)
>>> at
>>> org.apache.spark.SparkContext$$anonfun$getCallSite$2.apply(SparkContext.scala:1441)
>>> at
>>> org.apache.spark.SparkContext$$anonfun$getCallSite$2.apply(SparkContext.scala:1441)
>>> at scala.Option.getOrElse(Option.scala:120)
>>> at org.apache.spark.SparkContext.getCallSite(SparkContext.scala:1441)
>>> at org.apache.spark.rdd.RDD.(RDD.scala:1365)
>>> at
>>> org.apache.spark.streaming.kafka.KafkaRDD.(KafkaRDD.scala:46)
>>> at
>>> org.apache.spark.streaming.kafka.DirectKafkaInputDStream$DirectKafkaInputDStreamCheckpointData$$anonfun$restore$2.apply(DirectKafkaInputDStream.scala:155)
>>> at
>>> org.apache.spark.streaming.kafka.DirectKafkaInputDStream$DirectKafkaInputDStreamCheckpointData$$anonfun$restore$2.apply(DirectKafkaInputDStream.scala:153)
>>> at
>>> scala.collection.mutable.ResizableArray$class.foreach(ResizableArray.scala:59)
>>> at scala.collection.mutable.ArrayBuffer.foreach(ArrayBuffer.scala:47)
>>> at
>>> org.apache.spark.streaming.kafka.DirectKafkaInputDStream$DirectKafkaInputDStreamCheckpointData.restore(DirectKafkaInputDStream.scala:153)
>>> at
>>> org.apache.spark.streaming.dstream.DStream.restoreCheckpointData(DStream.scala:402)
>>> at
>>> org.apache.spark.streaming.dstream.DStream$$anonfun$restoreCheckpointData$2.apply(DStream.scala:403)
>>> at
>>> org.apache.spark.streaming.dstream.DStream$$anonfun$restoreCheckpointData$2.apply(DStream.scala:403)
>>> at scala.collection.immutable.List.foreach(List.scala:318)
>>> at
>>> org.apache.spark.streaming.dstream.DStream.restoreCheckpointData(DStrea

Re: How to fix OutOfMemoryError: GC overhead limit exceeded when using Spark Streaming checkpointing

2015-08-10 Thread Dmitry Goldenberg
Well, RDD"s also contain data, don't they?

The question is, what can be so hefty in the checkpointing directory to
cause Spark driver to run out of memory?  It seems that it makes
checkpointing expensive, in terms of I/O and memory consumption.  Two
network hops -- to driver, then to workers.  Hefty file system usage, hefty
memory consumption...   What can we do to offset some of these costs?



On Mon, Aug 10, 2015 at 4:27 PM, Cody Koeninger  wrote:

> The rdd is indeed defined by mostly just the offsets / topic partitions.
>
> On Mon, Aug 10, 2015 at 3:24 PM, Dmitry Goldenberg <
> dgoldenberg...@gmail.com> wrote:
>
>> "You need to keep a certain number of rdds around for checkpointing" --
>> that seems like a hefty expense to pay in order to achieve fault
>> tolerance.  Why does Spark persist whole RDD's of data?  Shouldn't it be
>> sufficient to just persist the offsets, to know where to resume from?
>>
>> Thanks.
>>
>>
>> On Mon, Aug 10, 2015 at 1:07 PM, Cody Koeninger 
>> wrote:
>>
>>> You need to keep a certain number of rdds around for checkpointing,
>>> based on e.g. the window size.  Those would all need to be loaded at once.
>>>
>>> On Mon, Aug 10, 2015 at 11:49 AM, Dmitry Goldenberg <
>>> dgoldenberg...@gmail.com> wrote:
>>>
 Would there be a way to chunk up/batch up the contents of the
 checkpointing directories as they're being processed by Spark Streaming?
 Is it mandatory to load the whole thing in one go?

 On Mon, Aug 10, 2015 at 12:42 PM, Ted Yu  wrote:

> I wonder during recovery from a checkpoint whether we can estimate
> the size of the checkpoint and compare with Runtime.getRuntime().
> freeMemory().
>
> If the size of checkpoint is much bigger than free memory, log
> warning, etc
>
> Cheers
>
> On Mon, Aug 10, 2015 at 9:34 AM, Dmitry Goldenberg <
> dgoldenberg...@gmail.com> wrote:
>
>> Thanks, Cody, will try that. Unfortunately due to a reinstall I don't
>> have the original checkpointing directory :(  Thanks for the 
>> clarification
>> on spark.driver.memory, I'll keep testing (at 2g things seem OK for now).
>>
>> On Mon, Aug 10, 2015 at 12:10 PM, Cody Koeninger 
>> wrote:
>>
>>> That looks like it's during recovery from a checkpoint, so it'd be
>>> driver memory not executor memory.
>>>
>>> How big is the checkpoint directory that you're trying to restore
>>> from?
>>>
>>> On Mon, Aug 10, 2015 at 10:57 AM, Dmitry Goldenberg <
>>> dgoldenberg...@gmail.com> wrote:
>>>
 We're getting the below error.  Tried increasing
 spark.executor.memory e.g. from 1g to 2g but the below error still 
 happens.

 Any recommendations? Something to do with specifying -Xmx in the
 submit job scripts?

 Thanks.

 Exception in thread "main" java.lang.OutOfMemoryError: GC overhead
 limit exceeded
 at java.util.Arrays.copyOf(Arrays.java:3332)
 at
 java.lang.AbstractStringBuilder.expandCapacity(AbstractStringBuilder.java:137)
 at
 java.lang.AbstractStringBuilder.ensureCapacityInternal(AbstractStringBuilder.java:121)
 at
 java.lang.AbstractStringBuilder.append(AbstractStringBuilder.java:421)
 at java.lang.StringBuilder.append(StringBuilder.java:136)
 at java.lang.StackTraceElement.toString(StackTraceElement.java:173)
 at
 org.apache.spark.util.Utils$$anonfun$getCallSite$1.apply(Utils.scala:1212)
 at
 org.apache.spark.util.Utils$$anonfun$getCallSite$1.apply(Utils.scala:1190)
 at
 scala.collection.IndexedSeqOptimized$class.foreach(IndexedSeqOptimized.scala:33)
 at
 scala.collection.mutable.ArrayOps$ofRef.foreach(ArrayOps.scala:108)
 at org.apache.spark.util.Utils$.getCallSite(Utils.scala:1190)
 at
 org.apache.spark.SparkContext$$anonfun$getCallSite$2.apply(SparkContext.scala:1441)
 at
 org.apache.spark.SparkContext$$anonfun$getCallSite$2.apply(SparkContext.scala:1441)
 at scala.Option.getOrElse(Option.scala:120)
 at
 org.apache.spark.SparkContext.getCallSite(SparkContext.scala:1441)
 at org.apache.spark.rdd.RDD.(RDD.scala:1365)
 at
 org.apache.spark.streaming.kafka.KafkaRDD.(KafkaRDD.scala:46)
 at
 org.apache.spark.streaming.kafka.DirectKafkaInputDStream$DirectKafkaInputDStreamCheckpointData$$anonfun$restore$2.apply(DirectKafkaInputDStream.scala:155)
 at
 org.apache.spark.streaming.kafka.DirectKafkaInputDStream$DirectKafkaInputDStreamCheckpointData$$anonfun$restore$2.apply(DirectKafkaInputDStream.scala:153)
 at
 scala.collection.mutable.ResizableArray$class.foreach(ResizableArray.scala:59)
 at
 scala.collection.mutable.ArrayBuffer.foreach(ArrayBuffer.scala:47)
 at
 org.apache.spark.strea

Re: How to fix OutOfMemoryError: GC overhead limit exceeded when using Spark Streaming checkpointing

2015-08-10 Thread Cody Koeninger
No, it's not like a given KafkaRDD object contains an array of messages
that gets serialized with the object.  Its compute method generates an
iterator of messages as needed, by connecting to kafka.

I don't know what was so hefty in your checkpoint directory, because you
deleted it.  My checkpoint directories are usually pretty reasonable in
size.

How many topicpartitions did you have, and how long was your window?

On Mon, Aug 10, 2015 at 3:33 PM, Dmitry Goldenberg  wrote:

> Well, RDD"s also contain data, don't they?
>
> The question is, what can be so hefty in the checkpointing directory to
> cause Spark driver to run out of memory?  It seems that it makes
> checkpointing expensive, in terms of I/O and memory consumption.  Two
> network hops -- to driver, then to workers.  Hefty file system usage, hefty
> memory consumption...   What can we do to offset some of these costs?
>
>
>
> On Mon, Aug 10, 2015 at 4:27 PM, Cody Koeninger 
> wrote:
>
>> The rdd is indeed defined by mostly just the offsets / topic partitions.
>>
>> On Mon, Aug 10, 2015 at 3:24 PM, Dmitry Goldenberg <
>> dgoldenberg...@gmail.com> wrote:
>>
>>> "You need to keep a certain number of rdds around for checkpointing" --
>>> that seems like a hefty expense to pay in order to achieve fault
>>> tolerance.  Why does Spark persist whole RDD's of data?  Shouldn't it be
>>> sufficient to just persist the offsets, to know where to resume from?
>>>
>>> Thanks.
>>>
>>>
>>> On Mon, Aug 10, 2015 at 1:07 PM, Cody Koeninger 
>>> wrote:
>>>
 You need to keep a certain number of rdds around for checkpointing,
 based on e.g. the window size.  Those would all need to be loaded at once.

 On Mon, Aug 10, 2015 at 11:49 AM, Dmitry Goldenberg <
 dgoldenberg...@gmail.com> wrote:

> Would there be a way to chunk up/batch up the contents of the
> checkpointing directories as they're being processed by Spark Streaming?
> Is it mandatory to load the whole thing in one go?
>
> On Mon, Aug 10, 2015 at 12:42 PM, Ted Yu  wrote:
>
>> I wonder during recovery from a checkpoint whether we can estimate
>> the size of the checkpoint and compare with Runtime.getRuntime().
>> freeMemory().
>>
>> If the size of checkpoint is much bigger than free memory, log
>> warning, etc
>>
>> Cheers
>>
>> On Mon, Aug 10, 2015 at 9:34 AM, Dmitry Goldenberg <
>> dgoldenberg...@gmail.com> wrote:
>>
>>> Thanks, Cody, will try that. Unfortunately due to a reinstall I
>>> don't have the original checkpointing directory :(  Thanks for the
>>> clarification on spark.driver.memory, I'll keep testing (at 2g things 
>>> seem
>>> OK for now).
>>>
>>> On Mon, Aug 10, 2015 at 12:10 PM, Cody Koeninger >> > wrote:
>>>
 That looks like it's during recovery from a checkpoint, so it'd be
 driver memory not executor memory.

 How big is the checkpoint directory that you're trying to restore
 from?

 On Mon, Aug 10, 2015 at 10:57 AM, Dmitry Goldenberg <
 dgoldenberg...@gmail.com> wrote:

> We're getting the below error.  Tried increasing
> spark.executor.memory e.g. from 1g to 2g but the below error still 
> happens.
>
> Any recommendations? Something to do with specifying -Xmx in the
> submit job scripts?
>
> Thanks.
>
> Exception in thread "main" java.lang.OutOfMemoryError: GC overhead
> limit exceeded
> at java.util.Arrays.copyOf(Arrays.java:3332)
> at
> java.lang.AbstractStringBuilder.expandCapacity(AbstractStringBuilder.java:137)
> at
> java.lang.AbstractStringBuilder.ensureCapacityInternal(AbstractStringBuilder.java:121)
> at
> java.lang.AbstractStringBuilder.append(AbstractStringBuilder.java:421)
> at java.lang.StringBuilder.append(StringBuilder.java:136)
> at java.lang.StackTraceElement.toString(StackTraceElement.java:173)
> at
> org.apache.spark.util.Utils$$anonfun$getCallSite$1.apply(Utils.scala:1212)
> at
> org.apache.spark.util.Utils$$anonfun$getCallSite$1.apply(Utils.scala:1190)
> at
> scala.collection.IndexedSeqOptimized$class.foreach(IndexedSeqOptimized.scala:33)
> at
> scala.collection.mutable.ArrayOps$ofRef.foreach(ArrayOps.scala:108)
> at org.apache.spark.util.Utils$.getCallSite(Utils.scala:1190)
> at
> org.apache.spark.SparkContext$$anonfun$getCallSite$2.apply(SparkContext.scala:1441)
> at
> org.apache.spark.SparkContext$$anonfun$getCallSite$2.apply(SparkContext.scala:1441)
> at scala.Option.getOrElse(Option.scala:120)
> at
> org.apache.spark.SparkContext.getCallSite(SparkContext.scala:1441)
> at org.apache.spark.rdd.RDD.(RDD.scala:1365)
> at
> org.apache.spark.streaming.kafka.Kafka

Managed memory leak detected + OutOfMemoryError: Unable to acquire X bytes of memory, got 0

2016-08-03 Thread Rychnovsky, Dusan
Hi,


I have a Spark workflow that when run on a relatively small portion of data 
works fine, but when run on big data fails with strange errors. In the log 
files of failed executors I found the following errors:


Firstly


> Managed memory leak detected; size = 263403077 bytes, TID = 6524

And then a series of

> java.lang.OutOfMemoryError: Unable to acquire 241 bytes of memory, got 0

> at 
> org.apache.spark.memory.MemoryConsumer.allocatePage(MemoryConsumer.java:120)

> at 
> org.apache.spark.shuffle.sort.ShuffleExternalSorter.acquireNewPageIfNecessary(ShuffleExternalSorter.java:346)

> at 
> org.apache.spark.shuffle.sort.ShuffleExternalSorter.insertRecord(ShuffleExternalSorter.java:367)

> at 
> org.apache.spark.shuffle.sort.UnsafeShuffleWriter.insertRecordIntoSorter(UnsafeShuffleWriter.java:237)

> at 
> org.apache.spark.shuffle.sort.UnsafeShuffleWriter.write(UnsafeShuffleWriter.java:164)

> at org.apache.spark.scheduler.ShuffleMapTask.runTask(ShuffleMapTask.scala:73)

> at org.apache.spark.scheduler.ShuffleMapTask.runTask(ShuffleMapTask.scala:41)

> at org.apache.spark.scheduler.Task.run(Task.scala:89)

> at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:214)

> at 
> java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142)

> at 
> java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617)

> at java.lang.Thread.run(Thread.java:745)


The job keeps failing in the same way (I tried a few times).


What could be causing such error?

I have a feeling that I'm not providing enough context necessary to understand 
the issue. Please ask for any other information needed.


Thank you,

Dusan



Re: [GraphX] how to set memory configurations to avoid OutOfMemoryError "GC overhead limit exceeded"

2014-08-18 Thread Ankur Dave
On Mon, Aug 18, 2014 at 6:29 AM, Yifan LI  wrote:

> I am testing our application(similar to "personalised page rank" using
> Pregel, and note that each vertex property will need pretty much more space
> to store after new iteration)

[...]

But when we ran it on larger graph(e.g. LiveJouranl), it always end at the
> error "GC overhead limit exceeded", even the partitions number is increased
> to 48 from 8.


If the graph (including vertex properties) is too large to fit in memory,
you might try allowing it to spill to disk. When constructing the graph,
you can set vertexStorageLevel and edgeStorageLevel to
StorageLevel.MEMORY_AND_DISK. This should allow the algorithm to finish.

Ankur 


Re: [GraphX] how to set memory configurations to avoid OutOfMemoryError "GC overhead limit exceeded"

2014-09-03 Thread Yifan LI
Hi Ankur,

Thanks so much for your advice.

But it failed when I tried to set the storage level in constructing a graph.

val graph = GraphLoader.edgeListFile(sc, edgesFile, minEdgePartitions =
numPartitions).partitionBy(PartitionStrategy.EdgePartition2D).persist(StorageLevel.MEMORY_AND_DISK)

Error: java.lang.UnsupportedOperationException: Cannot change storage level
of an RDD after it was already assigned a level


Is there anyone could give me help?

Best,
Yifan




2014-08-18 23:52 GMT+02:00 Ankur Dave :

> On Mon, Aug 18, 2014 at 6:29 AM, Yifan LI  wrote:
>
>  I am testing our application(similar to "personalised page rank" using
>> Pregel, and note that each vertex property will need pretty much more space
>> to store after new iteration)
>
> [...]
>
> But when we ran it on larger graph(e.g. LiveJouranl), it always end at the
>> error "GC overhead limit exceeded", even the partitions number is increased
>> to 48 from 8.
>
>
> If the graph (including vertex properties) is too large to fit in memory,
> you might try allowing it to spill to disk. When constructing the graph,
> you can set vertexStorageLevel and edgeStorageLevel to
> StorageLevel.MEMORY_AND_DISK. This should allow the algorithm to finish.
>
> Ankur 
>


Re: [GraphX] how to set memory configurations to avoid OutOfMemoryError "GC overhead limit exceeded"

2014-09-03 Thread Ankur Dave
At 2014-09-03 17:58:09 +0200, Yifan LI  wrote:
> val graph = GraphLoader.edgeListFile(sc, edgesFile, minEdgePartitions = 
> numPartitions).partitionBy(PartitionStrategy.EdgePartition2D).persist(StorageLevel.MEMORY_AND_DISK)
>
> Error: java.lang.UnsupportedOperationException: Cannot change storage level
> of an RDD after it was already assigned a level

You have to pass the StorageLevel to GraphLoader.edgeListFile:

val graph = GraphLoader.edgeListFile(
  sc, edgesFile, minEdgePartitions = numPartitions,
  edgeStorageLevel = StorageLevel.MEMORY_AND_DISK,
  vertexStorageLevel = StorageLevel.MEMORY_AND_DISK)
  .partitionBy(PartitionStrategy.EdgePartition2D)

Ankur

-
To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
For additional commands, e-mail: user-h...@spark.apache.org



Re: [GraphX] how to set memory configurations to avoid OutOfMemoryError "GC overhead limit exceeded"

2014-09-05 Thread Yifan LI
Thank you, Ankur! :)

But how to assign the storage level to a new vertices RDD that mapped from
an existing vertices RDD,
e.g.
*val newVertexRDD =
graph.collectNeighborIds(EdgeDirection.Out).map{case(id:VertexId,
a:Array[VertexId]) => (id, initialHashMap(a))}*

the new one will be combined with that existing edges RDD(MEMORY_AND_DISK)
to construct a new graph.
e.g.
val newGraph = Graph(newVertexRDD, graph.edges)


BTW, the return of newVertexRDD.getStorageLevel is StorageLevel(true, true,
false, true, 1), what does it mean?

Thanks in advance!

Best,
Yifan



2014-09-03 22:42 GMT+02:00 Ankur Dave :

> At 2014-09-03 17:58:09 +0200, Yifan LI  wrote:
> > val graph = GraphLoader.edgeListFile(sc, edgesFile, minEdgePartitions =
> numPartitions).partitionBy(PartitionStrategy.EdgePartition2D).persist(StorageLevel.MEMORY_AND_DISK)
> >
> > Error: java.lang.UnsupportedOperationException: Cannot change storage
> level
> > of an RDD after it was already assigned a level
>
> You have to pass the StorageLevel to GraphLoader.edgeListFile:
>
> val graph = GraphLoader.edgeListFile(
>   sc, edgesFile, minEdgePartitions = numPartitions,
>   edgeStorageLevel = StorageLevel.MEMORY_AND_DISK,
>   vertexStorageLevel = StorageLevel.MEMORY_AND_DISK)
>   .partitionBy(PartitionStrategy.EdgePartition2D)
>
> Ankur
>


Re: [GraphX] how to set memory configurations to avoid OutOfMemoryError "GC overhead limit exceeded"

2014-09-08 Thread Ankur Dave
At 2014-09-05 12:13:18 +0200, Yifan LI  wrote:
> But how to assign the storage level to a new vertices RDD that mapped from
> an existing vertices RDD,
> e.g.
> *val newVertexRDD =
> graph.collectNeighborIds(EdgeDirection.Out).map{case(id:VertexId,
> a:Array[VertexId]) => (id, initialHashMap(a))}*
>
> the new one will be combined with that existing edges RDD(MEMORY_AND_DISK)
> to construct a new graph.
> e.g.
> val newGraph = Graph(newVertexRDD, graph.edges)

Sorry for the late reply. If you are constructing a graph from the derived 
VertexRDD, you can pass a desired storage level to the Graph constructor:

val newVertexRDD = graph.collectNeighborIds(EdgeDirection.Out).map {
  case (id: VertexId, a: Array[VertexId]) => (id, initialHashMap(a))
}
val newGraph = Graph(
  newVertexRDD,
  graph.edges,
  edgeStorageLevel = StorageLevel.MEMORY_AND_DISK,
  vertexStorageLevel = StorageLevel.MEMORY_AND_DISK)

For others reading, the reason why GraphX needs to be told the desired storage 
level is that it internally constructs temporary vertex or edge RDDs and uses 
them more than once, so it has to cache them to avoid recomputation.

> BTW, the return of newVertexRDD.getStorageLevel is StorageLevel(true, true,
> false, true, 1), what does it mean?

See the StorageLevel object [1]. This particular storage level corresponds to 
StorageLevel.MEMORY_AND_DISK.

Ankur

[1] 
https://github.com/apache/spark/blob/092e2f152fb674e7200cc8a2cb99a8fe0a9b2b33/core/src/main/scala/org/apache/spark/storage/StorageLevel.scala#L147

-
To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
For additional commands, e-mail: user-h...@spark.apache.org



Re: Managed memory leak detected + OutOfMemoryError: Unable to acquire X bytes of memory, got 0

2016-08-03 Thread Ted Yu
Are you using Spark 1.6+ ?

See SPARK-11293

On Wed, Aug 3, 2016 at 5:03 AM, Rychnovsky, Dusan <
dusan.rychnov...@firma.seznam.cz> wrote:

> Hi,
>
>
> I have a Spark workflow that when run on a relatively small portion of
> data works fine, but when run on big data fails with strange errors. In the
> log files of failed executors I found the following errors:
>
>
> Firstly
>
>
> > Managed memory leak detected; size = 263403077 bytes, TID = 6524
>
> And then a series of
>
> > java.lang.OutOfMemoryError: Unable to acquire 241 bytes of memory, got 0
>
> > at
> org.apache.spark.memory.MemoryConsumer.allocatePage(MemoryConsumer.java:120)
>
>
> > at
> org.apache.spark.shuffle.sort.ShuffleExternalSorter.acquireNewPageIfNecessary(ShuffleExternalSorter.java:346)
>
>
> > at
> org.apache.spark.shuffle.sort.ShuffleExternalSorter.insertRecord(ShuffleExternalSorter.java:367)
>
>
> > at
> org.apache.spark.shuffle.sort.UnsafeShuffleWriter.insertRecordIntoSorter(UnsafeShuffleWriter.java:237)
>
>
> > at
> org.apache.spark.shuffle.sort.UnsafeShuffleWriter.write(UnsafeShuffleWriter.java:164)
>
>
> > at
> org.apache.spark.scheduler.ShuffleMapTask.runTask(ShuffleMapTask.scala:73)
>
> > at
> org.apache.spark.scheduler.ShuffleMapTask.runTask(ShuffleMapTask.scala:41)
>
> > at org.apache.spark.scheduler.Task.run(Task.scala:89)
>
> > at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:214)
>
> > at
> java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142)
>
>
> > at
> java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617)
>
>
> > at java.lang.Thread.run(Thread.java:745)
>
>
> The job keeps failing in the same way (I tried a few times).
>
>
> What could be causing such error?
>
> I have a feeling that I'm not providing enough context necessary to
> understand the issue. Please ask for any other information needed.
>
>
> Thank you,
>
> Dusan
>
>
>


Re: Managed memory leak detected + OutOfMemoryError: Unable to acquire X bytes of memory, got 0

2016-08-03 Thread Rychnovsky, Dusan
Yes, I believe I'm using Spark 1.6.0.


> spark-submit --version
Welcome to
    __
 / __/__  ___ _/ /__
_\ \/ _ \/ _ `/ __/  '_/
   /___/ .__/\_,_/_/ /_/\_\   version 1.6.0
  /_/


I don't understand the ticket. It says "Fixed in 1.6.0". I have 1.6.0 and 
therefore should have it fixed, right? Or what do I do to fix it?


Thanks,

Dusan



From: Ted Yu 
Sent: Wednesday, August 3, 2016 3:52 PM
To: Rychnovsky, Dusan
Cc: user@spark.apache.org
Subject: Re: Managed memory leak detected + OutOfMemoryError: Unable to acquire 
X bytes of memory, got 0

Are you using Spark 1.6+ ?

See SPARK-11293

On Wed, Aug 3, 2016 at 5:03 AM, Rychnovsky, Dusan 
mailto:dusan.rychnov...@firma.seznam.cz>> 
wrote:

Hi,


I have a Spark workflow that when run on a relatively small portion of data 
works fine, but when run on big data fails with strange errors. In the log 
files of failed executors I found the following errors:


Firstly


> Managed memory leak detected; size = 263403077 bytes, TID = 6524

And then a series of

> java.lang.OutOfMemoryError: Unable to acquire 241 bytes of memory, got 0

> at 
> org.apache.spark.memory.MemoryConsumer.allocatePage(MemoryConsumer.java:120)

> at 
> org.apache.spark.shuffle.sort.ShuffleExternalSorter.acquireNewPageIfNecessary(ShuffleExternalSorter.java:346)

> at 
> org.apache.spark.shuffle.sort.ShuffleExternalSorter.insertRecord(ShuffleExternalSorter.java:367)

> at 
> org.apache.spark.shuffle.sort.UnsafeShuffleWriter.insertRecordIntoSorter(UnsafeShuffleWriter.java:237)

> at 
> org.apache.spark.shuffle.sort.UnsafeShuffleWriter.write(UnsafeShuffleWriter.java:164)

> at org.apache.spark.scheduler.ShuffleMapTask.runTask(ShuffleMapTask.scala:73)

> at org.apache.spark.scheduler.ShuffleMapTask.runTask(ShuffleMapTask.scala:41)

> at org.apache.spark.scheduler.Task.run(Task.scala:89)

> at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:214)

> at 
> java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142)

> at 
> java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617)

> at java.lang.Thread.run(Thread.java:745)


The job keeps failing in the same way (I tried a few times).


What could be causing such error?

I have a feeling that I'm not providing enough context necessary to understand 
the issue. Please ask for any other information needed.


Thank you,

Dusan




Re: Managed memory leak detected + OutOfMemoryError: Unable to acquire X bytes of memory, got 0

2016-08-03 Thread Rychnovsky, Dusan
I am confused.


I tried to look for Spark that would have this issue fixed, i.e. 
https://github.com/apache/spark/pull/13027/ merged in, but it looks like the 
patch has not been merged for 1.6.


How do I get a fixed 1.6 version?


Thanks,

Dusan


[https://avatars2.githubusercontent.com/u/545478?v=3&s=400]<https://github.com/apache/spark/pull/13027/>

[SPARK-4452][SPARK-11293][Core][BRANCH-1.6] Shuffle data structures can starve 
others on the same thread for memory by lianhuiwang · Pull Request #13027 · 
apache/spark · GitHub
What changes were proposed in this pull request? This PR is for the branch-1.6 
version of the commits PR #10024. In #9241 It implemented a mechanism to call 
spill() on those SQL operators that sup...
Read more...<https://github.com/apache/spark/pull/13027/>





From: Rychnovsky, Dusan
Sent: Wednesday, August 3, 2016 3:58 PM
To: Ted Yu
Cc: user@spark.apache.org
Subject: Re: Managed memory leak detected + OutOfMemoryError: Unable to acquire 
X bytes of memory, got 0


Yes, I believe I'm using Spark 1.6.0.


> spark-submit --version
Welcome to
    __
 / __/__  ___ _/ /__
_\ \/ _ \/ _ `/ __/  '_/
   /___/ .__/\_,_/_/ /_/\_\   version 1.6.0
  /_/


I don't understand the ticket. It says "Fixed in 1.6.0". I have 1.6.0 and 
therefore should have it fixed, right? Or what do I do to fix it?


Thanks,

Dusan



From: Ted Yu 
Sent: Wednesday, August 3, 2016 3:52 PM
To: Rychnovsky, Dusan
Cc: user@spark.apache.org
Subject: Re: Managed memory leak detected + OutOfMemoryError: Unable to acquire 
X bytes of memory, got 0

Are you using Spark 1.6+ ?

See SPARK-11293

On Wed, Aug 3, 2016 at 5:03 AM, Rychnovsky, Dusan 
mailto:dusan.rychnov...@firma.seznam.cz>> 
wrote:

Hi,


I have a Spark workflow that when run on a relatively small portion of data 
works fine, but when run on big data fails with strange errors. In the log 
files of failed executors I found the following errors:


Firstly


> Managed memory leak detected; size = 263403077 bytes, TID = 6524

And then a series of

> java.lang.OutOfMemoryError: Unable to acquire 241 bytes of memory, got 0

> at 
> org.apache.spark.memory.MemoryConsumer.allocatePage(MemoryConsumer.java:120)

> at 
> org.apache.spark.shuffle.sort.ShuffleExternalSorter.acquireNewPageIfNecessary(ShuffleExternalSorter.java:346)

> at 
> org.apache.spark.shuffle.sort.ShuffleExternalSorter.insertRecord(ShuffleExternalSorter.java:367)

> at 
> org.apache.spark.shuffle.sort.UnsafeShuffleWriter.insertRecordIntoSorter(UnsafeShuffleWriter.java:237)

> at 
> org.apache.spark.shuffle.sort.UnsafeShuffleWriter.write(UnsafeShuffleWriter.java:164)

> at org.apache.spark.scheduler.ShuffleMapTask.runTask(ShuffleMapTask.scala:73)

> at org.apache.spark.scheduler.ShuffleMapTask.runTask(ShuffleMapTask.scala:41)

> at org.apache.spark.scheduler.Task.run(Task.scala:89)

> at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:214)

> at 
> java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142)

> at 
> java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617)

> at java.lang.Thread.run(Thread.java:745)


The job keeps failing in the same way (I tried a few times).


What could be causing such error?

I have a feeling that I'm not providing enough context necessary to understand 
the issue. Please ask for any other information needed.


Thank you,

Dusan




Re: Managed memory leak detected + OutOfMemoryError: Unable to acquire X bytes of memory, got 0

2016-08-03 Thread Ted Yu
The latest QA run was no longer accessible (error 404):
https://amplab.cs.berkeley.edu/jenkins/job/SparkPullRequestBuilder/59141/consoleFull

Looking at the comments on the PR, there is not enough confidence in
pulling in the fix into 1.6

On Wed, Aug 3, 2016 at 9:05 AM, Rychnovsky, Dusan <
dusan.rychnov...@firma.seznam.cz> wrote:

> I am confused.
>
>
> I tried to look for Spark that would have this issue fixed, i.e.
> https://github.com/apache/spark/pull/13027/ merged in, but it looks like
> the patch has not been merged for 1.6.
>
>
> How do I get a fixed 1.6 version?
>
>
> Thanks,
>
> Dusan
>
>
> <https://github.com/apache/spark/pull/13027/>
> [SPARK-4452][SPARK-11293][Core][BRANCH-1.6] Shuffle data structures can
> starve others on the same thread for memory by lianhuiwang · Pull Request
> #13027 · apache/spark · GitHub
> What changes were proposed in this pull request? This PR is for the
> branch-1.6 version of the commits PR #10024. In #9241 It implemented a
> mechanism to call spill() on those SQL operators that sup...
> Read more... <https://github.com/apache/spark/pull/13027/>
>
>
>
> --
> *From:* Rychnovsky, Dusan
> *Sent:* Wednesday, August 3, 2016 3:58 PM
> *To:* Ted Yu
>
> *Cc:* user@spark.apache.org
> *Subject:* Re: Managed memory leak detected + OutOfMemoryError: Unable to
> acquire X bytes of memory, got 0
>
>
> Yes, I believe I'm using Spark 1.6.0.
>
>
> > spark-submit --version
> Welcome to
>     __
>  / __/__  ___ _/ /__
> _\ \/ _ \/ _ `/ __/  '_/
>/___/ .__/\_,_/_/ /_/\_\   version 1.6.0
>   /_/
>
> I don't understand the ticket. It says "Fixed in 1.6.0". I have 1.6.0 and
> therefore should have it fixed, right? Or what do I do to fix it?
>
>
> Thanks,
>
> Dusan
>
>
> --
> *From:* Ted Yu 
> *Sent:* Wednesday, August 3, 2016 3:52 PM
> *To:* Rychnovsky, Dusan
> *Cc:* user@spark.apache.org
> *Subject:* Re: Managed memory leak detected + OutOfMemoryError: Unable to
> acquire X bytes of memory, got 0
>
> Are you using Spark 1.6+ ?
>
> See SPARK-11293
>
> On Wed, Aug 3, 2016 at 5:03 AM, Rychnovsky, Dusan <
> dusan.rychnov...@firma.seznam.cz> wrote:
>
>> Hi,
>>
>>
>> I have a Spark workflow that when run on a relatively small portion of
>> data works fine, but when run on big data fails with strange errors. In the
>> log files of failed executors I found the following errors:
>>
>>
>> Firstly
>>
>>
>> > Managed memory leak detected; size = 263403077 bytes, TID = 6524
>>
>> And then a series of
>>
>> > java.lang.OutOfMemoryError: Unable to acquire 241 bytes of memory, got
>> 0
>>
>> > at
>> org.apache.spark.memory.MemoryConsumer.allocatePage(MemoryConsumer.java:120)
>>
>>
>> > at
>> org.apache.spark.shuffle.sort.ShuffleExternalSorter.acquireNewPageIfNecessary(ShuffleExternalSorter.java:346)
>>
>>
>> > at
>> org.apache.spark.shuffle.sort.ShuffleExternalSorter.insertRecord(ShuffleExternalSorter.java:367)
>>
>>
>> > at
>> org.apache.spark.shuffle.sort.UnsafeShuffleWriter.insertRecordIntoSorter(UnsafeShuffleWriter.java:237)
>>
>>
>> > at
>> org.apache.spark.shuffle.sort.UnsafeShuffleWriter.write(UnsafeShuffleWriter.java:164)
>>
>>
>> > at
>> org.apache.spark.scheduler.ShuffleMapTask.runTask(ShuffleMapTask.scala:73)
>>
>> > at
>> org.apache.spark.scheduler.ShuffleMapTask.runTask(ShuffleMapTask.scala:41)
>>
>> > at org.apache.spark.scheduler.Task.run(Task.scala:89)
>>
>> > at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:214)
>>
>> > at
>> java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142)
>>
>>
>> > at
>> java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617)
>>
>>
>> > at java.lang.Thread.run(Thread.java:745)
>>
>>
>> The job keeps failing in the same way (I tried a few times).
>>
>>
>> What could be causing such error?
>>
>> I have a feeling that I'm not providing enough context necessary to
>> understand the issue. Please ask for any other information needed.
>>
>>
>> Thank you,
>>
>> Dusan
>>
>>
>>
>


Re: Managed memory leak detected + OutOfMemoryError: Unable to acquire X bytes of memory, got 0

2016-08-03 Thread Rychnovsky, Dusan
OK, thank you. What do you suggest I do to get rid of the error?



From: Ted Yu 
Sent: Wednesday, August 3, 2016 6:10 PM
To: Rychnovsky, Dusan
Cc: user@spark.apache.org
Subject: Re: Managed memory leak detected + OutOfMemoryError: Unable to acquire 
X bytes of memory, got 0

The latest QA run was no longer accessible (error 404):
https://amplab.cs.berkeley.edu/jenkins/job/SparkPullRequestBuilder/59141/consoleFull

Looking at the comments on the PR, there is not enough confidence in pulling in 
the fix into 1.6

On Wed, Aug 3, 2016 at 9:05 AM, Rychnovsky, Dusan 
mailto:dusan.rychnov...@firma.seznam.cz>> 
wrote:

I am confused.


I tried to look for Spark that would have this issue fixed, i.e. 
https://github.com/apache/spark/pull/13027/ merged in, but it looks like the 
patch has not been merged for 1.6.


How do I get a fixed 1.6 version?


Thanks,

Dusan


[https://avatars2.githubusercontent.com/u/545478?v=3&s=400]<https://github.com/apache/spark/pull/13027/>

[SPARK-4452][SPARK-11293][Core][BRANCH-1.6] Shuffle data structures can starve 
others on the same thread for memory by lianhuiwang · Pull Request #13027 · 
apache/spark · GitHub
What changes were proposed in this pull request? This PR is for the branch-1.6 
version of the commits PR #10024. In #9241 It implemented a mechanism to call 
spill() on those SQL operators that sup...
Read more...<https://github.com/apache/spark/pull/13027/>





From: Rychnovsky, Dusan
Sent: Wednesday, August 3, 2016 3:58 PM
To: Ted Yu

Cc: user@spark.apache.org<mailto:user@spark.apache.org>
Subject: Re: Managed memory leak detected + OutOfMemoryError: Unable to acquire 
X bytes of memory, got 0


Yes, I believe I'm using Spark 1.6.0.


> spark-submit --version
Welcome to
    __
 / __/__  ___ _/ /__
_\ \/ _ \/ _ `/ __/  '_/
   /___/ .__/\_,_/_/ /_/\_\   version 1.6.0
  /_/


I don't understand the ticket. It says "Fixed in 1.6.0". I have 1.6.0 and 
therefore should have it fixed, right? Or what do I do to fix it?


Thanks,

Dusan



From: Ted Yu mailto:yuzhih...@gmail.com>>
Sent: Wednesday, August 3, 2016 3:52 PM
To: Rychnovsky, Dusan
Cc: user@spark.apache.org<mailto:user@spark.apache.org>
Subject: Re: Managed memory leak detected + OutOfMemoryError: Unable to acquire 
X bytes of memory, got 0

Are you using Spark 1.6+ ?

See SPARK-11293

On Wed, Aug 3, 2016 at 5:03 AM, Rychnovsky, Dusan 
mailto:dusan.rychnov...@firma.seznam.cz>> 
wrote:

Hi,


I have a Spark workflow that when run on a relatively small portion of data 
works fine, but when run on big data fails with strange errors. In the log 
files of failed executors I found the following errors:


Firstly


> Managed memory leak detected; size = 263403077 bytes, TID = 6524

And then a series of

> java.lang.OutOfMemoryError: Unable to acquire 241 bytes of memory, got 0

> at 
> org.apache.spark.memory.MemoryConsumer.allocatePage(MemoryConsumer.java:120)

> at 
> org.apache.spark.shuffle.sort.ShuffleExternalSorter.acquireNewPageIfNecessary(ShuffleExternalSorter.java:346)

> at 
> org.apache.spark.shuffle.sort.ShuffleExternalSorter.insertRecord(ShuffleExternalSorter.java:367)

> at 
> org.apache.spark.shuffle.sort.UnsafeShuffleWriter.insertRecordIntoSorter(UnsafeShuffleWriter.java:237)

> at 
> org.apache.spark.shuffle.sort.UnsafeShuffleWriter.write(UnsafeShuffleWriter.java:164)

> at org.apache.spark.scheduler.ShuffleMapTask.runTask(ShuffleMapTask.scala:73)

> at org.apache.spark.scheduler.ShuffleMapTask.runTask(ShuffleMapTask.scala:41)

> at org.apache.spark.scheduler.Task.run(Task.scala:89)

> at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:214)

> at 
> java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142)

> at 
> java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617)

> at java.lang.Thread.run(Thread.java:745)


The job keeps failing in the same way (I tried a few times).


What could be causing such error?

I have a feeling that I'm not providing enough context necessary to understand 
the issue. Please ask for any other information needed.


Thank you,

Dusan





Re: Managed memory leak detected + OutOfMemoryError: Unable to acquire X bytes of memory, got 0

2016-08-03 Thread Ted Yu
Spark 2.0 has been released.

Mind giving it a try :-) ?

On Wed, Aug 3, 2016 at 9:11 AM, Rychnovsky, Dusan <
dusan.rychnov...@firma.seznam.cz> wrote:

> OK, thank you. What do you suggest I do to get rid of the error?
>
>
> --
> *From:* Ted Yu 
> *Sent:* Wednesday, August 3, 2016 6:10 PM
> *To:* Rychnovsky, Dusan
> *Cc:* user@spark.apache.org
> *Subject:* Re: Managed memory leak detected + OutOfMemoryError: Unable to
> acquire X bytes of memory, got 0
>
> The latest QA run was no longer accessible (error 404):
>
> https://amplab.cs.berkeley.edu/jenkins/job/SparkPullRequestBuilder/59141/consoleFull
>
> Looking at the comments on the PR, there is not enough confidence in
> pulling in the fix into 1.6
>
> On Wed, Aug 3, 2016 at 9:05 AM, Rychnovsky, Dusan <
> dusan.rychnov...@firma.seznam.cz> wrote:
>
>> I am confused.
>>
>>
>> I tried to look for Spark that would have this issue fixed, i.e.
>> https://github.com/apache/spark/pull/13027/ merged in, but it looks like
>> the patch has not been merged for 1.6.
>>
>>
>> How do I get a fixed 1.6 version?
>>
>>
>> Thanks,
>>
>> Dusan
>>
>>
>> <https://github.com/apache/spark/pull/13027/>
>> [SPARK-4452][SPARK-11293][Core][BRANCH-1.6] Shuffle data structures can
>> starve others on the same thread for memory by lianhuiwang · Pull Request
>> #13027 · apache/spark · GitHub
>> What changes were proposed in this pull request? This PR is for the
>> branch-1.6 version of the commits PR #10024. In #9241 It implemented a
>> mechanism to call spill() on those SQL operators that sup...
>> Read more... <https://github.com/apache/spark/pull/13027/>
>>
>>
>>
>> --
>> *From:* Rychnovsky, Dusan
>> *Sent:* Wednesday, August 3, 2016 3:58 PM
>> *To:* Ted Yu
>>
>> *Cc:* user@spark.apache.org
>> *Subject:* Re: Managed memory leak detected + OutOfMemoryError: Unable
>> to acquire X bytes of memory, got 0
>>
>>
>> Yes, I believe I'm using Spark 1.6.0.
>>
>>
>> > spark-submit --version
>> Welcome to
>>     __
>>  / __/__  ___ _/ /__
>> _\ \/ _ \/ _ `/ __/  '_/
>>/___/ .__/\_,_/_/ /_/\_\   version 1.6.0
>>   /_/
>>
>> I don't understand the ticket. It says "Fixed in 1.6.0". I have 1.6.0 and
>> therefore should have it fixed, right? Or what do I do to fix it?
>>
>>
>> Thanks,
>>
>> Dusan
>>
>>
>> --
>> *From:* Ted Yu 
>> *Sent:* Wednesday, August 3, 2016 3:52 PM
>> *To:* Rychnovsky, Dusan
>> *Cc:* user@spark.apache.org
>> *Subject:* Re: Managed memory leak detected + OutOfMemoryError: Unable
>> to acquire X bytes of memory, got 0
>>
>> Are you using Spark 1.6+ ?
>>
>> See SPARK-11293
>>
>> On Wed, Aug 3, 2016 at 5:03 AM, Rychnovsky, Dusan <
>> dusan.rychnov...@firma.seznam.cz> wrote:
>>
>>> Hi,
>>>
>>>
>>> I have a Spark workflow that when run on a relatively small portion of
>>> data works fine, but when run on big data fails with strange errors. In the
>>> log files of failed executors I found the following errors:
>>>
>>>
>>> Firstly
>>>
>>>
>>> > Managed memory leak detected; size = 263403077 bytes, TID = 6524
>>>
>>> And then a series of
>>>
>>> > java.lang.OutOfMemoryError: Unable to acquire 241 bytes of memory, got
>>> 0
>>>
>>> > at
>>> org.apache.spark.memory.MemoryConsumer.allocatePage(MemoryConsumer.java:120)
>>>
>>>
>>> > at
>>> org.apache.spark.shuffle.sort.ShuffleExternalSorter.acquireNewPageIfNecessary(ShuffleExternalSorter.java:346)
>>>
>>>
>>> > at
>>> org.apache.spark.shuffle.sort.ShuffleExternalSorter.insertRecord(ShuffleExternalSorter.java:367)
>>>
>>>
>>> > at
>>> org.apache.spark.shuffle.sort.UnsafeShuffleWriter.insertRecordIntoSorter(UnsafeShuffleWriter.java:237)
>>>
>>>
>>> > at
>>> org.apache.spark.shuffle.sort.UnsafeShuffleWriter.write(UnsafeShuffleWriter.java:164)
>>>
>>>
>>> > at
>>> org.apache.spark.scheduler.ShuffleMapTask.runTask(ShuffleMapTask.scala:73)
>>>
>>> > at
>>> org.apache.spark.scheduler.ShuffleMapTask.runTask(ShuffleMapTask.scala:41)
>>>
>>> > at org.apache.spark.scheduler.Task.run(Task.scala:89)
>>>
>>> > at
>>> org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:214)
>>>
>>> > at
>>> java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142)
>>>
>>>
>>> > at
>>> java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617)
>>>
>>>
>>> > at java.lang.Thread.run(Thread.java:745)
>>>
>>>
>>> The job keeps failing in the same way (I tried a few times).
>>>
>>>
>>> What could be causing such error?
>>>
>>> I have a feeling that I'm not providing enough context necessary to
>>> understand the issue. Please ask for any other information needed.
>>>
>>>
>>> Thank you,
>>>
>>> Dusan
>>>
>>>
>>>
>>
>


<    1   2