Re: Using Spark Accumulators with Structured Streaming
There is a restriction in AccumulatorV2 API [1], the OUT type should be atomic or thread safe. I'm wondering if the implementation for `java.util.Map[T, Long]` can meet it or not. Is there any chance to replace CollectionLongAccumulator by CollectionAccumulator[2] or LongAccumulator[3] and test if the StreamingListener and other codes are able to work? --- Cheers, -z [1] http://spark.apache.org/docs/latest/api/scala/index.html#org.apache.spark.util.AccumulatorV2 [2] http://spark.apache.org/docs/latest/api/scala/index.html#org.apache.spark.util.CollectionAccumulator [3] http://spark.apache.org/docs/latest/api/scala/index.html#org.apache.spark.util.LongAccumulator From: Something Something Sent: Saturday, May 16, 2020 0:38 To: spark-user Subject: Re: Using Spark Accumulators with Structured Streaming Can someone from Spark Development team tell me if this functionality is supported and tested? I've spent a lot of time on this but can't get it to work. Just to add more context, we've our own Accumulator class that extends from AccumulatorV2. In this class we keep track of one or more accumulators. Here's the definition: class CollectionLongAccumulator[T] extends AccumulatorV2[T, java.util.Map[T, Long]] When the job begins we register an instance of this class: spark.sparkContext.register(myAccumulator, "MyAccumulator") Is this working under Structured Streaming? I will keep looking for alternate approaches but any help would be greatly appreciated. Thanks. On Thu, May 14, 2020 at 2:36 PM Something Something mailto:mailinglist...@gmail.com>> wrote: In my structured streaming job I am updating Spark Accumulators in the updateAcrossEvents method but they are always 0 when I try to print them in my StreamingListener. Here's the code: .mapGroupsWithState(GroupStateTimeout.ProcessingTimeTimeout())( updateAcrossEvents ) The accumulators get incremented in 'updateAcrossEvents'. I've a StreamingListener which writes values of the accumulators in 'onQueryProgress' method but in this method the Accumulators are ALWAYS ZERO! When I added log statements in the updateAcrossEvents, I could see that these accumulators are getting incremented as expected. This only happens when I run in the 'Cluster' mode. In Local mode it works fine which implies that the Accumulators are not getting distributed correctly - or something like that! Note: I've seen quite a few answers on the Web that tell me to perform an "Action". That's not a solution here. This is a 'Stateful Structured Streaming' job. Yes, I am also 'registering' them in SparkContext. - To unsubscribe e-mail: user-unsubscr...@spark.apache.org
spark on k8s - can driver and executor have separate checkpoint location?
Hi guys, I'm running spark applications on kubernetes. According to spark documentation https://spark.apache.org/docs/latest/streaming-programming-guide.html#checkpointing Spark needs distributed file system to store its checkpoint data so that in case of failure, it can recover from checkpoint directory. My question is, can driver and executor have separate checkpoint location? I'm asking this because driver and executor might be deployed on different nodes. A shared checkpoint location will require ReadWriteMany access mode. Since I only have a storage class that supports ReadWriteOnce access mode I'm trying to find some workaround. In Spark Streaming Guide, it mentioned "Failed driver can be restarted from checkpoint information" and when executor failed, "Tasks and receivers restarted by Spark automatically, no config needed". Given this I tried only config checkpoint location for driver pod. It immediately failed with below exception: 2020-05-15T14:20:17.142 [stream execution thread for baseline_windower [id = b14190ed-0fb2-4d0e-82d3-b3a3bf101712, runId = f246774e-5a20-4bfb-b997-4d06c344bb0f]hread] ERROR org.apache.spark.sql.execution.streaming.MicroBatchExecution - Query baseline_windower [id = b14190ed-0fb2-4d0e-82d3-b3a3bf101712, runId = f246774e-5a20-4bfb-b997-4d06c344bb0f] terminated with error org.apache.spark.SparkException: Writing job aborted. at org.apache.spark.sql.execution.datasources.v2.WriteToDataSourceV2Exec.doExecute(WriteToDataSourceV2Exec.scala:92) at org.apache.spark.sql.execution.SparkPlan$$anonfun$execute$1.apply(SparkPlan.scala:131) at org.apache.spark.sql.execution.SparkPlan$$anonfun$execute$1.apply(SparkPlan.scala:127) at org.apache.spark.sql.execution.SparkPlan$$anonfun$executeQuery$1.apply(SparkPlan.scala:155) at org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:151) ... at org.apache.spark.sql.execution.streaming.StreamExecution$$anon$1.run(StreamExecution.scala:193) Caused by: org.apache.spark.SparkException: Job aborted due to stage failure: Task 0 in stage 11.0 failed 4 times, most recent failure: Lost task 0.3 in stage 11.0 (TID 420, 10.233.124.162, executor 1): java.io.IOException: mkdir of file:/opt/window-data/baseline_windower/state/0/0 failed So I tried giving separate checkpoint location to driver and executor: In spark application helm chart I have a checkpointlocation configuration: spec: sparkConf: "spark.sql.streaming.checkpointLocation": "file:///opt/checkpoint-data" I created two checkpoint pvc and mount the volume for driver and executor pod: volumes: - name: checkpoint-driver-volume persistentVolumeClaim: claimName: checkpoint-driver-pvc - name: checkpoint-executor-volume persistentVolumeClaim: claimName: checkpoint-executor-pvc driver: volumeMounts: - name: checkpoint-driver-volume mountPath: "/opt/checkpoint-data" ... executor: volumeMounts: - name: checkpoint-executor-volume mountPath: "/opt/checkpoint-data" After deployment it seems to be working. I tried restarted driver pod and it did recover from checkpoint directory. But I'm not sure if this is actually supported by design. Thanks, wzhan -- Sent from: http://apache-spark-user-list.1001560.n3.nabble.com/ - To unsubscribe e-mail: user-unsubscr...@spark.apache.org
unsubscribe
smime.p7s Description: S/MIME cryptographic signature
Re: Calling HTTP Rest APIs from Spark Job
Hi Sean, Thanks for great answer. What I am trying to do is to use something like Scala Future (cats-effect IO) to do concurrent calls. Was understanding if any limitation thresholds to make those calls. On Thu, May 14, 2020 at 7:28 PM Sean Owen wrote: > No, it means # HTTP calls = # executor slots. But even then, you're > welcome to, say, use thread pools to execute even more concurrently as > most are I/O bound. Your code can do what you want. > > On Thu, May 14, 2020 at 6:14 PM Chetan Khatri > wrote: > > > > Thanks, that means number of executor = number of http calls, I can > make. I can't boost more number of http calls in single executors, I mean - > I can't go beyond the threashold of number of executors. > > > > On Thu, May 14, 2020 at 6:26 PM Sean Owen wrote: > >> > >> Default is not 200, but the number of executor slots. Yes you can only > simultaneously execute as many tasks as slots regardless of partitions. > >> > >> On Thu, May 14, 2020, 5:19 PM Chetan Khatri < > chetan.opensou...@gmail.com> wrote: > >>> > >>> Thanks Sean, Jerry. > >>> > >>> Default Spark DataFrame partitions are 200 right? does it have > relationship with number of cores? 8 cores - 4 workers. is not it like I > can do only 8 * 4 = 32 http calls. Because in Spark number of partitions = > number cores is untrue. > >>> > >>> Thanks > >>> > >>> On Thu, May 14, 2020 at 6:11 PM Sean Owen wrote: > > Yes any code that you write in code that you apply with Spark runs in > the executors. You would be running as many HTTP clients as you have > partitions. > > On Thu, May 14, 2020 at 4:31 PM Jerry Vinokurov < > grapesmo...@gmail.com> wrote: > > > > I believe that if you do this within the context of an operation > that is already parallelized such as a map, the work will be distributed to > executors and they will do it in parallel. I could be wrong about this as I > never investigated this specific use case, though. > > > > On Thu, May 14, 2020 at 5:24 PM Chetan Khatri < > chetan.opensou...@gmail.com> wrote: > >> > >> Thanks for the quick response. > >> > >> I am curious to know whether would it be parallel pulling data for > 100+ HTTP request or it will only go on Driver node? the post body would be > part of DataFrame. Think as I have a data frame of employee_id, > employee_name now the http GET call has to be made for each employee_id and > DataFrame is dynamic for each spark job run. > >> > >> Does it make sense? > >> > >> Thanks > >> > >> > >> On Thu, May 14, 2020 at 5:12 PM Jerry Vinokurov < > grapesmo...@gmail.com> wrote: > >>> > >>> Hi Chetan, > >>> > >>> You can pretty much use any client to do this. When I was using > Spark at a previous job, we used OkHttp, but I'm sure there are plenty of > others. In our case, we had a startup phase in which we gathered metadata > via a REST API and then broadcast it to the workers. I think if you need > all the workers to have access to whatever you're getting from the API, > that's the way to do it. > >>> > >>> Jerry > >>> > >>> On Thu, May 14, 2020 at 5:03 PM Chetan Khatri < > chetan.opensou...@gmail.com> wrote: > > Hi Spark Users, > > How can I invoke the Rest API call from Spark Code which is not > only running on Spark Driver but distributed / parallel? > > Spark with Scala is my tech stack. > > Thanks > > > >>> > >>> > >>> -- > >>> http://www.google.com/profiles/grapesmoker > > > > > > > > -- > > http://www.google.com/profiles/grapesmoker >
Re: Using Spark Accumulators with Structured Streaming
Can someone from Spark Development team tell me if this functionality is supported and tested? I've spent a lot of time on this but can't get it to work. Just to add more context, we've our own Accumulator class that extends from AccumulatorV2. In this class we keep track of one or more accumulators. Here's the definition: class CollectionLongAccumulator[T] extends AccumulatorV2[T, java.util.Map[T, Long]] When the job begins we register an instance of this class: spark.sparkContext.register(myAccumulator, "MyAccumulator") Is this working under Structured Streaming? I will keep looking for alternate approaches but any help would be greatly appreciated. Thanks. On Thu, May 14, 2020 at 2:36 PM Something Something < mailinglist...@gmail.com> wrote: > In my structured streaming job I am updating Spark Accumulators in the > updateAcrossEvents method but they are always 0 when I try to print them in > my StreamingListener. Here's the code: > > .mapGroupsWithState(GroupStateTimeout.ProcessingTimeTimeout())( > updateAcrossEvents > ) > > The accumulators get incremented in 'updateAcrossEvents'. I've a > StreamingListener which writes values of the accumulators in > 'onQueryProgress' method but in this method the Accumulators are ALWAYS > ZERO! > > When I added log statements in the updateAcrossEvents, I could see that > these accumulators are getting incremented as expected. > > This only happens when I run in the 'Cluster' mode. In Local mode it works > fine which implies that the Accumulators are not getting distributed > correctly - or something like that! > > Note: I've seen quite a few answers on the Web that tell me to perform an > "Action". That's not a solution here. This is a 'Stateful Structured > Streaming' job. Yes, I am also 'registering' them in SparkContext. > > > >