Re: Spark with External Shuffle Service - using saved shuffle files in the event of executor failure

2021-05-12 Thread Attila Zsolt Piros
Hello,

I have answered it on the Stack Overflow.

Best Regards,
Attila


On Wed, May 12, 2021 at 4:57 PM Chris Thomas 
wrote:

> Hi,
>
> I am pretty confident I have observed Spark configured with the Shuffle
> Service continuing to fetch shuffle files on a node in the event of
> executor failure, rather than recompute the shuffle files as happens
> without the Shuffle Service. Can anyone confirm this?
>
> (I have a SO question
> open
> on the same if you would rather answer directly there).
>
> Kind regards,
>
> Chris
>
>
>


Re: [Spark in Kubernetes] Question about running in client mode

2021-04-26 Thread Attila Zsolt Piros
Hi Shiqi,

In case of client mode the driver runs locally: in the same machine, even
in the same process, of the spark submit.

So if the application was submitted in a running POD then the driver will
be running in a POD and when outside of K8s then it will be running
outside.
This is why there is no config mentioned for this.

>From the deploy mode in general you can read here:
https://spark.apache.org/docs/latest/submitting-applications.html

Best Regards,
Attila

On Tue, Apr 27, 2021 at 12:03 AM Shiqi Sun  wrote:

> Hi Spark User group,
>
> I have a couple of quick questions about running Spark in Kubernetes
> between different deploy modes.
>
> As specified in
> https://spark.apache.org/docs/latest/running-on-kubernetes.html#client-mode,
> since Spark 2.4, client mode support is available when running in
> Kubernetes, and it says "when your application runs in client mode, the
> driver can run inside a pod or on a physical host". Then here come the
> questions.
>
> 1. If I understand correctly, in cluster mode, the driver is also running
> inside a k8s pod. Then, what's the difference between running it in cluster
> mode, versus running it in client mode when I choose to run my driver in a
> pod?
>
> 2. What does it mean by "running driver on a physical host"? Does it mean
> that it runs outside of the k8s cluster? What config should I pass to spark
> submit so that it runs this way, instead of running my driver into a k8s
> pod?
>
> Thanks!
>
> Best,
> Shiqi
>


Re: Dynamic Allocation Backlog Property in Spark on Kubernetes

2021-04-10 Thread Attila Zsolt Piros
Hi Ranju!

> But if there are no extra resources available, then go for static
allocation rather dynamic. Is it correct ?

I think there is no such rule. If there is no more available new resource
for Spark then the existing ones will be used (even the min executors is
not guaranteed to be reached if no available resources).

But I suggest to always set the max executors to a meaningful value (the
default is too high: int max).
This way you can avoid too high costs for a small/medium sized job where
the tasks number is high but their size are small.

Regarding your questions: in both cases as I see extra resources are
helping and the jobs will be finished faster.

Best Regards,
Attila


On Sat, Apr 10, 2021 at 7:01 PM ranju goel  wrote:

> Hi Attila,
>
>
> I understood what you mean that Use the extra resources if available for
> running spark job, using schedulerbacklogtimeout (dynamic allocation).
>
> This will speeds up the job. But if there are no extra resources
> available, then go for static allocation rather dynamic. Is it correct ?
>
>
> Please validate below few scenarios for effective use of dynamic allocation
>
>
> 1.  Below screenshot shows, the Tasks are tiny, each task is executing
> fast, but number of total tasks count is high (3241).
>
> *Dynamic Allocation Advantage for this scenario*
>
> If reserved spark quota has more resources available when min Executors
> running, setting schedulerbacklogtimeout to few secs [say 15 min], those
> available quota resources can be used and  (3241) number of tasks can be
> finished fast. Is this understanding correct?
>
> [image: image.png]
>
>
>
> 2. Below report has less total number of tasks count (192) and parallel
> running task count (24), but each task took around 7 min to complete.
>
> So here again, if resources are available in quota, more parallelism can
> be achieved using schedulerbacklogtimeout (say 15 mins) and speeds up the
> job.
>
>
> [image: image.png]
>
> Best Regards
>
>
>
>
>
> *From:* Attila Zsolt Piros 
> *Sent:* Friday, April 9, 2021 11:11 AM
> *To:* Ranju Jain 
> *Cc:* user@spark.apache.org
> *Subject:* Re: Dynamic Allocation Backlog Property in Spark on Kubernetes
>
>
>
> You should not set "spark.dynamicAllocation.schedulerBacklogTimeout" so
> high and the purpose of this config is very different form the one you
> would like to use it for.
>
>
> The confusion I guess comes from the fact that you are still thinking in
> multiple Spark jobs.
>
>
> *But Dynamic Allocation is useful in case of a single Spark job, too. *With
> Dynamic allocation if there are pending tasks then new resources should be
> allocated to speed up the calculation.
> If you do not have enough partitions then you do not have enough tasks to
> run in parallel that was my earlier comment about.
>
> So let's focus on your first job:
> - With 3 executors it takes 2 hours to complete, right?
> - And what about 8 executors?  I hope significantly less time.
>
> So if you have more than 3 partitions and the tasks are meaningfully long
> enough to request some extra resources (schedulerBacklogTimeout) and the
> number of running executors are lower than the maximum number of executors
> you set (maxExecutors) then why wouldn't you want to use those extra
> resources?
>
>
>
>
>
>
> On Fri, Apr 9, 2021 at 6:03 AM Ranju Jain  wrote:
>
> Hi Attila,
>
>
>
> Thanks for your reply.
>
>
>
> If I talk about single job which started to run with minExecutors as *3*.
> And Suppose this job [*which reads the full data from backend and process
> and writes it to a location*]
>
> takes around 2 hour to complete.
>
>
>
> What I understood is, as the default value of
> spark.dynamicAllocation.schedulerBacklogTimeout is 1 sec, so executors will
> scale from *3* to *4* and then *8* after every second if tasks are
> pending at scheduler backend. So If I don’t want  it 1 sec and I might set
> it to 1 hour [3600 sec] in 2 hour of spark job.
>
>
>
> So this is all about when I want to scale executors dynamically for spark
> job. Is that understanding correct?
>
>
>
> In the below statement I don’t understand much about available partitions
> :-(
>
> *pending tasks (which kinda related to the available partitions)*
>
>
>
>
>
> Regards
>
> Ranju
>
>
>
>
>
> *From:* Attila Zsolt Piros 
> *Sent:* Friday, April 9, 2021 12:13 AM
> *To:* Ranju Jain 
> *Cc:* user@spark.apache.org
> *Subject:* Re: Dynamic Allocation Backlog Property in Spark on Kubernetes
>
>
>
> Hi!
>
> For dynamic allocation you do not need to run the Spark jobs in parallel.
> Dyna

Re: possible bug

2021-04-09 Thread Attila Zsolt Piros
Hi Sean!

So the "coalesce" without shuffle will create a CoalescedRDD which during
its computation delegates to the parent RDD partitions.
As the CoalescedRDD contains only 1 partition so we talk about 1 task and 1
task context.

The next stop is PythonRunner.

Here the python workers at least are reused (when
"spark.python.worker.reuse" is true, and true is the default) but the
MonitorThreads are not reused and what is worse all the MonitorThreads are
created for the same worker and same TaskContext.
This means the CoalescedRDD's 1 tasks should be completed to stop the first
monitor thread, relevant code:
https://github.com/apache/spark/blob/master/core/src/main/scala/org/apache/spark/api/python/PythonRunner.scala#L570

So this will lead to creating 7 extra threads when 1 would be enough.

The jira is: https://issues.apache.org/jira/browse/SPARK-35009
The PR will next week maybe (I am a bit uncertain as I have many other
things to do right now).

Best Regards,
Attila

On Fri, Apr 9, 2021 at 5:54 PM Sean Owen  wrote:

> Yeah I figured it's not something fundamental to the task or Spark. The
> error is very odd, never seen that. Do you have a theory on what's going on
> there? I don't!
>
> On Fri, Apr 9, 2021 at 10:43 AM Attila Zsolt Piros <
> piros.attila.zs...@gmail.com> wrote:
>
>> Hi!
>>
>> I looked into the code and find a way to improve it.
>>
>> With the improvement your test runs just fine:
>>
>> Welcome to
>>     __
>>  / __/__  ___ _/ /__
>> _\ \/ _ \/ _ `/ __/  '_/
>>/__ / .__/\_,_/_/ /_/\_\   version 3.2.0-SNAPSHOT
>>   /_/
>>
>> Using Python version 3.8.1 (default, Dec 30 2020 22:53:18)
>> Spark context Web UI available at http://192.168.0.199:4040
>> Spark context available as 'sc' (master = local, app id =
>> local-1617982367872).
>> SparkSession available as 'spark'.
>>
>> In [1]: import pyspark
>>
>> In [2]:
>> conf=pyspark.SparkConf().setMaster("local[64]").setAppName("Test1")
>>
>> In [3]: sc=pyspark.SparkContext.getOrCreate(conf)
>>
>> In [4]: rows=7
>>
>> In [5]: data=list(range(rows))
>>
>> In [6]: rdd=sc.parallelize(data,rows)
>>
>> In [7]: assert rdd.getNumPartitions()==rows
>>
>> In [8]: rdd0=rdd.filter(lambda x:False)
>>
>> In [9]: assert rdd0.getNumPartitions()==rows
>>
>> In [10]: rdd00=rdd0.coalesce(1)
>>
>> In [11]: data=rdd00.collect()
>> 21/04/09 17:32:54 WARN TaskSetManager: Stage 0 contains a task of very
>> large siz
>> e (4729 KiB). The maximum recommended task size is 1000 KiB.
>>
>> In [12]: assert data==[]
>>
>> In [13]:
>>
>>
>> I will create a jira and need to add some unittest before opening the PR.
>>
>> Best Regards,
>> Attila
>>
>>>


Re: possible bug

2021-04-09 Thread Attila Zsolt Piros
Hi!

I looked into the code and find a way to improve it.

With the improvement your test runs just fine:

Welcome to
    __
 / __/__  ___ _/ /__
_\ \/ _ \/ _ `/ __/  '_/
   /__ / .__/\_,_/_/ /_/\_\   version 3.2.0-SNAPSHOT
  /_/

Using Python version 3.8.1 (default, Dec 30 2020 22:53:18)
Spark context Web UI available at http://192.168.0.199:4040
Spark context available as 'sc' (master = local, app id =
local-1617982367872).
SparkSession available as 'spark'.

In [1]: import pyspark

In [2]:
conf=pyspark.SparkConf().setMaster("local[64]").setAppName("Test1")

In [3]: sc=pyspark.SparkContext.getOrCreate(conf)

In [4]: rows=7

In [5]: data=list(range(rows))

In [6]: rdd=sc.parallelize(data,rows)

In [7]: assert rdd.getNumPartitions()==rows

In [8]: rdd0=rdd.filter(lambda x:False)

In [9]: assert rdd0.getNumPartitions()==rows

In [10]: rdd00=rdd0.coalesce(1)

In [11]: data=rdd00.collect()
21/04/09 17:32:54 WARN TaskSetManager: Stage 0 contains a task of very
large siz
e (4729 KiB). The maximum recommended task size is 1000 KiB.

In [12]: assert data==[]

In [13]:


I will create a jira and need to add some unittest before opening the PR.

Best Regards,
Attila

On Fri, Apr 9, 2021 at 7:04 AM Weiand, Markus, NMA-CFD <
markus.wei...@bertelsmann.de> wrote:

> I’ve changed the code to set driver memory to 100g, changed python code:
>
> import pyspark
>
>
> conf=pyspark.SparkConf().setMaster("local[64]").setAppName("Test1").set(key="spark.driver.memory",
> value="100g")
>
> sc=pyspark.SparkContext.getOrCreate(conf)
>
> rows=7
>
> data=list(range(rows))
>
> rdd=sc.parallelize(data,rows)
>
> assert rdd.getNumPartitions()==rows
>
> rdd0=rdd.filter(lambda x:False)
>
> assert rdd0.getNumPartitions()==rows
>
> rdd00=rdd0.coalesce(1)
>
> data=rdd00.collect()
>
> assert data==[]
>
>
>
> Still the same error happens:
>
>
>
> 21/04/09 04:48:38 WARN TaskSetManager: Stage 0 contains a task of very
> large size (4732 KiB). The maximum recommended task size is 1000 KiB.
>
> OpenJDK 64-Bit Server VM warning: INFO:
> os::commit_memory(0x7f464355, 16384, 0) failed; error='Not enough
> space' (errno=12)
>
> [423.701s][warning][os,thread] Attempt to protect stack guard pages failed
> (0x7f4640d28000-0x7f4640d2c000).
>
> [423.701s][warning][os,thread] Attempt to deallocate stack guard pages
> failed.
>
> [423.704s][warning][os,thread] Failed to start thread - pthread_create
> failed (EAGAIN) for attributes: stacksize: 1024k, guardsize: 0k, detached.
>
> #
>
> # There is insufficient memory for the Java Runtime Environment to
> continue.
>
> # Native memory allocation (mmap) failed to map 16384 bytes for committing
> reserved memory.
>
>
>
> A function which needs 423 seconds to crash with excessive memory
> consumption when trying to coalesce 7 empty partitions is not very
> practical. As I do not know the limits in which coalesce without shuffling
> can be used safely and with performance, I will now always use coalesce
> with shuffling, even though in theory this will come with quite a
> performance decrease.
>
>
>
> Markus
>
>
>
> *Von:* Russell Spitzer 
> *Gesendet:* Donnerstag, 8. April 2021 15:24
> *An:* Weiand, Markus, NMA-CFD 
> *Cc:* user@spark.apache.org
> *Betreff:* Re: possible bug
>
>
>
> Could be that the driver JVM cannot handle the metadata required to store
> the partition information of a 70k partition RDD. I see you say you have a
> 100GB driver but i'm not sure where you configured that?
>
> Did you set --driver-memory 100G ?
>
>
>
> On Thu, Apr 8, 2021 at 8:08 AM Weiand, Markus, NMA-CFD <
> markus.wei...@bertelsmann.de> wrote:
>
> This is the reduction of an error in a complex program where allocated 100
> GB driver (=worker=executor as local mode) memory. In the example I used
> the default size, as the puny example shouldn’t need more anyway.
>
> And without the coalesce or with coalesce(1,True) everything works fine.
>
> I’m trying to coalesce an empty rdd with 7 partitions in an empty rdd
> with 1 partition, why is this a problem without shuffling?
>
>
>
> *Von:* Sean Owen 
> *Gesendet:* Donnerstag, 8. April 2021 15:00
> *An:* Weiand, Markus, NMA-CFD 
> *Cc:* user@spark.apache.org
> *Betreff:* Re: possible bug
>
>
>
> That's a very low level error from the JVM. Any chance you are
> misconfiguring the executor size? like to 10MB instead of 10GB, that kind
> of thing. Trying to think of why the JVM would have very little memory to
> operate.
>
> An app running out of mem would not look like this.
>
>
>
> On Thu, Apr 8, 2021 at 7:53 AM Weiand, Markus, NMA-CFD <
> markus.wei...@bertelsmann.de> wrote:
>
> Hi all,
>
>
>
> I'm using spark on a c5a.16xlarge machine in amazon cloud (so having  64
> cores and 128 GB RAM). I'm using spark 3.01.
>
>
>
> The following python code leads to an exception, is this a bug or is my
> understanding of the API incorrect?
>
>
>
>   

Re: Dynamic Allocation Backlog Property in Spark on Kubernetes

2021-04-08 Thread Attila Zsolt Piros
You should not set "spark.dynamicAllocation.schedulerBacklogTimeout" so
high and the purpose of this config is very different form the one you
would like to use it for.

The confusion I guess comes from the fact that you are still thinking in
multiple Spark jobs.


*But Dynamic Allocation is useful in case of a single Spark job, too.*With
Dynamic allocation if there are pending tasks then new resources should be
allocated to speed up the calculation.
If you do not have enough partitions then you do not have enough tasks to
run in parallel that was my earlier comment about.

So let's focus on your first job:
- With 3 executors it takes 2 hours to complete, right?
- And what about 8 executors?  I hope significantly less time.

So if you have more than 3 partitions and the tasks are meaningfully long
enough to request some extra resources (schedulerBacklogTimeout) and the
number of running executors are lower than the maximum number of executors
you set (maxExecutors) then why wouldn't you want to use those extra
resources?



On Fri, Apr 9, 2021 at 6:03 AM Ranju Jain  wrote:

> Hi Attila,
>
>
>
> Thanks for your reply.
>
>
>
> If I talk about single job which started to run with minExecutors as *3*.
> And Suppose this job [*which reads the full data from backend and process
> and writes it to a location*]
>
> takes around 2 hour to complete.
>
>
>
> What I understood is, as the default value of
> spark.dynamicAllocation.schedulerBacklogTimeout is 1 sec, so executors will
> scale from *3* to *4* and then *8* after every second if tasks are
> pending at scheduler backend. So If I don’t want  it 1 sec and I might set
> it to 1 hour [3600 sec] in 2 hour of spark job.
>
>
>
> So this is all about when I want to scale executors dynamically for spark
> job. Is that understanding correct?
>
>
>
> In the below statement I don’t understand much about available partitions
> :-(
>
> *pending tasks (which kinda related to the available partitions)*
>
>
>
>
>
> Regards
>
> Ranju
>
>
>
>
>
> *From:* Attila Zsolt Piros 
> *Sent:* Friday, April 9, 2021 12:13 AM
> *To:* Ranju Jain 
> *Cc:* user@spark.apache.org
> *Subject:* Re: Dynamic Allocation Backlog Property in Spark on Kubernetes
>
>
>
> Hi!
>
> For dynamic allocation you do not need to run the Spark jobs in parallel.
> Dynamic allocation simply means Spark scales up by requesting more
> executors when there are pending tasks (which kinda related to the
> available partitions) and scales down when the executor is idle (as within
> one job the number of partitions can fluctuate).
>
> But if you optimize for run time you can start those jobs in parallel at
> the beginning.
>
> In this case you will use higher number of executors even from the
> beginning.
>
> The "spark.dynamicAllocation.schedulerBacklogTimeout" is not for to
> schedule/synchronize different Spark jobs but it is about tasks.
>
> Best regards,
> Attila
>
>
>
> On Tue, Apr 6, 2021 at 1:59 PM Ranju Jain 
> wrote:
>
> Hi All,
>
>
>
> I have set dynamic allocation enabled while running spark on Kubernetes .
> But new executors are requested if pending tasks are backlogged for more
> than configured duration in property
> *“spark.dynamicAllocation.schedulerBacklogTimeout”*.
>
>
>
> My Use Case is:
>
>
>
> There are number of parallel jobs which might or might not run together at
> a particular point of time. E.g Only One Spark Job may run at a point of
> time or two spark jobs may run at a single point of time depending upon the
> need.
>
> I configured spark.dynamicAllocation.minExecutors as 3 and
> spark.dynamicAllocation.maxExecutors as 8 .
>
>
>
> Steps:
>
>1. SparkContext initialized with 3 executors and First Job requested.
>2. Now, if second job requested after few mins  (e.g 15 mins) , I am
>thinking if I can use the benefit of dynamic allocation and executor should
>scale up to handle second job tasks.
>
> For this I think *“spark.dynamicAllocation.schedulerBacklogTimeout”*
> needs to set after which new executors would be requested.
>
> *Problem: *Problem is there are chances that second job is not requested
> at all or may be requested after 10 mins or after 20 mins. How can I set a
> constant value for
>
> property *“spark.dynamicAllocation.schedulerBacklogTimeout” *to scale the
> executors , when tasks backlog is dependent upon the number of jobs
> requested.
>
>
>
> Regards
>
> Ranju
>
>


Re: Dynamic Allocation Backlog Property in Spark on Kubernetes

2021-04-08 Thread Attila Zsolt Piros
Hi!

For dynamic allocation you do not need to run the Spark jobs in parallel.
Dynamic allocation simply means Spark scales up by requesting more
executors when there are pending tasks (which kinda related to the
available partitions) and scales down when the executor is idle (as within
one job the number of partitions can fluctuate).

But if you optimize for run time you can start those jobs in parallel at
the beginning.
In this case you will use higher number of executors even from the
beginning.

The "spark.dynamicAllocation.schedulerBacklogTimeout" is not for to
schedule/synchronize different Spark jobs but it is about tasks.

Best regards,
Attila

On Tue, Apr 6, 2021 at 1:59 PM Ranju Jain 
wrote:

> Hi All,
>
>
>
> I have set dynamic allocation enabled while running spark on Kubernetes .
> But new executors are requested if pending tasks are backlogged for more
> than configured duration in property
> *“spark.dynamicAllocation.schedulerBacklogTimeout”*.
>
>
>
> My Use Case is:
>
>
>
> There are number of parallel jobs which might or might not run together at
> a particular point of time. E.g Only One Spark Job may run at a point of
> time or two spark jobs may run at a single point of time depending upon the
> need.
>
> I configured spark.dynamicAllocation.minExecutors as 3 and
> spark.dynamicAllocation.maxExecutors as 8 .
>
>
>
> Steps:
>
>1. SparkContext initialized with 3 executors and First Job requested.
>2. Now, if second job requested after few mins  (e.g 15 mins) , I am
>thinking if I can use the benefit of dynamic allocation and executor should
>scale up to handle second job tasks.
>
> For this I think *“spark.dynamicAllocation.schedulerBacklogTimeout”*
> needs to set after which new executors would be requested.
>
> *Problem: *Problem is there are chances that second job is not requested
> at all or may be requested after 10 mins or after 20 mins. How can I set a
> constant value for
>
> property *“spark.dynamicAllocation.schedulerBacklogTimeout” *to scale the
> executors , when tasks backlog is dependent upon the number of jobs
> requested.
>
>
>
> Regards
>
> Ranju
>


Re: unit testing for spark code

2021-03-22 Thread Attila Zsolt Piros
Hi!

Let me draw your attention to Holden's* spark-testing-base* project.
The documentation is at  https://github.com/holdenk/spark-testing-base/wiki.

As I usually write test for spark internal features I haven't needed to
test so high level.
But I am interested about your experiences.

Best regards,
Attila

On Mon, Mar 22, 2021 at 4:34 PM Nicholas Gustafson 
wrote:

> I've found pytest works well if you're using PySpark. Though if you have a
> lot of tests, running them all can be pretty slow.
>
> On Mon, Mar 22, 2021 at 6:32 AM Amit Sharma  wrote:
>
>> Hi, can we write unit tests for spark code. Is there any specific
>> framework?
>>
>>
>> Thanks
>> Amit
>>
>


Re: Can JVisual VM monitoring tool be used to Monitor Spark Executor Memory and CPU

2021-03-21 Thread Attila Zsolt Piros
Hi Ranju!

I am quite sure for your requirement "monitor every component and isolate
the resources consuming individually by every component" Spark metrics is
the right direction to go.

> Why only UsedstorageMemory should be checked?

Right, for you only storage memory won't be enough you need the system and
the execution memory too.
I expect ".JVMHeapMemory" and ".JVMOffHeapMemory" is what you looking for.

> Also I noticed cpuTime provides cpu time spent by an executor. But there
is no metric by which I can calculate the number of cores.


Number of cores is specified by the Spark submit. IIRC if you pass 3 it
means that each executor can run a maximum of 3 tasks at the same time.
So all these cores will be used if there is enough tasks. I know this is
not perfect solution but I hope it helps.

> Also I see Grafana, a very good visualization tool where I see all the
metrics can be viewed , but I have less idea for steps to install on
virtual server and integrate.

I cannot help in this with specifics but a monitoring system is a good idea
either Grafana or Prometheus.

Best regards,
Attila

On Sun, Mar 21, 2021 at 3:01 PM Ranju Jain  wrote:

> Hi Mich/Attila,
>
>
>
> @Mich Talebzadeh : I considered spark GUI ,
> but I have a confusion first at memory level.
>
>
>
> App Configuration: spark.executor.memory= 4g for running spark job.
>
>
>
> In spark GUI I see running spark job has Peak Execution Memory is 1 KB as
> highlighted below:
>
> I do not have Storage Memory screenshot. So  I calculated Total Memory
> consumption at that point of time was:
>
>
>
> Spark UI shows :  spark.executor.memory= Peak Execution Memory + Storage
> Mem + Reserved Mem + User Memory
>
>
>   = 1 Kb + Storage Mem + 300 Mb + (4g *0.25)
>
>
>= 1 Kb + Storage Mem + 300 Mb + 1g
>
>
>   = Approx 1.5 g
>
>
>
>
>
>
>
> And if I see Executor 0,1,2 actual memory consumption on virtual server
> using *top * commnd , it shows below reading:
>
>
>
> Executor – 2:   *top*
>
>
>
>
>
> Executor-0 :*top*
>
>
>
> Please suggest On Spark GUI, Can I go with below formula to isolate that
> how much spark component is consuming  memory out of several other
> components of a Web application.
>
>   spark.executor.memory= Peak Execution Memory + Storage Mem + Reserved
> Mem + User Memory
>
>   = 1 Kb + Storage Mem +
> 300 Mb + (4g *0.25)
>
>
>
>
>
> @Attila Zsolt Piros : I checked the
> *memoryMetrics.** of executor-metrics
> <https://spark.apache.org/docs/3.0.0-preview/monitoring.html#executor-metrics>,
> but here I have a confusion about
>
> usedOnHeapStorageMemory
>
> usedOffHeapStorageMemory
>
> totalOnHeapStorageMemory
>
> totalOffHeapStorageMemory
>
>
>
> *Why only UsedstorageMemory should be checked?*
>
>
>
> To isolate spark.executor.memory, Should I check *memoryMetrics**.**
> where *only storageMemory* is given  or Should I check *peakMemoryMetrics*.*
> where all Peaks are specified
>
>1. Execution
>2. Storage
>3. JVM Heap
>
>
>
> Also I noticed cpuTime provides cpu time spent by an executor. But there
> is no metric by which I can calculate the number of cores.
>
>
>
> As suggested, I checked Luca Canali’s presentation, there I see JMXSink
> which Registers metrics for viewing in JMX Console. I think exposing this
> metric via JMXSink take it to visualize
>
> spark.executor.memory and number of cores by an executor on Java
> Monitoring tool.
>
> Also I see Grafana, a very good visualization tool where I see all the
> metrics can be viewed , but I have less idea for steps to install on
> virtual server and integrate. I need to go through in detail the Grafana.
>
>
>
> Kindly suggest your views.
>
>
>
> Regards
>
> Ranju
>
>
>
> *From:* Attila Zsolt Piros 
> *Sent:* Sunday, March 21, 2021 3:42 AM
> *To:* Mich Talebzadeh 
> *Cc:* Ranju Jain ; user@spark.apache.org
> *Subject:* Re: Can JVisual VM monitoring tool be used to Monitor Spark
> Executor Memory and CPU
>
>
>
> Hi Ranju!
>
> You can configure Spark's metric system.
>
> Check the *memoryMetrics.** of executor-metrics
> <https://spark.apache.org/docs/3.0.0-preview/monitoring.html#executor-metrics>
>  and
> in the component-instance-executor
> <https://spark.apache.org/docs/3.0.0-preview/monitoring.html#component-instance--executor>
>  the
> CPU times.
>
> Regarding the details I suggest to check Luca Canali's presentations about
> Spark's metric system and maybe his github repo
> <https://protect

Re: Spark version verification

2021-03-21 Thread Attila Zsolt Piros
Hi!

Thanks Sean and Kent! By reading your answers I have also learnt something
new.

@Mich Talebzadeh : see the commit  content by
prefixing it with *https://github.com/apache/spark/commit/
<https://github.com/apache/spark/commit/>*.
So in your case
https://github.com/apache/spark/commit/1d550c4e90275ab418b9161925049239227f3dc9

Best Regards,
Attila

On Sun, Mar 21, 2021 at 5:02 PM Mich Talebzadeh 
wrote:

>
> Hi Kent,
>
> Thanks for the links.
>
> You have to excuse my ignorance, what are the correlations among these
> links and the ability to establish a spark build version?
>
>
>view my Linkedin profile
> <https://www.linkedin.com/in/mich-talebzadeh-ph-d-5205b2/>
>
>
>
> *Disclaimer:* Use it at your own risk. Any and all responsibility for any
> loss, damage or destruction of data or any other property which may arise
> from relying on this email's technical content is explicitly disclaimed.
> The author will in no case be liable for any monetary damages arising from
> such loss, damage or destruction.
>
>
>
>
> On Sun, 21 Mar 2021 at 15:55, Kent Yao  wrote:
>
>> Please refer to
>> http://spark.apache.org/docs/latest/api/sql/index.html#version
>>
>> *Kent Yao *
>> @ Data Science Center, Hangzhou Research Institute, NetEase Corp.
>> *a spark enthusiast*
>> *kyuubi <https://github.com/yaooqinn/kyuubi>is a
>> unified multi-tenant JDBC interface for large-scale data processing and
>> analytics, built on top of Apache Spark <http://spark.apache.org/>.*
>> *spark-authorizer <https://github.com/yaooqinn/spark-authorizer>A Spark
>> SQL extension which provides SQL Standard Authorization for **Apache
>> Spark <http://spark.apache.org/>.*
>> *spark-postgres <https://github.com/yaooqinn/spark-postgres> A library
>> for reading data from and transferring data to Postgres / Greenplum with
>> Spark SQL and DataFrames, 10~100x faster.*
>> *spark-func-extras <https://github.com/yaooqinn/spark-func-extras>A
>> library that brings excellent and useful functions from various modern
>> database management systems to Apache Spark <http://spark.apache.org/>.*
>>
>>
>>
>> On 03/21/2021 23:28,Mich Talebzadeh
>>  wrote:
>>
>> Many thanks
>>
>> spark-sql> SELECT version();
>> 3.1.1 1d550c4e90275ab418b9161925049239227f3dc9
>>
>> What does 1d550c4e90275ab418b9161925049239227f3dc9 signify please?
>>
>>
>>
>>
>>view my Linkedin profile
>> <https://www.linkedin.com/in/mich-talebzadeh-ph-d-5205b2/>
>>
>>
>>
>> *Disclaimer:* Use it at your own risk. Any and all responsibility for
>> any loss, damage or destruction of data or any other property which may
>> arise from relying on this email's technical content is explicitly
>> disclaimed. The author will in no case be liable for any monetary damages
>> arising from such loss, damage or destruction.
>>
>>
>>
>>
>> On Sun, 21 Mar 2021 at 15:14, Sean Owen  wrote:
>>
>>> I believe you can "SELECT version()" in Spark SQL to see the build
>>> version.
>>>
>>> On Sun, Mar 21, 2021 at 4:41 AM Mich Talebzadeh <
>>> mich.talebza...@gmail.com> wrote:
>>>
>>>> Thanks for the detailed info.
>>>>
>>>> I was hoping that one can find a simpler answer to the Spark version
>>>> than doing forensic examination on base code so to speak.
>>>>
>>>> The primer for this verification is that on GCP dataprocs originally
>>>> built on 3.11-rc2, there was an issue with running Spark Structured
>>>> Streaming (SSS) which I reported to this forum before.
>>>>
>>>> After a while and me reporting to Google, they have now upgraded the
>>>> base to Spark 3.1.1 itself. I am not privy to how they did the upgrade
>>>> itself.
>>>>
>>>> In the meantime we installed 3.1.1 on-premise and ran it with the same
>>>> Python code for SSS. It worked fine.
>>>>
>>>> However, when I run the same code on GCP dataproc upgraded to 3.1.1,
>>>> occasionally I see this error
>>>>
>>>> 21/03/18 16:53:38 ERROR org.apache.spark.scheduler.AsyncEventQueue:
>>>> Listener EventLoggingListener threw an exception
>>>>
>>>> java.util.ConcurrentModificationException
>>>>
>>>> at java.util.Hashtable$Enumerator.next(Hashtable.java:1387)
>>>>
>>>> This may be for other reasons or the consequence of upgradin

Re: Spark saveAsTextFile Disk Recommendation

2021-03-21 Thread Attila Zsolt Piros
Hi!

I would like to reflect only to the first part of your mail:

I have a large RDD dataset of around 60-70 GB which I cannot send to driver
> using *collect* so first writing that to disk using  *saveAsTextFile* and
> then this data gets saved in the form of multiple part files on each node
> of the cluster and after that driver reads the data from that storage.


What is your use case here?

As you mention *collect()* I can assume you have to process the data
outside of Spark maybe with a 3rd party tool, isn't it?

If you have 60-70 GB of data and you write it to text file then read it
back within the same application then you still cannot call *collect()* on
it as it is still 60-70GB data, right?

On the other hand is your data really just a collection of strings without
any repetitions? I ask this because of the fileformat you are using: text
file. Even for text file at least you can pass a compression codec as the
2nd argument of *saveAsTextFile()*

(when
you use this link you might need to scroll up a little bit.. at least my
chrome displays the the *saveAsTextFile* method without the 2nd arg codec).
As IO is slow a compressed data could be read back quicker: as there will
be less data in the disk. Check the Snappy
 codec for example.

But if there is a structure of your data and you have plan to process this
data further within Spark then please consider something way better: a columnar
storage format namely ORC or Parquet.

Best Regards,
Attila


On Sun, Mar 21, 2021 at 3:40 AM Ranju Jain 
wrote:

> Hi All,
>
>
>
> I have a large RDD dataset of around 60-70 GB which I cannot send to
> driver using *collect* so first writing that to disk using
> *saveAsTextFile* and then this data gets saved in the form of multiple
> part files on each node of the cluster and after that driver reads the data
> from that storage.
>
>
>
> I have a question like *spark.local.dir* is the directory which is used
> as a scratch space where mapoutputs files and RDDs might need to write by
> spark for shuffle operations etc.
>
> And there it is strongly recommended to use *local and fast disk *to
> avoid any failure or performance impact.
>
>
>
> *Do we have any such recommendation for storing multiple part files of
> large dataset [ or Big RDD ] in fast disk?*
>
> This will help me to configure the write type of disk for resulting part
> files.
>
>
>
> Regards
>
> Ranju
>


Re: Spark version verification

2021-03-20 Thread Attila Zsolt Piros
Hi!

I would check out the Spark source then diff those two RCs (first just take
look to the list of the changed files):

$ git diff v3.1.1-rc1..v3.1.1-rc2 --stat
...

The shell scripts in the release can be checked very easily:

$ git diff v3.1.1-rc1..v3.1.1-rc2 --stat | grep ".sh "
 bin/docker-image-tool.sh   |   6 +-
 dev/create-release/release-build.sh|   2 +-

We are lucky as *docker-image-tool.sh* is part of the released version.
Is it from v3.1.1-rc2 or v3.1.1-rc1?

Of course this only works if docker-image-tool.sh is not changed from
the v3.1.1-rc2 back to v3.1.1-rc1.
So let's continue with the python (and latter with R) files:

$ git diff v3.1.1-rc1..v3.1.1-rc2 --stat | grep ".py "
 python/pyspark/sql/avro/functions.py   |   4 +-
 python/pyspark/sql/dataframe.py|   1 +
 python/pyspark/sql/functions.py| 285 +--
 .../pyspark/sql/tests/test_pandas_cogrouped_map.py |  12 +
 python/pyspark/sql/tests/test_pandas_map.py|   8 +
...

After you have enough proof you can stop (to decide what is enough here
should be decided by you).
Finally you can use javap / scalap on the classes from the jars and check
some code changes which is more harder to be analyzed than a simple text
file.

Best Regards,
Attila


On Thu, Mar 18, 2021 at 4:09 PM Mich Talebzadeh 
wrote:

> Hi
>
> What would be a signature in Spark version or binaries that confirms the
> release is built on Spark built on 3.1.1 as opposed to 3.1.1-RC-1 or RC-2?
>
> Thanks
>
> Mich
>
>
>view my Linkedin profile
> 
>
>
>
> *Disclaimer:* Use it at your own risk. Any and all responsibility for any
> loss, damage or destruction of data or any other property which may arise
> from relying on this email's technical content is explicitly disclaimed.
> The author will in no case be liable for any monetary damages arising from
> such loss, damage or destruction.
>
>
>


Re: Can JVisual VM monitoring tool be used to Monitor Spark Executor Memory and CPU

2021-03-20 Thread Attila Zsolt Piros
Hi Ranju!

You can configure Spark's metric system.

Check the *memoryMetrics.** of executor-metrics

and
in the component-instance-executor

the
CPU times.

Regarding the details I suggest to check Luca Canali's presentations about
Spark's metric system and maybe his github repo
.

Best Regards,
Attila

On Sat, Mar 20, 2021 at 5:41 PM Mich Talebzadeh 
wrote:

> Hi,
>
> Have you considered spark GUI first?
>
>
>view my Linkedin profile
> 
>
>
>
> *Disclaimer:* Use it at your own risk. Any and all responsibility for any
> loss, damage or destruction of data or any other property which may arise
> from relying on this email's technical content is explicitly disclaimed.
> The author will in no case be liable for any monetary damages arising from
> such loss, damage or destruction.
>
>
>
>
> On Sat, 20 Mar 2021 at 16:06, Ranju Jain 
> wrote:
>
>> Hi All,
>>
>>
>>
>> Virtual Machine running an application, this application is having
>> various other 3PPs components running such as spark, database etc .
>>
>>
>>
>> *My requirement is to monitor every component and isolate the resources
>> consuming individually by every component.*
>>
>>
>>
>> I am thinking of using a common tool such as Java Visual VM , where I
>> specify the JMX URL of every component and monitor every component.
>>
>>
>>
>> For other components I am able to view their resources.
>>
>>
>>
>> *Is there a possibility of Viewing the Spark Executor CPU/Memory via Java
>> Visual VM Tool?*
>>
>>
>>
>> Please guide.
>>
>>
>>
>> Regards
>>
>> Ranju
>>
>


Re: Coalesce vs reduce operation parameter

2021-03-20 Thread Attila Zsolt Piros
Hi!

Actually *coalesce()* is usually a cheap operation as it moves some
existing partitions from one node to another. So it is not a (full) shuffle.

See the documentation
,
especially:

This results in a narrow dependency, e.g. if you go from 1000 partitions to
> 100 partitions,* there will not be a shuffle*, instead each of the 100
> new partitions will claim 10 of the current partitions.


 The *repartition() *is the expensive method.

Regarding Pedro's problem for sure *RDD.reduceByKey(func,
number).saveAsTextFile()* is expected to be better but the hours vs 2
minutes sounds really bad.
What is the number of partitions you are going from and what is the target
number of partitions (the *number *in your example)?

Probably you should compare the stages tab and stage details on the UI. So
if you need the community help please share the event logs of the two runs
and the applications logs might be needed too (the event log and
applications log must be from the same run for both cases).

Best Regards,
Attila

On Sat, Mar 20, 2021 at 12:46 PM vaquar khan  wrote:

> HI Pedro,
>
> What is your usecase ,why you used coqlesce ,coalesce() is very expensive
> operations as they shuffle the data across many partitions hence try to
> minimize repartition as much as possible.
>
> Regards,
> Vaquar khan
>
>
> On Thu, Mar 18, 2021, 5:47 PM Pedro Tuero  wrote:
>
>> I was reviewing a spark java application running on aws emr.
>>
>> The code was like:
>> RDD.reduceByKey(func).coalesce(number).saveAsTextFile()
>>
>> That stage took hours to complete.
>> I changed to:
>> RDD.reduceByKey(func, number).saveAsTextFile()
>> And it now takes less than 2 minutes, and the final output is the same.
>>
>> So, is it a bug or a feature?
>> Why spark doesn't treat a coalesce after a reduce like a reduce with
>> output partitions parameterized?
>>
>> Just for understanding,
>> Thanks,
>> Pedro.
>>
>>
>>
>>


Re: Coalesce vs reduce operation parameter

2021-03-20 Thread Attila Zsolt Piros
Hi!

Actually *coalesce()* is usually a cheap operation as it moves some
existing partitions from one node to another. So it is not a (full) shuffle.

See the documentation coalesce is a cheap operation as
it moves some existing partitions from one node to another. So it is not a
full shuffle. See https://spark.apache.org/docs/3.1.1/api/scala/org/apache/spark/rdd/RDD.html#coalesce(numPartitions:Int,shuffle:Boolean,partitionCoalescer:Option[org.apache.spark.rdd.PartitionCoalescer])(implicitord:Ordering[T]):org.apache.spark.rdd.RDD[T]">https://spark.apache.org/docs/3.1.1/api/scala/org/apache/spark/rdd/RDD.html#coalesce(numPartitions:Int,shuffle:Boolean,partitionCoalescer:Option[org.apache.spark.rdd.PartitionCoalescer])(implicitord:Ordering[T]):org.apache.spark.rdd.RDD[T]>,
especially:

This results in a narrow dependency, e.g. if you go from 1000 partitions to
> 100 partitions,* there will not be a shuffle*, instead each of the 100
> new partitions will claim 10 of the current partitions.


 The *repartition() *is the expensive method.

Regarding Pedro's problem for sure *RDD.reduceByKey(func,
number).saveAsTextFile()* is expected to be better but the hours vs 2
minutes sounds really bad.
What is the number of partitions you are going from and what is the target
number of partitions (the *number *in your example)?

Probably you should compare the stages tab and stage details on the UI. So
if you need the community help please share the event logs of the two runs
and the applications logs might be needed too (the event log and
applications log must be from the same run for both cases).

Best Regards,
Attila


On Sat, Mar 20, 2021 at 12:46 PM vaquar khan  wrote:

> HI Pedro,
>
> What is your usecase ,why you used coqlesce ,coalesce() is very expensive
> operations as they shuffle the data across many partitions hence try to
> minimize repartition as much as possible.
>
> Regards,
> Vaquar khan
>
>
> On Thu, Mar 18, 2021, 5:47 PM Pedro Tuero  wrote:
>
>> I was reviewing a spark java application running on aws emr.
>>
>> The code was like:
>> RDD.reduceByKey(func).coalesce(number).saveAsTextFile()
>>
>> That stage took hours to complete.
>> I changed to:
>> RDD.reduceByKey(func, number).saveAsTextFile()
>> And it now takes less than 2 minutes, and the final output is the same.
>>
>> So, is it a bug or a feature?
>> Why spark doesn't treat a coalesce after a reduce like a reduce with
>> output partitions parameterized?
>>
>> Just for understanding,
>> Thanks,
>> Pedro.
>>
>>
>>
>>


Re: How default partitioning in spark is deployed

2021-03-16 Thread Attila Zsolt Piros
Oh, sure that was the reason. You can keep using the `foreachPartition` and
get the partition ID from the `TaskContext`:

scala> import org.apache.spark.TaskContext
import org.apache.spark.TaskContext

scala> myRDD.foreachPartition( e => {  println(TaskContext.getPartitionId +
":" + e.mkString(",")) } )
0:
1:
2:Animal(1,Lion)
3:
4:Animal(2,Elephant)
5:
6:
7:Animal(3,Jaguar)
8:
9:Animal(4,Tiger)
10:
11:Animal(5,Chetah)

scala>




On Tue, Mar 16, 2021 at 2:38 PM German Schiavon 
wrote:

> Hi all,
>
> I guess you could do something like this too:
>
> [image: Captura de pantalla 2021-03-16 a las 14.35.46.png]
>
> On Tue, 16 Mar 2021 at 13:31, Renganathan Mutthiah <
> renganatha...@gmail.com> wrote:
>
>> Hi Attila,
>>
>> Thanks for looking into this!
>>
>> I actually found the issue and it turned out to be that the print
>> statements misled me. The records are indeed stored in different partitions.
>> What happened is since the foreachpartition method is run parallelly by
>> different threads, they all printed the first line almost at the same time
>> and followed by data which is also printed at almost the same time. This
>> has given an appearance that all the data is stored in a single partition.
>> When I run the below code, I can see that the objects are stored in
>> different partitions of course!
>>
>> *myRDD.mapPartitionsWithIndex( (index,itr) => { itr.foreach( e =>
>> println("Index : " +index +" " + e)); itr}, true).collect()*
>>
>> Prints the below... (index: ?  the ? is actually the partition number)
>> *Index : 9 Animal(4,Tiger) Index : 4 Animal(2,Elephant) Index : 11
>> Animal(5,Chetah) Index : 2 Animal(1,Lion) Index : 7 Animal(3,Jaguar) *
>>
>> Thanks!
>>
>> On Tue, Mar 16, 2021 at 3:06 PM Attila Zsolt Piros <
>> piros.attila.zs...@gmail.com> wrote:
>>
>>> Hi!
>>>
>>> This is weird. The code of foreachPartition
>>> <https://github.com/apache/spark/blob/master/core/src/main/scala/org/apache/spark/SparkContext.scala#L801-L806>
>>>  leads
>>> to ParallelCollectionRDD
>>> <https://github.com/apache/spark/blob/master/core/src/main/scala/org/apache/spark/rdd/ParallelCollectionRDD.scala#L84-L107>
>>>  which
>>> ends in slice
>>> <https://github.com/apache/spark/blob/f643bd96593dc411cb0cca1c7a3f28f93765c9b6/core/src/main/scala/org/apache/spark/rdd/ParallelCollectionRDD.scala#L116-L155>,
>>> where the most important part is the *positions* method:
>>>
>>>  def positions(length: Long, numSlices: Int): Iterator[(Int, Int)] = {
>>>  (0 until numSlices).iterator.map { i =>
>>> val start = ((i * length) / numSlices).toInt
>>> val end = (((i + 1) * length) / numSlices).toInt
>>> (start, end)
>>>  }
>>>  }
>>>
>>> Because of the extra ' (' you used in "*parallelize( (Array*" I thought
>>> some scala implicit might generate a Seq with one Array in it.
>>> But in that case your output would contain an Array. So this must be not
>>> the case.
>>>
>>> 1) What Spark/Scala version you are using? on what OS?
>>>
>>> 2)  Can you reproduce this issue in the spark-shell?
>>>
>>> scala> case class Animal(id:Int, name:String)
>>> defined class Animal
>>>
>>> scala> val myRDD = sc.parallelize( (Array( Animal(1, "Lion"),
>>> Animal(2,"Elephant"), Animal(3,"Jaguar"), Animal(4,"
>>> Tiger"), Animal(5, "Chetah") ) ), 12)
>>> myRDD: org.apache.spark.rdd.RDD[Animal] = ParallelCollectionRDD[2] at
>>> parallelize at :27
>>>
>>> scala> myRDD.foreachPartition( e => { println("--");
>>> e.foreach(println) } )
>>> --
>>> --
>>> --
>>> Animal(1,Lion)
>>> --
>>> --
>>> Animal(2,Elephant)
>>> --
>>> --
>>> --
>>> Animal(3,Jaguar)
>>> --
>>> --
>>> Animal(4,Tiger)
>>> --
>>> --
>>> Animal(5,Chetah)
>>>
>>> scala> Console println myRDD.getNumPartitions
>>> 12
>>>
>>> 3) Can you please check spark-shell what happens when you paste the
>>> above method and call it like:
>>>
>>> scala> def positions(length: Long, numSlices: Int): Iterator[(Int, Int)]
>>> = {
>>> 

Re: Spark streaming giving error for version 2.4

2021-03-16 Thread Attila Zsolt Piros
Hi!

I am just guessing here (as Gabor said before we need more information /
logs):
But is it possible Renu that you just upgraded one single jar?

Best Regards,
Attila

On Tue, Mar 16, 2021 at 11:31 AM Gabor Somogyi 
wrote:

> Well, this is not much. Please provide driver and executor logs...
>
> G
>
>
> On Tue, Mar 16, 2021 at 6:03 AM Renu Yadav  wrote:
>
>> Hi Team,
>>
>>
>> I have upgraded my spark streaming from 2.2 to 2.4 but getting below
>> error:
>>
>>
>> spark-streaming-kafka_0-10.2.11_2.4.0
>>
>>
>> scala 2.11
>>
>>
>> Any Idea?
>>
>>
>>
>> main" java.lang.AbstractMethodError
>>
>> at
>> org.apache.spark.util.ListenerBus$class.$init$(ListenerBus.scala:34)
>>
>> at
>> org.apache.spark.streaming.scheduler.StreamingListenerBus.(StreamingListenerBus.scala:30)
>>
>> at
>> org.apache.spark.streaming.scheduler.JobScheduler.(JobScheduler.scala:57)
>>
>> at
>> org.apache.spark.streaming.StreamingContext.(StreamingContext.scala:184)
>>
>> at
>> org.apache.spark.streaming.StreamingContext.(StreamingContext.scala:85)
>>
>>
>> Thanks & Regards,
>>
>> Renu Yadav
>>
>>


Re: How default partitioning in spark is deployed

2021-03-16 Thread Attila Zsolt Piros
Hi!

This is weird. The code of foreachPartition

leads
to ParallelCollectionRDD

which
ends in slice
,
where the most important part is the *positions* method:

 def positions(length: Long, numSlices: Int): Iterator[(Int, Int)] = {
 (0 until numSlices).iterator.map { i =>
val start = ((i * length) / numSlices).toInt
val end = (((i + 1) * length) / numSlices).toInt
(start, end)
 }
 }

Because of the extra ' (' you used in "*parallelize( (Array*" I thought
some scala implicit might generate a Seq with one Array in it.
But in that case your output would contain an Array. So this must be not
the case.

1) What Spark/Scala version you are using? on what OS?

2)  Can you reproduce this issue in the spark-shell?

scala> case class Animal(id:Int, name:String)
defined class Animal

scala> val myRDD = sc.parallelize( (Array( Animal(1, "Lion"),
Animal(2,"Elephant"), Animal(3,"Jaguar"), Animal(4,"
Tiger"), Animal(5, "Chetah") ) ), 12)
myRDD: org.apache.spark.rdd.RDD[Animal] = ParallelCollectionRDD[2] at
parallelize at :27

scala> myRDD.foreachPartition( e => { println("--");
e.foreach(println) } )
--
--
--
Animal(1,Lion)
--
--
Animal(2,Elephant)
--
--
--
Animal(3,Jaguar)
--
--
Animal(4,Tiger)
--
--
Animal(5,Chetah)

scala> Console println myRDD.getNumPartitions
12

3) Can you please check spark-shell what happens when you paste the above
method and call it like:

scala> def positions(length: Long, numSlices: Int): Iterator[(Int, Int)] = {
 |   (0 until numSlices).iterator.map { i =>
 | val start = ((i * length) / numSlices).toInt
 |   val end = (((i + 1) * length) / numSlices).toInt
 |   (start, end)
 |   }
 | }
positions: (length: Long, numSlices: Int)Iterator[(Int, Int)]

scala> positions(5, 12).foreach(println)
(0,0)
(0,0)
(0,1)
(1,1)
(1,2)
(2,2)
(2,2)
(2,3)
(3,3)
(3,4)
(4,4)
(4,5)

As you can see in my case the `positions` result consistent with the
`foreachPartition`
and this should be deterministic.

Best regards,
Attila


On Tue, Mar 16, 2021 at 5:34 AM Renganathan Mutthiah <
renganatha...@gmail.com> wrote:

> Hi,
>
> I have a question with respect to default partitioning in RDD.
>
>
>
>
> *case class Animal(id:Int, name:String)   val myRDD =
> session.sparkContext.parallelize( (Array( Animal(1, "Lion"),
> Animal(2,"Elephant"), Animal(3,"Jaguar"), Animal(4,"Tiger"), Animal(5,
> "Chetah") ) ))Console println myRDD.getNumPartitions  *
>
> I am running the above piece of code in my laptop which has 12 logical
> cores.
> Hence I see that there are 12 partitions created.
>
> My understanding is that hash partitioning is used to determine which
> object needs to go to which partition. So in this case, the formula would
> be: hashCode() % 12
> But when I further examine, I see all the RDDs are put in the last
> partition.
>
> *myRDD.foreachPartition( e => { println("--"); e.foreach(println)
> } )*
>
> Above code prints the below(first eleven partitions are empty and the last
> one has all the objects. The line is separate the partition contents):
> --
> --
> --
> --
> --
> --
> --
> --
> --
> --
> --
> --
> Animal(2,Elephant)
> Animal(4,Tiger)
> Animal(3,Jaguar)
> Animal(5,Chetah)
> Animal(1,Lion)
>
> I don't know why this happens. Can you please help.
>
> Thanks!
>


Re: spark on k8s driver pod exception

2021-03-15 Thread Attila Zsolt Piros
Sure, that is expected, see the "How it works" section in "Running Spark on
Kubernetes" page
<https://spark.apache.org/docs/3.1.1/running-on-kubernetes.html#how-it-works>,
quote:

When the application completes, the executor pods terminate and are cleaned
> up, but the driver pod persists logs and remains in “completed” state in
> the Kubernetes API until it’s eventually garbage collected or manually
> cleaned up.



On Mon, Mar 15, 2021 at 8:45 AM 040840219  wrote:

>
> when driver pod throws exception ,  driver pod still running   ?
>
> kubectl logs  wordcount-e3141c7834d3dd68-driver
>
> 21/03/15 07:40:19 DEBUG Analyzer$ResolveReferences: Resolving 'value1 to
> 'value1
> Exception in thread "main" org.apache.spark.sql.AnalysisException: cannot
> resolve '`value1`' given input columns: [key, value];
> 'Aggregate [key#6], [key#6, count('value1) AS cnt#14]
> +- Project [(id#4 % 5) AS key#6, (id#4 % 10) AS value#7]
>+- Project [value#1 AS id#4]
>   +- LocalRelation [value#1]
>
> at
> org.apache.spark.sql.catalyst.analysis.package$AnalysisErrorAt.failAnalysis(package.scala:42)
> at
> org.apache.spark.sql.catalyst.analysis.CheckAnalysis$$anonfun$$nestedInanonfun$checkAnalysis$1$2.applyOrElse(CheckAnalysis.scala:155)
> at
> org.apache.spark.sql.catalyst.analysis.CheckAnalysis$$anonfun$$nestedInanonfun$checkAnalysis$1$2.applyOrElse(CheckAnalysis.scala:152)
> at
> org.apache.spark.sql.catalyst.trees.TreeNode.$anonfun$transformUp$2(TreeNode.scala:341)
> at
> org.apache.spark.sql.catalyst.trees.CurrentOrigin$.withOrigin(TreeNode.scala:73)
>
> kubectl get pods wordcount-e3141c7834d3dd68-driver
>
> NAMEREADY   STATUSRESTARTS   AGE
> wordcount-e3141c7834d3dd68-driver   1/1 Running   0  2m58s
>
> On 03/12/2021 05:42,Attila Zsolt Piros
>  wrote:
>
> > but  the spark-submit log still  running
>
> Set the "spark.kubernetes.submission.waitAppCompletion" config to false to
> change that. As the doc says:
>
> "spark.kubernetes.submission.waitAppCompletion" : In cluster mode, whether
> to wait for the application to finish before exiting the launcher process.
> When changed to false, the launcher has a "fire-and-forget" behavior when
> launching the Spark job.
>
> On Thu, Mar 11, 2021 at 10:05 PM Attila Zsolt Piros <
> piros.attila.zs...@gmail.com> wrote:
>
>>
>> For getting the logs please read Accessing Logs
>> <https://spark.apache.org/docs/3.1.1/running-on-kubernetes.html#accessing-logs>
>>  part
>> of the *Running Spark on Kubernetes* page.
>>
>> For stopping and generic management of the spark application please read
>> the Spark Application Management
>> <https://spark.apache.org/docs/3.1.1/running-on-kubernetes.html#spark-application-management>,
>> where you find the example:
>>
>> $ spark-submit --kill spark:spark-pi* --master  
>> k8s://https://192.168.2.8:8443
>>
>>
>>
>> On Thu, Mar 11, 2021 at 1:07 PM yxl040840219 
>> wrote:
>>
>>>
>>>
>>>
>>> when run the code in k8s ,  driver pod throw AnalysisException , but
>>>  the spark-submit log still  running , then how to get the exception and
>>> stop pods ?
>>>
>>> val spark = SparkSession.builder().getOrCreate()
>>> import spark.implicits._
>>> val df = (0 until 10).toDF("id").selectExpr("id % 5 as key",
>>> "id%10 as value")
>>>   .groupBy("key").agg(count("value1").as("cnt"))
>>> df.show()
>>> spark.stop()
>>>
>>> bin/spark-submit \
>>> --master k8s://https://localhost:9443 \
>>> --deploy-mode cluster \
>>> --name wordcount \
>>> --class k8s.WordCount \
>>> --conf spark.kubernetes.container.image=rspark:v3.1.1 \
>>> --conf spark.kubernetes.container.image.pullPolicy=IfNotPresent \
>>> --conf spark.kubernetes.authenticate.driver.serviceAccountName=spark \
>>> --conf
>>> spark.kubernetes.file.upload.path=hdfs://localhost:8020/data/spark \
>>> /data/spark-example-1.0.0.jar
>>>
>>


Fwd: compile spark 3.1.1 error

2021-03-12 Thread Attila Zsolt Piros
= true ']'
+ TARDIR_NAME=spark-3.1.1-bin-custom-spark
+
TARDIR=/Users/attilazsoltpiros/git/attilapiros/spark/spark-3.1.1-bin-custom-spark
+ rm -rf
/Users/attilazsoltpiros/git/attilapiros/spark/spark-3.1.1-bin-custom-spark
+ cp -r /Users/attilazsoltpiros/git/attilapiros/spark/dist
/Users/attilazsoltpiros/git/attilapiros/spark/spark-3.1.1-bin-custom-spark
+ tar czf spark-3.1.1-bin-custom-spark.tgz -C
/Users/attilazsoltpiros/git/attilapiros/spark spark-3.1.1-bin-custom-spark
+ rm -rf
/Users/attilazsoltpiros/git/attilapiros/spark/spark-3.1.1-bin-custom-spark

Best Regards,
Attila



On Thu, Mar 11, 2021 at 5:11 AM jiahong li  wrote:

> BTW, how to  clear zinc/nailgun cache, thanks
>
> jiahong li  于2021年3月11日周四 下午12:04写道:
>
>> Maybe it is my environment cause
>>
>> jiahong li  于2021年3月11日周四 上午11:14写道:
>>
>>> it not the cause,when i set -Phadoop-2.7 instead of
>>> -Dhadoop.version=2.6.0-cdh5.13.1, the same errors come out.
>>>
>>> Attila Zsolt Piros  于2021年3月10日周三
>>> 下午8:56写道:
>>>
>>>> I see, this must be because of hadoop version you are selecting by
>>>> using "-Dhadoop.version=2.6.0-cdh5.13.1".
>>>> Spark 3.1.1 only support hadoop-2.7 and hadoop-3.2, at least these two
>>>> can be given via profiles:  -Phadoop-2.7  and -Phadoop-3.2 (the default).
>>>>
>>>>
>>>> On Wed, Mar 10, 2021 at 12:26 PM jiahong li 
>>>> wrote:
>>>>
>>>>> i use ./build/mvn to compile ,and after execute command 
>>>>> :./build/zinc-0.3.15/bin/zinc
>>>>> -shutdown
>>>>> and execute command like this: /dev/make-distribution.sh --name
>>>>> custom-spark --pip  --tgz -Phive -Phive-thriftserver -Pyarn
>>>>> -Dhadoop.version=2.6.0-cdh5.13.1 -DskipTests
>>>>> same error appear.
>>>>> and execute command: ps -ef |grep zinc, there is nothing containe zinc
>>>>>
>>>>> Attila Zsolt Piros  于2021年3月10日周三
>>>>> 下午6:55写道:
>>>>>
>>>>>> hi!
>>>>>>
>>>>>> Are you compiling Spark itself?
>>>>>> Do you use "./build/mvn" from the project root?
>>>>>> If you compiled an other version of Spark before and there the scala
>>>>>> version was different then zinc/nailgun could cached the old classes 
>>>>>> which
>>>>>> can cause similar troubles.
>>>>>> In that case this could help:
>>>>>>
>>>>>> ./build/zinc-0.3.15/bin/zinc -shutdown
>>>>>>
>>>>>> Best Regards,
>>>>>> Attila
>>>>>>
>>>>>> On Wed, Mar 10, 2021 at 11:27 AM jiahong li 
>>>>>> wrote:
>>>>>>
>>>>>>> hi, everybody, when i compile spark 3.1.1 from tag v3.1.1 ,encounter
>>>>>>> error like this:
>>>>>>>
>>>>>>> INFO] --- scala-maven-plugin:4.3.0:compile (scala-compile-first) @
>>>>>>> spark-core_2.12 ---
>>>>>>> [INFO] Using incremental compilation using Mixed compile order
>>>>>>> [INFO] Compiler bridge file:
>>>>>>> .sbt/1.0/zinc/org.scala-sbt/org.scala-sbt-compiler-bridge_2.12-1.3.1-bin_2.12.10__52.0-1.3.1_20191012T045515.jar
>>>>>>> [INFO] compiler plugin:
>>>>>>> BasicArtifact(com.github.ghik,silencer-plugin_2.12.10,1.6.0,null)
>>>>>>> [INFO] Compiling 560 Scala sources and 99 Java sources to
>>>>>>> git/spark/core/target/scala-2.12/classes ...
>>>>>>> [ERROR] [Error]
>>>>>>> git/spark/core/src/main/scala/org/apache/spark/ui/HttpSecurityFilter.scala:107:
>>>>>>> type mismatch;
>>>>>>>  found   : K where type K
>>>>>>>  required: String
>>>>>>> [ERROR] [Error]
>>>>>>> git/spark/core/src/main/scala/org/apache/spark/ui/HttpSecurityFilter.scala:107:
>>>>>>> value map is not a member of V
>>>>>>> [ERROR] [Error]
>>>>>>> git/spark/core/src/main/scala/org/apache/spark/ui/HttpSecurityFilter.scala:107:
>>>>>>> missing argument list for method stripXSS in class XssSafeRequest
>>>>>>> Unapplied methods are only converted to functions when a function
>>>>>>> type is expected.
>>>>>>> You can make this conversion explicit by writing `stripXSS _` or
>>>>>>> `stripXSS(_)` instead of `stripXSS`.
>>>>>>> [ERROR] [Error]
>>>>>>> git/spark/core/src/main/scala/org/apache/spark/ui/PagedTable.scala:307:
>>>>>>> value startsWith is not a member of K
>>>>>>> [ERROR] [Error]
>>>>>>> git/spark/core/src/main/scala/org/apache/spark/util/Utils.scala:580: 
>>>>>>> value
>>>>>>> toLowerCase is not a member of object org.apache.hadoop.util.StringUtils
>>>>>>> [ERROR] 5 errors found
>>>>>>>
>>>>>>> anybody encounter error like this?
>>>>>>>
>>>>>>>
>>>>>>


Re: spark on k8s driver pod exception

2021-03-11 Thread Attila Zsolt Piros
> but  the spark-submit log still  running

Set the "spark.kubernetes.submission.waitAppCompletion" config to false to
change that. As the doc says:

"spark.kubernetes.submission.waitAppCompletion" : In cluster mode, whether
to wait for the application to finish before exiting the launcher process.
When changed to false, the launcher has a "fire-and-forget" behavior when
launching the Spark job.

On Thu, Mar 11, 2021 at 10:05 PM Attila Zsolt Piros <
piros.attila.zs...@gmail.com> wrote:

>
> For getting the logs please read Accessing Logs
> <https://spark.apache.org/docs/3.1.1/running-on-kubernetes.html#accessing-logs>
>  part
> of the *Running Spark on Kubernetes* page.
>
> For stopping and generic management of the spark application please read
> the Spark Application Management
> <https://spark.apache.org/docs/3.1.1/running-on-kubernetes.html#spark-application-management>,
> where you find the example:
>
> $ spark-submit --kill spark:spark-pi* --master  k8s://https://192.168.2.8:8443
>
>
>
> On Thu, Mar 11, 2021 at 1:07 PM yxl040840219  wrote:
>
>>
>>
>>
>> when run the code in k8s ,  driver pod throw AnalysisException , but  the
>> spark-submit log still  running , then how to get the exception and stop
>> pods ?
>>
>> val spark = SparkSession.builder().getOrCreate()
>> import spark.implicits._
>> val df = (0 until 10).toDF("id").selectExpr("id % 5 as key",
>> "id%10 as value")
>>   .groupBy("key").agg(count("value1").as("cnt"))
>> df.show()
>> spark.stop()
>>
>> bin/spark-submit \
>> --master k8s://https://localhost:9443 \
>> --deploy-mode cluster \
>> --name wordcount \
>> --class k8s.WordCount \
>> --conf spark.kubernetes.container.image=rspark:v3.1.1 \
>> --conf spark.kubernetes.container.image.pullPolicy=IfNotPresent \
>> --conf spark.kubernetes.authenticate.driver.serviceAccountName=spark \
>> --conf spark.kubernetes.file.upload.path=hdfs://localhost:8020/data/spark
>> \
>> /data/spark-example-1.0.0.jar
>>
>


Re: spark on k8s driver pod exception

2021-03-11 Thread Attila Zsolt Piros
For getting the logs please read Accessing Logs

part
of the *Running Spark on Kubernetes* page.

For stopping and generic management of the spark application please read
the Spark Application Management
,
where you find the example:

$ spark-submit --kill spark:spark-pi* --master  k8s://https://192.168.2.8:8443



On Thu, Mar 11, 2021 at 1:07 PM yxl040840219  wrote:

>
>
>
> when run the code in k8s ,  driver pod throw AnalysisException , but  the
> spark-submit log still  running , then how to get the exception and stop
> pods ?
>
> val spark = SparkSession.builder().getOrCreate()
> import spark.implicits._
> val df = (0 until 10).toDF("id").selectExpr("id % 5 as key",
> "id%10 as value")
>   .groupBy("key").agg(count("value1").as("cnt"))
> df.show()
> spark.stop()
>
> bin/spark-submit \
> --master k8s://https://localhost:9443 \
> --deploy-mode cluster \
> --name wordcount \
> --class k8s.WordCount \
> --conf spark.kubernetes.container.image=rspark:v3.1.1 \
> --conf spark.kubernetes.container.image.pullPolicy=IfNotPresent \
> --conf spark.kubernetes.authenticate.driver.serviceAccountName=spark \
> --conf spark.kubernetes.file.upload.path=hdfs://localhost:8020/data/spark \
> /data/spark-example-1.0.0.jar
>


Re: [spark-core] docker-image-tool.sh question...

2021-03-10 Thread Attila Zsolt Piros
Hi Muthu!

I tried and at my side it is working just fine:

$  ./bin/docker-image-tool.sh -r docker.io/sample-spark -b
java_image_tag=8-jre-slim -t 3.1.1 build
Sending build context to Docker daemon  228.3MB
Step 1/18 : ARG java_image_tag=11-jre-slim
Step 2/18 : FROM openjdk:${java_image_tag}
*8-jre-slim*: Pulling from library/openjdk
45b42c59be33: Pull complete
c3f1fbf102b7: Pull complete
262868e4544c: Pull complete
1c0fec43ba3f: Pull complete
Digest:
sha256:412c52d88d77ea078c50ed4cf8d8656d6448b1c92829128e1c6aab6687ce0998
*Status: Downloaded newer image for openjdk:8-jre-slim*
 ---> 8f867fdbd02f

What you see at your side?

Best regards,
Attila

On Wed, Mar 10, 2021 at 5:44 AM Muthu Jayakumar  wrote:

> Hello there,
>
> While using docker-image-tool (for Spark 3.1.1) it seems to not accept
> `java_image_tag` property. The docker image default to JRE 11. Here is what
> I am running from the command line.
>
> $ spark/bin/docker-image-tool.sh -r docker.io/sample-spark -b
> java_image_tag=8-jre-slim -t 3.1.1 build
>
> Please advice,
> Muthu
>


Re: compile spark 3.1.1 error

2021-03-10 Thread Attila Zsolt Piros
I see, this must be because of hadoop version you are selecting by using
"-Dhadoop.version=2.6.0-cdh5.13.1".
Spark 3.1.1 only support hadoop-2.7 and hadoop-3.2, at least these two can
be given via profiles:  -Phadoop-2.7  and -Phadoop-3.2 (the default).


On Wed, Mar 10, 2021 at 12:26 PM jiahong li  wrote:

> i use ./build/mvn to compile ,and after execute command 
> :./build/zinc-0.3.15/bin/zinc
> -shutdown
> and execute command like this: /dev/make-distribution.sh --name
> custom-spark --pip  --tgz -Phive -Phive-thriftserver -Pyarn
> -Dhadoop.version=2.6.0-cdh5.13.1 -DskipTests
> same error appear.
> and execute command: ps -ef |grep zinc, there is nothing containe zinc
>
> Attila Zsolt Piros  于2021年3月10日周三 下午6:55写道:
>
>> hi!
>>
>> Are you compiling Spark itself?
>> Do you use "./build/mvn" from the project root?
>> If you compiled an other version of Spark before and there the scala
>> version was different then zinc/nailgun could cached the old classes which
>> can cause similar troubles.
>> In that case this could help:
>>
>> ./build/zinc-0.3.15/bin/zinc -shutdown
>>
>> Best Regards,
>> Attila
>>
>> On Wed, Mar 10, 2021 at 11:27 AM jiahong li 
>> wrote:
>>
>>> hi, everybody, when i compile spark 3.1.1 from tag v3.1.1 ,encounter
>>> error like this:
>>>
>>> INFO] --- scala-maven-plugin:4.3.0:compile (scala-compile-first) @
>>> spark-core_2.12 ---
>>> [INFO] Using incremental compilation using Mixed compile order
>>> [INFO] Compiler bridge file:
>>> .sbt/1.0/zinc/org.scala-sbt/org.scala-sbt-compiler-bridge_2.12-1.3.1-bin_2.12.10__52.0-1.3.1_20191012T045515.jar
>>> [INFO] compiler plugin:
>>> BasicArtifact(com.github.ghik,silencer-plugin_2.12.10,1.6.0,null)
>>> [INFO] Compiling 560 Scala sources and 99 Java sources to
>>> git/spark/core/target/scala-2.12/classes ...
>>> [ERROR] [Error]
>>> git/spark/core/src/main/scala/org/apache/spark/ui/HttpSecurityFilter.scala:107:
>>> type mismatch;
>>>  found   : K where type K
>>>  required: String
>>> [ERROR] [Error]
>>> git/spark/core/src/main/scala/org/apache/spark/ui/HttpSecurityFilter.scala:107:
>>> value map is not a member of V
>>> [ERROR] [Error]
>>> git/spark/core/src/main/scala/org/apache/spark/ui/HttpSecurityFilter.scala:107:
>>> missing argument list for method stripXSS in class XssSafeRequest
>>> Unapplied methods are only converted to functions when a function type
>>> is expected.
>>> You can make this conversion explicit by writing `stripXSS _` or
>>> `stripXSS(_)` instead of `stripXSS`.
>>> [ERROR] [Error]
>>> git/spark/core/src/main/scala/org/apache/spark/ui/PagedTable.scala:307:
>>> value startsWith is not a member of K
>>> [ERROR] [Error]
>>> git/spark/core/src/main/scala/org/apache/spark/util/Utils.scala:580: value
>>> toLowerCase is not a member of object org.apache.hadoop.util.StringUtils
>>> [ERROR] 5 errors found
>>>
>>> anybody encounter error like this?
>>>
>>>
>>


Re: compile spark 3.1.1 error

2021-03-10 Thread Attila Zsolt Piros
hi!

Are you compiling Spark itself?
Do you use "./build/mvn" from the project root?
If you compiled an other version of Spark before and there the scala
version was different then zinc/nailgun could cached the old classes which
can cause similar troubles.
In that case this could help:

./build/zinc-0.3.15/bin/zinc -shutdown

Best Regards,
Attila

On Wed, Mar 10, 2021 at 11:27 AM jiahong li  wrote:

> hi, everybody, when i compile spark 3.1.1 from tag v3.1.1 ,encounter error
> like this:
>
> INFO] --- scala-maven-plugin:4.3.0:compile (scala-compile-first) @
> spark-core_2.12 ---
> [INFO] Using incremental compilation using Mixed compile order
> [INFO] Compiler bridge file:
> .sbt/1.0/zinc/org.scala-sbt/org.scala-sbt-compiler-bridge_2.12-1.3.1-bin_2.12.10__52.0-1.3.1_20191012T045515.jar
> [INFO] compiler plugin:
> BasicArtifact(com.github.ghik,silencer-plugin_2.12.10,1.6.0,null)
> [INFO] Compiling 560 Scala sources and 99 Java sources to
> git/spark/core/target/scala-2.12/classes ...
> [ERROR] [Error]
> git/spark/core/src/main/scala/org/apache/spark/ui/HttpSecurityFilter.scala:107:
> type mismatch;
>  found   : K where type K
>  required: String
> [ERROR] [Error]
> git/spark/core/src/main/scala/org/apache/spark/ui/HttpSecurityFilter.scala:107:
> value map is not a member of V
> [ERROR] [Error]
> git/spark/core/src/main/scala/org/apache/spark/ui/HttpSecurityFilter.scala:107:
> missing argument list for method stripXSS in class XssSafeRequest
> Unapplied methods are only converted to functions when a function type is
> expected.
> You can make this conversion explicit by writing `stripXSS _` or
> `stripXSS(_)` instead of `stripXSS`.
> [ERROR] [Error]
> git/spark/core/src/main/scala/org/apache/spark/ui/PagedTable.scala:307:
> value startsWith is not a member of K
> [ERROR] [Error]
> git/spark/core/src/main/scala/org/apache/spark/util/Utils.scala:580: value
> toLowerCase is not a member of object org.apache.hadoop.util.StringUtils
> [ERROR] 5 errors found
>
> anybody encounter error like this?
>
>


RE: Spark Version 3.0.1 Gui Display Query

2021-03-04 Thread Attila Zsolt Piros
Hi Ranju!

I meant the event log would be very helpful for analyzing the problem at
your side. 

The three logs together (driver, executors, event) is the best from the same
run of course.
 
I know you want check the executors tab during the job is running. And for
this you do not need to eventlog. But the event log is still useful for
finding out what happened.

Regards,
Attila




--
Sent from: http://apache-spark-user-list.1001560.n3.nabble.com/

-
To unsubscribe e-mail: user-unsubscr...@spark.apache.org



RE: Spark Version 3.0.1 Gui Display Query

2021-03-03 Thread Attila Zsolt Piros
Hi Ranju!

The UI is built up from events. This is why history server able to show the
state of the a finished app as those events are replayed to build a state,
for details you can check  web UI page and the following section too <
https://spark.apache.org/docs/latest/monitoring.html#web-interfaces>  .

So you should share/look into the event log.

Regards,
Attila




--
Sent from: http://apache-spark-user-list.1001560.n3.nabble.com/

-
To unsubscribe e-mail: user-unsubscr...@spark.apache.org



Re: How to control count / size of output files for

2021-02-24 Thread Attila Zsolt Piros
hi!

It is because of "spark.sql.shuffle.partitions". See the value 200 in the
physical plan at the rangepartitioning:


scala> val df = sc.parallelize(1 to 1000, 10).toDF("v").sort("v")
df: org.apache.spark.sql.Dataset[org.apache.spark.sql.Row] = [v: int]

scala> df.explain()
== Physical Plan ==
*(2) Sort [v#300 ASC NULLS FIRST], true, 0
+- Exchange rangepartitioning(v#300 ASC NULLS FIRST, 200), true, [id=#334]
   +- *(1) Project [value#297 AS v#300]
  +- *(1) SerializeFromObject [input[0, int, false] AS value#297]
 +- Scan[obj#296]

scala> df.rdd.getNumPartitions
res13: Int = 200

Best Regards,
Attila







--
Sent from: http://apache-spark-user-list.1001560.n3.nabble.com/

-
To unsubscribe e-mail: user-unsubscr...@spark.apache.org



Re: K8S spark-submit Loses Successful Driver Completion

2021-02-15 Thread Attila Zsolt Piros
Hi,

I am not using Airflow but I assume your application is deployed in cluster
mode and in this case the class you are looking for is
*org.apache.spark.deploy.k8s.submit.Client* [1].

If we are talking about the first "spark-submit" used to start the
application and not "spark-submit --status" then it contains loop where the
application status is logged. This loop stops when the
*LoggingPodStatusWatcher* reports the app is completed [2] or when
"spark.kubernetes.submission.waitAppCompletion" [3] is false.

And you are right the monitoring (POD state watching) is done via REST
(HTTPS) and should be detected by 
"io.fabric8.kubernetes.client.Watcher.onClose()" method so by the kubernetes
client.

I hope this helps. Some further questions if you need some more help:

1. What is the Spark version you are running? 
2. Does it contain SPARK-24266 [4]? 
3. If yes can you reproduce the issue without airflow and do you have the
logs about the issue? 

Best regards,
Attila

[1]
https://github.com/apache/spark/blob/master/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/deploy/k8s/submit/KubernetesClientApplication.scala#L88-L103

[2]
https://github.com/apache/spark/blob/8604db28b87b387bbdb3761df85fae292cd402a1/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/deploy/k8s/submit/KubernetesClientApplication.scala#L162-L166

[3]
https://github.com/apache/spark/blob/master/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/deploy/k8s/submit/LoggingPodStatusWatcher.scala#L112-L114

[4] https://issues.apache.org/jira/browse/SPARK-24266




--
Sent from: http://apache-spark-user-list.1001560.n3.nabble.com/

-
To unsubscribe e-mail: user-unsubscr...@spark.apache.org



Re: Spark Kubernetes 3.0.1 | podcreationTimeout not working

2021-02-12 Thread Attila Zsolt Piros
I believe this problem led to opening SPARK-34389 where the problem is
discussed further.

[1] https://issues.apache.org/jira/browse/SPARK-34389



--
Sent from: http://apache-spark-user-list.1001560.n3.nabble.com/

-
To unsubscribe e-mail: user-unsubscr...@spark.apache.org



Re: understanding spark shuffle file re-use better

2021-02-12 Thread Attila Zsolt Piros
A much better one-liner (easier to understand the UI because it will be 1
simple job with 2 stages):

```
spark.read.text("README.md").repartition(2).take(1)
```


Attila Zsolt Piros wrote
> No, it won't be reused.
> You should reuse the dateframe for reusing the shuffle blocks (and cached
> data).
> 
> I know this because the two actions will lead to building a two separate
> DAGs, but I will show you a way how you could check this on your own (with
> a
> small simple spark application). 
> 
> For this you can even use the spark-shell. Start it in directory where a
> simple text file available ("README.md" in my case).
> 
> After this the one-liner is:
> 
> ```
> scala> spark.read.text("README.md").selectExpr("length(value) as l",
> "value").groupBy("l").count
> .take(1)
> ```
> 
> Now if you check Stages tab on the UI you will see 3 stages.
> After re-executing the same line of code in the Stages tab you can see the
> number of stages are doubled.
> 
> So shuffle files are not reused.
> 
> Finally you can delete the file and re-execute our small test. Now it will
> produce:
> 
> ``` 
> org.apache.spark.sql.AnalysisException: Path does not exist:
> file:/Users/attilazsoltpiros/git/attilapiros/spark/README.md;
> ```
> 
> So the file would have been opened again for loading the data (even in the
> 3rd run).
> 
> 
> 
> --
> Sent from: http://apache-spark-user-list.1001560.n3.nabble.com/
> 
> -
> To unsubscribe e-mail: user-unsubscr...@spark.apache.org


```
```



--
Sent from: http://apache-spark-user-list.1001560.n3.nabble.com/

-
To unsubscribe e-mail: user-unsubscr...@spark.apache.org



Re: understanding spark shuffle file re-use better

2021-02-11 Thread Attila Zsolt Piros
No, it won't be reused.
You should reuse the dateframe for reusing the shuffle blocks (and cached
data).

I know this because the two actions will lead to building a two separate
DAGs, but I will show you a way how you could check this on your own (with a
small simple spark application). 

For this the spark-shell can be used, too. Start it in directory where a
simple text file available ("README.md" in my case).

After this the one-liner is:

```
scala> spark.read.text("README.md").selectExpr("length(value) as l",
"value").groupBy("l").count
.take(1)
```

Now if you check Stages tab on the UI you will see 3 stages.
After re-executing the same line of code in the Stages tab you can see the
number of stages are doubled.

So shuffle files are not reused.

Finally you can delete the file and re-execute our small test. Now it will
produce:

``` 
org.apache.spark.sql.AnalysisException: Path does not exist:
file:/Users/attilazsoltpiros/git/attilapiros/spark/README.md;
```

So the file would have been opened again for loading the data (even in the
3rd run).



--
Sent from: http://apache-spark-user-list.1001560.n3.nabble.com/

-
To unsubscribe e-mail: user-unsubscr...@spark.apache.org



Re: Spark on Yarn, is it possible to manually blacklist nodes before running spark job?

2019-01-22 Thread Attila Zsolt Piros
The new issue is https://issues.apache.org/jira/browse/SPARK-26688.


On Tue, Jan 22, 2019 at 11:30 AM Attila Zsolt Piros 
wrote:

> Hi,
>
> >> Is it this one: https://github.com/apache/spark/pull/23223 ?
>
> No. My old development was https://github.com/apache/spark/pull/21068,
> which is closed.
>
> This would be a new improvement with a new Apache JIRA issue (
> https://issues.apache.org) and with a new Github pull request.
>
> >> Can I try to reach you through Cloudera Support portal?
>
> It is not needed. This would be an improvement into the Apache Spark which
> details can be discussed in the JIRA / Github PR.
>
> Attila
>
>
> On Mon, Jan 21, 2019 at 10:18 PM Serega Sheypak 
> wrote:
>
>> Hi Apiros, thanks for your reply.
>>
>> Is it this one: https://github.com/apache/spark/pull/23223 ?
>> Can I try to reach you through Cloudera Support portal?
>>
>> пн, 21 янв. 2019 г. в 20:06, attilapiros :
>>
>>> Hello, I was working on this area last year (I have developed the
>>> YarnAllocatorBlacklistTracker) and if you haven't found any solution for
>>> your problem I can introduce a new config which would contain a sequence
>>> of
>>> always blacklisted nodes. This way blacklisting would improve a bit
>>> again :)
>>>
>>>
>>>
>>> --
>>> Sent from: http://apache-spark-user-list.1001560.n3.nabble.com/
>>>
>>> -
>>> To unsubscribe e-mail: user-unsubscr...@spark.apache.org
>>>
>>>


Re: Spark on Yarn, is it possible to manually blacklist nodes before running spark job?

2019-01-22 Thread Attila Zsolt Piros
Hi,

>> Is it this one: https://github.com/apache/spark/pull/23223 ?

No. My old development was https://github.com/apache/spark/pull/21068,
which is closed.

This would be a new improvement with a new Apache JIRA issue (
https://issues.apache.org) and with a new Github pull request.

>> Can I try to reach you through Cloudera Support portal?

It is not needed. This would be an improvement into the Apache Spark which
details can be discussed in the JIRA / Github PR.

Attila


On Mon, Jan 21, 2019 at 10:18 PM Serega Sheypak 
wrote:

> Hi Apiros, thanks for your reply.
>
> Is it this one: https://github.com/apache/spark/pull/23223 ?
> Can I try to reach you through Cloudera Support portal?
>
> пн, 21 янв. 2019 г. в 20:06, attilapiros :
>
>> Hello, I was working on this area last year (I have developed the
>> YarnAllocatorBlacklistTracker) and if you haven't found any solution for
>> your problem I can introduce a new config which would contain a sequence
>> of
>> always blacklisted nodes. This way blacklisting would improve a bit again
>> :)
>>
>>
>>
>> --
>> Sent from: http://apache-spark-user-list.1001560.n3.nabble.com/
>>
>> -
>> To unsubscribe e-mail: user-unsubscr...@spark.apache.org
>>
>>