[ https://issues.apache.org/jira/browse/KAFKA-14138?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ]
Sagar Rao reassigned KAFKA-14138: --------------------------------- Assignee: (was: Sagar Rao) > The Exception Throwing Behavior of Transactional Producer is Inconsistent > ------------------------------------------------------------------------- > > Key: KAFKA-14138 > URL: https://issues.apache.org/jira/browse/KAFKA-14138 > Project: Kafka > Issue Type: Improvement > Components: producer > Reporter: Guozhang Wang > Priority: Critical > > There's an issue for inconsistent error throwing inside Kafka Producer when > transactions are enabled. In short, there are two places where the received > error code from the brokers would be eventually thrown to the caller: > * Recorded on the batch's metadata, via "Sender#failBatch" > * Recorded on the txn manager, via "txnManager#handleFailedBatch". > The former would be thrown from 1) the `Future<RecordMetadata>` returned from > the `send`; or 2) the `callback` inside `send(record, callback)`. Whereas, > the latter would be thrown from `producer.send()` directly in which we call > `txnManager.maybeAddPartition -> maybeFailWithError`. However, when thrown > from the former, it's not wrapped hence the direct exception (e.g. > ClusterAuthorizationException), whereas in the latter it's wrapped as, e.g. > KafkaException(ClusterAuthorizationException). And which one would be thrown > depend on a race condition since we cannot control by the time the caller > thread calls `txnManager.maybeAddPartition`, if the previous produceRequest's > error has been sent back or not. > For example consider the following sequence for idempotent producer: > 1. caller thread: within future = producer.send(), call > recordAccumulator.append > 2. sender thread: drain the accumulator, send the produceRequest and get the > error back. > 3. caller thread: within future = producer.send(), call > txnManager.maybeAddPartition, in which we would check `maybeFailWithError` > before `isTransactional`. > 4. caller thread: future.get() > In a sequence where then 3) happened before 2), we would only get the raw > exception at step 4; in a sequence where 2) happened before 3), then we would > throw the exception immediately at 3). > This inconsistent error throwing is pretty annoying for users since they'd > need to handle both cases, but many of them actually do not know this > trickiness. We should make the error throwing consistent, e.g. we should > consider: 1) which errors would be thrown from callback / future.get, and > which would be thrown from the `send` call directly, and these errors should > better be non-overlapping, 2) whether we should wrap the raw error or not, we > should do so consistently. -- This message was sent by Atlassian Jira (v8.20.10#820010)