I'm sorry guy's. Aggregation works fine, but i've found new problem with
*groupByKey()*. After restart application some aggregations starts from
beginning, although this key already has aggregated data. And some
aggregations continue to summarize data. This is very strange, I did not
expect such behavior)))).

вт, 30 окт. 2018 г. в 0:43, Matthias J. Sax <matth...@confluent.io>:

> Make sure to call `KafkaStreams#close()` to get the latest offsets
> committed.
>
> Beside this, you can check the consumer and Streams logs in DEBUG mode,
> to see what offset is picked up (or not).
>
>
> -Matthias
>
> On 10/29/18 11:43 AM, Patrik Kleindl wrote:
> > Hi
> > How long does your application run? More than the 60 seconds you set for
> commit interval?
> > Have a look at
> https://sematext.com/opensee/m/Kafka/uyzND1SdDRMgROjn?subj=Re+Kafka+Streams+why+aren+t+offsets+being+committed+
> > and check if your offsets are really comitted
> > Best regards
> > Patrik
> >
> >> Am 29.10.2018 um 18:20 schrieb Pavel Koroliov <afgmeis...@gmail.com>:
> >>
> >> Hi
> >> No, my application id doesn't change
> >>
> >> пн, 29 окт. 2018 г. в 19:11, Patrik Kleindl <pklei...@gmail.com>:
> >>
> >>> Hi
> >>> Does your applicationId change?
> >>> Best regards
> >>> Patrik
> >>>
> >>>> Am 29.10.2018 um 13:28 schrieb Pavel Koroliov <afgmeis...@gmail.com>:
> >>>>
> >>>> Hi everyone! I use kafka-streams, and i have a problem when i use
> >>>> windowedBy. Everything works well until I restart the application.
> After
> >>>> restarting my aggregation starts from beginning.
> >>>> Code bellow:
> >>>>>
> >>>>>   StreamsBuilder builder = new StreamsBuilder()
> >>>>>   KStream stream = builder.stream(topic,
> >>> Consumed.with(Serdes.String(), Serdes.String()))
> >>>>>
> >>>>>   KTable table =
> >>>
> stream.groupByKey().windowedBy(TimeWindows.of(TimeUnit.MINUTES.toMillis(15)))
> >>>>>           .aggregate(
> >>>>>           { new AggregatorModel() },
> >>>>>           { key, value, aggregate ->
> >>>>>               return aggregate.add(value)
> >>>>>           }
> >>>>>   )
> >>>>>           .toStream()
> >>>>>           .map({ k, v ->
> >>>>>       new KeyValue<>(k.window().end(), v)
> >>>>>   })
> >>>>>           .to('output')
> >>>>>
> >>>>>   def config = new Properties()
> >>>>>   config.put(StreamsConfig.APPLICATION_ID_CONFIG, applicationId)
> >>>>>   config.put(StreamsConfig.BOOTSTRAP_SERVERS_CONFIG,
> 'localhost:9092')
> >>>>>   config.put(StreamsConfig.COMMIT_INTERVAL_MS_CONFIG,
> >>> TimeUnit.SECONDS.toMillis(60))
> >>>>>
> >>>>>   KafkaStreams kafkaStreams = new KafkaStreams(builder.build(),
> config)
> >>>>>   kafkaStreams.start()
> >>>>>
> >>>>>
> >>>> I've tried add to config ConsumerConfig.AUTO_OFFSET_RESET_CONFIG set
> to
> >>>> 'latest' and 'earliest' but it didn't help.
> >>>> Can you help me understand what I'm doing wrong?
> >>>> Thank you.
> >>>
> >
>
>

Reply via email to