Re: Job keeps aborting because of org.apache.spark.shuffle.FetchFailedException: Failed to connect to server/ip:39232

2017-07-28 Thread jeff saremi
asking this on a tangent:

Is there anyway for the shuffle data to be replicated to more than one server?

thanks


From: jeff saremi 
Sent: Friday, July 28, 2017 4:38:08 PM
To: Juan Rodríguez Hortalá
Cc: user@spark.apache.org
Subject: Re: Job keeps aborting because of 
org.apache.spark.shuffle.FetchFailedException: Failed to connect to 
server/ip:39232


Thanks Juan for taking the time

Here's more info:
- This is running on Yarn in Master mode

- See config params below

- This is a corporate environment. In general nodes should not be added or 
removed that often to the cluster. Even if that is the case I would expect that 
to be one or 2 servers. In my case I get hundreds of these errors before the 
job fails.


  --master yarn-cluster ^
  --driver-memory 96G ^
  --executor-memory 48G ^
  --num-executors 150 ^
  --executor-cores 8 ^
  --driver-cores 8 ^
  --conf spark.yarn.executor.memoryOverhead=36000 ^
  --conf spark.shuffle.service.enabled=true ^
  --conf spark.yarn.submit.waitAppCompletion=false ^
  --conf spark.yarn.submit.file.replication=64 ^
  --conf spark.yarn.maxAppAttempts=1 ^
  --conf spark.speculation=true ^
  --conf spark.speculation.quantile=0.9 ^
  --conf spark.yarn.executor.nodeLabelExpression="prod" ^
  --conf spark.yarn.am.nodeLabelExpression="prod" ^
  --conf spark.stage.maxConsecutiveAttempts=1000 ^
  --conf spark.yarn.scheduler.heartbeat.interval-ms=15000 ^
  --conf spark.yarn.launchContainer.count.simultaneously=50 ^
  --conf spark.driver.maxResultSize=16G ^
  --conf spark.network.timeout=1000s ^



From: Juan Rodríguez Hortalá 
Sent: Friday, July 28, 2017 4:20:40 PM
To: jeff saremi
Cc: user@spark.apache.org
Subject: Re: Job keeps aborting because of 
org.apache.spark.shuffle.FetchFailedException: Failed to connect to 
server/ip:39232

Hi Jeff,

Can you provide more information about how are you running your job? In 
particular:
  - which cluster manager are you using? It is YARN, Mesos, Spark Standalone?
  - with configuration options are you using to submit the job? In particular 
are you using dynamic allocation or external shuffle? You should be able to see 
this in the Environment tab of the Spark UI, looking for 
spark.dynamicAllocation.enabled and spark.shuffle.service.enabled.
  - in which environment are you running the jobs? Is this an on premise 
cluster or some cloud provider? Are you adding or removing nodes from the 
cluster during the job execution?

FetchFailedException errors happen during execution when an executor is not 
able to read the shuffle blocks for a previous stage that are served by other 
executor. That might happen if the executor that has to serve the files dies 
and internal shuffle is used, although there can be other reasons like network 
errors. If you are using dynamic allocation then you should also enable 
external shuffle service so shuffle blocks can be served by the node manager 
after the executor that created the blocks is terminated, see 
https://spark.apache.org/docs/latest/job-scheduling.html#dynamic-resource-allocation
 for more details.



On Fri, Jul 28, 2017 at 9:57 AM, jeff saremi 
> wrote:

We have a not too complex and not too large spark job that keeps dying with 
this error

I have researched it and I have not seen any convincing explanation on why

I am not using a shuffle service. Which server is the one that is refusing the 
connection?
If I go to the server that is being reported in the error message, I see a lot 
of these errors towards the end:


java.io.FileNotFoundException: 
D:\data\yarnnm\local\usercache\hadoop\appcache\application_1500970459432_1024\blockmgr-7f3a1abc-2b8b-4e51-9072-8c12495ec563\0e\shuffle_0_4107_0.index

(may or may not be related to the problem at all)


and if you examine further on this machine there are fetchfailedexceptions 
resulting from other machines and so on and so forth

This is Spark 1.6 on Yarn-master

Could anyone provide some insight or solution to this?

thanks




Re: Job keeps aborting because of org.apache.spark.shuffle.FetchFailedException: Failed to connect to server/ip:39232

2017-07-28 Thread jeff saremi
Thanks Juan for taking the time

Here's more info:
- This is running on Yarn in Master mode

- See config params below

- This is a corporate environment. In general nodes should not be added or 
removed that often to the cluster. Even if that is the case I would expect that 
to be one or 2 servers. In my case I get hundreds of these errors before the 
job fails.


  --master yarn-cluster ^
  --driver-memory 96G ^
  --executor-memory 48G ^
  --num-executors 150 ^
  --executor-cores 8 ^
  --driver-cores 8 ^
  --conf spark.yarn.executor.memoryOverhead=36000 ^
  --conf spark.shuffle.service.enabled=true ^
  --conf spark.yarn.submit.waitAppCompletion=false ^
  --conf spark.yarn.submit.file.replication=64 ^
  --conf spark.yarn.maxAppAttempts=1 ^
  --conf spark.speculation=true ^
  --conf spark.speculation.quantile=0.9 ^
  --conf spark.yarn.executor.nodeLabelExpression="prod" ^
  --conf spark.yarn.am.nodeLabelExpression="prod" ^
  --conf spark.stage.maxConsecutiveAttempts=1000 ^
  --conf spark.yarn.scheduler.heartbeat.interval-ms=15000 ^
  --conf spark.yarn.launchContainer.count.simultaneously=50 ^
  --conf spark.driver.maxResultSize=16G ^
  --conf spark.network.timeout=1000s ^



From: Juan Rodríguez Hortalá 
Sent: Friday, July 28, 2017 4:20:40 PM
To: jeff saremi
Cc: user@spark.apache.org
Subject: Re: Job keeps aborting because of 
org.apache.spark.shuffle.FetchFailedException: Failed to connect to 
server/ip:39232

Hi Jeff,

Can you provide more information about how are you running your job? In 
particular:
  - which cluster manager are you using? It is YARN, Mesos, Spark Standalone?
  - with configuration options are you using to submit the job? In particular 
are you using dynamic allocation or external shuffle? You should be able to see 
this in the Environment tab of the Spark UI, looking for 
spark.dynamicAllocation.enabled and spark.shuffle.service.enabled.
  - in which environment are you running the jobs? Is this an on premise 
cluster or some cloud provider? Are you adding or removing nodes from the 
cluster during the job execution?

FetchFailedException errors happen during execution when an executor is not 
able to read the shuffle blocks for a previous stage that are served by other 
executor. That might happen if the executor that has to serve the files dies 
and internal shuffle is used, although there can be other reasons like network 
errors. If you are using dynamic allocation then you should also enable 
external shuffle service so shuffle blocks can be served by the node manager 
after the executor that created the blocks is terminated, see 
https://spark.apache.org/docs/latest/job-scheduling.html#dynamic-resource-allocation
 for more details.



On Fri, Jul 28, 2017 at 9:57 AM, jeff saremi 
> wrote:

We have a not too complex and not too large spark job that keeps dying with 
this error

I have researched it and I have not seen any convincing explanation on why

I am not using a shuffle service. Which server is the one that is refusing the 
connection?
If I go to the server that is being reported in the error message, I see a lot 
of these errors towards the end:


java.io.FileNotFoundException: 
D:\data\yarnnm\local\usercache\hadoop\appcache\application_1500970459432_1024\blockmgr-7f3a1abc-2b8b-4e51-9072-8c12495ec563\0e\shuffle_0_4107_0.index

(may or may not be related to the problem at all)


and if you examine further on this machine there are fetchfailedexceptions 
resulting from other machines and so on and so forth

This is Spark 1.6 on Yarn-master

Could anyone provide some insight or solution to this?

thanks




Re: Job keeps aborting because of org.apache.spark.shuffle.FetchFailedException: Failed to connect to server/ip:39232

2017-07-28 Thread Juan Rodríguez Hortalá
Hi Jeff,

Can you provide more information about how are you running your job? In
particular:
  - which cluster manager are you using? It is YARN, Mesos, Spark
Standalone?
  - with configuration options are you using to submit the job? In
particular are you using dynamic allocation or external shuffle? You should
be able to see this in the Environment tab of the Spark UI, looking
for spark.dynamicAllocation.enabled and spark.shuffle.service.enabled.
  - in which environment are you running the jobs? Is this an on premise
cluster or some cloud provider? Are you adding or removing nodes from the
cluster during the job execution?

FetchFailedException errors happen during execution when an executor is not
able to read the shuffle blocks for a previous stage that are served by
other executor. That might happen if the executor that has to serve the
files dies and internal shuffle is used, although there can be other
reasons like network errors. If you are using dynamic allocation then you
should also enable external shuffle service so shuffle blocks can be served
by the node manager after the executor that created the blocks is
terminated, see
https://spark.apache.org/docs/latest/job-scheduling.html#dynamic-resource-allocation
for more details.



On Fri, Jul 28, 2017 at 9:57 AM, jeff saremi  wrote:

> We have a not too complex and not too large spark job that keeps dying
> with this error
>
> I have researched it and I have not seen any convincing explanation on why
>
> I am not using a shuffle service. Which server is the one that is refusing
> the connection?
> If I go to the server that is being reported in the error message, I see a
> lot of these errors towards the end:
>
> java.io.FileNotFoundException: 
> D:\data\yarnnm\local\usercache\hadoop\appcache\application_1500970459432_1024\blockmgr-7f3a1abc-2b8b-4e51-9072-8c12495ec563\0e\shuffle_0_4107_0.index
>
> (may or may not be related to the problem at all)
>
> and if you examine further on this machine there are fetchfailedexceptions
> resulting from other machines and so on and so forth
>
>
> This is Spark 1.6 on Yarn-master
>
>
> Could anyone provide some insight or solution to this?
>
> thanks
>
>
>


Re: [SPARK STRUCTURED STREAMING]: Alternatives to using Foreach sink in pyspark

2017-07-28 Thread Priyank Shrivastava
Also, in your example doesn't the tempview need to be accessed using the
same sparkSession on the scala side?  Since I am not using a notebook, how
can I get access to the same sparksession in scala.

On Fri, Jul 28, 2017 at 3:17 PM, Priyank Shrivastava  wrote:

> Thanks Burak.
>
> In a streaming context would I need to do any state management for the
> temp views? for example across sliding windows.
>
> Priyank
>
> On Fri, Jul 28, 2017 at 3:13 PM, Burak Yavuz  wrote:
>
>> Hi Priyank,
>>
>> You may register them as temporary tables to use across language
>> boundaries.
>>
>> Python:
>> df = spark.readStream...
>> # Python logic
>> df.createOrReplaceTempView("tmp1")
>>
>> Scala:
>> val df = spark.table("tmp1")
>> df.writeStream
>>   .foreach(...)
>>
>>
>> On Fri, Jul 28, 2017 at 3:06 PM, Priyank Shrivastava <
>> priy...@asperasoft.com> wrote:
>>
>>> TD,
>>>
>>> For a hybrid python-scala approach, what's the recommended way of
>>> handing off a dataframe from python to scala.  I would like to know
>>> especially in a streaming context.
>>>
>>> I am not using notebooks/databricks.  We are running it on our own spark
>>> 2.1 cluster.
>>>
>>> Priyank
>>>
>>> On Wed, Jul 26, 2017 at 12:49 PM, Tathagata Das <
>>> tathagata.das1...@gmail.com> wrote:
>>>
 We see that all the time. For example, in SQL, people can write their
 user-defined function in Scala/Java and use it from SQL/python/anywhere.
 That is the recommended way to get the best combo of performance and
 ease-of-use from non-jvm languages.

 On Wed, Jul 26, 2017 at 11:49 AM, Priyank Shrivastava <
 priy...@asperasoft.com> wrote:

> Thanks TD.  I am going to try the python-scala hybrid approach by
> using scala only for custom redis sink and python for the rest of the app
> .  I understand it might not be as efficient as purely writing the app in
> scala but unfortunately I am constrained on scala resources.  Have you 
> come
> across other use cases where people have resided to such python-scala
> hybrid approach?
>
> Regards,
> Priyank
>
>
>
> On Wed, Jul 26, 2017 at 1:46 AM, Tathagata Das <
> tathagata.das1...@gmail.com> wrote:
>
>> Hello Priyank
>>
>> Writing something purely in Scale/Java would be the most efficient.
>> Even if we expose python APIs that allow writing custom sinks in pure
>> Python, it wont be as efficient as Scala/Java foreach as the data would
>> have to go through JVM / PVM boundary which has significant overheads. So
>> Scala/Java foreach is always going to be the best option.
>>
>> TD
>>
>> On Tue, Jul 25, 2017 at 6:05 PM, Priyank Shrivastava <
>> priy...@asperasoft.com> wrote:
>>
>>> I am trying to write key-values to redis using a DataStreamWriter
>>> object using pyspark structured streaming APIs. I am using Spark 2.2
>>>
>>> Since the Foreach Sink is not supported for python; here
>>> ,
>>> I am trying to find out some alternatives.
>>>
>>> One alternative is to write a separate Scala module only to push
>>> data into redis using foreach; ForeachWriter
>>> 
>>>  is
>>> supported in Scala. BUT this doesn't seem like an efficient approach and
>>> adds deployment overhead because now I will have to support Scala in my 
>>> app.
>>>
>>> Another approach is obviously to use Scala instead of python, which
>>> is fine but I want to make sure that I absolutely cannot use python for
>>> this problem before I take this path.
>>>
>>> Would appreciate some feedback and alternative design approaches for
>>> this problem.
>>>
>>> Thanks.
>>>
>>>
>>>
>>>
>>
>

>>>
>>
>


Re: [SPARK STRUCTURED STREAMING]: Alternatives to using Foreach sink in pyspark

2017-07-28 Thread Priyank Shrivastava
Thanks Burak.

In a streaming context would I need to do any state management for the temp
views? for example across sliding windows.

Priyank

On Fri, Jul 28, 2017 at 3:13 PM, Burak Yavuz  wrote:

> Hi Priyank,
>
> You may register them as temporary tables to use across language
> boundaries.
>
> Python:
> df = spark.readStream...
> # Python logic
> df.createOrReplaceTempView("tmp1")
>
> Scala:
> val df = spark.table("tmp1")
> df.writeStream
>   .foreach(...)
>
>
> On Fri, Jul 28, 2017 at 3:06 PM, Priyank Shrivastava <
> priy...@asperasoft.com> wrote:
>
>> TD,
>>
>> For a hybrid python-scala approach, what's the recommended way of handing
>> off a dataframe from python to scala.  I would like to know especially in a
>> streaming context.
>>
>> I am not using notebooks/databricks.  We are running it on our own spark
>> 2.1 cluster.
>>
>> Priyank
>>
>> On Wed, Jul 26, 2017 at 12:49 PM, Tathagata Das <
>> tathagata.das1...@gmail.com> wrote:
>>
>>> We see that all the time. For example, in SQL, people can write their
>>> user-defined function in Scala/Java and use it from SQL/python/anywhere.
>>> That is the recommended way to get the best combo of performance and
>>> ease-of-use from non-jvm languages.
>>>
>>> On Wed, Jul 26, 2017 at 11:49 AM, Priyank Shrivastava <
>>> priy...@asperasoft.com> wrote:
>>>
 Thanks TD.  I am going to try the python-scala hybrid approach by using
 scala only for custom redis sink and python for the rest of the app .  I
 understand it might not be as efficient as purely writing the app in scala
 but unfortunately I am constrained on scala resources.  Have you come
 across other use cases where people have resided to such python-scala
 hybrid approach?

 Regards,
 Priyank



 On Wed, Jul 26, 2017 at 1:46 AM, Tathagata Das <
 tathagata.das1...@gmail.com> wrote:

> Hello Priyank
>
> Writing something purely in Scale/Java would be the most efficient.
> Even if we expose python APIs that allow writing custom sinks in pure
> Python, it wont be as efficient as Scala/Java foreach as the data would
> have to go through JVM / PVM boundary which has significant overheads. So
> Scala/Java foreach is always going to be the best option.
>
> TD
>
> On Tue, Jul 25, 2017 at 6:05 PM, Priyank Shrivastava <
> priy...@asperasoft.com> wrote:
>
>> I am trying to write key-values to redis using a DataStreamWriter
>> object using pyspark structured streaming APIs. I am using Spark 2.2
>>
>> Since the Foreach Sink is not supported for python; here
>> ,
>> I am trying to find out some alternatives.
>>
>> One alternative is to write a separate Scala module only to push data
>> into redis using foreach; ForeachWriter
>> 
>>  is
>> supported in Scala. BUT this doesn't seem like an efficient approach and
>> adds deployment overhead because now I will have to support Scala in my 
>> app.
>>
>> Another approach is obviously to use Scala instead of python, which
>> is fine but I want to make sure that I absolutely cannot use python for
>> this problem before I take this path.
>>
>> Would appreciate some feedback and alternative design approaches for
>> this problem.
>>
>> Thanks.
>>
>>
>>
>>
>

>>>
>>
>


Re: [SPARK STRUCTURED STREAMING]: Alternatives to using Foreach sink in pyspark

2017-07-28 Thread Burak Yavuz
Hi Priyank,

You may register them as temporary tables to use across language boundaries.

Python:
df = spark.readStream...
# Python logic
df.createOrReplaceTempView("tmp1")

Scala:
val df = spark.table("tmp1")
df.writeStream
  .foreach(...)


On Fri, Jul 28, 2017 at 3:06 PM, Priyank Shrivastava  wrote:

> TD,
>
> For a hybrid python-scala approach, what's the recommended way of handing
> off a dataframe from python to scala.  I would like to know especially in a
> streaming context.
>
> I am not using notebooks/databricks.  We are running it on our own spark
> 2.1 cluster.
>
> Priyank
>
> On Wed, Jul 26, 2017 at 12:49 PM, Tathagata Das <
> tathagata.das1...@gmail.com> wrote:
>
>> We see that all the time. For example, in SQL, people can write their
>> user-defined function in Scala/Java and use it from SQL/python/anywhere.
>> That is the recommended way to get the best combo of performance and
>> ease-of-use from non-jvm languages.
>>
>> On Wed, Jul 26, 2017 at 11:49 AM, Priyank Shrivastava <
>> priy...@asperasoft.com> wrote:
>>
>>> Thanks TD.  I am going to try the python-scala hybrid approach by using
>>> scala only for custom redis sink and python for the rest of the app .  I
>>> understand it might not be as efficient as purely writing the app in scala
>>> but unfortunately I am constrained on scala resources.  Have you come
>>> across other use cases where people have resided to such python-scala
>>> hybrid approach?
>>>
>>> Regards,
>>> Priyank
>>>
>>>
>>>
>>> On Wed, Jul 26, 2017 at 1:46 AM, Tathagata Das <
>>> tathagata.das1...@gmail.com> wrote:
>>>
 Hello Priyank

 Writing something purely in Scale/Java would be the most efficient.
 Even if we expose python APIs that allow writing custom sinks in pure
 Python, it wont be as efficient as Scala/Java foreach as the data would
 have to go through JVM / PVM boundary which has significant overheads. So
 Scala/Java foreach is always going to be the best option.

 TD

 On Tue, Jul 25, 2017 at 6:05 PM, Priyank Shrivastava <
 priy...@asperasoft.com> wrote:

> I am trying to write key-values to redis using a DataStreamWriter
> object using pyspark structured streaming APIs. I am using Spark 2.2
>
> Since the Foreach Sink is not supported for python; here
> ,
> I am trying to find out some alternatives.
>
> One alternative is to write a separate Scala module only to push data
> into redis using foreach; ForeachWriter
> 
>  is
> supported in Scala. BUT this doesn't seem like an efficient approach and
> adds deployment overhead because now I will have to support Scala in my 
> app.
>
> Another approach is obviously to use Scala instead of python, which is
> fine but I want to make sure that I absolutely cannot use python for this
> problem before I take this path.
>
> Would appreciate some feedback and alternative design approaches for
> this problem.
>
> Thanks.
>
>
>
>

>>>
>>
>


Re: [SPARK STRUCTURED STREAMING]: Alternatives to using Foreach sink in pyspark

2017-07-28 Thread Priyank Shrivastava
TD,

For a hybrid python-scala approach, what's the recommended way of handing
off a dataframe from python to scala.  I would like to know especially in a
streaming context.

I am not using notebooks/databricks.  We are running it on our own spark
2.1 cluster.

Priyank

On Wed, Jul 26, 2017 at 12:49 PM, Tathagata Das  wrote:

> We see that all the time. For example, in SQL, people can write their
> user-defined function in Scala/Java and use it from SQL/python/anywhere.
> That is the recommended way to get the best combo of performance and
> ease-of-use from non-jvm languages.
>
> On Wed, Jul 26, 2017 at 11:49 AM, Priyank Shrivastava <
> priy...@asperasoft.com> wrote:
>
>> Thanks TD.  I am going to try the python-scala hybrid approach by using
>> scala only for custom redis sink and python for the rest of the app .  I
>> understand it might not be as efficient as purely writing the app in scala
>> but unfortunately I am constrained on scala resources.  Have you come
>> across other use cases where people have resided to such python-scala
>> hybrid approach?
>>
>> Regards,
>> Priyank
>>
>>
>>
>> On Wed, Jul 26, 2017 at 1:46 AM, Tathagata Das <
>> tathagata.das1...@gmail.com> wrote:
>>
>>> Hello Priyank
>>>
>>> Writing something purely in Scale/Java would be the most efficient. Even
>>> if we expose python APIs that allow writing custom sinks in pure Python, it
>>> wont be as efficient as Scala/Java foreach as the data would have to go
>>> through JVM / PVM boundary which has significant overheads. So Scala/Java
>>> foreach is always going to be the best option.
>>>
>>> TD
>>>
>>> On Tue, Jul 25, 2017 at 6:05 PM, Priyank Shrivastava <
>>> priy...@asperasoft.com> wrote:
>>>
 I am trying to write key-values to redis using a DataStreamWriter
 object using pyspark structured streaming APIs. I am using Spark 2.2

 Since the Foreach Sink is not supported for python; here
 ,
 I am trying to find out some alternatives.

 One alternative is to write a separate Scala module only to push data
 into redis using foreach; ForeachWriter
 
  is
 supported in Scala. BUT this doesn't seem like an efficient approach and
 adds deployment overhead because now I will have to support Scala in my 
 app.

 Another approach is obviously to use Scala instead of python, which is
 fine but I want to make sure that I absolutely cannot use python for this
 problem before I take this path.

 Would appreciate some feedback and alternative design approaches for
 this problem.

 Thanks.




>>>
>>
>


can I do spark-submit --jars [s3://bucket/folder/jar_file]? or --jars

2017-07-28 Thread Richard Xin
Can we add extra library (jars on S3) to spark-submit? if yes, how? such as 
--jars, extraClassPath, extraLibPathThanks,Richard

Persisting RDD: Low Percentage with a lot of memory available

2017-07-28 Thread pedroT
Hi,
This problem is very annoying for me and I'm tired of surfing the network
without any good advice to follow.

I have a complex job. It has been worked fine until I needed to save
partial results (RDDs) to files.
So I tried to cache the RDDs and then call a saveAsText method and follow
the workflow as usual.
The first problem I noticed was that RDD was not totally cached .
So I changed the cache() method for a
 persist(StorageLevel.MEMORY_AND_DISK_SER())
hoping this should persist 100% of RDD. But it didn't at all.
That not make any sense to me. Is not suposed that with that storage level,
the fractions which don't fit in memory will be persisted in disk?

[image: Imagen integrada 3]

Even insignificant RDDs of about ~5MB was cached only at 82%.

The last one in the previous image, which had 6628 cached partitions, is
distributed in the following way:


[image: Imagen integrada 4]

The executors Storage Memory wee far away from be filled:
[image: Imagen integrada 5]


[image: Imagen integrada 6]


The only thing I noticed is near to be exhausted is "Memory" in hadoop
Cluster Memory:

[image: Imagen integrada 7]

I don't know the relation between this "memory used" column and the memory
described in Spark UI (Storage memory was almost empty).


Finally, the job accumulated a lot of  stages (~100) for recalculation of
RDDs not cached  and the cluster failed with an enigmatic and apparently
known error:

16/12/14 21:03:54 ERROR LiveListenerBus: Dropping SparkListenerEvent
because no remaining room in event queue. This likely means one of the
SparkListeners is too slow and cannot keep up with the rate at which
tasks are being started by the scheduler.
16/12/14 21:03:54 WARN LiveListenerBus: Dropped 1 SparkListenerEvents
since Thu Jan 01 01:00:00 CET 1970


 mentioned here  .


Please, any clue, any comment indeed, will be much appreciated.

Thanks,
Pedro


image.png (48K) 

image.png (294K) 

image.png (57K) 

image.png (53K) 

image.png (197K) 





--
View this message in context: 
http://apache-spark-user-list.1001560.n3.nabble.com/Persisting-RDD-Low-Percentage-with-a-lot-of-memory-available-tp29006.html
Sent from the Apache Spark User List mailing list archive at Nabble.com.

RE: changing directories in Spark Streming

2017-07-28 Thread Siddhartha Singh Sandhu
Hi,

I am saving the output of my streaming process to s3.

I want to able to change the directory of the stream as an hour passes by.

Will this work:

parsed_kf_frame.saveAsTextFiles((s3_location).format(
datetime.datetime.today().strftime("%Y%m%d"),
datetime.datetime.today().strftime("%H"), "prefix_key"), "csv")

Thank You!

Sid.


Re: How to configure spark on Yarn cluster

2017-07-28 Thread yohann jardin
For yarn, I'm speaking about the file fairscheduler.xml (if you kept the 
default scheduling of Yarn): 
https://hadoop.apache.org/docs/r2.7.3/hadoop-yarn/hadoop-yarn-site/FairScheduler.html#Allocation_file_format


Yohann Jardin

Le 7/28/2017 à 8:00 PM, jeff saremi a écrit :

The only relevant setting i see in Yarn is this:

  
yarn.nodemanager.resource.memory-mb
120726
  
which is 120GB and we are well below that. I don't see a total limit.

I haven't played with spark.memory.fraction. I'm not sure if it makes a 
difference. Note that there are no errors coming from Spark with respect to 
memory being an issue. Yarn kills the JVM and just prints out one line: Out of 
memory in the stdout of the container. After that Spark complains about the 
ExecutorLostFailure. So the memory factions are not playing a factor here.
I just looked at the link you included. Thank you. Yes this is the same problem 
however it looks like no one has come up with a solution for this problem yet



From: yohann jardin 
Sent: Friday, July 28, 2017 10:47:40 AM
To: jeff saremi; user@spark.apache.org
Subject: Re: How to configure spark on Yarn cluster


Not sure that we are OK on one thing: Yarn limitations are for the sum of all 
nodes, while you only specify the memory for a single node through Spark.


By the way, the memory displayed in the UI is only a part of the total memory 
allocation: 
https://spark.apache.org/docs/latest/configuration.html#memory-management

It corresponds to “spark.memory.fraction”, so it will mainly be filled by the 
rdd you’re trying to persist. The memory left by this parameter will be used to 
read the input file and compute. When the fail comes from this, the Out Of 
Memory exception is quite explicit in the driver logs.

Testing sampled files of 1 GB, 1 TB, 10 TB should help investigate what goes 
right and what goes wrong at least a bit.


Also, did you check for similar issues on stackoverflow? Like 
https://stackoverflow.com/questions/40781354/container-killed-by-yarn-for-exceeding-memory-limits-10-4-gb-of-10-4-gb-physic


Regards,

Yohann Jardin

Le 7/28/2017 à 6:05 PM, jeff saremi a écrit :

Thanks so much Yohann

I checked the Storage/Memory column in Executors status page. Well below where 
I wanted to be.
I will try the suggestion on smaller data sets.
I am also well within the Yarn limitations (128GB). In my last try I asked for 
48+32 (overhead). So somehow I am exceeding that or I should say Spark is 
exceeding since I am trusting to manage the memory I provided for it.
Is there anything in Shuffle Write Size, Shuffle Spill, or anything in the logs 
that I should be looking for to come up with the recommended memory size or 
partition count?

thanks


From: yohann jardin 
Sent: Thursday, July 27, 2017 11:15:39 PM
To: jeff saremi; user@spark.apache.org
Subject: Re: How to configure spark on Yarn cluster


Check the executor page of the Spark UI, to check if your storage level is 
limiting.


Also, instead of starting with 100 TB of data, sample it, make it work, and 
grow it little by little until you reached 100 TB. This will validate the 
workflow and let you see how much data is shuffled, etc.


And just in case, check the limits you set on your Yarn queue. If you try to 
allocate more memory to your job than what is set on the queue, there might be 
cases of failure.
Though there are some limitations, it’s possible to allocate more ram to your 
job than available on your Yarn queue.


Yohann Jardin

Le 7/28/2017 à 8:03 AM, jeff saremi a écrit :

I have the simplest job which i'm running against 100TB of data. The job keeps 
failing with ExecutorLostFailure's on containers killed by Yarn for exceeding 
memory limits

I have varied the executor-memory from 32GB to 96GB, the 
spark.yarn.executor.memoryOverhead from 8192 to 36000 and similar changes to 
the number of cores, and driver size. It looks like nothing stops this error 
(running out of memory) from happening. Looking at metrics reported by Spark 
status page, is there anything I can use to configure my job properly? Is 
repartitioning more or less going to help at all? The current number of 
partitions is around 40,000 currently.

Here's the gist of the code:


val input = sc.textFile(path)

val t0 = input.map(s => s.split("\t").map(a => ((a(0),a(1)), a)))

t0.persist(StorageLevel.DISK_ONLY)


I have changed storagelevel from MEMORY_ONLY to MEMORY_AND_DISK to DISK_ONLY to 
no avail.







Re: How to configure spark on Yarn cluster

2017-07-28 Thread jeff saremi
The only relevant setting i see in Yarn is this:

  
yarn.nodemanager.resource.memory-mb
120726
  
which is 120GB and we are well below that. I don't see a total limit.

I haven't played with spark.memory.fraction. I'm not sure if it makes a 
difference. Note that there are no errors coming from Spark with respect to 
memory being an issue. Yarn kills the JVM and just prints out one line: Out of 
memory in the stdout of the container. After that Spark complains about the 
ExecutorLostFailure. So the memory factions are not playing a factor here.
I just looked at the link you included. Thank you. Yes this is the same problem 
however it looks like no one has come up with a solution for this problem yet



From: yohann jardin 
Sent: Friday, July 28, 2017 10:47:40 AM
To: jeff saremi; user@spark.apache.org
Subject: Re: How to configure spark on Yarn cluster


Not sure that we are OK on one thing: Yarn limitations are for the sum of all 
nodes, while you only specify the memory for a single node through Spark.


By the way, the memory displayed in the UI is only a part of the total memory 
allocation: 
https://spark.apache.org/docs/latest/configuration.html#memory-management

It corresponds to “spark.memory.fraction”, so it will mainly be filled by the 
rdd you’re trying to persist. The memory left by this parameter will be used to 
read the input file and compute. When the fail comes from this, the Out Of 
Memory exception is quite explicit in the driver logs.

Testing sampled files of 1 GB, 1 TB, 10 TB should help investigate what goes 
right and what goes wrong at least a bit.


Also, did you check for similar issues on stackoverflow? Like 
https://stackoverflow.com/questions/40781354/container-killed-by-yarn-for-exceeding-memory-limits-10-4-gb-of-10-4-gb-physic


Regards,

Yohann Jardin

Le 7/28/2017 à 6:05 PM, jeff saremi a écrit :

Thanks so much Yohann

I checked the Storage/Memory column in Executors status page. Well below where 
I wanted to be.
I will try the suggestion on smaller data sets.
I am also well within the Yarn limitations (128GB). In my last try I asked for 
48+32 (overhead). So somehow I am exceeding that or I should say Spark is 
exceeding since I am trusting to manage the memory I provided for it.
Is there anything in Shuffle Write Size, Shuffle Spill, or anything in the logs 
that I should be looking for to come up with the recommended memory size or 
partition count?

thanks


From: yohann jardin 
Sent: Thursday, July 27, 2017 11:15:39 PM
To: jeff saremi; user@spark.apache.org
Subject: Re: How to configure spark on Yarn cluster


Check the executor page of the Spark UI, to check if your storage level is 
limiting.


Also, instead of starting with 100 TB of data, sample it, make it work, and 
grow it little by little until you reached 100 TB. This will validate the 
workflow and let you see how much data is shuffled, etc.


And just in case, check the limits you set on your Yarn queue. If you try to 
allocate more memory to your job than what is set on the queue, there might be 
cases of failure.
Though there are some limitations, it’s possible to allocate more ram to your 
job than available on your Yarn queue.


Yohann Jardin

Le 7/28/2017 à 8:03 AM, jeff saremi a écrit :

I have the simplest job which i'm running against 100TB of data. The job keeps 
failing with ExecutorLostFailure's on containers killed by Yarn for exceeding 
memory limits

I have varied the executor-memory from 32GB to 96GB, the 
spark.yarn.executor.memoryOverhead from 8192 to 36000 and similar changes to 
the number of cores, and driver size. It looks like nothing stops this error 
(running out of memory) from happening. Looking at metrics reported by Spark 
status page, is there anything I can use to configure my job properly? Is 
repartitioning more or less going to help at all? The current number of 
partitions is around 40,000 currently.

Here's the gist of the code:


val input = sc.textFile(path)

val t0 = input.map(s => s.split("\t").map(a => ((a(0),a(1)), a)))

t0.persist(StorageLevel.DISK_ONLY)


I have changed storagelevel from MEMORY_ONLY to MEMORY_AND_DISK to DISK_ONLY to 
no avail.






Re: How to configure spark on Yarn cluster

2017-07-28 Thread yohann jardin
Not sure that we are OK on one thing: Yarn limitations are for the sum of all 
nodes, while you only specify the memory for a single node through Spark.


By the way, the memory displayed in the UI is only a part of the total memory 
allocation: 
https://spark.apache.org/docs/latest/configuration.html#memory-management

It corresponds to “spark.memory.fraction”, so it will mainly be filled by the 
rdd you’re trying to persist. The memory left by this parameter will be used to 
read the input file and compute. When the fail comes from this, the Out Of 
Memory exception is quite explicit in the driver logs.

Testing sampled files of 1 GB, 1 TB, 10 TB should help investigate what goes 
right and what goes wrong at least a bit.


Also, did you check for similar issues on stackoverflow? Like 
https://stackoverflow.com/questions/40781354/container-killed-by-yarn-for-exceeding-memory-limits-10-4-gb-of-10-4-gb-physic


Regards,

Yohann Jardin

Le 7/28/2017 à 6:05 PM, jeff saremi a écrit :

Thanks so much Yohann

I checked the Storage/Memory column in Executors status page. Well below where 
I wanted to be.
I will try the suggestion on smaller data sets.
I am also well within the Yarn limitations (128GB). In my last try I asked for 
48+32 (overhead). So somehow I am exceeding that or I should say Spark is 
exceeding since I am trusting to manage the memory I provided for it.
Is there anything in Shuffle Write Size, Shuffle Spill, or anything in the logs 
that I should be looking for to come up with the recommended memory size or 
partition count?

thanks


From: yohann jardin 
Sent: Thursday, July 27, 2017 11:15:39 PM
To: jeff saremi; user@spark.apache.org
Subject: Re: How to configure spark on Yarn cluster


Check the executor page of the Spark UI, to check if your storage level is 
limiting.


Also, instead of starting with 100 TB of data, sample it, make it work, and 
grow it little by little until you reached 100 TB. This will validate the 
workflow and let you see how much data is shuffled, etc.


And just in case, check the limits you set on your Yarn queue. If you try to 
allocate more memory to your job than what is set on the queue, there might be 
cases of failure.
Though there are some limitations, it’s possible to allocate more ram to your 
job than available on your Yarn queue.


Yohann Jardin

Le 7/28/2017 à 8:03 AM, jeff saremi a écrit :

I have the simplest job which i'm running against 100TB of data. The job keeps 
failing with ExecutorLostFailure's on containers killed by Yarn for exceeding 
memory limits

I have varied the executor-memory from 32GB to 96GB, the 
spark.yarn.executor.memoryOverhead from 8192 to 36000 and similar changes to 
the number of cores, and driver size. It looks like nothing stops this error 
(running out of memory) from happening. Looking at metrics reported by Spark 
status page, is there anything I can use to configure my job properly? Is 
repartitioning more or less going to help at all? The current number of 
partitions is around 40,000 currently.

Here's the gist of the code:


val input = sc.textFile(path)

val t0 = input.map(s => s.split("\t").map(a => ((a(0),a(1)), a)))

t0.persist(StorageLevel.DISK_ONLY)


I have changed storagelevel from MEMORY_ONLY to MEMORY_AND_DISK to DISK_ONLY to 
no avail.






Job keeps aborting because of org.apache.spark.shuffle.FetchFailedException: Failed to connect to server/ip:39232

2017-07-28 Thread jeff saremi
We have a not too complex and not too large spark job that keeps dying with 
this error

I have researched it and I have not seen any convincing explanation on why

I am not using a shuffle service. Which server is the one that is refusing the 
connection?
If I go to the server that is being reported in the error message, I see a lot 
of these errors towards the end:


java.io.FileNotFoundException: 
D:\data\yarnnm\local\usercache\hadoop\appcache\application_1500970459432_1024\blockmgr-7f3a1abc-2b8b-4e51-9072-8c12495ec563\0e\shuffle_0_4107_0.index

(may or may not be related to the problem at all)


and if you examine further on this machine there are fetchfailedexceptions 
resulting from other machines and so on and so forth

This is Spark 1.6 on Yarn-master

Could anyone provide some insight or solution to this?

thanks



Re: How to configure spark on Yarn cluster

2017-07-28 Thread jeff saremi
Thanks so much Yohann

I checked the Storage/Memory column in Executors status page. Well below where 
I wanted to be.
I will try the suggestion on smaller data sets.
I am also well within the Yarn limitations (128GB). In my last try I asked for 
48+32 (overhead). So somehow I am exceeding that or I should say Spark is 
exceeding since I am trusting to manage the memory I provided for it.
Is there anything in Shuffle Write Size, Shuffle Spill, or anything in the logs 
that I should be looking for to come up with the recommended memory size or 
partition count?

thanks


From: yohann jardin 
Sent: Thursday, July 27, 2017 11:15:39 PM
To: jeff saremi; user@spark.apache.org
Subject: Re: How to configure spark on Yarn cluster


Check the executor page of the Spark UI, to check if your storage level is 
limiting.


Also, instead of starting with 100 TB of data, sample it, make it work, and 
grow it little by little until you reached 100 TB. This will validate the 
workflow and let you see how much data is shuffled, etc.


And just in case, check the limits you set on your Yarn queue. If you try to 
allocate more memory to your job than what is set on the queue, there might be 
cases of failure.
Though there are some limitations, it’s possible to allocate more ram to your 
job than available on your Yarn queue.


Yohann Jardin

Le 7/28/2017 à 8:03 AM, jeff saremi a écrit :

I have the simplest job which i'm running against 100TB of data. The job keeps 
failing with ExecutorLostFailure's on containers killed by Yarn for exceeding 
memory limits

I have varied the executor-memory from 32GB to 96GB, the 
spark.yarn.executor.memoryOverhead from 8192 to 36000 and similar changes to 
the number of cores, and driver size. It looks like nothing stops this error 
(running out of memory) from happening. Looking at metrics reported by Spark 
status page, is there anything I can use to configure my job properly? Is 
repartitioning more or less going to help at all? The current number of 
partitions is around 40,000 currently.

Here's the gist of the code:


val input = sc.textFile(path)

val t0 = input.map(s => s.split("\t").map(a => ((a(0),a(1)), a)))

t0.persist(StorageLevel.DISK_ONLY)


I have changed storagelevel from MEMORY_ONLY to MEMORY_AND_DISK to DISK_ONLY to 
no avail.





Re: Spark Streaming with long batch / window duration

2017-07-28 Thread emceemouli
Thanks. If i not use Window and choose to use Streaming the data on to HDFS,
could you suggest how to only store 1 week worth of data. Should i create a
cron job to delete HDFS files older than a week. PLease let me know if you
have any other suggestions



--
View this message in context: 
http://apache-spark-user-list.1001560.n3.nabble.com/Spark-Streaming-with-long-batch-window-duration-tp10191p29005.html
Sent from the Apache Spark User List mailing list archive at Nabble.com.

Spark Streaming as a Service

2017-07-28 Thread ajit roshen
We have few Spark Streaming Apps running on our AWS Spark 2.1 Yarn cluster.
We currently log on to the Master Node of the cluster and start the App
using "spark-submit", calling the jar.

We would like to open up this to our users, so that they can submit their
own Apps, but we would not be able to give Users access to the Master or
any other Nodes of the cluster.  I have the below questions.

   - What would be a good interface to invoke the spark-submit in this
   case? I read about Livy & jobserver but couldn't find out if it provides a
   UI.


   - Is there a way to specify the start-date and end-date for the
   streaming Apps? Option to know the current App status and option to Kill
   App. I know Resource Manager provides this but not sure if it is a good
   idea to open up RM to users.


   - Is there a way to control Access to the App such that User-A can only
   execute App-1 and App-3?


   - Is there a way to control Resources usage such that User-A can only
   use 2 Executors, 2 Cores/Executor, 10GB/Executor etc:?

Thank you.
Ajit


subscribe

2017-07-28 Thread ajit roshen



Re: Support Dynamic Partition Inserts params with SET command in Spark 2.0.1

2017-07-28 Thread Chetan Khatri
I think it will be same, but let me try that

FYR - https://issues.apache.org/jira/browse/SPARK-19881

On Fri, Jul 28, 2017 at 4:44 PM, ayan guha  wrote:

> Try running spark.sql("set yourconf=val")
>
> On Fri, 28 Jul 2017 at 8:51 pm, Chetan Khatri 
> wrote:
>
>> Jorn, Both are same.
>>
>> On Fri, Jul 28, 2017 at 4:18 PM, Jörn Franke 
>> wrote:
>>
>>> Try sparksession.conf().set
>>>
>>> On 28. Jul 2017, at 12:19, Chetan Khatri 
>>> wrote:
>>>
>>> Hey Dev/ USer,
>>>
>>> I am working with Spark 2.0.1 and with dynamic partitioning with Hive
>>> facing below issue:
>>>
>>> org.apache.hadoop.hive.ql.metadata.HiveException:
>>> Number of dynamic partitions created is 1344, which is more than 1000.
>>> To solve this try to set hive.exec.max.dynamic.partitions to at least
>>> 1344.
>>>
>>> I tried below options, but failed:
>>>
>>> val spark = sparkSession.builder().enableHiveSupport().getOrCreate()
>>>
>>> *spark.sqlContext.setConf("hive.exec.max.dynamic.partitions", "2000")*
>>>
>>> Please help with alternate workaround !
>>>
>>> Thanks
>>>
>>>
>> --
> Best Regards,
> Ayan Guha
>


Re: Support Dynamic Partition Inserts params with SET command in Spark 2.0.1

2017-07-28 Thread ayan guha
Try running spark.sql("set yourconf=val")

On Fri, 28 Jul 2017 at 8:51 pm, Chetan Khatri 
wrote:

> Jorn, Both are same.
>
> On Fri, Jul 28, 2017 at 4:18 PM, Jörn Franke  wrote:
>
>> Try sparksession.conf().set
>>
>> On 28. Jul 2017, at 12:19, Chetan Khatri 
>> wrote:
>>
>> Hey Dev/ USer,
>>
>> I am working with Spark 2.0.1 and with dynamic partitioning with Hive
>> facing below issue:
>>
>> org.apache.hadoop.hive.ql.metadata.HiveException:
>> Number of dynamic partitions created is 1344, which is more than 1000.
>> To solve this try to set hive.exec.max.dynamic.partitions to at least
>> 1344.
>>
>> I tried below options, but failed:
>>
>> val spark = sparkSession.builder().enableHiveSupport().getOrCreate()
>>
>> *spark.sqlContext.setConf("hive.exec.max.dynamic.partitions", "2000")*
>>
>> Please help with alternate workaround !
>>
>> Thanks
>>
>>
> --
Best Regards,
Ayan Guha


Re: Support Dynamic Partition Inserts params with SET command in Spark 2.0.1

2017-07-28 Thread Chetan Khatri
Jorn, Both are same.

On Fri, Jul 28, 2017 at 4:18 PM, Jörn Franke  wrote:

> Try sparksession.conf().set
>
> On 28. Jul 2017, at 12:19, Chetan Khatri 
> wrote:
>
> Hey Dev/ USer,
>
> I am working with Spark 2.0.1 and with dynamic partitioning with Hive
> facing below issue:
>
> org.apache.hadoop.hive.ql.metadata.HiveException:
> Number of dynamic partitions created is 1344, which is more than 1000.
> To solve this try to set hive.exec.max.dynamic.partitions to at least
> 1344.
>
> I tried below options, but failed:
>
> val spark = sparkSession.builder().enableHiveSupport().getOrCreate()
>
> *spark.sqlContext.setConf("hive.exec.max.dynamic.partitions", "2000")*
>
> Please help with alternate workaround !
>
> Thanks
>
>


Re: Support Dynamic Partition Inserts params with SET command in Spark 2.0.1

2017-07-28 Thread Jörn Franke
Try sparksession.conf().set

> On 28. Jul 2017, at 12:19, Chetan Khatri  wrote:
> 
> Hey Dev/ USer,
> 
> I am working with Spark 2.0.1 and with dynamic partitioning with Hive facing 
> below issue:
> 
> org.apache.hadoop.hive.ql.metadata.HiveException:
> Number of dynamic partitions created is 1344, which is more than 1000.
> To solve this try to set hive.exec.max.dynamic.partitions to at least 1344.
> 
> I tried below options, but failed:
> 
> val spark = sparkSession.builder().enableHiveSupport().getOrCreate()
> 
> spark.sqlContext.setConf("hive.exec.max.dynamic.partitions", "2000")
> 
> Please help with alternate workaround !
> 
> Thanks


Support Dynamic Partition Inserts params with SET command in Spark 2.0.1

2017-07-28 Thread Chetan Khatri
Hey Dev/ USer,

I am working with Spark 2.0.1 and with dynamic partitioning with Hive
facing below issue:

org.apache.hadoop.hive.ql.metadata.HiveException:
Number of dynamic partitions created is 1344, which is more than 1000.
To solve this try to set hive.exec.max.dynamic.partitions to at least 1344.

I tried below options, but failed:

val spark = sparkSession.builder().enableHiveSupport().getOrCreate()

*spark.sqlContext.setConf("hive.exec.max.dynamic.partitions", "2000")*

Please help with alternate workaround !

Thanks


Re: SPARK Storagelevel issues

2017-07-28 Thread 周康
All right, i did not catch the point ,sorry for that.
But you can take a snapshot of the heap, and then analysis heap dump by mat
or other tools.
>From the code i can not find any clue.

2017-07-28 17:09 GMT+08:00 Gourav Sengupta :

> Hi,
>
> I have done all of that, but my question is "why should a 62 MB data give
> memory error when we have over 2 GB of memory available".
>
> Therefore all that is mentioned by Zhoukang is not pertinent at all.
>
>
> Regards,
> Gourav Sengupta
>
> On Fri, Jul 28, 2017 at 4:43 AM, 周康  wrote:
>
>> testdf.persist(pyspark.storagelevel.StorageLevel.MEMORY_ONLY_SER) maybe
>> StorageLevel should change.And check you config "
>> spark.memory.storageFraction" which default value is 0.5
>>
>> 2017-07-28 3:04 GMT+08:00 Gourav Sengupta :
>>
>>> Hi,
>>>
>>> I cached in a table in a large EMR cluster and it has a size of 62 MB.
>>> Therefore I know the size of the table while cached.
>>>
>>> But when I am trying to cache in the table in smaller cluster which
>>> still has a total of 3 GB Driver memory and two executors with close to 2.5
>>> GB memory the job still keeps on failing giving JVM out of memory errors.
>>>
>>> Is there something that I am missing?
>>>
>>> CODE:
>>> =
>>> sparkSession =  spark.builder \
>>> .config("spark.rdd.compress", "true") \
>>> .config("spark.serializer",
>>> "org.apache.spark.serializer.KryoSerializer") \
>>> .config("spark.executor.extraJ
>>> avaOptions","-XX:+UseCompressedOops -XX:+PrintGCDetails
>>> -XX:+PrintGCTimeStamps") \
>>> .appName("test").enableHiveSupport().getOrCreate()
>>>
>>> testdf = sparkSession.sql("select * from tablename")
>>> testdf.persist(pyspark.storagelevel.StorageLevel.MEMORY_ONLY_SER)
>>> =
>>>
>>> This causes JVM out of memory error.
>>>
>>>
>>> Regards,
>>> Gourav Sengupta
>>>
>>
>>
>


Re: SPARK Storagelevel issues

2017-07-28 Thread Gourav Sengupta
Hi,

I have done all of that, but my question is "why should a 62 MB data give
memory error when we have over 2 GB of memory available".

Therefore all that is mentioned by Zhoukang is not pertinent at all.


Regards,
Gourav Sengupta

On Fri, Jul 28, 2017 at 4:43 AM, 周康  wrote:

> testdf.persist(pyspark.storagelevel.StorageLevel.MEMORY_ONLY_SER) maybe
> StorageLevel should change.And check you config "
> spark.memory.storageFraction" which default value is 0.5
>
> 2017-07-28 3:04 GMT+08:00 Gourav Sengupta :
>
>> Hi,
>>
>> I cached in a table in a large EMR cluster and it has a size of 62 MB.
>> Therefore I know the size of the table while cached.
>>
>> But when I am trying to cache in the table in smaller cluster which still
>> has a total of 3 GB Driver memory and two executors with close to 2.5 GB
>> memory the job still keeps on failing giving JVM out of memory errors.
>>
>> Is there something that I am missing?
>>
>> CODE:
>> =
>> sparkSession =  spark.builder \
>> .config("spark.rdd.compress", "true") \
>> .config("spark.serializer", 
>> "org.apache.spark.serializer.KryoSerializer")
>> \
>> .config("spark.executor.extraJ
>> avaOptions","-XX:+UseCompressedOops -XX:+PrintGCDetails
>> -XX:+PrintGCTimeStamps") \
>> .appName("test").enableHiveSupport().getOrCreate()
>>
>> testdf = sparkSession.sql("select * from tablename")
>> testdf.persist(pyspark.storagelevel.StorageLevel.MEMORY_ONLY_SER)
>> =
>>
>> This causes JVM out of memory error.
>>
>>
>> Regards,
>> Gourav Sengupta
>>
>
>


Re: How to configure spark on Yarn cluster

2017-07-28 Thread yohann jardin
Check the executor page of the Spark UI, to check if your storage level is 
limiting.


Also, instead of starting with 100 TB of data, sample it, make it work, and 
grow it little by little until you reached 100 TB. This will validate the 
workflow and let you see how much data is shuffled, etc.


And just in case, check the limits you set on your Yarn queue. If you try to 
allocate more memory to your job than what is set on the queue, there might be 
cases of failure.
Though there are some limitations, it’s possible to allocate more ram to your 
job than available on your Yarn queue.


Yohann Jardin

Le 7/28/2017 à 8:03 AM, jeff saremi a écrit :

I have the simplest job which i'm running against 100TB of data. The job keeps 
failing with ExecutorLostFailure's on containers killed by Yarn for exceeding 
memory limits

I have varied the executor-memory from 32GB to 96GB, the 
spark.yarn.executor.memoryOverhead from 8192 to 36000 and similar changes to 
the number of cores, and driver size. It looks like nothing stops this error 
(running out of memory) from happening. Looking at metrics reported by Spark 
status page, is there anything I can use to configure my job properly? Is 
repartitioning more or less going to help at all? The current number of 
partitions is around 40,000 currently.

Here's the gist of the code:


val input = sc.textFile(path)

val t0 = input.map(s => s.split("\t").map(a => ((a(0),a(1)), a)))

t0.persist(StorageLevel.DISK_ONLY)


I have changed storagelevel from MEMORY_ONLY to MEMORY_AND_DISK to DISK_ONLY to 
no avail.





How to configure spark on Yarn cluster

2017-07-28 Thread jeff saremi
I have the simplest job which i'm running against 100TB of data. The job keeps 
failing with ExecutorLostFailure's on containers killed by Yarn for exceeding 
memory limits

I have varied the executor-memory from 32GB to 96GB, the 
spark.yarn.executor.memoryOverhead from 8192 to 36000 and similar changes to 
the number of cores, and driver size. It looks like nothing stops this error 
(running out of memory) from happening. Looking at metrics reported by Spark 
status page, is there anything I can use to configure my job properly? Is 
repartitioning more or less going to help at all? The current number of 
partitions is around 40,000 currently.

Here's the gist of the code:


val input = sc.textFile(path)

val t0 = input.map(s => s.split("\t").map(a => ((a(0),a(1)), a)))

t0.persist(StorageLevel.DISK_ONLY)


I have changed storagelevel from MEMORY_ONLY to MEMORY_AND_DISK to DISK_ONLY to 
no avail.