[ https://issues.apache.org/jira/browse/SPARK-44083?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ]
Anil Dasari updated SPARK-44083: -------------------------------- Summary: Spark streaming: Add max pending microbatches conf to avoid scheduling new mircobatch (was: Spark streaming: Add max pending microbatches conf to avoid new pending mircobatch) > Spark streaming: Add max pending microbatches conf to avoid scheduling new > mircobatch > ------------------------------------------------------------------------------------- > > Key: SPARK-44083 > URL: https://issues.apache.org/jira/browse/SPARK-44083 > Project: Spark > Issue Type: New Feature > Components: Structured Streaming > Affects Versions: 3.4.0 > Reporter: Anil Dasari > Priority: Major > > In the case of uneven incoming rates and high scheduling delays, streaming > will continue to add microbatches to the eventloop and submit the job to the > job thread executor. Consequently, pending microbatches hold fewer offset > ranges in Spark streaming Kafka if the kafka lag is less than the configured > maximum per partition. > We rely on the third-party service to add additional metadata to incoming > records, and its response times remain constant regardless of microbatch > size. An RDD's metadata is fetched during the transform phase in our case if > various reasons, which is executed when micorbatch is scheduled. Our RDD > transform on high level : > {code:java} > val dstreams = ... > dstreams.transform(rdd => > { > val uniqueItems = rdd.map(..).distinct.collect > val metadata = getMedatada(uniqueItems) > val rddWithMedatadata = rdd.map(...) // adds metadata > > rddWithMedatadata > }) > {code} > > Scheduling small microbatches can be avoided by skipping new jobs when there > are sufficient pending jobs in the queue. > > Proposed changes in _JobExecutor.scala_ on high level: > {code:java} > val maxPendingJobs = ssc.sc.conf.getInt("spark.streaming.maxPendingBatches", > -1) > private def processEvent(event: JobGeneratorEvent): Unit = { > logDebug("Got event " + event) > event match { > case GenerateJobs(time) => > if (maxPendingJobs == -1 || jobScheduler.getPendingTimes().size < > maxPendingJobs){ > generateJobs(time) > }else { > logWarning("Skipping JobGenerator at " + time) // adding pending > times in queue. > } > > // other current cases > case ... > ..... > } > } {code} > -- This message was sent by Atlassian Jira (v8.20.10#820010) --------------------------------------------------------------------- To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org For additional commands, e-mail: issues-h...@spark.apache.org