Hi
How long does your application run? More than the 60 seconds you set for commit 
interval?
Have a look at 
https://sematext.com/opensee/m/Kafka/uyzND1SdDRMgROjn?subj=Re+Kafka+Streams+why+aren+t+offsets+being+committed+
 
and check if your offsets are really comitted
Best regards
Patrik

> Am 29.10.2018 um 18:20 schrieb Pavel Koroliov <afgmeis...@gmail.com>:
> 
> Hi
> No, my application id doesn't change
> 
> пн, 29 окт. 2018 г. в 19:11, Patrik Kleindl <pklei...@gmail.com>:
> 
>> Hi
>> Does your applicationId change?
>> Best regards
>> Patrik
>> 
>>> Am 29.10.2018 um 13:28 schrieb Pavel Koroliov <afgmeis...@gmail.com>:
>>> 
>>> Hi everyone! I use kafka-streams, and i have a problem when i use
>>> windowedBy. Everything works well until I restart the application. After
>>> restarting my aggregation starts from beginning.
>>> Code bellow:
>>>> 
>>>>   StreamsBuilder builder = new StreamsBuilder()
>>>>   KStream stream = builder.stream(topic,
>> Consumed.with(Serdes.String(), Serdes.String()))
>>>> 
>>>>   KTable table =
>> stream.groupByKey().windowedBy(TimeWindows.of(TimeUnit.MINUTES.toMillis(15)))
>>>>           .aggregate(
>>>>           { new AggregatorModel() },
>>>>           { key, value, aggregate ->
>>>>               return aggregate.add(value)
>>>>           }
>>>>   )
>>>>           .toStream()
>>>>           .map({ k, v ->
>>>>       new KeyValue<>(k.window().end(), v)
>>>>   })
>>>>           .to('output')
>>>> 
>>>>   def config = new Properties()
>>>>   config.put(StreamsConfig.APPLICATION_ID_CONFIG, applicationId)
>>>>   config.put(StreamsConfig.BOOTSTRAP_SERVERS_CONFIG, 'localhost:9092')
>>>>   config.put(StreamsConfig.COMMIT_INTERVAL_MS_CONFIG,
>> TimeUnit.SECONDS.toMillis(60))
>>>> 
>>>>   KafkaStreams kafkaStreams = new KafkaStreams(builder.build(), config)
>>>>   kafkaStreams.start()
>>>> 
>>>> 
>>> I've tried add to config ConsumerConfig.AUTO_OFFSET_RESET_CONFIG set to
>>> 'latest' and 'earliest' but it didn't help.
>>> Can you help me understand what I'm doing wrong?
>>> Thank you.
>> 

Reply via email to