[ 
https://issues.apache.org/jira/browse/KAFKA-4940?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Victor Garcia updated KAFKA-4940:
---------------------------------
    Description: 
A cluster can partially work if there is an IO issue that blocks the broker 
process and leaves it in a state of uninterruptible sleep.

All the threads connected to this bad broker will hang and the cluster ends up 
partially working.

I reproduced it and this is what happened:

Let's say we have brokers 1, 2 and 3 and broker 2 is IO blocked, non 
responsive, you can't even kill it unless -9.
Let's say we have a topic with replication 3. The partitions with leader 1 and 
3, will see that broker 2 has issues and will take it out from ISR. That's fine.

But the partitions where the leader is 2, think the problematic brokers are 1 
and 3 and will take these replicas out of the ISR. And this is a problem.

The consumers and producers will only work with the ones that don't have the 
broker 2 in their ISR.

This is an example of the output for 2 topics after provoking this:

{code}
./kafka-topics.sh --describe --zookeeper 127.0.0.1:2181 --unavailable-partitions

        Topic: agent_ping       Partition: 0    Leader: 2       Replicas: 2,1,3 
Isr: 2
        Topic: agent_ping       Partition: 1    Leader: 3       Replicas: 3,2,1 
Isr: 1,3
        Topic: agent_ping       Partition: 2    Leader: 1       Replicas: 1,3,2 
Isr: 3,1
        Topic: agent_ping       Partition: 3    Leader: 2       Replicas: 2,3,1 
Isr: 2
        Topic: agent_ping       Partition: 4    Leader: 3       Replicas: 3,1,2 
Isr: 1,3
        Topic: agent_ping       Partition: 5    Leader: 1       Replicas: 1,2,3 
Isr: 3,1
        Topic: agent_ping       Partition: 6    Leader: 2       Replicas: 2,1,3 
Isr: 2
        Topic: agent_ping       Partition: 9    Leader: 2       Replicas: 2,3,1 
Isr: 2
        Topic: agent_ping       Partition: 12   Leader: 2       Replicas: 2,1,3 
Isr: 2
        Topic: agent_ping       Partition: 13   Leader: 3       Replicas: 3,2,1 
Isr: 1,3
        Topic: agent_ping       Partition: 14   Leader: 1       Replicas: 1,3,2 
Isr: 3,1
        Topic: agent_ping       Partition: 15   Leader: 2       Replicas: 2,3,1 
Isr: 2
        Topic: agent_ping       Partition: 16   Leader: 3       Replicas: 3,1,2 
Isr: 1,3
        Topic: agent_ping       Partition: 17   Leader: 1       Replicas: 1,2,3 
Isr: 3,1
        Topic: agent_ping       Partition: 18   Leader: 2       Replicas: 2,1,3 
Isr: 2
        Topic: imback   Partition: 0    Leader: 3       Replicas: 3,1,2 Isr: 1,3
        Topic: imback   Partition: 1    Leader: 1       Replicas: 1,2,3 Isr: 3,1
        Topic: imback   Partition: 2    Leader: 2       Replicas: 2,3,1 Isr: 2
        Topic: imback   Partition: 3    Leader: 3       Replicas: 3,2,1 Isr: 1,3
        Topic: imback   Partition: 4    Leader: 1       Replicas: 1,3,2 Isr: 3,1
{code}

Kafka should be able to handle this in a better way and find out what are the 
problematic brokers and remove its replicas accordingly.
IO problems can be caused by hardware issues, kernel misconfiguration or 
others, and are not that infrequent.

Kafka is highly available, but in this case it is not.

To reproduce this, creating IO to block a process is not easy but the same 
symptoms can be easily reproducible using NFS.
Create an simple NFS server 
(https://help.ubuntu.com/community/SettingUpNFSHowTo), mount a NFS partition in 
the broker log.dirs and once the cluster is working, stop NFS in the server 
(service nfs-kernel-server stop)

This will make broker hang waiting for IO.



This is the output of a producer, some messages go through but others fail with 
this error:

{code}
./kafka-console-producer.sh --topic imback --broker 
broker1:6667,broker2:6667,broker3:6667

text
text
text
[2017-03-22 18:18:42,864] WARN Got error produce response with correlation id 
44 on topic-partition imback-2, retrying (2 attempts left). Error: 
NETWORK_EXCEPTION (org.apache.kafka.clients.producer.internals.Sender)
[2017-03-22 18:18:44,467] WARN Got error produce response with correlation id 
46 on topic-partition imback-2, retrying (1 attempts left). Error: 
NETWORK_EXCEPTION (org.apache.kafka.clients.producer.internals.Sender)
[2017-03-22 18:18:46,075] WARN Got error produce response with correlation id 
48 on topic-partition imback-2, retrying (0 attempts left). Error: 
NETWORK_EXCEPTION (org.apache.kafka.clients.producer.internals.Sender)
[2017-03-22 18:18:47,677] ERROR Error when sending message to topic imback with 
key: null, value: 1 bytes with error: 
(org.apache.kafka.clients.producer.internals.ErrorLoggingCallback)
org.apache.kafka.common.errors.NetworkException: The server disconnected before 
a response was received.
text
text
text
text
text
[2017-03-22 18:20:31,002] WARN Got error produce response with correlation id 
55 on topic-partition imback-2, retrying (2 attempts left). Error: 
NETWORK_EXCEPTION (org.apache.kafka.clients.producer.internals.Sender)
[2017-03-22 18:20:32,605] WARN Got error produce response with correlation id 
57 on topic-partition imback-2, retrying (1 attempts left). Error: 
NETWORK_EXCEPTION (org.apache.kafka.clients.producer.internals.Sender)
[2017-03-22 18:20:34,212] WARN Got error produce response with correlation id 
59 on topic-partition imback-2, retrying (0 attempts left). Error: 
NETWORK_EXCEPTION (org.apache.kafka.clients.producer.internals.Sender)
[2017-03-22 18:20:35,815] ERROR Error when sending message to topic imback with 
key: null, value: 1 bytes with error: 
(org.apache.kafka.clients.producer.internals.ErrorLoggingCallback)
org.apache.kafka.common.errors.NetworkException: The server disconnected before 
a response was received.

{code}

And a consumer attached to this topic, outputs this:

{code}
./kafka-console-consumer.sh --topic imback --zookeeper localhost:2181

text
text
text
[2017-03-22 18:16:44,594] WARN 
[console-consumer-77661_kafkachaos3-1490231743517-c0973e49-leader-finder-thread],
 Failed to add leader for partitions 
imback-2,imback-4,imback-1,imback-3,imback-0; will retry 
(kafka.consumer.ConsumerFetcherManager$LeaderFinderThread)
java.net.SocketTimeoutException
        at 
sun.nio.ch.SocketAdaptor$SocketInputStream.read(SocketAdaptor.java:211)
        at sun.nio.ch.ChannelInputStream.read(ChannelInputStream.java:103)
        at 
java.nio.channels.Channels$ReadableByteChannelImpl.read(Channels.java:385)
        at 
org.apache.kafka.common.network.NetworkReceive.readFromReadableChannel(NetworkReceive.java:81)
        at 
kafka.network.BlockingChannel.readCompletely(BlockingChannel.scala:129)
        at kafka.network.BlockingChannel.receive(BlockingChannel.scala:120)
        at kafka.consumer.SimpleConsumer.liftedTree1$1(SimpleConsumer.scala:99)
        at 
kafka.consumer.SimpleConsumer.kafka$consumer$SimpleConsumer$$sendRequest(SimpleConsumer.scala:83)
        at 
kafka.consumer.SimpleConsumer.getOffsetsBefore(SimpleConsumer.scala:149)
        at 
kafka.consumer.SimpleConsumer.earliestOrLatestOffset(SimpleConsumer.scala:188)
        at 
kafka.consumer.ConsumerFetcherThread.handleOffsetOutOfRange(ConsumerFetcherThread.scala:85)
        at 
kafka.server.AbstractFetcherThread$$anonfun$6.apply(AbstractFetcherThread.scala:210)
        at 
kafka.server.AbstractFetcherThread$$anonfun$6.apply(AbstractFetcherThread.scala:208)
        at 
scala.collection.TraversableLike$$anonfun$map$1.apply(TraversableLike.scala:244)
        at 
scala.collection.TraversableLike$$anonfun$map$1.apply(TraversableLike.scala:244)
        at scala.collection.immutable.Map$Map1.foreach(Map.scala:109)
        at scala.collection.TraversableLike$class.map(TraversableLike.scala:244)
        at scala.collection.AbstractTraversable.map(Traversable.scala:105)
        at 
kafka.server.AbstractFetcherThread.addPartitions(AbstractFetcherThread.scala:208)
        at 
kafka.server.AbstractFetcherManager$$anonfun$addFetcherForPartitions$2.apply(AbstractFetcherManager.scala:88)
        at 
kafka.server.AbstractFetcherManager$$anonfun$addFetcherForPartitions$2.apply(AbstractFetcherManager.scala:78)
        at 
scala.collection.TraversableLike$WithFilter$$anonfun$foreach$1.apply(TraversableLike.scala:772)
        at scala.collection.immutable.Map$Map3.foreach(Map.scala:154)
        at 
scala.collection.TraversableLike$WithFilter.foreach(TraversableLike.scala:771)
        at 
kafka.server.AbstractFetcherManager.addFetcherForPartitions(AbstractFetcherManager.scala:78)
        at 
kafka.consumer.ConsumerFetcherManager$LeaderFinderThread.doWork(ConsumerFetcherManager.scala:94)
        at kafka.utils.ShutdownableThread.run(ShutdownableThread.scala:63)
text
{code}

  was:
A cluster can partially work if there is an IO issue that blocks the broker 
process and leaves it in a state of uninterruptible sleep.

All the threads connected to this bad broker will hang and the cluster ends up 
partially working.

I reproduced it and this is what happened:

Let's say we have brokers 1, 2 and 3 and broker 2 is IO blocked, non 
responsive, you can't even kill it unless -9.
Let's say we have a topic with replication 3. The partitions with leader 1 and 
3, will see that broker 2 has issues and will take it out from ISR. That's fine.

But the partitions where the leader is 2, think the problematic brokers are 1 
and 3 and will take these replicas out of the ISR. And this is a problem.

The consumers and producers will only work with the ones that don't have the 
broker 2 in their ISR.

This is an example of the output for 2 topics after provoking this:

{code}
./kafka-topics.sh --describe --zookeeper 127.0.0.1:2181 --unavailable-partitions

        Topic: agent_ping       Partition: 0    Leader: 2       Replicas: 2,1,3 
Isr: 2
        Topic: agent_ping       Partition: 1    Leader: 3       Replicas: 3,2,1 
Isr: 1,3
        Topic: agent_ping       Partition: 2    Leader: 1       Replicas: 1,3,2 
Isr: 3,1
        Topic: agent_ping       Partition: 3    Leader: 2       Replicas: 2,3,1 
Isr: 2
        Topic: agent_ping       Partition: 4    Leader: 3       Replicas: 3,1,2 
Isr: 1,3
        Topic: agent_ping       Partition: 5    Leader: 1       Replicas: 1,2,3 
Isr: 3,1
        Topic: agent_ping       Partition: 6    Leader: 2       Replicas: 2,1,3 
Isr: 2
        Topic: agent_ping       Partition: 9    Leader: 2       Replicas: 2,3,1 
Isr: 2
        Topic: agent_ping       Partition: 12   Leader: 2       Replicas: 2,1,3 
Isr: 2
        Topic: agent_ping       Partition: 13   Leader: 3       Replicas: 3,2,1 
Isr: 1,3
        Topic: agent_ping       Partition: 14   Leader: 1       Replicas: 1,3,2 
Isr: 3,1
        Topic: agent_ping       Partition: 15   Leader: 2       Replicas: 2,3,1 
Isr: 2
        Topic: agent_ping       Partition: 16   Leader: 3       Replicas: 3,1,2 
Isr: 1,3
        Topic: agent_ping       Partition: 17   Leader: 1       Replicas: 1,2,3 
Isr: 3,1
        Topic: agent_ping       Partition: 18   Leader: 2       Replicas: 2,1,3 
Isr: 2
        Topic: imback   Partition: 0    Leader: 3       Replicas: 3,1,2 Isr: 1,3
        Topic: imback   Partition: 1    Leader: 1       Replicas: 1,2,3 Isr: 3,1
        Topic: imback   Partition: 2    Leader: 2       Replicas: 2,3,1 Isr: 2
        Topic: imback   Partition: 3    Leader: 3       Replicas: 3,2,1 Isr: 1,3
        Topic: imback   Partition: 4    Leader: 1       Replicas: 1,3,2 Isr: 3,1
{code}

Kafka should be able to handle this in a better way and find out what are the 
problematic brokers and remove its replicas accordingly.
IO problems can be caused by hardware issues, kernel misconfiguration or 
others, and are not that infrequent.

Kafka is highly available, but in this case it is not.

To reproduce this, creating IO to block a process is not easy but the same 
symptoms can be easily reproducible using NFS.
Create an simple NFS server 
(https://help.ubuntu.com/community/SettingUpNFSHowTo), mount a NFS partition in 
the broker log.dirs and once the cluster is working, stop NFS in the server 
(service nfs-kernel-server stop)

This will make broker hang waiting for IO.


> Cluster partially working if broker blocked with IO
> ---------------------------------------------------
>
>                 Key: KAFKA-4940
>                 URL: https://issues.apache.org/jira/browse/KAFKA-4940
>             Project: Kafka
>          Issue Type: Bug
>          Components: core
>    Affects Versions: 0.10.1.1, 0.10.2.0
>            Reporter: Victor Garcia
>
> A cluster can partially work if there is an IO issue that blocks the broker 
> process and leaves it in a state of uninterruptible sleep.
> All the threads connected to this bad broker will hang and the cluster ends 
> up partially working.
> I reproduced it and this is what happened:
> Let's say we have brokers 1, 2 and 3 and broker 2 is IO blocked, non 
> responsive, you can't even kill it unless -9.
> Let's say we have a topic with replication 3. The partitions with leader 1 
> and 3, will see that broker 2 has issues and will take it out from ISR. 
> That's fine.
> But the partitions where the leader is 2, think the problematic brokers are 1 
> and 3 and will take these replicas out of the ISR. And this is a problem.
> The consumers and producers will only work with the ones that don't have the 
> broker 2 in their ISR.
> This is an example of the output for 2 topics after provoking this:
> {code}
> ./kafka-topics.sh --describe --zookeeper 127.0.0.1:2181 
> --unavailable-partitions
>       Topic: agent_ping       Partition: 0    Leader: 2       Replicas: 2,1,3 
> Isr: 2
>       Topic: agent_ping       Partition: 1    Leader: 3       Replicas: 3,2,1 
> Isr: 1,3
>       Topic: agent_ping       Partition: 2    Leader: 1       Replicas: 1,3,2 
> Isr: 3,1
>       Topic: agent_ping       Partition: 3    Leader: 2       Replicas: 2,3,1 
> Isr: 2
>       Topic: agent_ping       Partition: 4    Leader: 3       Replicas: 3,1,2 
> Isr: 1,3
>       Topic: agent_ping       Partition: 5    Leader: 1       Replicas: 1,2,3 
> Isr: 3,1
>       Topic: agent_ping       Partition: 6    Leader: 2       Replicas: 2,1,3 
> Isr: 2
>       Topic: agent_ping       Partition: 9    Leader: 2       Replicas: 2,3,1 
> Isr: 2
>       Topic: agent_ping       Partition: 12   Leader: 2       Replicas: 2,1,3 
> Isr: 2
>       Topic: agent_ping       Partition: 13   Leader: 3       Replicas: 3,2,1 
> Isr: 1,3
>       Topic: agent_ping       Partition: 14   Leader: 1       Replicas: 1,3,2 
> Isr: 3,1
>       Topic: agent_ping       Partition: 15   Leader: 2       Replicas: 2,3,1 
> Isr: 2
>       Topic: agent_ping       Partition: 16   Leader: 3       Replicas: 3,1,2 
> Isr: 1,3
>       Topic: agent_ping       Partition: 17   Leader: 1       Replicas: 1,2,3 
> Isr: 3,1
>       Topic: agent_ping       Partition: 18   Leader: 2       Replicas: 2,1,3 
> Isr: 2
>       Topic: imback   Partition: 0    Leader: 3       Replicas: 3,1,2 Isr: 1,3
>       Topic: imback   Partition: 1    Leader: 1       Replicas: 1,2,3 Isr: 3,1
>       Topic: imback   Partition: 2    Leader: 2       Replicas: 2,3,1 Isr: 2
>       Topic: imback   Partition: 3    Leader: 3       Replicas: 3,2,1 Isr: 1,3
>       Topic: imback   Partition: 4    Leader: 1       Replicas: 1,3,2 Isr: 3,1
> {code}
> Kafka should be able to handle this in a better way and find out what are the 
> problematic brokers and remove its replicas accordingly.
> IO problems can be caused by hardware issues, kernel misconfiguration or 
> others, and are not that infrequent.
> Kafka is highly available, but in this case it is not.
> To reproduce this, creating IO to block a process is not easy but the same 
> symptoms can be easily reproducible using NFS.
> Create an simple NFS server 
> (https://help.ubuntu.com/community/SettingUpNFSHowTo), mount a NFS partition 
> in the broker log.dirs and once the cluster is working, stop NFS in the 
> server (service nfs-kernel-server stop)
> This will make broker hang waiting for IO.
> This is the output of a producer, some messages go through but others fail 
> with this error:
> {code}
> ./kafka-console-producer.sh --topic imback --broker 
> broker1:6667,broker2:6667,broker3:6667
> text
> text
> text
> [2017-03-22 18:18:42,864] WARN Got error produce response with correlation id 
> 44 on topic-partition imback-2, retrying (2 attempts left). Error: 
> NETWORK_EXCEPTION (org.apache.kafka.clients.producer.internals.Sender)
> [2017-03-22 18:18:44,467] WARN Got error produce response with correlation id 
> 46 on topic-partition imback-2, retrying (1 attempts left). Error: 
> NETWORK_EXCEPTION (org.apache.kafka.clients.producer.internals.Sender)
> [2017-03-22 18:18:46,075] WARN Got error produce response with correlation id 
> 48 on topic-partition imback-2, retrying (0 attempts left). Error: 
> NETWORK_EXCEPTION (org.apache.kafka.clients.producer.internals.Sender)
> [2017-03-22 18:18:47,677] ERROR Error when sending message to topic imback 
> with key: null, value: 1 bytes with error: 
> (org.apache.kafka.clients.producer.internals.ErrorLoggingCallback)
> org.apache.kafka.common.errors.NetworkException: The server disconnected 
> before a response was received.
> text
> text
> text
> text
> text
> [2017-03-22 18:20:31,002] WARN Got error produce response with correlation id 
> 55 on topic-partition imback-2, retrying (2 attempts left). Error: 
> NETWORK_EXCEPTION (org.apache.kafka.clients.producer.internals.Sender)
> [2017-03-22 18:20:32,605] WARN Got error produce response with correlation id 
> 57 on topic-partition imback-2, retrying (1 attempts left). Error: 
> NETWORK_EXCEPTION (org.apache.kafka.clients.producer.internals.Sender)
> [2017-03-22 18:20:34,212] WARN Got error produce response with correlation id 
> 59 on topic-partition imback-2, retrying (0 attempts left). Error: 
> NETWORK_EXCEPTION (org.apache.kafka.clients.producer.internals.Sender)
> [2017-03-22 18:20:35,815] ERROR Error when sending message to topic imback 
> with key: null, value: 1 bytes with error: 
> (org.apache.kafka.clients.producer.internals.ErrorLoggingCallback)
> org.apache.kafka.common.errors.NetworkException: The server disconnected 
> before a response was received.
> {code}
> And a consumer attached to this topic, outputs this:
> {code}
> ./kafka-console-consumer.sh --topic imback --zookeeper localhost:2181
> text
> text
> text
> [2017-03-22 18:16:44,594] WARN 
> [console-consumer-77661_kafkachaos3-1490231743517-c0973e49-leader-finder-thread],
>  Failed to add leader for partitions 
> imback-2,imback-4,imback-1,imback-3,imback-0; will retry 
> (kafka.consumer.ConsumerFetcherManager$LeaderFinderThread)
> java.net.SocketTimeoutException
>       at 
> sun.nio.ch.SocketAdaptor$SocketInputStream.read(SocketAdaptor.java:211)
>       at sun.nio.ch.ChannelInputStream.read(ChannelInputStream.java:103)
>       at 
> java.nio.channels.Channels$ReadableByteChannelImpl.read(Channels.java:385)
>       at 
> org.apache.kafka.common.network.NetworkReceive.readFromReadableChannel(NetworkReceive.java:81)
>       at 
> kafka.network.BlockingChannel.readCompletely(BlockingChannel.scala:129)
>       at kafka.network.BlockingChannel.receive(BlockingChannel.scala:120)
>       at kafka.consumer.SimpleConsumer.liftedTree1$1(SimpleConsumer.scala:99)
>       at 
> kafka.consumer.SimpleConsumer.kafka$consumer$SimpleConsumer$$sendRequest(SimpleConsumer.scala:83)
>       at 
> kafka.consumer.SimpleConsumer.getOffsetsBefore(SimpleConsumer.scala:149)
>       at 
> kafka.consumer.SimpleConsumer.earliestOrLatestOffset(SimpleConsumer.scala:188)
>       at 
> kafka.consumer.ConsumerFetcherThread.handleOffsetOutOfRange(ConsumerFetcherThread.scala:85)
>       at 
> kafka.server.AbstractFetcherThread$$anonfun$6.apply(AbstractFetcherThread.scala:210)
>       at 
> kafka.server.AbstractFetcherThread$$anonfun$6.apply(AbstractFetcherThread.scala:208)
>       at 
> scala.collection.TraversableLike$$anonfun$map$1.apply(TraversableLike.scala:244)
>       at 
> scala.collection.TraversableLike$$anonfun$map$1.apply(TraversableLike.scala:244)
>       at scala.collection.immutable.Map$Map1.foreach(Map.scala:109)
>       at scala.collection.TraversableLike$class.map(TraversableLike.scala:244)
>       at scala.collection.AbstractTraversable.map(Traversable.scala:105)
>       at 
> kafka.server.AbstractFetcherThread.addPartitions(AbstractFetcherThread.scala:208)
>       at 
> kafka.server.AbstractFetcherManager$$anonfun$addFetcherForPartitions$2.apply(AbstractFetcherManager.scala:88)
>       at 
> kafka.server.AbstractFetcherManager$$anonfun$addFetcherForPartitions$2.apply(AbstractFetcherManager.scala:78)
>       at 
> scala.collection.TraversableLike$WithFilter$$anonfun$foreach$1.apply(TraversableLike.scala:772)
>       at scala.collection.immutable.Map$Map3.foreach(Map.scala:154)
>       at 
> scala.collection.TraversableLike$WithFilter.foreach(TraversableLike.scala:771)
>       at 
> kafka.server.AbstractFetcherManager.addFetcherForPartitions(AbstractFetcherManager.scala:78)
>       at 
> kafka.consumer.ConsumerFetcherManager$LeaderFinderThread.doWork(ConsumerFetcherManager.scala:94)
>       at kafka.utils.ShutdownableThread.run(ShutdownableThread.scala:63)
> text
> {code}



--
This message was sent by Atlassian JIRA
(v6.3.15#6346)

Reply via email to