In the process of testing a Kafka Streams application I've come across a few issues that are baffling me.
For testing I am executing a job on 20 nodes with four cores per node, each instance configured to use 4 threads, against a 5 node broker cluster running 0.10.1.1. Before execution kafka-streams-application-reset.sh is ran to reset offset of input topics to zero. The app calls KafkaStreams.cleanUp() on startup to clean up the local state stores. All workers are started simultaneously. All topics have 100 partitions. The main input topic has 1TB of data 3x replicated. min.insync.replicas is set to 3. The application consumes from the main input topic, transforms the input, and repartitions it using KStream.through() to write to another topic. It reads from the repartitioned topic and continues processing. In the brokers we are seeing errors such as: [2017-01-19 21:24:41,298] WARN [ReplicaFetcherThread-3-1009], Replica 1010 for partition some_topic-91 reset its fetch offset from 424762 to current leader 1009's start offset 424779 (kafka.server.ReplicaFetcherThread) [2017-01-19 21:24:41,350] WARN [ReplicaFetcherThread-2-1009], Replica 1010 for partition some_topic-66 reset its fetch offset from 401243 to current leader 1009's start offset 401376 (kafka.server.ReplicaFetcherThread) [2017-01-19 21:24:41,381] ERROR [ReplicaFetcherThread-3-1009], Current offset 424762 for partition [some_topic,91] out of range; reset offset to 424779 (kafka.server.ReplicaFetcherThread) [2017-01-19 21:24:41,399] WARN [ReplicaFetcherThread-3-1009], Replica 1010 for partition some_topic-71 reset its fetch offset from 456158 to current leader 1009's start offset 456189 (kafka.server.ReplicaFetcherThread) [2017-01-19 21:24:41,400] WARN [ReplicaFetcherThread-0-1007], Replica 1010 for partition some_topic-84 reset its fetch offset from 399325 to current leader 1007's start offset 399327 (kafka.server.ReplicaFetcherThread) [2017-01-19 21:24:41,446] ERROR [ReplicaFetcherThread-2-1009], Current offset 401243 for partition [some_topic,66] out of range; reset offset to 401376 (kafka.server.ReplicaFetcherThread) If I understand these errors correctly, they are saying that the broker's replica fetcher thread for these partitions failed to fetch at its current offset because the leader's start offset is higher. It basically says the leader no longer has the messages at the offset requested. That makes no sense, as the topic is not configured to delete any messages. I observed these errors 512 times in total across all brokers while executing the application. >From there is seems to cascade to the Streams application: INFO 2017-01-19 21:24:41,417 [StreamThread-4][Fetcher.java:714] : Fetch offset 1051824 is out of range for partition some_topic-14, resetting offset ERROR 2017-01-19 21:24:41,421 [StreamThread-4][StreamThread.java:249] : stream-thread [StreamThread-4] Streams application error during processing: java.lang.NullPointerException at org.apache.kafka.clients.consumer.internals.Fetcher.resetOffset(Fetcher.java:341) at org.apache.kafka.clients.consumer.internals.Fetcher.resetOffsetsIfNeeded(Fetcher.java:197) at org.apache.kafka.clients.consumer.KafkaConsumer.updateFetchPositions(KafkaConsumer.java:1524) at org.apache.kafka.clients.consumer.KafkaConsumer.pollOnce(KafkaConsumer.java:1018) at org.apache.kafka.clients.consumer.KafkaConsumer.poll(KafkaConsumer.java:979) at org.apache.kafka.streams.processor.internals.StreamThread.runLoop(StreamThread.java:407) at org.apache.kafka.streams.processor.internals.StreamThread.run(StreamThread.java:242) INFO 2017-01-19 21:24:41,425 [StreamThread-4][StreamThread.java:268] : stream-thread [StreamThread-4] Shutting down Saw this error 731 times across the workers. If we look at just one partition across brokers and workers and we group the logs by time, we see this: worker-3 INFO 2017-01-19 21:*24:41*,806 [StreamThread-3][Fetcher.java:714] : Fetch offset 429851 is out of range for partition some_topic-94, resetting offset worker-3 INFO 2017-01-19 21:*29:41*,496 [StreamThread-1][Fetcher.java:714] : Fetch offset 1317721 is out of range for partition some_topic-94, resetting offset worker-1 INFO 2017-01-19 21:*34:41*,977 [StreamThread-2][Fetcher.java:714] : Fetch offset 2014017 is out of range for partition some_topic-94, resetting offset worker-1 INFO 2017-01-19 21:*39:41*,425 [StreamThread-3][Fetcher.java:714] : Fetch offset 2588834 is out of range for partition some_topic-94, resetting offset broker-3 [2017-01-19 21:*44:41*,595] WARN [ReplicaFetcherThread-2-1007], Replica 1008 for partition some_topic-94 reset its fetch offset from 3093739 to current leader 1007's start offset 3093742 (kafka.server.ReplicaFetcherThread) broker-3 [2017-01-19 21:*44:41*,642] ERROR [ReplicaFetcherThread-2-1007], Current offset 3093739 for partition [some_topic,94] out of range; reset offset to 3093742 (kafka.server.ReplicaFetcherThread) worker-2 INFO 2017-01-19 21:*45:03*,011 [StreamThread-2][Fetcher.java:714] : Fetch offset 3093075 is out of range for partition some_topic-94, resetting offset broker-3 [2017-01-19 21:*49:41*,344] WARN [ReplicaFetcherThread-2-1007], Replica 1008 for partition some_topic-94 reset its fetch offset from 3417421 to current leader 1007's start offset 3417435 (kafka.server.ReplicaFetcherThread broker-2 [2017-01-19 21:*49:41*,346] WARN [ReplicaFetcherThread-2-1007], Replica 1009 for partition some_topic-94 reset its fetch offset from 3417421 to current leader 1007's start offset 3417435 (kafka.server.ReplicaFetcherThread) worker-2 INFO 2017-01-19 21*:49:41*,393 [StreamThread-2][Fetcher.java:714] : Fetch offset 3416859 is out of range for partition some_topic-94, resetting offset broker-2 [2017-01-19 21:*49:41*,564] ERROR [ReplicaFetcherThread-2-1007], Current offset 3417421 for partition [some_topic,94] out of range; reset offset to 3417435 (kafka.server.ReplicaFetcherThread) broker-3 [2017-01-19 21:*49:41*,614] ERROR [ReplicaFetcherThread-2-1007], Current offset 3417421 for partition [some_topic,94] out of range; reset offset to 3417435 (kafka.server.ReplicaFetcherThread) worker-4 INFO 2017-01-19 21:*55:05*,072 [StreamThread-3][Fetcher.java:714] : Fetch offset 3762043 is out of range for partition some_topic-94, resetting offset Interestingly, the error repeats roughly every 5 minutes. Further more, while the job is running, it is writing data to the repartition topic: $ ansible tag_aws_autoscaling_groupName_Kafka_Cluster2 -u ec2-user -f 10 -a 'du -sch /data/kafka/logs/some_topic-*/*.log | tail -1' -m shell --become 54.172.57.255 | SUCCESS | rc=0 >> 2.1G total 54.175.254.148 | SUCCESS | rc=0 >> 2.2G total 54.174.72.110 | SUCCESS | rc=0 >> 2.5G total 54.167.181.186 | SUCCESS | rc=0 >> 2.0G total 52.3.220.225 | SUCCESS | rc=0 >> 2.3G total But after I stop the job and wait a while all the data disappears: $ ansible tag_aws_autoscaling_groupName_Kafka_Cluster2 -u ec2-user -f 10 -a 'du -sch /data/kafka/logs/some_topic-*/*.log | tail -1' -m shell --become 54.174.72.110 | SUCCESS | rc=0 >> 0 total 54.167.181.186 | SUCCESS | rc=0 >> 0 total 54.175.254.148 | SUCCESS | rc=0 >> 0 total 54.172.57.255 | SUCCESS | rc=0 >> 0 total 52.3.220.225 | SUCCESS | rc=0 >> 0 total Performing the test with unclean.leader.election.enable set to true or false makes no difference. While the 5 minute cadence of errors matches thee default value of the *leader.imbalance.check.interval.seconds* setting, increasing it appears to have no effect (errors still occur at 5 minute intervals). Anyone have any idea what might occur every five minutes that may result on this error??