[ 
https://issues.apache.org/jira/browse/KAFKA-496?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=13457997#comment-13457997
 ] 

Jun Rao commented on KAFKA-496:
-------------------------------

The following is a straw-man proposal:

Define the following class and trait.

class ProducerCallbackResponse (val response: ProducerResponse) {
}

trait ProducerCallback[V] {
  def onSerializationError(event: V, e: Throwable)
  def onSendResponse(response: ProducerResponse)

  def getCallbackResponse(response: ProducerResponse): 
Option[ProducerCallbackResponse]
}

1. In Producer, add a new api registerCallBack(callback: ProducerCallback[V]). 
Change the send api to:
   def send(producerData: ProducerData[K,V]*) : Option[ProducerCallbackResponse]

2. For sync Producer, define the following default callback. Send() will either 
get an exception or a ProducerCallbackResponse.

class DefaultSyncProducerCallback[V] extends ProducerCallback[V] {
  var response: Option[ProducerCallbackResponse] = none
  def oonSerializationError(event: V, e: Throwable) {
       throw e   
   }
   def onSendResponse(response: ProducerResponse) {
       // instantiate response with a DefaultSyncProducerResponse
       response = Some(new ProducerCallbackResponse(response))
   }

   def getCallbackResponse(): ProducerCallbackResponse = {
      return response
   }
}   

3. For async Producer, define the following default callback that simply 
ignores the callback.

class DefaultAsyncProducerCallback[V] extends ProducerCallback[V] {
  def onSerializationError(event: V, e: Throwable) {
       // let it go  
   }
   def onSendResponse(response: ProducerResponse) {
       // let it go
   }

   def getCallbackResponse(): ProducerCallbackResponse = {
      return none
   }
}   

4. A user can also define and register it's own customized ProducerCallback.

                
> high level producer send should return a response
> -------------------------------------------------
>
>                 Key: KAFKA-496
>                 URL: https://issues.apache.org/jira/browse/KAFKA-496
>             Project: Kafka
>          Issue Type: Bug
>          Components: core
>            Reporter: Jun Rao
>            Priority: Blocker
>              Labels: features
>             Fix For: 0.8
>
>   Original Estimate: 72h
>  Remaining Estimate: 72h
>
> Currently, Producer.send() doesn't return any value. In 0.8, since each 
> produce request will be acked, we should pass the response back. What we can 
> do is that if the producer is in sync mode, we can return a map of 
> (topic,partitionId) -> (errorcode, offset). If the producer is in async mode, 
> we can just return a null.

--
This message is automatically generated by JIRA.
If you think it was sent incorrectly, please contact your JIRA administrators
For more information on JIRA, see: http://www.atlassian.com/software/jira

Reply via email to