Hello, I run Kafka 0.8.2.2 cluster with 3 nodes and recently started to observe strange behavior on select topics. The cluster runs in-house as well as most consumers. I have started some consumers in AWS and they _mostly_ work fine. Occasionally, I end up in a state where when I run kafka-consumer-offset-checker I see that offset of one partition goes back and forth (i.e. it was 1000 then goes to 900 then goes to 1100 etc.)
Kafka broker that is holding this partition has following log messages: {"@timestamp":"2016-10-19T21:00:00.134Z","@service":"kafka","thread":"kafka-request-handler-2","logger":"kafka.server.ReplicaManager","@host":"kafka-0","@category":"common","@msg":"[Replica Manager on Broker 0]: Error when processing fetch request for partition [my_topic,1] offset 337055698 from consumer with correlation id 0. Possible cause: Request for offset 337055698 but we only have log segments in the range 347392118 to 361407455.","@version":"1","@severity":"ERROR"} {"@timestamp":"2016-10-19T21:00:00.168Z","@service":"kafka","exception":"java.io.IOException: Broken pipe at sun.nio.ch.FileChannelImpl.transferTo0(Native Method) at sun.nio.ch.FileChannelImpl.transferToDirectly(FileChannelImpl.java:434) at sun.nio.ch.FileChannelImpl.transferTo(FileChannelImpl.java:566) at kafka.log.FileMessageSet.writeTo(FileMessageSet.scala:147) at kafka.api.PartitionDataSend.writeTo(FetchResponse.scala:70) at kafka.network.MultiSend.writeTo(Transmission.scala:101) at kafka.api.TopicDataSend.writeTo(FetchResponse.scala:125) at kafka.network.MultiSend.writeTo(Transmission.scala:101) at kafka.api.FetchResponseSend.writeTo(FetchResponse.scala:231) at kafka.network.Processor.write(SocketServer.scala:472) at kafka.network.Processor.run(SocketServer.scala:342) at java.lang.Thread.run(Thread.java:745)\n","thread":"kafka-network-thread-6667-0","logger":"kafka.network.Processor","@host":"kafka0.util.pages","@category":"common","@msg":"Closing socket for /10.10.10.10 because of error","@version":"1","@severity":"ERROR"} IP above is obscured, but it is an IP of the EC2 node that runs the consumer for that partition. I try to reset offset for the consumer group at that partition manually (I wrote a script for that), but I still see it being reset to a prior point (and back). It seems that after a while this behavior goes away and affected partitions have a chance to catch up, but then the whole thing repeats. My consumer configuration is: "socket.timeout.ms": "60000", "zookeeper.session.timeout.ms": "60000", "offsets.channel.socket.timeout.ms": "30000" "auto.offset.reset": "smallest" "offsets.storage": "kafka" "consumer.timeout.ms": "1500" I use reactive-kafka wrapper, other places where it is used do not have these problems. Please, advice what could this be. Thanks, Timur