[ 
https://issues.apache.org/jira/browse/SPARK-44083?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Anil Dasari updated SPARK-44083:
--------------------------------
    Summary: Spark streaming: Add max pending microbatches conf to avoid 
scheduling new mircobatch  (was: Spark streaming: Add max pending microbatches 
conf to avoid new pending mircobatch)

> Spark streaming: Add max pending microbatches conf to avoid scheduling new 
> mircobatch
> -------------------------------------------------------------------------------------
>
>                 Key: SPARK-44083
>                 URL: https://issues.apache.org/jira/browse/SPARK-44083
>             Project: Spark
>          Issue Type: New Feature
>          Components: Structured Streaming
>    Affects Versions: 3.4.0
>            Reporter: Anil Dasari
>            Priority: Major
>
> In the case of uneven incoming rates and high scheduling delays, streaming 
> will continue to add microbatches to the eventloop and submit the job to the 
> job thread executor. Consequently, pending microbatches hold fewer offset 
> ranges in Spark streaming Kafka if the kafka lag is less than the configured 
> maximum per partition. 
> We rely on the third-party service to add additional metadata to incoming 
> records, and its response times remain constant regardless of microbatch 
> size. An RDD's metadata is fetched during the transform phase in our case if 
> various reasons, which is executed when micorbatch is scheduled. Our RDD 
> transform on high level :
> {code:java}
> val dstreams = ...
> dstreams.transform(rdd =>
>  {   
>   val uniqueItems = rdd.map(..).distinct.collect
>   val metadata = getMedatada(uniqueItems)
>   val rddWithMedatadata = rdd.map(...) // adds metadata  
>    
>   rddWithMedatadata
>  })
> {code}
>  
> Scheduling small microbatches can be avoided by skipping new jobs when there 
> are sufficient pending jobs in the queue. 
>  
> Proposed changes in _JobExecutor.scala_ on high level:
> {code:java}
> val maxPendingJobs = ssc.sc.conf.getInt("spark.streaming.maxPendingBatches", 
> -1)
> private def processEvent(event: JobGeneratorEvent): Unit = {
>  logDebug("Got event " + event)
>  event match {
>   case GenerateJobs(time) =>
>       if (maxPendingJobs == -1 || jobScheduler.getPendingTimes().size < 
> maxPendingJobs){ 
>             generateJobs(time)
>        }else { 
>          logWarning("Skipping JobGenerator at " + time)   // adding pending 
> times in queue.   
>       }
>   
>   // other current cases
>   case ...
>    .....
>  }
> } {code}
>  



--
This message was sent by Atlassian Jira
(v8.20.10#820010)

---------------------------------------------------------------------
To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org
For additional commands, e-mail: issues-h...@spark.apache.org

Reply via email to