In the process of testing a Kafka Streams application I've come across a
few issues that are baffling me.

For testing I am executing a job on 20 nodes with four cores per node, each
instance configured to use 4 threads, against a 5 node broker cluster
running 0.10.1.1.

Before execution kafka-streams-application-reset.sh is ran to reset offset
of input topics to zero.  The app calls KafkaStreams.cleanUp() on startup
to clean up the local state stores.  All workers are started
simultaneously. All topics have 100 partitions. The main input topic has
1TB of data 3x replicated.  min.insync.replicas is set to 3.

The application consumes from the main input topic, transforms the input,
and repartitions it using KStream.through() to write to another topic.  It
reads from the repartitioned topic and continues processing.

In the brokers we are seeing errors such as:

[2017-01-19 21:24:41,298] WARN [ReplicaFetcherThread-3-1009], Replica 1010
for partition some_topic-91 reset its fetch offset from 424762 to current
leader 1009's start offset 424779 (kafka.server.ReplicaFetcherThread)
[2017-01-19 21:24:41,350] WARN [ReplicaFetcherThread-2-1009], Replica 1010
for partition some_topic-66 reset its fetch offset from 401243 to current
leader 1009's start offset 401376 (kafka.server.ReplicaFetcherThread)
[2017-01-19 21:24:41,381] ERROR [ReplicaFetcherThread-3-1009], Current
offset 424762 for partition [some_topic,91] out of range; reset offset to
424779 (kafka.server.ReplicaFetcherThread)
[2017-01-19 21:24:41,399] WARN [ReplicaFetcherThread-3-1009], Replica 1010
for partition some_topic-71 reset its fetch offset from 456158 to current
leader 1009's start offset 456189 (kafka.server.ReplicaFetcherThread)
[2017-01-19 21:24:41,400] WARN [ReplicaFetcherThread-0-1007], Replica 1010
for partition some_topic-84 reset its fetch offset from 399325 to current
leader 1007's start offset 399327 (kafka.server.ReplicaFetcherThread)
[2017-01-19 21:24:41,446] ERROR [ReplicaFetcherThread-2-1009], Current
offset 401243 for partition [some_topic,66] out of range; reset offset to
401376 (kafka.server.ReplicaFetcherThread)

If I understand these errors correctly, they are saying that the broker's
replica fetcher thread for these partitions failed to fetch at its current
offset because the leader's start offset is higher.  It basically says the
leader no longer has the messages at the offset requested.  That makes no
sense, as the topic is not configured to delete any messages.  I observed
these errors 512 times in total across all brokers while executing the
application.

>From there is seems to cascade to the Streams application:

INFO  2017-01-19 21:24:41,417 [StreamThread-4][Fetcher.java:714] : Fetch
offset 1051824 is out of range for partition some_topic-14, resetting offset
ERROR 2017-01-19 21:24:41,421 [StreamThread-4][StreamThread.java:249] :
stream-thread [StreamThread-4] Streams application error during processing:
java.lang.NullPointerException
        at
org.apache.kafka.clients.consumer.internals.Fetcher.resetOffset(Fetcher.java:341)
        at
org.apache.kafka.clients.consumer.internals.Fetcher.resetOffsetsIfNeeded(Fetcher.java:197)
        at
org.apache.kafka.clients.consumer.KafkaConsumer.updateFetchPositions(KafkaConsumer.java:1524)
        at
org.apache.kafka.clients.consumer.KafkaConsumer.pollOnce(KafkaConsumer.java:1018)
        at
org.apache.kafka.clients.consumer.KafkaConsumer.poll(KafkaConsumer.java:979)
        at
org.apache.kafka.streams.processor.internals.StreamThread.runLoop(StreamThread.java:407)
        at
org.apache.kafka.streams.processor.internals.StreamThread.run(StreamThread.java:242)
INFO  2017-01-19 21:24:41,425 [StreamThread-4][StreamThread.java:268] :
stream-thread [StreamThread-4] Shutting down

Saw this error 731 times across the workers.

If we look at just one partition across brokers and workers and we group
the logs by time, we see this:

worker-3 INFO  2017-01-19 21:*24:41*,806 [StreamThread-3][Fetcher.java:714]
: Fetch offset 429851 is out of range for partition some_topic-94,
resetting offset

worker-3 INFO  2017-01-19 21:*29:41*,496 [StreamThread-1][Fetcher.java:714]
: Fetch offset 1317721 is out of range for partition some_topic-94,
resetting offset

worker-1 INFO  2017-01-19 21:*34:41*,977 [StreamThread-2][Fetcher.java:714]
: Fetch offset 2014017 is out of range for partition some_topic-94,
resetting offset

worker-1 INFO  2017-01-19 21:*39:41*,425 [StreamThread-3][Fetcher.java:714]
: Fetch offset 2588834 is out of range for partition some_topic-94,
resetting offset

broker-3 [2017-01-19 21:*44:41*,595] WARN [ReplicaFetcherThread-2-1007],
Replica 1008 for partition some_topic-94 reset its fetch offset from
3093739 to current leader 1007's start offset 3093742
(kafka.server.ReplicaFetcherThread)
broker-3 [2017-01-19 21:*44:41*,642] ERROR [ReplicaFetcherThread-2-1007],
Current offset 3093739 for partition [some_topic,94] out of range; reset
offset to 3093742 (kafka.server.ReplicaFetcherThread)
worker-2 INFO  2017-01-19 21:*45:03*,011 [StreamThread-2][Fetcher.java:714]
: Fetch offset 3093075 is out of range for partition some_topic-94,
resetting offset

broker-3 [2017-01-19 21:*49:41*,344] WARN [ReplicaFetcherThread-2-1007],
Replica 1008 for partition some_topic-94 reset its fetch offset from
3417421 to current leader 1007's start offset 3417435
(kafka.server.ReplicaFetcherThread
broker-2 [2017-01-19 21:*49:41*,346] WARN [ReplicaFetcherThread-2-1007],
Replica 1009 for partition some_topic-94 reset its fetch offset from
3417421 to current leader 1007's start offset 3417435
(kafka.server.ReplicaFetcherThread)
worker-2 INFO  2017-01-19 21*:49:41*,393 [StreamThread-2][Fetcher.java:714]
: Fetch offset 3416859 is out of range for partition some_topic-94,
resetting offset
broker-2 [2017-01-19 21:*49:41*,564] ERROR [ReplicaFetcherThread-2-1007],
Current offset 3417421 for partition [some_topic,94] out of range; reset
offset to 3417435 (kafka.server.ReplicaFetcherThread)
broker-3 [2017-01-19 21:*49:41*,614] ERROR [ReplicaFetcherThread-2-1007],
Current offset 3417421 for partition [some_topic,94] out of range; reset
offset to 3417435 (kafka.server.ReplicaFetcherThread)

worker-4 INFO  2017-01-19 21:*55:05*,072 [StreamThread-3][Fetcher.java:714]
: Fetch offset 3762043 is out of range for partition some_topic-94,
resetting offset


Interestingly, the error repeats roughly every 5 minutes.

Further more, while the job is running, it is writing data to the
repartition topic:

$ ansible tag_aws_autoscaling_groupName_Kafka_Cluster2 -u ec2-user -f 10 -a
'du -sch /data/kafka/logs/some_topic-*/*.log | tail -1' -m shell --become
54.172.57.255 | SUCCESS | rc=0 >>
2.1G total

54.175.254.148 | SUCCESS | rc=0 >>
2.2G total

54.174.72.110 | SUCCESS | rc=0 >>
2.5G total

54.167.181.186 | SUCCESS | rc=0 >>
2.0G total

52.3.220.225 | SUCCESS | rc=0 >>
2.3G total

But after I stop the job and wait a while all the data disappears:

$ ansible tag_aws_autoscaling_groupName_Kafka_Cluster2 -u ec2-user -f 10 -a
'du -sch /data/kafka/logs/some_topic-*/*.log | tail -1' -m shell --become
54.174.72.110 | SUCCESS | rc=0 >>
0 total

54.167.181.186 | SUCCESS | rc=0 >>
0 total

54.175.254.148 | SUCCESS | rc=0 >>
0 total

54.172.57.255 | SUCCESS | rc=0 >>
0 total

52.3.220.225 | SUCCESS | rc=0 >>
0 total


Performing the test with unclean.leader.election.enable set to true or
false makes no difference.

While the 5 minute cadence of errors matches thee default value of the
*leader.imbalance.check.interval.seconds* setting, increasing it appears to
have no effect (errors still occur at 5 minute intervals).

Anyone have any idea what might occur every five minutes that may result on
this error??

Reply via email to