Re: Using Spark Accumulators with Structured Streaming

2020-05-15 Thread ZHANG Wei
There is a restriction in AccumulatorV2 API [1], the OUT type should be atomic 
or thread safe. I'm wondering if the implementation for `java.util.Map[T, 
Long]` can meet it or not. Is there any chance to replace 
CollectionLongAccumulator by CollectionAccumulator[2] or LongAccumulator[3] and 
test if the StreamingListener and other codes are able to work?

---
Cheers,
-z
[1] 
http://spark.apache.org/docs/latest/api/scala/index.html#org.apache.spark.util.AccumulatorV2
[2] 
http://spark.apache.org/docs/latest/api/scala/index.html#org.apache.spark.util.CollectionAccumulator
[3] 
http://spark.apache.org/docs/latest/api/scala/index.html#org.apache.spark.util.LongAccumulator


From: Something Something 
Sent: Saturday, May 16, 2020 0:38
To: spark-user
Subject: Re: Using Spark Accumulators with Structured Streaming

Can someone from Spark Development team tell me if this functionality is 
supported and tested? I've spent a lot of time on this but can't get it to 
work. Just to add more context, we've our own Accumulator class that extends 
from AccumulatorV2. In this class we keep track of one or more accumulators. 
Here's the definition:


class CollectionLongAccumulator[T]
extends AccumulatorV2[T, java.util.Map[T, Long]]

When the job begins we register an instance of this class:

spark.sparkContext.register(myAccumulator, "MyAccumulator")

Is this working under Structured Streaming?

I will keep looking for alternate approaches but any help would be greatly 
appreciated. Thanks.



On Thu, May 14, 2020 at 2:36 PM Something Something 
mailto:mailinglist...@gmail.com>> wrote:

In my structured streaming job I am updating Spark Accumulators in the 
updateAcrossEvents method but they are always 0 when I try to print them in my 
StreamingListener. Here's the code:

.mapGroupsWithState(GroupStateTimeout.ProcessingTimeTimeout())(
updateAcrossEvents
  )


The accumulators get incremented in 'updateAcrossEvents'. I've a 
StreamingListener which writes values of the accumulators in 'onQueryProgress' 
method but in this method the Accumulators are ALWAYS ZERO!

When I added log statements in the updateAcrossEvents, I could see that these 
accumulators are getting incremented as expected.

This only happens when I run in the 'Cluster' mode. In Local mode it works fine 
which implies that the Accumulators are not getting distributed correctly - or 
something like that!

Note: I've seen quite a few answers on the Web that tell me to perform an 
"Action". That's not a solution here. This is a 'Stateful Structured Streaming' 
job. Yes, I am also 'registering' them in SparkContext.




-
To unsubscribe e-mail: user-unsubscr...@spark.apache.org



spark on k8s - can driver and executor have separate checkpoint location?

2020-05-15 Thread wzhan
Hi guys,

I'm running spark applications on kubernetes. According to spark
documentation
https://spark.apache.org/docs/latest/streaming-programming-guide.html#checkpointing
Spark needs distributed file system to store its checkpoint data so that in
case of failure, it can recover from checkpoint directory. 

My question is, can driver and executor have separate checkpoint location?
I'm asking this because driver and executor might be deployed on different
nodes. A shared checkpoint location will require ReadWriteMany access mode.
Since I only have a storage class that supports ReadWriteOnce access mode
I'm trying to find some workaround.


In Spark Streaming Guide, it mentioned "Failed driver can be restarted from
checkpoint information" and when executor failed, "Tasks and receivers
restarted by Spark automatically, no config needed".

Given this I tried only config checkpoint location for driver pod. It
immediately failed with below exception:

2020-05-15T14:20:17.142 [stream execution thread for baseline_windower [id =
b14190ed-0fb2-4d0e-82d3-b3a3bf101712, runId =
f246774e-5a20-4bfb-b997-4d06c344bb0f]hread] ERROR
org.apache.spark.sql.execution.streaming.MicroBatchExecution - Query
baseline_windower [id = b14190ed-0fb2-4d0e-82d3-b3a3bf101712, runId =
f246774e-5a20-4bfb-b997-4d06c344bb0f] terminated with error
org.apache.spark.SparkException: Writing job aborted.
at
org.apache.spark.sql.execution.datasources.v2.WriteToDataSourceV2Exec.doExecute(WriteToDataSourceV2Exec.scala:92)
at
org.apache.spark.sql.execution.SparkPlan$$anonfun$execute$1.apply(SparkPlan.scala:131)
at
org.apache.spark.sql.execution.SparkPlan$$anonfun$execute$1.apply(SparkPlan.scala:127)
at
org.apache.spark.sql.execution.SparkPlan$$anonfun$executeQuery$1.apply(SparkPlan.scala:155)
at
org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:151)
...
at
org.apache.spark.sql.execution.streaming.StreamExecution$$anon$1.run(StreamExecution.scala:193)
Caused by: org.apache.spark.SparkException: Job aborted due to stage
failure: Task 0 in stage 11.0 failed 4 times, most recent failure: Lost task
0.3 in stage 11.0 (TID 420, 10.233.124.162, executor 1):
java.io.IOException: mkdir of
file:/opt/window-data/baseline_windower/state/0/0 failed


So I tried giving separate checkpoint location to driver and executor:

In spark application helm chart I have a checkpointlocation configuration:

spec:
   sparkConf:
 "spark.sql.streaming.checkpointLocation": "file:///opt/checkpoint-data"

I created two checkpoint pvc and mount the volume for driver and executor
pod:

  volumes:
- name: checkpoint-driver-volume
  persistentVolumeClaim:
claimName: checkpoint-driver-pvc
- name: checkpoint-executor-volume
  persistentVolumeClaim:
claimName: checkpoint-executor-pvc

driver:
volumeMounts:
  - name: checkpoint-driver-volume
mountPath: "/opt/checkpoint-data"
...
executor:
volumeMounts:
  - name: checkpoint-executor-volume
mountPath: "/opt/checkpoint-data"

After deployment it seems to be working. I tried restarted driver pod and it
did recover from checkpoint directory. But I'm not sure if this is actually
supported by design.

Thanks,
wzhan



--
Sent from: http://apache-spark-user-list.1001560.n3.nabble.com/

-
To unsubscribe e-mail: user-unsubscr...@spark.apache.org



unsubscribe

2020-05-15 Thread Basavaraj



smime.p7s
Description: S/MIME cryptographic signature


Re: Calling HTTP Rest APIs from Spark Job

2020-05-15 Thread Chetan Khatri
Hi Sean,
Thanks for great answer.

What I am trying to do is to use something like Scala Future (cats-effect
IO) to do concurrent calls. Was understanding if any limitation
thresholds to make those calls.

On Thu, May 14, 2020 at 7:28 PM Sean Owen  wrote:

> No, it means # HTTP calls = # executor slots. But even then, you're
> welcome to, say, use thread pools to execute even more concurrently as
> most are I/O bound. Your code can do what you want.
>
> On Thu, May 14, 2020 at 6:14 PM Chetan Khatri
>  wrote:
> >
> > Thanks, that means number of executor = number of http calls, I can
> make. I can't boost more number of http calls in single executors, I mean -
> I can't go beyond the threashold of number of executors.
> >
> > On Thu, May 14, 2020 at 6:26 PM Sean Owen  wrote:
> >>
> >> Default is not 200, but the number of executor slots. Yes you can only
> simultaneously execute as many tasks as slots regardless of partitions.
> >>
> >> On Thu, May 14, 2020, 5:19 PM Chetan Khatri <
> chetan.opensou...@gmail.com> wrote:
> >>>
> >>> Thanks Sean, Jerry.
> >>>
> >>> Default Spark DataFrame partitions are 200 right? does it have
> relationship with number of cores? 8 cores - 4 workers. is not it like I
> can do only 8 * 4 = 32 http calls. Because in Spark number of partitions =
> number cores is untrue.
> >>>
> >>> Thanks
> >>>
> >>> On Thu, May 14, 2020 at 6:11 PM Sean Owen  wrote:
> 
>  Yes any code that you write in code that you apply with Spark runs in
>  the executors. You would be running as many HTTP clients as you have
>  partitions.
> 
>  On Thu, May 14, 2020 at 4:31 PM Jerry Vinokurov <
> grapesmo...@gmail.com> wrote:
>  >
>  > I believe that if you do this within the context of an operation
> that is already parallelized such as a map, the work will be distributed to
> executors and they will do it in parallel. I could be wrong about this as I
> never investigated this specific use case, though.
>  >
>  > On Thu, May 14, 2020 at 5:24 PM Chetan Khatri <
> chetan.opensou...@gmail.com> wrote:
>  >>
>  >> Thanks for the quick response.
>  >>
>  >> I am curious to know whether would it be parallel pulling data for
> 100+ HTTP request or it will only go on Driver node? the post body would be
> part of DataFrame. Think as I have a data frame of employee_id,
> employee_name now the http GET call has to be made for each employee_id and
> DataFrame is dynamic for each spark job run.
>  >>
>  >> Does it make sense?
>  >>
>  >> Thanks
>  >>
>  >>
>  >> On Thu, May 14, 2020 at 5:12 PM Jerry Vinokurov <
> grapesmo...@gmail.com> wrote:
>  >>>
>  >>> Hi Chetan,
>  >>>
>  >>> You can pretty much use any client to do this. When I was using
> Spark at a previous job, we used OkHttp, but I'm sure there are plenty of
> others. In our case, we had a startup phase in which we gathered metadata
> via a REST API and then broadcast it to the workers. I think if you need
> all the workers to have access to whatever you're getting from the API,
> that's the way to do it.
>  >>>
>  >>> Jerry
>  >>>
>  >>> On Thu, May 14, 2020 at 5:03 PM Chetan Khatri <
> chetan.opensou...@gmail.com> wrote:
>  
>   Hi Spark Users,
>  
>   How can I invoke the Rest API call from Spark Code which is not
> only running on Spark Driver but distributed / parallel?
>  
>   Spark with Scala is my tech stack.
>  
>   Thanks
>  
>  
>  >>>
>  >>>
>  >>> --
>  >>> http://www.google.com/profiles/grapesmoker
>  >
>  >
>  >
>  > --
>  > http://www.google.com/profiles/grapesmoker
>


Re: Using Spark Accumulators with Structured Streaming

2020-05-15 Thread Something Something
Can someone from Spark Development team tell me if this functionality is
supported and tested? I've spent a lot of time on this but can't get it
to work. Just to add more context, we've our own Accumulator class that
extends from AccumulatorV2. In this class we keep track of one or more
accumulators. Here's the definition:

class CollectionLongAccumulator[T]
extends AccumulatorV2[T, java.util.Map[T, Long]]

When the job begins we register an instance of this class:

spark.sparkContext.register(myAccumulator, "MyAccumulator")

Is this working under Structured Streaming?

I will keep looking for alternate approaches but any help would be
greatly appreciated. Thanks.




On Thu, May 14, 2020 at 2:36 PM Something Something <
mailinglist...@gmail.com> wrote:

> In my structured streaming job I am updating Spark Accumulators in the
> updateAcrossEvents method but they are always 0 when I try to print them in
> my StreamingListener. Here's the code:
>
> .mapGroupsWithState(GroupStateTimeout.ProcessingTimeTimeout())(
> updateAcrossEvents
>   )
>
> The accumulators get incremented in 'updateAcrossEvents'. I've a
> StreamingListener which writes values of the accumulators in
> 'onQueryProgress' method but in this method the Accumulators are ALWAYS
> ZERO!
>
> When I added log statements in the updateAcrossEvents, I could see that
> these accumulators are getting incremented as expected.
>
> This only happens when I run in the 'Cluster' mode. In Local mode it works
> fine which implies that the Accumulators are not getting distributed
> correctly - or something like that!
>
> Note: I've seen quite a few answers on the Web that tell me to perform an
> "Action". That's not a solution here. This is a 'Stateful Structured
> Streaming' job. Yes, I am also 'registering' them in SparkContext.
>
>
>
>