[ https://issues.apache.org/jira/browse/KAFKA-496?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=13457997#comment-13457997 ]
Jun Rao commented on KAFKA-496: ------------------------------- The following is a straw-man proposal: Define the following class and trait. class ProducerCallbackResponse (val response: ProducerResponse) { } trait ProducerCallback[V] { def onSerializationError(event: V, e: Throwable) def onSendResponse(response: ProducerResponse) def getCallbackResponse(response: ProducerResponse): Option[ProducerCallbackResponse] } 1. In Producer, add a new api registerCallBack(callback: ProducerCallback[V]). Change the send api to: def send(producerData: ProducerData[K,V]*) : Option[ProducerCallbackResponse] 2. For sync Producer, define the following default callback. Send() will either get an exception or a ProducerCallbackResponse. class DefaultSyncProducerCallback[V] extends ProducerCallback[V] { var response: Option[ProducerCallbackResponse] = none def oonSerializationError(event: V, e: Throwable) { throw e } def onSendResponse(response: ProducerResponse) { // instantiate response with a DefaultSyncProducerResponse response = Some(new ProducerCallbackResponse(response)) } def getCallbackResponse(): ProducerCallbackResponse = { return response } } 3. For async Producer, define the following default callback that simply ignores the callback. class DefaultAsyncProducerCallback[V] extends ProducerCallback[V] { def onSerializationError(event: V, e: Throwable) { // let it go } def onSendResponse(response: ProducerResponse) { // let it go } def getCallbackResponse(): ProducerCallbackResponse = { return none } } 4. A user can also define and register it's own customized ProducerCallback. > high level producer send should return a response > ------------------------------------------------- > > Key: KAFKA-496 > URL: https://issues.apache.org/jira/browse/KAFKA-496 > Project: Kafka > Issue Type: Bug > Components: core > Reporter: Jun Rao > Priority: Blocker > Labels: features > Fix For: 0.8 > > Original Estimate: 72h > Remaining Estimate: 72h > > Currently, Producer.send() doesn't return any value. In 0.8, since each > produce request will be acked, we should pass the response back. What we can > do is that if the producer is in sync mode, we can return a map of > (topic,partitionId) -> (errorcode, offset). If the producer is in async mode, > we can just return a null. -- This message is automatically generated by JIRA. If you think it was sent incorrectly, please contact your JIRA administrators For more information on JIRA, see: http://www.atlassian.com/software/jira