Those options will not affect structured streaming. You are looking for .option("maxOffsetsPerTrigger", "1000")
We are working on improving this by building a generic mechanism into the Streaming DataSource V2 so that the engine can do admission control on the amount of data returned in a source independent way. On Tue, Mar 20, 2018 at 2:58 PM, kant kodali <kanth...@gmail.com> wrote: > I am using spark 2.3.0 and Kafka 0.10.2.0 so I assume structured streaming > using Direct API's although I am not sure? If it is direct API's the only > parameters that are relevant are below according to this > <https://www.linkedin.com/pulse/enable-back-pressure-make-your-spark-streaming-production-lan-jiang> > article > > - spark.conf("spark.streaming.backpressure.enabled", "true") > - spark.conf("spark.streaming.kafka.maxRatePerPartition", "10000") > > I set both of these and I run select count * on my 10M records I still > don't see any output until it finishes the initial batch of 10M and this > takes a while. so I am wondering if I miss something here? > > On Tue, Mar 20, 2018 at 6:09 AM, Geoff Von Allmen <ge...@ibleducation.com> > wrote: > >> The following >> <http://spark.apache.org/docs/latest/configuration.html#spark-streaming> >> settings >> may be what you’re looking for: >> >> - spark.streaming.backpressure.enabled >> - spark.streaming.backpressure.initialRate >> - spark.streaming.receiver.maxRate >> - spark.streaming.kafka.maxRatePerPartition >> >> >> >> On Mon, Mar 19, 2018 at 5:27 PM, kant kodali <kanth...@gmail.com> wrote: >> >>> Yes it indeed makes sense! Is there a way to get incremental counts when >>> I start from 0 and go through 10M records? perhaps count for every micro >>> batch or something? >>> >>> On Mon, Mar 19, 2018 at 1:57 PM, Geoff Von Allmen < >>> ge...@ibleducation.com> wrote: >>> >>>> Trigger does not mean report the current solution every 'trigger >>>> seconds'. It means it will attempt to fetch new data and process it no >>>> faster than trigger seconds intervals. >>>> >>>> If you're reading from the beginning and you've got 10M entries in >>>> kafka, it's likely pulling everything down then processing it completely >>>> and giving you an initial output. From here on out, it will check kafka >>>> every 1 second for new data and process it, showing you only the updated >>>> rows. So the initial read will give you the entire output since there is >>>> nothing to be 'updating' from. If you add data to kafka now that the >>>> streaming job has completed it's first batch (and leave it running), it >>>> will then show you the new/updated rows since the last batch every 1 second >>>> (assuming it can fetch + process in that time span). >>>> >>>> If the combined fetch + processing time is > the trigger time, you will >>>> notice warnings that it is 'falling behind' (I forget the exact verbiage, >>>> but something to the effect of the calculation took XX time and is falling >>>> behind). In that case, it will immediately check kafka for new messages and >>>> begin processing the next batch (if new messages exist). >>>> >>>> Hope that makes sense - >>>> >>>> >>>> On Mon, Mar 19, 2018 at 13:36 kant kodali <kanth...@gmail.com> wrote: >>>> >>>>> Hi All, >>>>> >>>>> I have 10 million records in my Kafka and I am just trying to >>>>> spark.sql(select count(*) from kafka_view). I am reading from kafka and >>>>> writing to kafka. >>>>> >>>>> My writeStream is set to "update" mode and trigger interval of one >>>>> second (Trigger.ProcessingTime(1000)). I expect the counts to be >>>>> printed every second but looks like it would print after going through all >>>>> 10M. why? >>>>> >>>>> Also, it seems to take forever whereas Linux wc of 10M rows would take >>>>> 30 seconds. >>>>> >>>>> Thanks! >>>>> >>>> >>> >> >