I use Kafka 0.10.0.1. I count the messages of a topic as follows:

...
streamsConfiguration.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG,
"earliest");
...
KStream<String, String> longs = builder.stream(Serdes.String(),
Serdes.String(), "qps-input");
...
KTable<Windowed<String>, Long> longCounts =
                longs.countByKey(TimeWindows.of("qps", 3600 * 1000),
                        Serdes.String());
...

and then I write output to another topic. Result is that:

Numbers which starts from 1 and increase whenever I add something to
qps-input.

My questions:

1) Does it calculate really last hour or everything from the beginning due
you I've set it as earliest?

2) Sometimes it's been reset and numbers starts from 1. What can be the
reason for that?

Kind Regards,
Furkan KAMACI

Reply via email to