Recovering partition leadership outside ISR
Hi all, We had an outage last week that I think we could have prevented, and I'd like to get some feedback on the idea. tl;dr: When a partition leader writes an updated ISR, it should also record its current log-end-offset. On leader election, if there are no live replicas in the ISR, then a replica with this same log-end-offset should be preferred before considering unclean leader election. Details and use case: We have a 5-node Kafka 1.0.0 cluster (since upgraded to 1.1.0) with unclean leader election disabled. Well-configured topics have replication factor 3 and min.insync.replicas 2, with producers setting acks=all. On Monday our cloud provider suffered hardware failure, causing a partial outage on network connectivity to disk storage. Broker 5's storage was on the orphaned side of the network partition. At the very start of the incident, broker 5 dropped all followers on brokers 1 and 4 out of the ISR for partitions it was leading. Its connections to brokers 2 and 3 and to Zookeeper stayed up, including to the controller on broker 3. Broker 5 went offline entirely a few moments later, and stayed down with disk state inaccessible for several hours. We had configured multiple partitions with broker 5 as their leader and followers on brokers 1 and 4. Before the incident those partitions had ISR {5,1,4}, which shrank to {5} before broker 5 disappeared - leaving us with no eligible replicas to become leader. The only ways to bring these partitions back up were to either recover broker 5's up-to-date disk state, or to enable unclean leader election. Had we lost one follower, then the other, and then the leader, enabling unclean leader election would have carried 50% risk of message loss. In the end, we decided that the lowest-risk option was to enable unclean leader election on the affected topics, force a controller election, watch the partitions come back up, and disable unclean election. I think there's a safer recovery path that Kafka could support: The leader should also record its current log-end-offset when it writes an updated ISR. If the controller determines that it can't promote a replica from the ISR, it should next look for a replica that has that same log-end-offset. Only if that step also fails should it then consider unclean leader election. For our failure case, at least, this would have allowed a clean and automatic recovery. Has this idea been considered before? Does it have fatal flaws? Thanks, -- Jack Foy
Re: Compile failure going from kafka 0.8.1.1 to 0.8.2
On Thu, Oct 30, 2014 at 9:20 AM, Jay Kreps jay.kr...@gmail.commailto:jay.kr...@gmail.com wrote: I think we should treat this like a bug for 0.8.2 final, we should be able to add two commitOffsets methods with and without the param which should fix the problem, right? On Oct 30, 2014, at 9:51 AM, Jun Rao jun...@gmail.commailto:jun...@gmail.com wrote: Yes, we can change this to two methods in 0.8.2 final. Thanks. After some experimentation, I think there isn’t actually a form of this code that can compile under both kafka 0.8.1.1 and 0.8.2-beta. This form fails to compile against kafka-0.8.1.1: connector.commitOffsets() [error] src/main/scala/com/whitepages/kafka/consumer/Connector.scala:21: Unit does not take parameters [error] connector.commitOffsets() [error]^ This form fails to compile against kafka-0.8.2-beta: connector.commitOffsets [error] src/main/scala/com/whitepages/kafka/consumer/Connector.scala:21: missing arguments for method commitOffsets in trait ConsumerConnector; [error] follow this method with `_' if you want to treat it as a partially applied function [error] connector.commitOffsets [error] ^ -- Jack Foy j...@whitepages.commailto:j...@whitepages.com
Compile failure going from kafka 0.8.1.1 to 0.8.2
My Scala project built against kafka 0.8.1.1 commits consumer offsets as follows: connector.commitOffsets This compiles without warnings. When I bumped the library dependency to 0.8.2-beta, the compiler started emitting this error: [error] src/main/scala/com/whitepages/kafka/consumer/Connector.scala:21: missing arguments for method commitOffsets in trait ConsumerConnector; [error] follow this method with `_' if you want to treat it as a partially applied function [error] connector.commitOffsets [error] ^ [error] one error found [error] (compile:compile) Compilation failed The following change resolved the error: -connector.commitOffsets +connector.commitOffsets() Should we expect compilation-breaking changes moving from 0.8.1.1 to 0.8.2-beta? -- Jack Foy j...@whitepages.com
Re: Compile failure going from kafka 0.8.1.1 to 0.8.2
On Oct 29, 2014, at 1:46 PM, Neha Narkhede neha.narkh...@gmail.com wrote: Which version of Scala did you use for Kafka 0.8.2-beta? The problem reproduces under both Scala 2.10.4 and Scala 2.11.2. -- Jack Foy j...@whitepages.com
Re: Compile failure going from kafka 0.8.1.1 to 0.8.2
On Oct 29, 2014, at 3:03 PM, Jack Foy j...@whitepages.com wrote: On Oct 29, 2014, at 1:46 PM, Neha Narkhede neha.narkh...@gmail.com wrote: Which version of Scala did you use for Kafka 0.8.2-beta? The problem reproduces under both Scala 2.10.4 and Scala 2.11.2. Sorry, to be clearer: I am using SBT with the following dependency line, to pull in the specific package for the version of Scala I’m running. libraryDependencies ++= Seq( org.apache.kafka %% kafka % 0.8.2-beta exclude(javax.jms, jms) exclude(com.sun.jdmk, jmxtools) exclude(com.sun.jmx, jmxri) exclude(log4j, log4j) ) I tried the following combinations: Scala 2.10.4, kafka 0.8.1.1, my old code - compiled Scala 2.10.4, kafka 0.8.2, my old code - failed Scala 2.11.2, kafka 0.8.2, my old code - failed Scala 2.10.4, kafka 0.8.2, my new code - compiled Scala 2.11.2, kafka 0.8.2, my new code - compiled -- Jack Foy j...@whitepages.com
Re: ConsumerFetcherThread deadlock?
On Oct 23, 2014, at 5:09 PM, Neha Narkhede neha.narkh...@gmail.com wrote: This can also happen if at least one of the consumer threads (not the fetcher threads) die. You can inspect the thread dump to see if all your consumer threads are alive. Thanks very much — that’s probably what is happening, and we’ll investigate further. -- Jack Foy j...@whitepages.com
kafka 0.8.1: Producer.send() can block forever when a broker is down
We observe that when a broker is down, Producer.send() can get into a state where it will block forever, even when using the async producer. When a Producer first sends data, it fetches topic metadata from the broker cluster. To do this, it shuffles the list of hosts in the cluster, then iterates through the list querying each broker. For each broker in the shuffled list, the Producer creates a SyncProducer and invokes SyncProducer.send(). SyncProducer.send() creates a BlockingChannel and invokes BlockingChannel.connect(). BlockingChannel.connect() retrieves a java.nio.channels.SocketChannel, sets it to blocking mode, and invokes SocketChannel.connect(), passing the current broker hostname. If the first broker in the list is nonresponsive, SocketChannel.connect() will wait forever. I think the correct change is as follows: diff --git a/core/src/main/scala/kafka/network/BlockingChannel.scala b/core/src/main/scala/kafka/network/BlockingChannel.scala index eb7bb14..9bb102a 100644 --- a/core/src/main/scala/kafka/network/BlockingChannel.scala +++ b/core/src/main/scala/kafka/network/BlockingChannel.scala @@ -55,7 +55,7 @@ class BlockingChannel( val host: String, channel.socket.setSoTimeout(readTimeoutMs) channel.socket.setKeepAlive(true) channel.socket.setTcpNoDelay(true) -channel.connect(new InetSocketAddress(host, port)) +channel.socket.connect(new InetSocketAddress(host, port), connectTimeoutMs) writeChannel = channel readChannel = Channels.newChannel(channel.socket().getInputStream) Is the next step to create a JIRA with this information? Thanks. -- Jack Foy j...@whitepages.com