Are you sure you are not accidentally recovering from checkpoint? How are you using StreamingContext.getOrCreate() in your code?
TD On Fri, Sep 4, 2015 at 4:53 PM, Dmitry Goldenberg <dgoldenberg...@gmail.com> wrote: > Tathagata, > > In our logs I see the batch duration millis being set first to 10 then to > 20 seconds. I don't see the 20 being reflected later during ingestion. > > In the Spark UI under Streaming I see the below output, notice the *10 > second* Batch interval. Can you think of a reason why it's stuck at 10? > It used to be 1 second by the way, then somehow over the course of a few > restarts we managed to get it to be 10 seconds. Now it won't get reset to > 20 seconds. Any ideas? > > Streaming > > - *Started at: *Thu Sep 03 10:59:03 EDT 2015 > - *Time since start: *1 day 8 hours 44 minutes > - *Network receivers: *0 > - *Batch interval: *10 seconds > - *Processed batches: *11790 > - *Waiting batches: *0 > - *Received records: *0 > - *Processed records: *0 > > > > Statistics over last 100 processed batchesReceiver Statistics > No receivers > Batch Processing Statistics > > MetricLast batchMinimum25th percentileMedian75th > percentileMaximumProcessing > Time23 ms7 ms10 ms11 ms14 ms172 msScheduling Delay1 ms0 ms0 ms0 ms1 ms2 > msTotal Delay24 ms8 ms10 ms12 ms14 ms173 ms > > > > > > On Fri, Sep 4, 2015 at 3:50 PM, Tathagata Das <t...@databricks.com> wrote: > >> Could you see what the streaming tab in the Spark UI says? It should show >> the underlying batch duration of the StreamingContext, the details of when >> the batch starts, etc. >> >> BTW, it seems that the 5.6 or 6.8 seconds delay is present only when data >> is present (that is, * Documents processed: > 0)* >> >> On Fri, Sep 4, 2015 at 3:38 AM, Dmitry Goldenberg < >> dgoldenberg...@gmail.com> wrote: >> >>> Tathagata, >>> >>> Checkpointing is turned on but we were not recovering. I'm looking at >>> the logs now, feeding fresh content hours after the restart. Here's a >>> snippet: >>> >>> 2015-09-04 06:11:20,013 ... Documents processed: 0. >>> 2015-09-04 06:11:30,014 ... Documents processed: 0. >>> 2015-09-04 06:11:40,011 ... Documents processed: 0. >>> 2015-09-04 06:11:50,012 ... Documents processed: 0. >>> 2015-09-04 06:12:00,010 ... Documents processed: 0. >>> 2015-09-04 06:12:10,047 ... Documents processed: 0. >>> 2015-09-04 06:12:20,012 ... Documents processed: 0. >>> 2015-09-04 06:12:30,011 ... Documents processed: 0. >>> 2015-09-04 06:12:40,012 ... Documents processed: 0. >>> *2015-09-04 06:12:55,629 ... Documents processed: 4.* >>> 2015-09-04 06:13:00,018 ... Documents processed: 0. >>> 2015-09-04 06:13:10,012 ... Documents processed: 0. >>> 2015-09-04 06:13:20,019 ... Documents processed: 0. >>> 2015-09-04 06:13:30,014 ... Documents processed: 0. >>> 2015-09-04 06:13:40,041 ... Documents processed: 0. >>> 2015-09-04 06:13:50,009 ... Documents processed: 0. >>> ... >>> 2015-09-04 06:17:30,019 ... Documents processed: 0. >>> *2015-09-04 06:17:46,832 ... Documents processed: 40.* >>> >>> Interestingly, the fresh content (4 documents) is fed about 5.6 seconds >>> after the previous batch, not 10 seconds. The second fresh batch comes in >>> 6.8 seconds after the previous empty one. >>> >>> Granted, the log message is printed after iterating over the messages >>> which may account for some time differences. But generally, looking at the >>> log messages being printed before we iterate, it's still 10 seconds each >>> time, not 20 which is what batchdurationmillis is currently set to. >>> >>> Code: >>> >>> JavaPairInputDStream<String, String> messages = >>> KafkaUtils.createDirectStream(....); >>> messages.checkpoint(Durations.milliseconds(checkpointMillis)); >>> >>> >>> JavaDStream<String> messageBodies = messages.map(new >>> Function<Tuple2<String, >>> String>, String>() { >>> @Override >>> public String call(Tuple2<String, String> tuple2) { >>> return tuple2._2(); >>> } >>> }); >>> >>> messageBodies.foreachRDD(new Function<JavaRDD<String>, Void>() { >>> @Override >>> public Void call(JavaRDD<String> rdd) throws Exception { >>> >>> ProcessPartitionFunction func = new ProcessPartitionFunction(...); >>> rdd.foreachPartition(func); >>> return null; >>> } >>> }); >>> >>> The log message comes from ProcessPartitionFunction: >>> >>> public void call(Iterator<String> messageIterator) throws Exception { >>> log.info("Starting data partition processing. AppName={}, >>> topic={}.)...", appName, topic); >>> // ... iterate ... >>> log.info("Finished data partition processing (appName={}, >>> topic={}). Documents processed: {}.", appName, topic, docCount); >>> } >>> >>> Any ideas? Thanks. >>> >>> - Dmitry >>> >>> On Thu, Sep 3, 2015 at 10:45 PM, Tathagata Das <t...@databricks.com> >>> wrote: >>> >>>> Are you accidentally recovering from checkpoint files which has 10 >>>> second as the batch interval? >>>> >>>> >>>> On Thu, Sep 3, 2015 at 7:34 AM, Dmitry Goldenberg < >>>> dgoldenberg...@gmail.com> wrote: >>>> >>>>> I'm seeing an oddity where I initially set the batchdurationmillis to >>>>> 1 second and it works fine: >>>>> >>>>> JavaStreamingContext jssc = new JavaStreamingContext(sparkConf, >>>>> Durations.milliseconds(batchDurationMillis)); >>>>> >>>>> Then I tried changing the value to 10 seconds. The change didn't seem >>>>> to take. I've bounced the Spark workers and the consumers and now I'm >>>>> seeing RDD's coming in once around 10 seconds (not always 10 seconds >>>>> according to the logs). >>>>> >>>>> However, now I'm trying to change the value to 20 seconds and it's >>>>> just not taking. I've bounced Spark master, workers, and consumers and the >>>>> value seems "stuck" at 10 seconds. >>>>> >>>>> Any ideas? We're running Spark 1.3.0 built for Hadoop 2.4. >>>>> >>>>> Thanks. >>>>> >>>>> - Dmitry >>>>> >>>>> >>>> >>> >> >