KAFKA-955 After a leader change, messages sent with ack=0 are lost; reviewed by Jay Kreps, Neha Narkhede and Jun Rao
Project: http://git-wip-us.apache.org/repos/asf/kafka/repo Commit: http://git-wip-us.apache.org/repos/asf/kafka/commit/f89ddced Tree: http://git-wip-us.apache.org/repos/asf/kafka/tree/f89ddced Diff: http://git-wip-us.apache.org/repos/asf/kafka/diff/f89ddced Branch: refs/heads/trunk Commit: f89ddced1ba058f0c51697957cde8bb2e2b05c4d Parents: ea54700 Author: Guozhang Wang <[email protected]> Authored: Wed Aug 28 10:16:50 2013 -0700 Committer: Neha Narkhede <[email protected]> Committed: Wed Aug 28 10:16:59 2013 -0700 ---------------------------------------------------------------------- .../scala/kafka/network/RequestChannel.scala | 24 +++++++++++++- .../main/scala/kafka/network/SocketServer.scala | 34 ++++++++++++-------- .../src/main/scala/kafka/server/KafkaApis.scala | 17 +++++++--- .../unit/kafka/producer/ProducerTest.scala | 4 ++- .../unit/kafka/producer/SyncProducerTest.scala | 27 ++++++++++++++++ 5 files changed, 86 insertions(+), 20 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/kafka/blob/f89ddced/core/src/main/scala/kafka/network/RequestChannel.scala ---------------------------------------------------------------------- diff --git a/core/src/main/scala/kafka/network/RequestChannel.scala b/core/src/main/scala/kafka/network/RequestChannel.scala index 1437496..77d7ec0 100644 --- a/core/src/main/scala/kafka/network/RequestChannel.scala +++ b/core/src/main/scala/kafka/network/RequestChannel.scala @@ -84,12 +84,20 @@ object RequestChannel extends Logging { } } - case class Response(processor: Int, request: Request, responseSend: Send) { + case class Response(processor: Int, request: Request, responseSend: Send, responseAction: ResponseAction) { request.responseCompleteTimeMs = SystemTime.milliseconds + def this(processor: Int, request: Request, responseSend: Send) = + this(processor, request, responseSend, if (responseSend == null) NoOpAction else SendAction) + def this(request: Request, send: Send) = this(request.processor, request, send) } + + trait ResponseAction + case object SendAction extends ResponseAction + case object NoOpAction extends ResponseAction + case object CloseConnectionAction extends ResponseAction } class RequestChannel(val numProcessors: Int, val queueSize: Int) extends KafkaMetricsGroup { @@ -127,6 +135,20 @@ class RequestChannel(val numProcessors: Int, val queueSize: Int) extends KafkaMe onResponse(response.processor) } + /** No operation to take for the request, need to read more over the network */ + def noOperation(processor: Int, request: RequestChannel.Request) { + responseQueues(processor).put(new RequestChannel.Response(processor, request, null, RequestChannel.NoOpAction)) + for(onResponse <- responseListeners) + onResponse(processor) + } + + /** Close the connection for the request */ + def closeConnection(processor: Int, request: RequestChannel.Request) { + responseQueues(processor).put(new RequestChannel.Response(processor, request, null, RequestChannel.CloseConnectionAction)) + for(onResponse <- responseListeners) + onResponse(processor) + } + /** Get the next request or block until there is one */ def receiveRequest(): RequestChannel.Request = requestQueue.take() http://git-wip-us.apache.org/repos/asf/kafka/blob/f89ddced/core/src/main/scala/kafka/network/SocketServer.scala ---------------------------------------------------------------------- diff --git a/core/src/main/scala/kafka/network/SocketServer.scala b/core/src/main/scala/kafka/network/SocketServer.scala index d5bd143..216245d 100644 --- a/core/src/main/scala/kafka/network/SocketServer.scala +++ b/core/src/main/scala/kafka/network/SocketServer.scala @@ -274,25 +274,33 @@ private[kafka] class Processor(val id: Int, while(curr != null) { val key = curr.request.requestKey.asInstanceOf[SelectionKey] try { - if(curr.responseSend == null) { - // a null response send object indicates that there is no response to send to the client. - // In this case, we just want to turn the interest ops to READ to be able to read more pipelined requests - // that are sitting in the server's socket buffer - trace("Socket server received empty response to send, registering for read: " + curr) - key.interestOps(SelectionKey.OP_READ) - key.attach(null) - curr.request.updateRequestMetrics - } else { - trace("Socket server received response to send, registering for write: " + curr) - key.interestOps(SelectionKey.OP_WRITE) - key.attach(curr) + curr.responseAction match { + case RequestChannel.NoOpAction => { + // There is no response to send to the client, we need to read more pipelined requests + // that are sitting in the server's socket buffer + curr.request.updateRequestMetrics + trace("Socket server received empty response to send, registering for read: " + curr) + key.interestOps(SelectionKey.OP_READ) + key.attach(null) + } + case RequestChannel.SendAction => { + trace("Socket server received response to send, registering for write: " + curr) + key.interestOps(SelectionKey.OP_WRITE) + key.attach(curr) + } + case RequestChannel.CloseConnectionAction => { + curr.request.updateRequestMetrics + trace("Closing socket connection actively according to the response code.") + close(key) + } + case responseCode => throw new KafkaException("No mapping found for response code " + responseCode) } } catch { case e: CancelledKeyException => { debug("Ignoring response for closed socket.") close(key) } - }finally { + } finally { curr = requestChannel.receiveResponse(id) } } http://git-wip-us.apache.org/repos/asf/kafka/blob/f89ddced/core/src/main/scala/kafka/server/KafkaApis.scala ---------------------------------------------------------------------- diff --git a/core/src/main/scala/kafka/server/KafkaApis.scala b/core/src/main/scala/kafka/server/KafkaApis.scala index b17964e..cd02aab 100644 --- a/core/src/main/scala/kafka/server/KafkaApis.scala +++ b/core/src/main/scala/kafka/server/KafkaApis.scala @@ -51,8 +51,7 @@ class KafkaApis(val requestChannel: RequestChannel, * and is queried by the topic metadata request. */ var leaderCache: mutable.Map[TopicAndPartition, PartitionStateInfo] = new mutable.HashMap[TopicAndPartition, PartitionStateInfo]() -// private var allBrokers: mutable.Map[Int, Broker] = new mutable.HashMap[Int, Broker]() - private var aliveBrokers: mutable.Map[Int, Broker] = new mutable.HashMap[Int, Broker]() + private val aliveBrokers: mutable.Map[Int, Broker] = new mutable.HashMap[Int, Broker]() private val partitionMetadataLock = new Object this.logIdent = "[KafkaApi-%d] ".format(brokerId) @@ -170,9 +169,17 @@ class KafkaApis(val requestChannel: RequestChannel, !produceRequest.data.keySet.exists( m => replicaManager.getReplicationFactorForPartition(m.topic, m.partition) != 1) if(produceRequest.requiredAcks == 0) { - // send a fake producer response if producer request.required.acks = 0. This mimics the behavior of a 0.7 producer - // and is tuned for very high throughput - requestChannel.sendResponse(new RequestChannel.Response(request.processor, request, null)) + // no operation needed if producer request.required.acks = 0; however, if there is any exception in handling the request, since + // no response is expected by the producer the handler will send a close connection response to the socket server + // to close the socket so that the producer client will know that some exception has happened and will refresh its metadata + if (numPartitionsInError != 0) { + info(("Send the close connection response due to error handling produce request " + + "[clientId = %s, correlationId = %s, topicAndPartition = %s] with Ack=0") + .format(produceRequest.clientId, produceRequest.correlationId, produceRequest.topicPartitionMessageSizeMap.keySet.mkString(","))) + requestChannel.closeConnection(request.processor, request) + } else { + requestChannel.noOperation(request.processor, request) + } } else if (produceRequest.requiredAcks == 1 || produceRequest.numPartitions <= 0 || allPartitionHaveReplicationFactorOne || http://git-wip-us.apache.org/repos/asf/kafka/blob/f89ddced/core/src/test/scala/unit/kafka/producer/ProducerTest.scala ---------------------------------------------------------------------- diff --git a/core/src/test/scala/unit/kafka/producer/ProducerTest.scala b/core/src/test/scala/unit/kafka/producer/ProducerTest.scala index b511d90..29331db 100644 --- a/core/src/test/scala/unit/kafka/producer/ProducerTest.scala +++ b/core/src/test/scala/unit/kafka/producer/ProducerTest.scala @@ -17,6 +17,7 @@ package kafka.producer +import org.scalatest.TestFailedException import org.scalatest.junit.JUnit3Suite import kafka.consumer.SimpleConsumer import kafka.message.Message @@ -236,7 +237,8 @@ class ProducerTest extends JUnit3Suite with ZooKeeperTestHarness with Logging{ producer.send(new KeyedMessage[String, String](topic, "test", "test1")) fail("Should fail since no leader exists for the partition.") } catch { - case e => // success + case e : TestFailedException => throw e // catch and re-throw the failure message + case e2 => // otherwise success } // restart server 1 http://git-wip-us.apache.org/repos/asf/kafka/blob/f89ddced/core/src/test/scala/unit/kafka/producer/SyncProducerTest.scala ---------------------------------------------------------------------- diff --git a/core/src/test/scala/unit/kafka/producer/SyncProducerTest.scala b/core/src/test/scala/unit/kafka/producer/SyncProducerTest.scala index b5ee31d..b3e89c3 100644 --- a/core/src/test/scala/unit/kafka/producer/SyncProducerTest.scala +++ b/core/src/test/scala/unit/kafka/producer/SyncProducerTest.scala @@ -114,6 +114,33 @@ class SyncProducerTest extends JUnit3Suite with KafkaServerTestHarness { } @Test + def testMessageSizeTooLargeWithAckZero() { + val server = servers.head + + val props = TestUtils.getSyncProducerConfig(server.socketServer.port) + props.put("request.required.acks", "0") + + val producer = new SyncProducer(new SyncProducerConfig(props)) + CreateTopicCommand.createTopic(zkClient, "test", 1, 1) + TestUtils.waitUntilLeaderIsElectedOrChanged(zkClient, "test", 0, 500) + + // This message will be dropped silently since message size too large. + producer.send(TestUtils.produceRequest("test", 0, + new ByteBufferMessageSet(compressionCodec = NoCompressionCodec, messages = new Message(new Array[Byte](configs(0).messageMaxBytes + 1))), acks = 0)) + + // Send another message whose size is large enough to exceed the buffer size so + // the socket buffer will be flushed immediately; + // this send should fail since the socket has been closed + try { + producer.send(TestUtils.produceRequest("test", 0, + new ByteBufferMessageSet(compressionCodec = NoCompressionCodec, messages = new Message(new Array[Byte](configs(0).messageMaxBytes + 1))), acks = 0)) + } catch { + case e : java.io.IOException => // success + case e2 => throw e2 + } + } + + @Test def testProduceCorrectlyReceivesResponse() { val server = servers.head val props = TestUtils.getSyncProducerConfig(server.socketServer.port)
