Make sure to call `KafkaStreams#close()` to get the latest offsets committed.
Beside this, you can check the consumer and Streams logs in DEBUG mode, to see what offset is picked up (or not). -Matthias On 10/29/18 11:43 AM, Patrik Kleindl wrote: > Hi > How long does your application run? More than the 60 seconds you set for > commit interval? > Have a look at > https://sematext.com/opensee/m/Kafka/uyzND1SdDRMgROjn?subj=Re+Kafka+Streams+why+aren+t+offsets+being+committed+ > > and check if your offsets are really comitted > Best regards > Patrik > >> Am 29.10.2018 um 18:20 schrieb Pavel Koroliov <[email protected]>: >> >> Hi >> No, my application id doesn't change >> >> пн, 29 окт. 2018 г. в 19:11, Patrik Kleindl <[email protected]>: >> >>> Hi >>> Does your applicationId change? >>> Best regards >>> Patrik >>> >>>> Am 29.10.2018 um 13:28 schrieb Pavel Koroliov <[email protected]>: >>>> >>>> Hi everyone! I use kafka-streams, and i have a problem when i use >>>> windowedBy. Everything works well until I restart the application. After >>>> restarting my aggregation starts from beginning. >>>> Code bellow: >>>>> >>>>> StreamsBuilder builder = new StreamsBuilder() >>>>> KStream stream = builder.stream(topic, >>> Consumed.with(Serdes.String(), Serdes.String())) >>>>> >>>>> KTable table = >>> stream.groupByKey().windowedBy(TimeWindows.of(TimeUnit.MINUTES.toMillis(15))) >>>>> .aggregate( >>>>> { new AggregatorModel() }, >>>>> { key, value, aggregate -> >>>>> return aggregate.add(value) >>>>> } >>>>> ) >>>>> .toStream() >>>>> .map({ k, v -> >>>>> new KeyValue<>(k.window().end(), v) >>>>> }) >>>>> .to('output') >>>>> >>>>> def config = new Properties() >>>>> config.put(StreamsConfig.APPLICATION_ID_CONFIG, applicationId) >>>>> config.put(StreamsConfig.BOOTSTRAP_SERVERS_CONFIG, 'localhost:9092') >>>>> config.put(StreamsConfig.COMMIT_INTERVAL_MS_CONFIG, >>> TimeUnit.SECONDS.toMillis(60)) >>>>> >>>>> KafkaStreams kafkaStreams = new KafkaStreams(builder.build(), config) >>>>> kafkaStreams.start() >>>>> >>>>> >>>> I've tried add to config ConsumerConfig.AUTO_OFFSET_RESET_CONFIG set to >>>> 'latest' and 'earliest' but it didn't help. >>>> Can you help me understand what I'm doing wrong? >>>> Thank you. >>> >
signature.asc
Description: OpenPGP digital signature
