Re: Spark Streaming: Async action scheduling inside foreachRDD
Forkjoinpool with task support would help in this case. Where u can create a thread pool with configured number of threads ( make sure u have enough cores) and submit job I mean actions to the pool On Fri, Aug 4, 2017 at 8:54 AM Raghavendra Pandey < raghavendra.pan...@gmail.com> wrote: > Did you try SparkContext.addSparkListener? > > > > On Aug 3, 2017 1:54 AM, "Andrii Biletskyi" >wrote: > >> Hi all, >> >> What is the correct way to schedule multiple jobs inside foreachRDD >> method and importantly await on result to ensure those jobs have completed >> successfully? >> E.g.: >> >> kafkaDStream.foreachRDD{ rdd => >> val rdd1 = rdd.map(...) >> val rdd2 = rdd1.map(...) >> >> val job1Future = Future{ >> rdd1.saveToCassandra(...) >> } >> >> val job2Future = Future{ >> rdd1.foreachPartition( iter => /* save to Kafka */) >> } >> >> Await.result( >> Future.sequence(job1Future, job2Future), >> Duration.Inf) >> >> >>// commit Kafka offsets >> } >> >> In this code I'm scheduling two actions in futures and awaiting them. I >> need to be sure when I commit Kafka offsets at the end of the batch >> processing that job1 and job2 have actually executed successfully. Does >> given approach provide these guarantees? I.e. in case one of the jobs fails >> the entire batch will be marked as failed too? >> >> >> Thanks, >> Andrii >> >
Re: Spark Streaming: Async action scheduling inside foreachRDD
Did you try SparkContext.addSparkListener? On Aug 3, 2017 1:54 AM, "Andrii Biletskyi"wrote: > Hi all, > > What is the correct way to schedule multiple jobs inside foreachRDD method > and importantly await on result to ensure those jobs have completed > successfully? > E.g.: > > kafkaDStream.foreachRDD{ rdd => > val rdd1 = rdd.map(...) > val rdd2 = rdd1.map(...) > > val job1Future = Future{ > rdd1.saveToCassandra(...) > } > > val job2Future = Future{ > rdd1.foreachPartition( iter => /* save to Kafka */) > } > > Await.result( > Future.sequence(job1Future, job2Future), > Duration.Inf) > > >// commit Kafka offsets > } > > In this code I'm scheduling two actions in futures and awaiting them. I > need to be sure when I commit Kafka offsets at the end of the batch > processing that job1 and job2 have actually executed successfully. Does > given approach provide these guarantees? I.e. in case one of the jobs fails > the entire batch will be marked as failed too? > > > Thanks, > Andrii >
Spark Streaming: Async action scheduling inside foreachRDD
Hi all, What is the correct way to schedule multiple jobs inside foreachRDD method and importantly await on result to ensure those jobs have completed successfully? E.g.: kafkaDStream.foreachRDD{ rdd => val rdd1 = rdd.map(...) val rdd2 = rdd1.map(...) val job1Future = Future{ rdd1.saveToCassandra(...) } val job2Future = Future{ rdd1.foreachPartition( iter => /* save to Kafka */) } Await.result( Future.sequence(job1Future, job2Future), Duration.Inf) // commit Kafka offsets } In this code I'm scheduling two actions in futures and awaiting them. I need to be sure when I commit Kafka offsets at the end of the batch processing that job1 and job2 have actually executed successfully. Does given approach provide these guarantees? I.e. in case one of the jobs fails the entire batch will be marked as failed too? Thanks, Andrii