Dmitry, which KIP are you referring to? I see this behavior too sometimes. On Fri, Aug 4, 2017 at 10:25 AM, Dmitry Minkovsky <dminkov...@gmail.com> wrote:
> Thank you Matthias and Bill, > > Just want to confirm that was my offsets *were *being committed but I was > being affected by `offsets.retention.minutes` which I did not know about. I > set > > offsets.retention.minutes=2147483647 > offsets.retention.check.interval.ms=9223372036854775807 > > Will keep an eye on that KIP. > > Best, > Dmitry > > On Fri, Jul 21, 2017 at 4:31 AM, Matthias J. Sax <matth...@confluent.io> > wrote: > > > >>>> My guess is that offsets are committed only when all tasks in the > > >>> topology > > >>>> have received input. Is this what's happening? > > > > No. Task offsets are committed independently from each other. > > > > You can you double check the logs in DEBUG mode. It indicates when > > offsets get committed. Also check via `bin/kafka-consumer-groups.sh` > > what offsets are committed (application.id == group.id) > > > > Hope this helps. > > > > > > -Matthias > > > > On 7/21/17 2:27 AM, Dmitry Minkovsky wrote: > > > Hi Bill, > > > > > >> When you say "even if the application has not had data for a long > time" > > do > > > you have a rough idea of how long? > > > > > > Minutes, hours > > > > > >> What is the value of your > > > "auto.offset.reset" configuration? > > > > > > I don't specify it explicitly, but the ConsumerConfig logs indicate > > > "auto.offset.reset = earliest" for all consumers the application > creates. > > > > > > Thank you, > > > Dmitry > > > > > > > > > On Thu, Jul 20, 2017 at 8:07 PM, Bill Bejeck <b...@confluent.io> > wrote: > > > > > >> Hi Dmitry, > > >> > > >> When you say "even if the application has not had data for a long > time" > > do > > >> you have a rough idea of how long? What is the value of your > > >> "auto.offset.reset" configuration? > > >> > > >> Thanks, > > >> Bill > > >> > > >> On Thu, Jul 20, 2017 at 6:03 PM, Dmitry Minkovsky < > dminkov...@gmail.com > > > > > >> wrote: > > >> > > >>> My Streams application is configured to commit offsets every 250ms: > > >>> > > >>> Properties streamsConfig = new Properties(); > > >>> streamsConfig.put(StreamsConfig.COMMIT_INTERVAL_MS_CONFIG, > > 250); > > >>> > > >>> > > >>> However, every time I restart my application, records that have > already > > >>> been processed are re-processed, even if the application has not had > > data > > >>> for a long time. > > >>> > > >>> My guess is that offsets are committed only when all tasks in the > > >> topology > > >>> have received input. Is this what's happening? > > >>> > > >>> > > >>> > > >>> Thank you, > > >>> Dmitry > > >>> > > >> > > > > > > > >