[
https://issues.apache.org/jira/browse/KAFKA-3190?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=15823583#comment-15823583
]
Jiangjie Qin commented on KAFKA-3190:
-------------------------------------
[~ewencp] This remains an issue and confuses some of the applications from time
to time.
We had some discussion on the PR. The main reason we want to fire callback in
the producer.send() is because this function can throw API exceptions, and
generally speaking, all the ApiExceptions are supposed to be handled by the
callback. Here the two possible ApiExceptions are RecordTooLargeException and
TimeoutException
There are actually two issues.
1. We are throwing {{RecordTooLargeException}} in the producer directly once
the ProdcuerRecord serialized size is greater than RequestSize or it is greater
than totalMemorySize. This is a little weird because
{{RecordTooLargeException}} is an API exception, and ideally should only be
returned by the broker. Technically speaking it is possible that the serialized
bytes can be compressed extremely well and end up less than the max message
size limit. So we are essentially throwing an API exception based on our
estimation. Maybe it is better to throw IllegalArgumentException instead.
2. We are actually misusing the o.a.k.common.errors.TimeoutException in many
places including in the producer, consumer and the broker. This Exception is
originally mapped to error code 7 in the ProdcueResponse, indicating that the
replication did not finish within the timeout specified in the ProducerRequest
when acks=-1. Now it is used everywhere in the producer and consumer indicating
any kind of timeout, regardless of whether it is returned from broker or not.
Similarly, on the broker side, it is used by DelayedCreateTopic and
DelayedDeleteTopic. I think this is a serious problem and have to be fixed.
Unfortunately it will require API change. I will probably have another ticket
and a KIP. The simplest solution I can think of now is to create a new
{{RequestTimeoutException}} extending from ApiException and map that to error
code 7 (We can discuss whether we want to split that into two separate
exceptions: ReplicationTimeoutException and RequestTimeoutException, but that
can be done later). And change the current TimeoutException to extend directly
from KafkaException. This way we do not need to change the massive usage of
TimeoutException.
So to summary, the solution I am thinking is the following:
1. Throw IllegalArgumentException instead of RecordTooLargeException when the
serialized record size is greater than the maxRequestSize or totalMemorySize.
2. Add a new RequestTimeoutException extending from ApiExceptions and map that
new exception to error code 7.
3. Change the current o.a.k.common.errors.TimeoutException to extend from
KafkaException.
4. Remove the ApiException handling logic in producer.send().
Given that 0.10.2.0 release has already cut off on the KIPs, I am not sure if
you want to get it into 0.10.2.0. We can probably get the fix in 0.10.2.0 if
people generally agree on the way to fix, otherwise it we may have to delay it
to the next release.
[~guozhang] [~ijuma] [~junrao] Since you were reviewing the original PR, what
do you think about the above solution? [~hachikuji] It would also be good if
you can comment and see if you have any concerns.
Thanks.
> KafkaProducer should not invoke callback in send()
> --------------------------------------------------
>
> Key: KAFKA-3190
> URL: https://issues.apache.org/jira/browse/KAFKA-3190
> Project: Kafka
> Issue Type: Bug
> Components: clients, producer
> Affects Versions: 0.9.0.0
> Reporter: Jiangjie Qin
> Assignee: Jiangjie Qin
> Priority: Critical
> Fix For: 0.10.2.0
>
>
> Currently KafkaProducer will invoke callback.onComplete() if it receives an
> ApiException during send(). This breaks the guarantee that callback will be
> invoked in order. It seems ApiException in send() only comes from metadata
> refresh. If so, we can probably simply throw it instead of invoking
> callback().
--
This message was sent by Atlassian JIRA
(v6.3.4#6332)