[
https://issues.apache.org/jira/browse/KAFKA-496?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=13457997#comment-13457997
]
Jun Rao commented on KAFKA-496:
-------------------------------
The following is a straw-man proposal:
Define the following class and trait.
class ProducerCallbackResponse (val response: ProducerResponse) {
}
trait ProducerCallback[V] {
def onSerializationError(event: V, e: Throwable)
def onSendResponse(response: ProducerResponse)
def getCallbackResponse(response: ProducerResponse):
Option[ProducerCallbackResponse]
}
1. In Producer, add a new api registerCallBack(callback: ProducerCallback[V]).
Change the send api to:
def send(producerData: ProducerData[K,V]*) : Option[ProducerCallbackResponse]
2. For sync Producer, define the following default callback. Send() will either
get an exception or a ProducerCallbackResponse.
class DefaultSyncProducerCallback[V] extends ProducerCallback[V] {
var response: Option[ProducerCallbackResponse] = none
def oonSerializationError(event: V, e: Throwable) {
throw e
}
def onSendResponse(response: ProducerResponse) {
// instantiate response with a DefaultSyncProducerResponse
response = Some(new ProducerCallbackResponse(response))
}
def getCallbackResponse(): ProducerCallbackResponse = {
return response
}
}
3. For async Producer, define the following default callback that simply
ignores the callback.
class DefaultAsyncProducerCallback[V] extends ProducerCallback[V] {
def onSerializationError(event: V, e: Throwable) {
// let it go
}
def onSendResponse(response: ProducerResponse) {
// let it go
}
def getCallbackResponse(): ProducerCallbackResponse = {
return none
}
}
4. A user can also define and register it's own customized ProducerCallback.
> high level producer send should return a response
> -------------------------------------------------
>
> Key: KAFKA-496
> URL: https://issues.apache.org/jira/browse/KAFKA-496
> Project: Kafka
> Issue Type: Bug
> Components: core
> Reporter: Jun Rao
> Priority: Blocker
> Labels: features
> Fix For: 0.8
>
> Original Estimate: 72h
> Remaining Estimate: 72h
>
> Currently, Producer.send() doesn't return any value. In 0.8, since each
> produce request will be acked, we should pass the response back. What we can
> do is that if the producer is in sync mode, we can return a map of
> (topic,partitionId) -> (errorcode, offset). If the producer is in async mode,
> we can just return a null.
--
This message is automatically generated by JIRA.
If you think it was sent incorrectly, please contact your JIRA administrators
For more information on JIRA, see: http://www.atlassian.com/software/jira