[
https://issues.apache.org/jira/browse/KAFKA-2060?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=14496116#comment-14496116
]
Rajini Sivaram commented on KAFKA-2060:
---------------------------------------
RecordBatch.done() no longer calls thunk.future.get(). This was modified under
KAFKA-1865 to invoke the callback with a new RecordMetadata object when there
are no exceptions (instead of calling thunk.future.get() which could have
thrown an exception). So this issue can be closed.
> Async onCompletion callback may not be called
> ---------------------------------------------
>
> Key: KAFKA-2060
> URL: https://issues.apache.org/jira/browse/KAFKA-2060
> Project: Kafka
> Issue Type: Bug
> Components: clients
> Affects Versions: 0.8.1.2
> Environment: All
> Reporter: Bill Sobel
> Priority: Critical
> Labels: easyfix
> Original Estimate: 1h
> Remaining Estimate: 1h
>
> The 'done' function in RecordBatch.java attempts to enumerate and call each
> onCompletion() callback. However the call to thunk.future.get() can throw an
> exception. When this occurs the callback is not invoked. This appears to be
> the only place where a callback per async send would not occur and the
> callback orphaned.
> The call to thunk.future.get() appears to need to occur in its own try/catch
> and then the onCompletion called with the results if it doesn't throw an
> exception or thunk.callback.onCompletion(null, recordException) if it does.
> e.g.
> /**
> * Complete the request
> *
> * @param baseOffset The base offset of the messages assigned by the
> server
> * @param exception The exception that occurred (or null if the request
> was successful)
> */
> public void done(long baseOffset, RuntimeException exception) {
> this.produceFuture.done(topicPartition, baseOffset, exception);
> log.trace("Produced messages to topic-partition {} with base offset
> offset {} and error: {}.",
> topicPartition,
> baseOffset,
> exception);
> // execute callbacks
> for (int i = 0; i < this.thunks.size(); i++) {
> try {
> Thunk thunk = this.thunks.get(i);
> if (exception == null) {
> RecordMetadata rc = null;
> try {
> rc = thunk.future.get();
> }
> catch(Exception recordException) {
> thunk.callback.onCompletion(null,
> recordException);
> }
> if(rc != null) {
> thunk.callback.onCompletion(rc, null);
> }
> }
> else {
> thunk.callback.onCompletion(null, exception);
> }
> } catch (Exception e) {
> log.error("Error executing user-provided callback on message
> for topic-partition {}:", topicPartition, e);
> }
> }
> }
--
This message was sent by Atlassian JIRA
(v6.3.4#6332)