[ https://issues.apache.org/jira/browse/KAFKA-4940?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ]
Victor Garcia updated KAFKA-4940: --------------------------------- Description: A cluster can partially work if there is an IO issue that blocks the broker process and leaves it in a state of uninterruptible sleep. All the threads connected to this bad broker will hang and the cluster ends up partially working. I reproduced it and this is what happened: Let's say we have brokers 1, 2 and 3 and broker 2 is IO blocked, non responsive, you can't even kill it unless -9. Let's say we have a topic with replication 3. The partitions with leader 1 and 3, will see that broker 2 has issues and will take it out from ISR. That's fine. But the partitions where the leader is 2, think the problematic brokers are 1 and 3 and will take these replicas out of the ISR. And this is a problem. The consumers and producers will only work with the ones that don't have the broker 2 in their ISR. This is an example of the output for 2 topics after provoking this: {code} ./kafka-topics.sh --describe --zookeeper 127.0.0.1:2181 --unavailable-partitions Topic: agent_ping Partition: 0 Leader: 2 Replicas: 2,1,3 Isr: 2 Topic: agent_ping Partition: 1 Leader: 3 Replicas: 3,2,1 Isr: 1,3 Topic: agent_ping Partition: 2 Leader: 1 Replicas: 1,3,2 Isr: 3,1 Topic: agent_ping Partition: 3 Leader: 2 Replicas: 2,3,1 Isr: 2 Topic: agent_ping Partition: 4 Leader: 3 Replicas: 3,1,2 Isr: 1,3 Topic: agent_ping Partition: 5 Leader: 1 Replicas: 1,2,3 Isr: 3,1 Topic: agent_ping Partition: 6 Leader: 2 Replicas: 2,1,3 Isr: 2 Topic: agent_ping Partition: 9 Leader: 2 Replicas: 2,3,1 Isr: 2 Topic: agent_ping Partition: 12 Leader: 2 Replicas: 2,1,3 Isr: 2 Topic: agent_ping Partition: 13 Leader: 3 Replicas: 3,2,1 Isr: 1,3 Topic: agent_ping Partition: 14 Leader: 1 Replicas: 1,3,2 Isr: 3,1 Topic: agent_ping Partition: 15 Leader: 2 Replicas: 2,3,1 Isr: 2 Topic: agent_ping Partition: 16 Leader: 3 Replicas: 3,1,2 Isr: 1,3 Topic: agent_ping Partition: 17 Leader: 1 Replicas: 1,2,3 Isr: 3,1 Topic: agent_ping Partition: 18 Leader: 2 Replicas: 2,1,3 Isr: 2 Topic: imback Partition: 0 Leader: 3 Replicas: 3,1,2 Isr: 1,3 Topic: imback Partition: 1 Leader: 1 Replicas: 1,2,3 Isr: 3,1 Topic: imback Partition: 2 Leader: 2 Replicas: 2,3,1 Isr: 2 Topic: imback Partition: 3 Leader: 3 Replicas: 3,2,1 Isr: 1,3 Topic: imback Partition: 4 Leader: 1 Replicas: 1,3,2 Isr: 3,1 {code} Kafka should be able to handle this in a better way and find out what are the problematic brokers and remove its replicas accordingly. IO problems can be caused by hardware issues, kernel misconfiguration or others, and are not that infrequent. Kafka is highly available, but in this case it is not. To reproduce this, creating IO to block a process is not easy but the same symptoms can be easily reproducible using NFS. Create an simple NFS server (https://help.ubuntu.com/community/SettingUpNFSHowTo), mount a NFS partition in the broker log.dirs and once the cluster is working, stop NFS in the server (service nfs-kernel-server stop) This will make broker hang waiting for IO. This is the output of a producer, some messages go through but others fail with this error: {code} ./kafka-console-producer.sh --topic imback --broker broker1:6667,broker2:6667,broker3:6667 text text text [2017-03-22 18:18:42,864] WARN Got error produce response with correlation id 44 on topic-partition imback-2, retrying (2 attempts left). Error: NETWORK_EXCEPTION (org.apache.kafka.clients.producer.internals.Sender) [2017-03-22 18:18:44,467] WARN Got error produce response with correlation id 46 on topic-partition imback-2, retrying (1 attempts left). Error: NETWORK_EXCEPTION (org.apache.kafka.clients.producer.internals.Sender) [2017-03-22 18:18:46,075] WARN Got error produce response with correlation id 48 on topic-partition imback-2, retrying (0 attempts left). Error: NETWORK_EXCEPTION (org.apache.kafka.clients.producer.internals.Sender) [2017-03-22 18:18:47,677] ERROR Error when sending message to topic imback with key: null, value: 1 bytes with error: (org.apache.kafka.clients.producer.internals.ErrorLoggingCallback) org.apache.kafka.common.errors.NetworkException: The server disconnected before a response was received. text text text text text [2017-03-22 18:20:31,002] WARN Got error produce response with correlation id 55 on topic-partition imback-2, retrying (2 attempts left). Error: NETWORK_EXCEPTION (org.apache.kafka.clients.producer.internals.Sender) [2017-03-22 18:20:32,605] WARN Got error produce response with correlation id 57 on topic-partition imback-2, retrying (1 attempts left). Error: NETWORK_EXCEPTION (org.apache.kafka.clients.producer.internals.Sender) [2017-03-22 18:20:34,212] WARN Got error produce response with correlation id 59 on topic-partition imback-2, retrying (0 attempts left). Error: NETWORK_EXCEPTION (org.apache.kafka.clients.producer.internals.Sender) [2017-03-22 18:20:35,815] ERROR Error when sending message to topic imback with key: null, value: 1 bytes with error: (org.apache.kafka.clients.producer.internals.ErrorLoggingCallback) org.apache.kafka.common.errors.NetworkException: The server disconnected before a response was received. {code} And a consumer attached to this topic, outputs this: {code} ./kafka-console-consumer.sh --topic imback --zookeeper localhost:2181 text text text [2017-03-22 18:16:44,594] WARN [console-consumer-77661_kafkachaos3-1490231743517-c0973e49-leader-finder-thread], Failed to add leader for partitions imback-2,imback-4,imback-1,imback-3,imback-0; will retry (kafka.consumer.ConsumerFetcherManager$LeaderFinderThread) java.net.SocketTimeoutException at sun.nio.ch.SocketAdaptor$SocketInputStream.read(SocketAdaptor.java:211) at sun.nio.ch.ChannelInputStream.read(ChannelInputStream.java:103) at java.nio.channels.Channels$ReadableByteChannelImpl.read(Channels.java:385) at org.apache.kafka.common.network.NetworkReceive.readFromReadableChannel(NetworkReceive.java:81) at kafka.network.BlockingChannel.readCompletely(BlockingChannel.scala:129) at kafka.network.BlockingChannel.receive(BlockingChannel.scala:120) at kafka.consumer.SimpleConsumer.liftedTree1$1(SimpleConsumer.scala:99) at kafka.consumer.SimpleConsumer.kafka$consumer$SimpleConsumer$$sendRequest(SimpleConsumer.scala:83) at kafka.consumer.SimpleConsumer.getOffsetsBefore(SimpleConsumer.scala:149) at kafka.consumer.SimpleConsumer.earliestOrLatestOffset(SimpleConsumer.scala:188) at kafka.consumer.ConsumerFetcherThread.handleOffsetOutOfRange(ConsumerFetcherThread.scala:85) at kafka.server.AbstractFetcherThread$$anonfun$6.apply(AbstractFetcherThread.scala:210) at kafka.server.AbstractFetcherThread$$anonfun$6.apply(AbstractFetcherThread.scala:208) at scala.collection.TraversableLike$$anonfun$map$1.apply(TraversableLike.scala:244) at scala.collection.TraversableLike$$anonfun$map$1.apply(TraversableLike.scala:244) at scala.collection.immutable.Map$Map1.foreach(Map.scala:109) at scala.collection.TraversableLike$class.map(TraversableLike.scala:244) at scala.collection.AbstractTraversable.map(Traversable.scala:105) at kafka.server.AbstractFetcherThread.addPartitions(AbstractFetcherThread.scala:208) at kafka.server.AbstractFetcherManager$$anonfun$addFetcherForPartitions$2.apply(AbstractFetcherManager.scala:88) at kafka.server.AbstractFetcherManager$$anonfun$addFetcherForPartitions$2.apply(AbstractFetcherManager.scala:78) at scala.collection.TraversableLike$WithFilter$$anonfun$foreach$1.apply(TraversableLike.scala:772) at scala.collection.immutable.Map$Map3.foreach(Map.scala:154) at scala.collection.TraversableLike$WithFilter.foreach(TraversableLike.scala:771) at kafka.server.AbstractFetcherManager.addFetcherForPartitions(AbstractFetcherManager.scala:78) at kafka.consumer.ConsumerFetcherManager$LeaderFinderThread.doWork(ConsumerFetcherManager.scala:94) at kafka.utils.ShutdownableThread.run(ShutdownableThread.scala:63) text {code} was: A cluster can partially work if there is an IO issue that blocks the broker process and leaves it in a state of uninterruptible sleep. All the threads connected to this bad broker will hang and the cluster ends up partially working. I reproduced it and this is what happened: Let's say we have brokers 1, 2 and 3 and broker 2 is IO blocked, non responsive, you can't even kill it unless -9. Let's say we have a topic with replication 3. The partitions with leader 1 and 3, will see that broker 2 has issues and will take it out from ISR. That's fine. But the partitions where the leader is 2, think the problematic brokers are 1 and 3 and will take these replicas out of the ISR. And this is a problem. The consumers and producers will only work with the ones that don't have the broker 2 in their ISR. This is an example of the output for 2 topics after provoking this: {code} ./kafka-topics.sh --describe --zookeeper 127.0.0.1:2181 --unavailable-partitions Topic: agent_ping Partition: 0 Leader: 2 Replicas: 2,1,3 Isr: 2 Topic: agent_ping Partition: 1 Leader: 3 Replicas: 3,2,1 Isr: 1,3 Topic: agent_ping Partition: 2 Leader: 1 Replicas: 1,3,2 Isr: 3,1 Topic: agent_ping Partition: 3 Leader: 2 Replicas: 2,3,1 Isr: 2 Topic: agent_ping Partition: 4 Leader: 3 Replicas: 3,1,2 Isr: 1,3 Topic: agent_ping Partition: 5 Leader: 1 Replicas: 1,2,3 Isr: 3,1 Topic: agent_ping Partition: 6 Leader: 2 Replicas: 2,1,3 Isr: 2 Topic: agent_ping Partition: 9 Leader: 2 Replicas: 2,3,1 Isr: 2 Topic: agent_ping Partition: 12 Leader: 2 Replicas: 2,1,3 Isr: 2 Topic: agent_ping Partition: 13 Leader: 3 Replicas: 3,2,1 Isr: 1,3 Topic: agent_ping Partition: 14 Leader: 1 Replicas: 1,3,2 Isr: 3,1 Topic: agent_ping Partition: 15 Leader: 2 Replicas: 2,3,1 Isr: 2 Topic: agent_ping Partition: 16 Leader: 3 Replicas: 3,1,2 Isr: 1,3 Topic: agent_ping Partition: 17 Leader: 1 Replicas: 1,2,3 Isr: 3,1 Topic: agent_ping Partition: 18 Leader: 2 Replicas: 2,1,3 Isr: 2 Topic: imback Partition: 0 Leader: 3 Replicas: 3,1,2 Isr: 1,3 Topic: imback Partition: 1 Leader: 1 Replicas: 1,2,3 Isr: 3,1 Topic: imback Partition: 2 Leader: 2 Replicas: 2,3,1 Isr: 2 Topic: imback Partition: 3 Leader: 3 Replicas: 3,2,1 Isr: 1,3 Topic: imback Partition: 4 Leader: 1 Replicas: 1,3,2 Isr: 3,1 {code} Kafka should be able to handle this in a better way and find out what are the problematic brokers and remove its replicas accordingly. IO problems can be caused by hardware issues, kernel misconfiguration or others, and are not that infrequent. Kafka is highly available, but in this case it is not. To reproduce this, creating IO to block a process is not easy but the same symptoms can be easily reproducible using NFS. Create an simple NFS server (https://help.ubuntu.com/community/SettingUpNFSHowTo), mount a NFS partition in the broker log.dirs and once the cluster is working, stop NFS in the server (service nfs-kernel-server stop) This will make broker hang waiting for IO. > Cluster partially working if broker blocked with IO > --------------------------------------------------- > > Key: KAFKA-4940 > URL: https://issues.apache.org/jira/browse/KAFKA-4940 > Project: Kafka > Issue Type: Bug > Components: core > Affects Versions: 0.10.1.1, 0.10.2.0 > Reporter: Victor Garcia > > A cluster can partially work if there is an IO issue that blocks the broker > process and leaves it in a state of uninterruptible sleep. > All the threads connected to this bad broker will hang and the cluster ends > up partially working. > I reproduced it and this is what happened: > Let's say we have brokers 1, 2 and 3 and broker 2 is IO blocked, non > responsive, you can't even kill it unless -9. > Let's say we have a topic with replication 3. The partitions with leader 1 > and 3, will see that broker 2 has issues and will take it out from ISR. > That's fine. > But the partitions where the leader is 2, think the problematic brokers are 1 > and 3 and will take these replicas out of the ISR. And this is a problem. > The consumers and producers will only work with the ones that don't have the > broker 2 in their ISR. > This is an example of the output for 2 topics after provoking this: > {code} > ./kafka-topics.sh --describe --zookeeper 127.0.0.1:2181 > --unavailable-partitions > Topic: agent_ping Partition: 0 Leader: 2 Replicas: 2,1,3 > Isr: 2 > Topic: agent_ping Partition: 1 Leader: 3 Replicas: 3,2,1 > Isr: 1,3 > Topic: agent_ping Partition: 2 Leader: 1 Replicas: 1,3,2 > Isr: 3,1 > Topic: agent_ping Partition: 3 Leader: 2 Replicas: 2,3,1 > Isr: 2 > Topic: agent_ping Partition: 4 Leader: 3 Replicas: 3,1,2 > Isr: 1,3 > Topic: agent_ping Partition: 5 Leader: 1 Replicas: 1,2,3 > Isr: 3,1 > Topic: agent_ping Partition: 6 Leader: 2 Replicas: 2,1,3 > Isr: 2 > Topic: agent_ping Partition: 9 Leader: 2 Replicas: 2,3,1 > Isr: 2 > Topic: agent_ping Partition: 12 Leader: 2 Replicas: 2,1,3 > Isr: 2 > Topic: agent_ping Partition: 13 Leader: 3 Replicas: 3,2,1 > Isr: 1,3 > Topic: agent_ping Partition: 14 Leader: 1 Replicas: 1,3,2 > Isr: 3,1 > Topic: agent_ping Partition: 15 Leader: 2 Replicas: 2,3,1 > Isr: 2 > Topic: agent_ping Partition: 16 Leader: 3 Replicas: 3,1,2 > Isr: 1,3 > Topic: agent_ping Partition: 17 Leader: 1 Replicas: 1,2,3 > Isr: 3,1 > Topic: agent_ping Partition: 18 Leader: 2 Replicas: 2,1,3 > Isr: 2 > Topic: imback Partition: 0 Leader: 3 Replicas: 3,1,2 Isr: 1,3 > Topic: imback Partition: 1 Leader: 1 Replicas: 1,2,3 Isr: 3,1 > Topic: imback Partition: 2 Leader: 2 Replicas: 2,3,1 Isr: 2 > Topic: imback Partition: 3 Leader: 3 Replicas: 3,2,1 Isr: 1,3 > Topic: imback Partition: 4 Leader: 1 Replicas: 1,3,2 Isr: 3,1 > {code} > Kafka should be able to handle this in a better way and find out what are the > problematic brokers and remove its replicas accordingly. > IO problems can be caused by hardware issues, kernel misconfiguration or > others, and are not that infrequent. > Kafka is highly available, but in this case it is not. > To reproduce this, creating IO to block a process is not easy but the same > symptoms can be easily reproducible using NFS. > Create an simple NFS server > (https://help.ubuntu.com/community/SettingUpNFSHowTo), mount a NFS partition > in the broker log.dirs and once the cluster is working, stop NFS in the > server (service nfs-kernel-server stop) > This will make broker hang waiting for IO. > This is the output of a producer, some messages go through but others fail > with this error: > {code} > ./kafka-console-producer.sh --topic imback --broker > broker1:6667,broker2:6667,broker3:6667 > text > text > text > [2017-03-22 18:18:42,864] WARN Got error produce response with correlation id > 44 on topic-partition imback-2, retrying (2 attempts left). Error: > NETWORK_EXCEPTION (org.apache.kafka.clients.producer.internals.Sender) > [2017-03-22 18:18:44,467] WARN Got error produce response with correlation id > 46 on topic-partition imback-2, retrying (1 attempts left). Error: > NETWORK_EXCEPTION (org.apache.kafka.clients.producer.internals.Sender) > [2017-03-22 18:18:46,075] WARN Got error produce response with correlation id > 48 on topic-partition imback-2, retrying (0 attempts left). Error: > NETWORK_EXCEPTION (org.apache.kafka.clients.producer.internals.Sender) > [2017-03-22 18:18:47,677] ERROR Error when sending message to topic imback > with key: null, value: 1 bytes with error: > (org.apache.kafka.clients.producer.internals.ErrorLoggingCallback) > org.apache.kafka.common.errors.NetworkException: The server disconnected > before a response was received. > text > text > text > text > text > [2017-03-22 18:20:31,002] WARN Got error produce response with correlation id > 55 on topic-partition imback-2, retrying (2 attempts left). Error: > NETWORK_EXCEPTION (org.apache.kafka.clients.producer.internals.Sender) > [2017-03-22 18:20:32,605] WARN Got error produce response with correlation id > 57 on topic-partition imback-2, retrying (1 attempts left). Error: > NETWORK_EXCEPTION (org.apache.kafka.clients.producer.internals.Sender) > [2017-03-22 18:20:34,212] WARN Got error produce response with correlation id > 59 on topic-partition imback-2, retrying (0 attempts left). Error: > NETWORK_EXCEPTION (org.apache.kafka.clients.producer.internals.Sender) > [2017-03-22 18:20:35,815] ERROR Error when sending message to topic imback > with key: null, value: 1 bytes with error: > (org.apache.kafka.clients.producer.internals.ErrorLoggingCallback) > org.apache.kafka.common.errors.NetworkException: The server disconnected > before a response was received. > {code} > And a consumer attached to this topic, outputs this: > {code} > ./kafka-console-consumer.sh --topic imback --zookeeper localhost:2181 > text > text > text > [2017-03-22 18:16:44,594] WARN > [console-consumer-77661_kafkachaos3-1490231743517-c0973e49-leader-finder-thread], > Failed to add leader for partitions > imback-2,imback-4,imback-1,imback-3,imback-0; will retry > (kafka.consumer.ConsumerFetcherManager$LeaderFinderThread) > java.net.SocketTimeoutException > at > sun.nio.ch.SocketAdaptor$SocketInputStream.read(SocketAdaptor.java:211) > at sun.nio.ch.ChannelInputStream.read(ChannelInputStream.java:103) > at > java.nio.channels.Channels$ReadableByteChannelImpl.read(Channels.java:385) > at > org.apache.kafka.common.network.NetworkReceive.readFromReadableChannel(NetworkReceive.java:81) > at > kafka.network.BlockingChannel.readCompletely(BlockingChannel.scala:129) > at kafka.network.BlockingChannel.receive(BlockingChannel.scala:120) > at kafka.consumer.SimpleConsumer.liftedTree1$1(SimpleConsumer.scala:99) > at > kafka.consumer.SimpleConsumer.kafka$consumer$SimpleConsumer$$sendRequest(SimpleConsumer.scala:83) > at > kafka.consumer.SimpleConsumer.getOffsetsBefore(SimpleConsumer.scala:149) > at > kafka.consumer.SimpleConsumer.earliestOrLatestOffset(SimpleConsumer.scala:188) > at > kafka.consumer.ConsumerFetcherThread.handleOffsetOutOfRange(ConsumerFetcherThread.scala:85) > at > kafka.server.AbstractFetcherThread$$anonfun$6.apply(AbstractFetcherThread.scala:210) > at > kafka.server.AbstractFetcherThread$$anonfun$6.apply(AbstractFetcherThread.scala:208) > at > scala.collection.TraversableLike$$anonfun$map$1.apply(TraversableLike.scala:244) > at > scala.collection.TraversableLike$$anonfun$map$1.apply(TraversableLike.scala:244) > at scala.collection.immutable.Map$Map1.foreach(Map.scala:109) > at scala.collection.TraversableLike$class.map(TraversableLike.scala:244) > at scala.collection.AbstractTraversable.map(Traversable.scala:105) > at > kafka.server.AbstractFetcherThread.addPartitions(AbstractFetcherThread.scala:208) > at > kafka.server.AbstractFetcherManager$$anonfun$addFetcherForPartitions$2.apply(AbstractFetcherManager.scala:88) > at > kafka.server.AbstractFetcherManager$$anonfun$addFetcherForPartitions$2.apply(AbstractFetcherManager.scala:78) > at > scala.collection.TraversableLike$WithFilter$$anonfun$foreach$1.apply(TraversableLike.scala:772) > at scala.collection.immutable.Map$Map3.foreach(Map.scala:154) > at > scala.collection.TraversableLike$WithFilter.foreach(TraversableLike.scala:771) > at > kafka.server.AbstractFetcherManager.addFetcherForPartitions(AbstractFetcherManager.scala:78) > at > kafka.consumer.ConsumerFetcherManager$LeaderFinderThread.doWork(ConsumerFetcherManager.scala:94) > at kafka.utils.ShutdownableThread.run(ShutdownableThread.scala:63) > text > {code} -- This message was sent by Atlassian JIRA (v6.3.15#6346)