[
https://issues.apache.org/jira/browse/KAFKA-2200?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17188042#comment-17188042
]
Aakash Gupta edited comment on KAFKA-2200 at 8/31/20, 11:20 PM:
----------------------------------------------------------------
Hi [~becket_qin]
I am willing to take this ticket.
As of now till date, this is how exceptions are being handled in
kafkaProducer.send() method:
{code:java}
catch (ApiException e) {
log.debug("Exception occurred during message send:", e);
if (callback != null)
callback.onCompletion(null, e);
this.errors.record();
this.interceptors.onSendError(record, tp, e);
return new FutureFailure(e);
} catch (InterruptedException e) {
this.errors.record();
this.interceptors.onSendError(record, tp, e);
throw new InterruptException(e);
} catch (KafkaException e) {
this.errors.record();
this.interceptors.onSendError(record, tp, e);
throw e;
} catch (Exception e) {
// we notify interceptor about all exceptions, since onSend is
called before anything else in this method
this.interceptors.onSendError(record, tp, e);
throw e;
}
{code}
# TimeoutException in waiting for metadata update, what is your suggestion?
How should it be handled if not via ApiException callback? As you mentioned, we
are misusing this TimeoutException as idea was to use it only where replication
couldn't complete within the allowed time, so should we create a new exception
'ClientTimeoutException' to handle such scenarios, and also use the same in
waitOnMetadata() method ?
# Validation of message size is throwing RecordTooLargeException which extends
ApiException. In this case, you are correct to say that producer client is
throwing RecordTooLargeException without even interacting with server.
You've suggested 2 scenarios which can cause exceptions :
## *If the size of serialised uncompressed message is more than
maxRequestSize*: I'm not sure if we can estimate the size of message keeping
compression type in consideration. So, current implementation throws
RecordTooLargeException based on the ESTIMATE w/o keeping into account the
compression type. What is the expected behaviour in this case?
## *If the message size is bigger than the totalMemorySize or
memoryBufferSize* : Buffer pool would throw IllegalArgumentException when asked
for allocation. Should we just catch this exception, record it and throw it
back?
[~becket_qin] Can you please answer above queries and validate my
understanding? Apologies if I've misunderstood something as I am new to Kafka
community.
was (Author: aakashgupta96):
Hi [~becket_qin]
I am willing to take this ticket.
As of now till date, this is how exceptions are being handled in
kafkaProducer.send() method:
{code:java}
catch (ApiException e) {
log.debug("Exception occurred during message send:", e);
if (callback != null)
callback.onCompletion(null, e);
this.errors.record();
this.interceptors.onSendError(record, tp, e);
return new FutureFailure(e);
} catch (InterruptedException e) {
this.errors.record();
this.interceptors.onSendError(record, tp, e);
throw new InterruptException(e);
} catch (KafkaException e) {
this.errors.record();
this.interceptors.onSendError(record, tp, e);
throw e;
} catch (Exception e) {
// we notify interceptor about all exceptions, since onSend is
called before anything else in this method
this.interceptors.onSendError(record, tp, e);
throw e;
}
{code}
# TimeoutException in waiting for metadata update, what is your suggestion?
How should it be handled if not via ApiException callback? As you mentioned, we
are misusing this TimeoutException as idea was to use it only where replication
couldn't complete within the allowed time, so should we create a new exception
'ClientTimeoutException' to handle such scenarios, and also use the same in
waitOnMetadata() method ?
# Validation of message size is throwing RecordTooLargeException which extends
ApiException. In this case, you are correct to say that producer client is
throwing RecordTooLargeException without even interacting with server.
You've suggested 2 scenarios which can cause exceptions :
## *If the size of serialised uncompressed message is more than
maxRequestSize*: I'm not sure if we can estimate the size of message keeping
compression type in consideration. So, current implementation throws
RecordTooLargeException based on the ESTIMATE w/o keeping into account the
compression type. What is the expected behaviour in this case?
## *If the message size is bigger than the totalMemorySize or
memoryBufferSize* : **Buffer pool would throw IllegalArgumentException when
asked for allocation. Should we just catch this exception, record it and throw
it back?
[~becket_qin] Can you please answer above queries and validate my
understanding? Apologies if I've misunderstood something as I am new to Kafka
community.
> kafkaProducer.send() should not call callback.onCompletion()
> ------------------------------------------------------------
>
> Key: KAFKA-2200
> URL: https://issues.apache.org/jira/browse/KAFKA-2200
> Project: Kafka
> Issue Type: Bug
> Components: clients
> Affects Versions: 0.10.1.0
> Reporter: Jiangjie Qin
> Priority: Major
> Labels: newbie
>
> KafkaProducer.send() should not call callback.onCompletion() because this
> might break the callback firing order.
--
This message was sent by Atlassian Jira
(v8.3.4#803005)