Make sure to call `KafkaStreams#close()` to get the latest offsets
committed.

Beside this, you can check the consumer and Streams logs in DEBUG mode,
to see what offset is picked up (or not).


-Matthias

On 10/29/18 11:43 AM, Patrik Kleindl wrote:
> Hi
> How long does your application run? More than the 60 seconds you set for 
> commit interval?
> Have a look at 
> https://sematext.com/opensee/m/Kafka/uyzND1SdDRMgROjn?subj=Re+Kafka+Streams+why+aren+t+offsets+being+committed+
>  
> and check if your offsets are really comitted
> Best regards
> Patrik
> 
>> Am 29.10.2018 um 18:20 schrieb Pavel Koroliov <afgmeis...@gmail.com>:
>>
>> Hi
>> No, my application id doesn't change
>>
>> пн, 29 окт. 2018 г. в 19:11, Patrik Kleindl <pklei...@gmail.com>:
>>
>>> Hi
>>> Does your applicationId change?
>>> Best regards
>>> Patrik
>>>
>>>> Am 29.10.2018 um 13:28 schrieb Pavel Koroliov <afgmeis...@gmail.com>:
>>>>
>>>> Hi everyone! I use kafka-streams, and i have a problem when i use
>>>> windowedBy. Everything works well until I restart the application. After
>>>> restarting my aggregation starts from beginning.
>>>> Code bellow:
>>>>>
>>>>>   StreamsBuilder builder = new StreamsBuilder()
>>>>>   KStream stream = builder.stream(topic,
>>> Consumed.with(Serdes.String(), Serdes.String()))
>>>>>
>>>>>   KTable table =
>>> stream.groupByKey().windowedBy(TimeWindows.of(TimeUnit.MINUTES.toMillis(15)))
>>>>>           .aggregate(
>>>>>           { new AggregatorModel() },
>>>>>           { key, value, aggregate ->
>>>>>               return aggregate.add(value)
>>>>>           }
>>>>>   )
>>>>>           .toStream()
>>>>>           .map({ k, v ->
>>>>>       new KeyValue<>(k.window().end(), v)
>>>>>   })
>>>>>           .to('output')
>>>>>
>>>>>   def config = new Properties()
>>>>>   config.put(StreamsConfig.APPLICATION_ID_CONFIG, applicationId)
>>>>>   config.put(StreamsConfig.BOOTSTRAP_SERVERS_CONFIG, 'localhost:9092')
>>>>>   config.put(StreamsConfig.COMMIT_INTERVAL_MS_CONFIG,
>>> TimeUnit.SECONDS.toMillis(60))
>>>>>
>>>>>   KafkaStreams kafkaStreams = new KafkaStreams(builder.build(), config)
>>>>>   kafkaStreams.start()
>>>>>
>>>>>
>>>> I've tried add to config ConsumerConfig.AUTO_OFFSET_RESET_CONFIG set to
>>>> 'latest' and 'earliest' but it didn't help.
>>>> Can you help me understand what I'm doing wrong?
>>>> Thank you.
>>>
> 

Attachment: signature.asc
Description: OpenPGP digital signature

Reply via email to