Re: Debugging Initial job has not accepted any resources; check your cluster UI to ensure that workers are registered and have sufficient resources

2017-09-26 Thread Sathishkumar Manimoorthy
@Ayan

It seems to be running on spark standalone. Not mostly on Yarn I guess.

Thanks,
Sathish

On Tue, Sep 26, 2017 at 9:09 PM, ayan guha  wrote:

> I would check the queue you are submitting job, assuming it is yarn...
>
> On Tue, Sep 26, 2017 at 11:40 PM, JG Perrin  wrote:
>
>> Hi,
>>
>>
>>
>> I get the infamous:
>>
>> Initial job has not accepted any resources; check your cluster UI to
>> ensure that workers are registered and have sufficient resources
>>
>>
>>
>> I run the app via Eclipse, connecting:
>>
>> SparkSession spark = SparkSession.*builder*()
>>
>> .appName("Converter - Benchmark")
>>
>> .master(ConfigurationManager.*getMaster*())
>>
>> .config("spark.cores.max", "4")
>>
>> .config("spark.executor.memory", "16g")
>>
>> .getOrCreate();
>>
>>
>>
>>
>>
>> Everything seems ok on the cluster side:
>>
>>
>>
>>
>>
>> I probably missed something super obvious, but can’t find it…
>>
>>
>>
>> Any help/hint is welcome! - TIA
>>
>>
>>
>> jg
>>
>>
>>
>>
>>
>>
>>
>
>
>
> --
> Best Regards,
> Ayan Guha
>


RE: Debugging Initial job has not accepted any resources; check your cluster UI to ensure that workers are registered and have sufficient resources

2017-09-26 Thread JG Perrin
not using Yarn, just standalone cluster with 2 nodes here (physical, not even 
VM). network seems good between the nodes .

From: ayan guha [mailto:guha.a...@gmail.com]
Sent: Tuesday, September 26, 2017 10:39 AM
To: JG Perrin 
Cc: user@spark.apache.org
Subject: Re: Debugging Initial job has not accepted any resources; check your 
cluster UI to ensure that workers are registered and have sufficient resources

I would check the queue you are submitting job, assuming it is yarn...

On Tue, Sep 26, 2017 at 11:40 PM, JG Perrin 
> wrote:
Hi,

I get the infamous:
Initial job has not accepted any resources; check your cluster UI to ensure 
that workers are registered and have sufficient resources

I run the app via Eclipse, connecting:
SparkSession spark = SparkSession.builder()
.appName("Converter - Benchmark")
.master(ConfigurationManager.getMaster())
.config("spark.cores.max", "4")
.config("spark.executor.memory", "16g")
.getOrCreate();


Everything seems ok on the cluster side:
[cid:image001.png@01D336B5.FF7933B0]


I probably missed something super obvious, but can’t find it…

Any help/hint is welcome! - TIA

jg






--
Best Regards,
Ayan Guha


Re: Debugging Initial job has not accepted any resources; check your cluster UI to ensure that workers are registered and have sufficient resources

2017-09-26 Thread ayan guha
I would check the queue you are submitting job, assuming it is yarn...

On Tue, Sep 26, 2017 at 11:40 PM, JG Perrin  wrote:

> Hi,
>
>
>
> I get the infamous:
>
> Initial job has not accepted any resources; check your cluster UI to
> ensure that workers are registered and have sufficient resources
>
>
>
> I run the app via Eclipse, connecting:
>
> SparkSession spark = SparkSession.*builder*()
>
> .appName("Converter - Benchmark")
>
> .master(ConfigurationManager.*getMaster*())
>
> .config("spark.cores.max", "4")
>
> .config("spark.executor.memory", "16g")
>
> .getOrCreate();
>
>
>
>
>
> Everything seems ok on the cluster side:
>
>
>
>
>
> I probably missed something super obvious, but can’t find it…
>
>
>
> Any help/hint is welcome! - TIA
>
>
>
> jg
>
>
>
>
>
>
>



-- 
Best Regards,
Ayan Guha


Re: partitionBy causing OOM

2017-09-26 Thread Amit Sela
Thanks for all the answers!
It looks like increasing the heap a little, and setting spark.sql.
shuffle.partitions to a much lower number (I used the recommended
input_size_mb/128 formula) did the trick.
As for partitionBy, unless I use repartition("dt") before the writer, it
actually writes more than one output file per "dt" partition so I guess the
same "dt" value is spread across multiple partitions, right?

On Mon, Sep 25, 2017 at 11:07 PM ayan guha  wrote:

> Another possible option would be creating partitioned table in hive and
> use dynamic partitioning while inserting. This will not require spark to do
> explocit partition by
>
> On Tue, 26 Sep 2017 at 12:39 pm, Ankur Srivastava <
> ankur.srivast...@gmail.com> wrote:
>
>> Hi Amit,
>>
>> Spark keeps the partition that it is working on in memory (and does not
>> spill to disk even if it is running OOM). Also since you are getting OOM
>> when using partitionBy (and not when you just use flatMap), there should be
>> one (or few) dates on which your partition size is bigger than the heap.
>> You can do a count on dates to check if there is skewness in your data.
>>
>> The way out would be increase the heap size or use columns in partitionBy
>> (like date + hour) to distribute the data better.
>>
>> Hope this helps!
>>
>> Thanks
>> Ankur
>>
>> On Mon, Sep 25, 2017 at 7:30 PM, 孫澤恩  wrote:
>>
>>> Hi, Amit,
>>>
>>> Maybe you can change this configuration spark.sql.shuffle.partitions.
>>> The default is 200 change this property could change the task number
>>> when you are using DataFrame API.
>>>
>>> On 26 Sep 2017, at 1:25 AM, Amit Sela  wrote:
>>>
>>> I'm trying to run a simple pyspark application that reads from file
>>> (json), flattens it (explode) and writes back to file (json) partitioned by
>>> date using DataFrameWriter.partitionBy(*cols).
>>>
>>> I keep getting OOMEs like:
>>> java.lang.OutOfMemoryError: Java heap space
>>> at
>>> org.apache.spark.util.collection.unsafe.sort.UnsafeSorterSpillWriter.(UnsafeSorterSpillWriter.java:46)
>>> at
>>> org.apache.spark.util.collection.unsafe.sort.UnsafeExternalSorter.spill(UnsafeExternalSorter.java:206)
>>> at
>>> org.apache.spark.memory.TaskMemoryManager.acquireExecutionMemory(TaskMemoryManager.java:203)
>>> ...
>>>
>>> Explode could make the underlying RDD grow a lot, and maybe in an
>>> unbalanced way sometimes,
>>> adding to that partitioning by date (in daily ETLs for instance) would
>>> probably cause a data skew (right?), but why am I getting OOMs? Isn't Spark
>>> supposed to spill to disk if the underlying RDD is too big to fit in memory?
>>>
>>> If I'm not using "partitionBy" with the writer (still exploding)
>>> everything works fine.
>>>
>>> This happens both in EMR and in local (mac) pyspark/spark shell (tried
>>> both in python and scala).
>>>
>>> Thanks!
>>>
>>>
>>>
>> --
> Best Regards,
> Ayan Guha
>


Debugging Initial job has not accepted any resources; check your cluster UI to ensure that workers are registered and have sufficient resources

2017-09-26 Thread JG Perrin
Hi,

I get the infamous:
Initial job has not accepted any resources; check your cluster UI to ensure 
that workers are registered and have sufficient resources

I run the app via Eclipse, connecting:
SparkSession spark = SparkSession.builder()
.appName("Converter - Benchmark")
.master(ConfigurationManager.getMaster())
.config("spark.cores.max", "4")
.config("spark.executor.memory", "16g")
.getOrCreate();


Everything seems ok on the cluster side:
[cid:image001.png@01D336A3.2B215410]


I probably missed something super obvious, but can't find it...

Any help/hint is welcome! - TIA

jg





PySpark: Overusing allocated cores / too many processes

2017-09-26 Thread Fabian Böhnlein
Hi all,

above topic has been mentioned before in this list between March - June 2016
,
again mentioned

in
July 2016 and got asked similarly in early September 2017

-
none of which had a conclusion on how to limit effectively the number of
Python processes spawned by PySparks respectively the number of actual
cores used per executor.

Does anyone have tips or solutions at hand? Thanks!

Bolding for the skim-readers, I'm not shouting ;)

Problem on my side, example setup:
Mesos 1.3.1, Spark 2.1.1,
Coarse mode, dynamicAllocation off, shuffle service off
spark.cores.max=112
spark.executor.cores=8 (machines have 32)
spark.executor.memory=50G (machines have 250G)

Stage 1 goes okyish, after setting spark.task.cpus=2. Without this setting,
there was 8 python processes per executor (using 8 CPUs) *plus 2-4 CPUs of
the java processes*, ending up with 10-14 cores per executor instead of the
8. This JVM overhead is ok to handle with this setting I believe.
val df = spark.read.parquet(path)
val grpd = df.rdd.map(lambda x: (x[0], list(x[1:]))).groupByKey()
This stage runs 3 hours, writes 990G of shuffle.

Stage 2 is roughly speaking a
grpd.mapValues(sklearn.DBSCAN(n_jobs=1).fit_predict(_)).write.parquet(path)
which runs *much* *more* (sometimes dozens!) *than* *4* *python* *processes*
*per* *executor*, which would be the expected number given 8 executor cores
with task.cpus=2. Runs for about 15 hours.

We are fairly sure that the mapValues function doesn't apply
multi-processing. Actually this would probably result in single Python
processes use more than 100% CPU - something which is never observed.

Unfortunately these Spark tasks then overuse their allocated Mesos
resources by 100-150% (hitting the physical limit of the machine).

Any tipps much appreciated!

Best,
Fabian