Hi Pavel, did you understood why do you have such strange behaviour? On Tue, Oct 30, 2018 at 12:22 PM Pavel Koroliov <afgmeis...@gmail.com> wrote:
> I'm sorry guy's. Aggregation works fine, but i've found new problem with > *groupByKey()*. After restart application some aggregations starts from > beginning, although this key already has aggregated data. And some > aggregations continue to summarize data. This is very strange, I did not > expect such behavior)))). > > вт, 30 окт. 2018 г. в 0:43, Matthias J. Sax <matth...@confluent.io>: > > > Make sure to call `KafkaStreams#close()` to get the latest offsets > > committed. > > > > Beside this, you can check the consumer and Streams logs in DEBUG mode, > > to see what offset is picked up (or not). > > > > > > -Matthias > > > > On 10/29/18 11:43 AM, Patrik Kleindl wrote: > > > Hi > > > How long does your application run? More than the 60 seconds you set > for > > commit interval? > > > Have a look at > > > https://sematext.com/opensee/m/Kafka/uyzND1SdDRMgROjn?subj=Re+Kafka+Streams+why+aren+t+offsets+being+committed+ > > > and check if your offsets are really comitted > > > Best regards > > > Patrik > > > > > >> Am 29.10.2018 um 18:20 schrieb Pavel Koroliov <afgmeis...@gmail.com>: > > >> > > >> Hi > > >> No, my application id doesn't change > > >> > > >> пн, 29 окт. 2018 г. в 19:11, Patrik Kleindl <pklei...@gmail.com>: > > >> > > >>> Hi > > >>> Does your applicationId change? > > >>> Best regards > > >>> Patrik > > >>> > > >>>> Am 29.10.2018 um 13:28 schrieb Pavel Koroliov <afgmeis...@gmail.com > >: > > >>>> > > >>>> Hi everyone! I use kafka-streams, and i have a problem when i use > > >>>> windowedBy. Everything works well until I restart the application. > > After > > >>>> restarting my aggregation starts from beginning. > > >>>> Code bellow: > > >>>>> > > >>>>> StreamsBuilder builder = new StreamsBuilder() > > >>>>> KStream stream = builder.stream(topic, > > >>> Consumed.with(Serdes.String(), Serdes.String())) > > >>>>> > > >>>>> KTable table = > > >>> > > > stream.groupByKey().windowedBy(TimeWindows.of(TimeUnit.MINUTES.toMillis(15))) > > >>>>> .aggregate( > > >>>>> { new AggregatorModel() }, > > >>>>> { key, value, aggregate -> > > >>>>> return aggregate.add(value) > > >>>>> } > > >>>>> ) > > >>>>> .toStream() > > >>>>> .map({ k, v -> > > >>>>> new KeyValue<>(k.window().end(), v) > > >>>>> }) > > >>>>> .to('output') > > >>>>> > > >>>>> def config = new Properties() > > >>>>> config.put(StreamsConfig.APPLICATION_ID_CONFIG, applicationId) > > >>>>> config.put(StreamsConfig.BOOTSTRAP_SERVERS_CONFIG, > > 'localhost:9092') > > >>>>> config.put(StreamsConfig.COMMIT_INTERVAL_MS_CONFIG, > > >>> TimeUnit.SECONDS.toMillis(60)) > > >>>>> > > >>>>> KafkaStreams kafkaStreams = new KafkaStreams(builder.build(), > > config) > > >>>>> kafkaStreams.start() > > >>>>> > > >>>>> > > >>>> I've tried add to config ConsumerConfig.AUTO_OFFSET_RESET_CONFIG set > > to > > >>>> 'latest' and 'earliest' but it didn't help. > > >>>> Can you help me understand what I'm doing wrong? > > >>>> Thank you. > > >>> > > > > > > > > -- Vincenzo D'Amore