Hello,

I run Kafka 0.8.2.2 cluster with 3 nodes and recently started to observe
strange behavior on select topics. The cluster runs in-house as well as
most consumers. I have started some consumers in AWS and they _mostly_ work
fine. Occasionally, I end up in a state where when I run
kafka-consumer-offset-checker I see that offset of one partition goes back
and forth (i.e. it was 1000 then goes to 900 then goes to 1100 etc.)

Kafka broker that is holding this partition has following log messages:
{"@timestamp":"2016-10-19T21:00:00.134Z","@service":"kafka","thread":"kafka-request-handler-2","logger":"kafka.server.ReplicaManager","@host":"kafka-0","@category":"common","@msg":"[Replica
Manager on Broker 0]: Error when processing fetch request for partition
[my_topic,1] offset 337055698 from consumer with correlation id 0. Possible
cause: Request for offset 337055698 but we only have log segments in the
range 347392118 to 361407455.","@version":"1","@severity":"ERROR"}

{"@timestamp":"2016-10-19T21:00:00.168Z","@service":"kafka","exception":"java.io.IOException:
Broken pipe
at
sun.nio.ch.FileChannelImpl.transferTo0(Native Method)
at sun.nio.ch.FileChannelImpl.transferToDirectly(FileChannelImpl.java:434)
at sun.nio.ch.FileChannelImpl.transferTo(FileChannelImpl.java:566)
at kafka.log.FileMessageSet.writeTo(FileMessageSet.scala:147)
at kafka.api.PartitionDataSend.writeTo(FetchResponse.scala:70)
at kafka.network.MultiSend.writeTo(Transmission.scala:101)
at kafka.api.TopicDataSend.writeTo(FetchResponse.scala:125)
at kafka.network.MultiSend.writeTo(Transmission.scala:101)
at kafka.api.FetchResponseSend.writeTo(FetchResponse.scala:231)
at kafka.network.Processor.write(SocketServer.scala:472)
at kafka.network.Processor.run(SocketServer.scala:342)
at
java.lang.Thread.run(Thread.java:745)\n","thread":"kafka-network-thread-6667-0","logger":"kafka.network.Processor","@host":"kafka0.util.pages","@category":"common","@msg":"Closing
socket for /10.10.10.10 because of
error","@version":"1","@severity":"ERROR"}

IP above is obscured, but it is an IP of the EC2 node that runs the
consumer for that partition.

I try to reset offset for the consumer group at that partition manually (I
wrote a script for that), but I still see it being reset to a prior point
(and back). It seems that after a while this behavior goes away and
affected partitions have a chance to catch up, but then the whole thing
repeats.

My consumer configuration is:

"socket.timeout.ms": "60000",
"zookeeper.session.timeout.ms": "60000",
"offsets.channel.socket.timeout.ms": "30000"
"auto.offset.reset": "smallest"
"offsets.storage": "kafka"
"consumer.timeout.ms": "1500"

I use reactive-kafka wrapper, other places where it is used do not have
these problems.

Please, advice what could this be.

Thanks,
Timur

Reply via email to