Dmitry, which KIP are you referring to? I see this behavior too sometimes.

On Fri, Aug 4, 2017 at 10:25 AM, Dmitry Minkovsky <dminkov...@gmail.com>
wrote:

> Thank you Matthias and Bill,
>
> Just want to confirm that was my offsets *were *being committed but I was
> being affected by `offsets.retention.minutes` which I did not know about. I
> set
>
> offsets.retention.minutes=2147483647
> offsets.retention.check.interval.ms=9223372036854775807
>
> Will keep an eye on that KIP.
>
> Best,
> Dmitry
>
> On Fri, Jul 21, 2017 at 4:31 AM, Matthias J. Sax <matth...@confluent.io>
> wrote:
>
> > >>>> My guess is that offsets are committed only when all tasks in the
> > >>> topology
> > >>>> have received input. Is this what's happening?
> >
> > No. Task offsets are committed independently from each other.
> >
> > You can you double check the logs in DEBUG mode. It indicates when
> > offsets get committed. Also check via `bin/kafka-consumer-groups.sh`
> > what offsets are committed (application.id == group.id)
> >
> > Hope this helps.
> >
> >
> > -Matthias
> >
> > On 7/21/17 2:27 AM, Dmitry Minkovsky wrote:
> > > Hi Bill,
> > >
> > >> When you say "even if the application has not had data for a long
> time"
> > do
> > > you have a rough idea of how long?
> > >
> > > Minutes, hours
> > >
> > >> What is the value of  your
> > > "auto.offset.reset"  configuration?
> > >
> > > I don't specify it explicitly, but the ConsumerConfig logs indicate
> > > "auto.offset.reset = earliest" for all consumers the application
> creates.
> > >
> > > Thank you,
> > > Dmitry
> > >
> > >
> > > On Thu, Jul 20, 2017 at 8:07 PM, Bill Bejeck <b...@confluent.io>
> wrote:
> > >
> > >> Hi Dmitry,
> > >>
> > >> When you say "even if the application has not had data for a long
> time"
> > do
> > >> you have a rough idea of how long?  What is the value of  your
> > >> "auto.offset.reset"  configuration?
> > >>
> > >> Thanks,
> > >> Bill
> > >>
> > >> On Thu, Jul 20, 2017 at 6:03 PM, Dmitry Minkovsky <
> dminkov...@gmail.com
> > >
> > >> wrote:
> > >>
> > >>> My Streams application is configured to commit offsets every 250ms:
> > >>>
> > >>>         Properties streamsConfig = new Properties();
> > >>>         streamsConfig.put(StreamsConfig.COMMIT_INTERVAL_MS_CONFIG,
> > 250);
> > >>>
> > >>>
> > >>> However, every time I restart my application, records that have
> already
> > >>> been processed are re-processed, even if the application has not had
> > data
> > >>> for a long time.
> > >>>
> > >>> My guess is that offsets are committed only when all tasks in the
> > >> topology
> > >>> have received input. Is this what's happening?
> > >>>
> > >>>
> > >>>
> > >>> Thank you,
> > >>> Dmitry
> > >>>
> > >>
> > >
> >
> >
>

Reply via email to