Hello,

I am looking at 0.8.1.1, the kafka.producer.async.DefaultEventHandler
file. Below is the dispatchSerializedData function. Looks like we are
catching exception outside the loop and purely logs an error message.
We then return failedProduceRequests.

In case one broker is having problem, messages that will be sent to
brokers after the problematic broker will NOT be included in the
failedTopicAndPartitions and will be ignored quietly. Is this correct?
Shall we change the code to catch exception for sending message to
each broker?

Thanks

private def dispatchSerializedData(messages:
Seq[KeyedMessage[K,Message]]): Seq[KeyedMessage[K, Message]] = {
  val partitionedDataOpt = partitionAndCollate(messages)
  partitionedDataOpt match {
    case Some(partitionedData) =>
      val failedProduceRequests = new ArrayBuffer[KeyedMessage[K,Message]]
      try {

*      for ((brokerid, messagesPerBrokerMap) <- partitionedData) { *
      if (logger.isTraceEnabled)
            messagesPerBrokerMap.foreach(partitionAndEvent =>
              trace("Handling event for Topic: %s, Broker: %d,
Partitions: %s".format(partitionAndEvent._1, brokerid,
partitionAndEvent._2)))
          val messageSetPerBroker = groupMessagesToSet(messagesPerBrokerMap)

          val failedTopicPartitions = send(brokerid, messageSetPerBroker)
          failedTopicPartitions.foreach(topicPartition => {
            messagesPerBrokerMap.get(topicPartition) match {
              case Some(data) => failedProduceRequests.appendAll(data)
              case None => // nothing
            }
          })
        }



*   } catch {        case t: Throwable => error("Failed to send
messages", t)      }  *    failedProduceRequests
    case None => // all produce requests failed
      messages
  }
}

Reply via email to