[ 
https://issues.apache.org/jira/browse/KAFKA-2200?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17188042#comment-17188042
 ] 

Aakash Gupta edited comment on KAFKA-2200 at 8/31/20, 11:20 PM:
----------------------------------------------------------------

Hi [~becket_qin] 
 I am willing to take this ticket. 

As of now till date, this is how exceptions are being handled in 
kafkaProducer.send() method:
{code:java}
catch (ApiException e) {
            log.debug("Exception occurred during message send:", e);
            if (callback != null)
                callback.onCompletion(null, e);
            this.errors.record();
            this.interceptors.onSendError(record, tp, e);
            return new FutureFailure(e);        
} catch (InterruptedException e) {
            this.errors.record();
            this.interceptors.onSendError(record, tp, e);
            throw new InterruptException(e);        
} catch (KafkaException e) {
            this.errors.record();
            this.interceptors.onSendError(record, tp, e);
            throw e;        
} catch (Exception e) {
            // we notify interceptor about all exceptions, since onSend is 
called before anything else in this method
            this.interceptors.onSendError(record, tp, e);
            throw e;        
}
{code}
 # TimeoutException in waiting for metadata update, what is your suggestion? 
How should it be handled if not via ApiException callback? As you mentioned, we 
are misusing this TimeoutException as idea was to use it only where replication 
couldn't complete within the allowed time, so should we create a new exception 
'ClientTimeoutException' to handle such scenarios, and also use the same in 
waitOnMetadata() method ?
 # Validation of message size is throwing RecordTooLargeException which extends 
ApiException. In this case, you are correct to say that producer client is 
throwing RecordTooLargeException without even interacting with server.
You've suggested 2 scenarios which can cause exceptions :
 ## *If the size of serialised uncompressed message is more than 
maxRequestSize*: I'm not sure if we can estimate the size of message keeping 
compression type in consideration. So, current implementation throws 
RecordTooLargeException based on the ESTIMATE w/o keeping into account the 
compression type. What is the expected behaviour in this case? 
 ## *If the message size is bigger than the totalMemorySize or 
memoryBufferSize* : Buffer pool would throw IllegalArgumentException when asked 
for allocation. Should we just catch this exception, record it and throw it 
back?

[~becket_qin] Can you please answer above queries and validate my 
understanding? Apologies if I've misunderstood something as I am new to Kafka 
community.


was (Author: aakashgupta96):
Hi [~becket_qin] 
I am willing to take this ticket. 

As of now till date, this is how exceptions are being handled in 
kafkaProducer.send() method:
{code:java}
catch (ApiException e) {
            log.debug("Exception occurred during message send:", e);
            if (callback != null)
                callback.onCompletion(null, e);
            this.errors.record();
            this.interceptors.onSendError(record, tp, e);
            return new FutureFailure(e);        
} catch (InterruptedException e) {
            this.errors.record();
            this.interceptors.onSendError(record, tp, e);
            throw new InterruptException(e);        
} catch (KafkaException e) {
            this.errors.record();
            this.interceptors.onSendError(record, tp, e);
            throw e;        
} catch (Exception e) {
            // we notify interceptor about all exceptions, since onSend is 
called before anything else in this method
            this.interceptors.onSendError(record, tp, e);
            throw e;        
}
{code}
 # TimeoutException in waiting for metadata update, what is your suggestion? 
How should it be handled if not via ApiException callback? As you mentioned, we 
are misusing this TimeoutException as idea was to use it only where replication 
couldn't complete within the allowed time, so should we create a new exception 
'ClientTimeoutException' to handle such scenarios, and also use the same in 
waitOnMetadata() method ?


 # Validation of message size is throwing RecordTooLargeException which extends 
ApiException. In this case, you are correct to say that producer client is 
throwing RecordTooLargeException without even interacting with server. 

You've suggested 2 scenarios which can cause exceptions :

 ## *If the size of serialised uncompressed message is more than 
maxRequestSize*: I'm not sure if we can estimate the size of message keeping 
compression type in consideration. So, current implementation throws 
RecordTooLargeException based on the ESTIMATE w/o keeping into account the 
compression type. What is the expected behaviour in this case? 


 ## *If the message size is bigger than the totalMemorySize or 
memoryBufferSize* : **Buffer pool would throw IllegalArgumentException when 
asked for allocation. Should we just catch this exception, record it and throw 
it back?

 

[~becket_qin] Can you please answer above queries and validate my 
understanding? Apologies if I've misunderstood something as I am new to Kafka 
community.

> kafkaProducer.send() should not call callback.onCompletion()
> ------------------------------------------------------------
>
>                 Key: KAFKA-2200
>                 URL: https://issues.apache.org/jira/browse/KAFKA-2200
>             Project: Kafka
>          Issue Type: Bug
>          Components: clients
>    Affects Versions: 0.10.1.0
>            Reporter: Jiangjie Qin
>            Priority: Major
>              Labels: newbie
>
> KafkaProducer.send() should not call callback.onCompletion() because this 
> might break the callback firing order.



--
This message was sent by Atlassian Jira
(v8.3.4#803005)

Reply via email to