I'm sorry guy's. Aggregation works fine, but i've found new problem with *groupByKey()*. After restart application some aggregations starts from beginning, although this key already has aggregated data. And some aggregations continue to summarize data. This is very strange, I did not expect such behavior)))).
вт, 30 окт. 2018 г. в 0:43, Matthias J. Sax <matth...@confluent.io>: > Make sure to call `KafkaStreams#close()` to get the latest offsets > committed. > > Beside this, you can check the consumer and Streams logs in DEBUG mode, > to see what offset is picked up (or not). > > > -Matthias > > On 10/29/18 11:43 AM, Patrik Kleindl wrote: > > Hi > > How long does your application run? More than the 60 seconds you set for > commit interval? > > Have a look at > https://sematext.com/opensee/m/Kafka/uyzND1SdDRMgROjn?subj=Re+Kafka+Streams+why+aren+t+offsets+being+committed+ > > and check if your offsets are really comitted > > Best regards > > Patrik > > > >> Am 29.10.2018 um 18:20 schrieb Pavel Koroliov <afgmeis...@gmail.com>: > >> > >> Hi > >> No, my application id doesn't change > >> > >> пн, 29 окт. 2018 г. в 19:11, Patrik Kleindl <pklei...@gmail.com>: > >> > >>> Hi > >>> Does your applicationId change? > >>> Best regards > >>> Patrik > >>> > >>>> Am 29.10.2018 um 13:28 schrieb Pavel Koroliov <afgmeis...@gmail.com>: > >>>> > >>>> Hi everyone! I use kafka-streams, and i have a problem when i use > >>>> windowedBy. Everything works well until I restart the application. > After > >>>> restarting my aggregation starts from beginning. > >>>> Code bellow: > >>>>> > >>>>> StreamsBuilder builder = new StreamsBuilder() > >>>>> KStream stream = builder.stream(topic, > >>> Consumed.with(Serdes.String(), Serdes.String())) > >>>>> > >>>>> KTable table = > >>> > stream.groupByKey().windowedBy(TimeWindows.of(TimeUnit.MINUTES.toMillis(15))) > >>>>> .aggregate( > >>>>> { new AggregatorModel() }, > >>>>> { key, value, aggregate -> > >>>>> return aggregate.add(value) > >>>>> } > >>>>> ) > >>>>> .toStream() > >>>>> .map({ k, v -> > >>>>> new KeyValue<>(k.window().end(), v) > >>>>> }) > >>>>> .to('output') > >>>>> > >>>>> def config = new Properties() > >>>>> config.put(StreamsConfig.APPLICATION_ID_CONFIG, applicationId) > >>>>> config.put(StreamsConfig.BOOTSTRAP_SERVERS_CONFIG, > 'localhost:9092') > >>>>> config.put(StreamsConfig.COMMIT_INTERVAL_MS_CONFIG, > >>> TimeUnit.SECONDS.toMillis(60)) > >>>>> > >>>>> KafkaStreams kafkaStreams = new KafkaStreams(builder.build(), > config) > >>>>> kafkaStreams.start() > >>>>> > >>>>> > >>>> I've tried add to config ConsumerConfig.AUTO_OFFSET_RESET_CONFIG set > to > >>>> 'latest' and 'earliest' but it didn't help. > >>>> Can you help me understand what I'm doing wrong? > >>>> Thank you. > >>> > > > >