Hi How long does your application run? More than the 60 seconds you set for commit interval? Have a look at https://sematext.com/opensee/m/Kafka/uyzND1SdDRMgROjn?subj=Re+Kafka+Streams+why+aren+t+offsets+being+committed+ and check if your offsets are really comitted Best regards Patrik
> Am 29.10.2018 um 18:20 schrieb Pavel Koroliov <afgmeis...@gmail.com>: > > Hi > No, my application id doesn't change > > пн, 29 окт. 2018 г. в 19:11, Patrik Kleindl <pklei...@gmail.com>: > >> Hi >> Does your applicationId change? >> Best regards >> Patrik >> >>> Am 29.10.2018 um 13:28 schrieb Pavel Koroliov <afgmeis...@gmail.com>: >>> >>> Hi everyone! I use kafka-streams, and i have a problem when i use >>> windowedBy. Everything works well until I restart the application. After >>> restarting my aggregation starts from beginning. >>> Code bellow: >>>> >>>> StreamsBuilder builder = new StreamsBuilder() >>>> KStream stream = builder.stream(topic, >> Consumed.with(Serdes.String(), Serdes.String())) >>>> >>>> KTable table = >> stream.groupByKey().windowedBy(TimeWindows.of(TimeUnit.MINUTES.toMillis(15))) >>>> .aggregate( >>>> { new AggregatorModel() }, >>>> { key, value, aggregate -> >>>> return aggregate.add(value) >>>> } >>>> ) >>>> .toStream() >>>> .map({ k, v -> >>>> new KeyValue<>(k.window().end(), v) >>>> }) >>>> .to('output') >>>> >>>> def config = new Properties() >>>> config.put(StreamsConfig.APPLICATION_ID_CONFIG, applicationId) >>>> config.put(StreamsConfig.BOOTSTRAP_SERVERS_CONFIG, 'localhost:9092') >>>> config.put(StreamsConfig.COMMIT_INTERVAL_MS_CONFIG, >> TimeUnit.SECONDS.toMillis(60)) >>>> >>>> KafkaStreams kafkaStreams = new KafkaStreams(builder.build(), config) >>>> kafkaStreams.start() >>>> >>>> >>> I've tried add to config ConsumerConfig.AUTO_OFFSET_RESET_CONFIG set to >>> 'latest' and 'earliest' but it didn't help. >>> Can you help me understand what I'm doing wrong? >>> Thank you. >>