[ 
https://issues.apache.org/jira/browse/KAFKA-391?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=14195762#comment-14195762
 ] 

Honghai Chen commented on KAFKA-391:
------------------------------------

This situation happen under below scenario:
one broker is leader for several partitions, for example 3,   when send one 
messageset which has message for all of the 3 partitions of this broker ,      
the response.status.size is 3 and  the producerRequest.data.size is 1.    then 
it hit this exception.   Any idea for fix?  Do we need compare 
response.status.size  with messagesPerTopic.Count instead of 
producerRequest.data.size ?


  private def send(brokerId: Int, messagesPerTopic: 
collection.mutable.Map[TopicAndPartition, ByteBufferMessageSet]) = {
    if(brokerId < 0) {
      warn("Failed to send data since partitions %s don't have a 
leader".format(messagesPerTopic.map(_._1).mkString(",")))
      messagesPerTopic.keys.toSeq
    } else if(messagesPerTopic.size > 0) {
      val currentCorrelationId = correlationId.getAndIncrement
      val producerRequest = new ProducerRequest(currentCorrelationId, 
config.clientId, config.requestRequiredAcks,
        config.requestTimeoutMs, messagesPerTopic)
      var failedTopicPartitions = Seq.empty[TopicAndPartition]
      try {
        val syncProducer = producerPool.getProducer(brokerId)
        debug("Producer sending messages with correlation id %d for topics %s 
to broker %d on %s:%d"
          .format(currentCorrelationId, messagesPerTopic.keySet.mkString(","), 
brokerId, syncProducer.config.host, syncProducer.config.port))
        val response = syncProducer.send(producerRequest)
        debug("Producer sent messages with correlation id %d for topics %s to 
broker %d on %s:%d"
          .format(currentCorrelationId, messagesPerTopic.keySet.mkString(","), 
brokerId, syncProducer.config.host, syncProducer.config.port))
        if(response != null) {
          if (response.status.size != producerRequest.data.size)
            throw new KafkaException("Incomplete response (%s) for producer 
request (%s)".format(response, producerRequest))



> Producer request and response classes should use maps
> -----------------------------------------------------
>
>                 Key: KAFKA-391
>                 URL: https://issues.apache.org/jira/browse/KAFKA-391
>             Project: Kafka
>          Issue Type: Bug
>            Reporter: Joel Koshy
>            Assignee: Joel Koshy
>            Priority: Blocker
>              Labels: optimization
>             Fix For: 0.8.0
>
>         Attachments: KAFKA-391-draft-r1374069.patch, KAFKA-391-v2.patch, 
> KAFKA-391-v3.patch, KAFKA-391-v4.patch
>
>
> Producer response contains two arrays of error codes and offsets - the 
> ordering in these arrays correspond to the flattened ordering of the request 
> arrays.
> It would be better to switch to maps in the request and response as this 
> would make the code clearer and more efficient (right now, linear scans are 
> used in handling producer acks).
> We can probably do the same in the fetch request/response.



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)

Reply via email to