[ 
https://issues.apache.org/jira/browse/KAFKA-4854?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17705857#comment-17705857
 ] 

zhangzhisheng commented on KAFKA-4854:
--------------------------------------

behavior is intended;

 
{code:java}
// code placeholder
 @Override
 public void onException(OnExceptionContext onExceptionContext) {
     test.setStatus(TestBatchRecordStatusEnum.FAIL.getCode());
     //retryCount is null, throws NPE
     test.setRetryCount(test.getRetryCount() + TestCoreConstants.ONE_INT);
     testRecordDataMapper.updateById(test);
     log.warn("TestSender send fail.data:{},error:", test, 
onExceptionContext.getException());
 } {code}
 

 

!image-2023-03-28-15-45-47-783.png!

 

> Producer RecordBatch executes callbacks with `null` provided for metadata if 
> an exception is encountered
> --------------------------------------------------------------------------------------------------------
>
>                 Key: KAFKA-4854
>                 URL: https://issues.apache.org/jira/browse/KAFKA-4854
>             Project: Kafka
>          Issue Type: Bug
>          Components: producer 
>    Affects Versions: 0.10.1.1
>            Reporter: Robert Quinlivan
>            Assignee: Ewen Cheslack-Postava
>            Priority: Minor
>         Attachments: image-2023-03-28-15-45-47-783.png
>
>
> When using a user-provided callback with the producer, the `RecordBatch` 
> executes the callbacks with a null metadata argument if an exception was 
> encountered. For monitoring and debugging purposes, I would prefer if the 
> metadata were provided, perhaps optionally. For example, it would be useful 
> to know the size of the serialized payload and the offset so these values 
> could appear in application logs.
> To be entirely clear, the piece of code I am considering is in 
> `org.apache.kafka.clients.producer.internals.RecordBatch#done`:
> ```java
>         // execute callbacks
>         for (Thunk thunk : thunks) {
>             try {
>                 if (exception == null) {
>                     RecordMetadata metadata = thunk.future.value();
>                     thunk.callback.onCompletion(metadata, null);
>                 } else {
>                     thunk.callback.onCompletion(null, exception);
>                 }
>             } catch (Exception e) {
>                 log.error("Error executing user-provided callback on message 
> for topic-partition '{}'", topicPartition, e);
>             }
>         }
> ```



--
This message was sent by Atlassian Jira
(v8.20.10#820010)

Reply via email to