yashmayya commented on code in PR #12984: URL: https://github.com/apache/kafka/pull/12984#discussion_r1050765139
########## connect/runtime/src/main/java/org/apache/kafka/connect/storage/KafkaConfigBackingStore.java: ########## @@ -712,8 +733,16 @@ KafkaBasedLog<String, byte[]> setupAndCreateKafkaBasedLog(String topic, final Wo } private void sendPrivileged(String key, byte[] value) { + sendPrivileged(key, value, null); + } + + private void sendPrivileged(String key, byte[] value, Callback<Void> callback) { if (!usesFencableWriter) { - configLog.send(key, value); Review Comment: Thanks, that's a good observation. Currently, the pause / resume APIs (which call `putTargetState`) don't use callback mechanisms whatsoever and they also don't have any documented response bodies (they send back a `202` response with an empty body right now) and will hence need a bit more refactoring. I feel like that should be done in a follow up / separate PR, WDYT? ########## connect/runtime/src/main/java/org/apache/kafka/connect/storage/KafkaConfigBackingStore.java: ########## @@ -723,7 +752,11 @@ private void sendPrivileged(String key, byte[] value) { try { fencableProducer.beginTransaction(); - fencableProducer.send(new ProducerRecord<>(topic, key, value)); + fencableProducer.send(new ProducerRecord<>(topic, key, value), (metadata, exception) -> { Review Comment: > I believe here we're intentionally relying on this behavior of commitTransaction to propagate errors: Thanks for pointing this out! As per https://kafka.apache.org/33/javadoc/org/apache/kafka/clients/producer/KafkaProducer.html#send- ``` When used as part of a transaction, it is not necessary to define a callback or check the result of the future in order to detect errors from send. If any of the send calls failed with an irrecoverable error, the final [commitTransaction()](https://kafka.apache.org/33/javadoc/org/apache/kafka/clients/producer/KafkaProducer.html#commitTransaction--) call will fail and throw the exception from the last failed send. When this happens, your application should call [abortTransaction()](https://kafka.apache.org/33/javadoc/org/apache/kafka/clients/producer/KafkaProducer.html#abortTransaction--) to reset the state and continue to send data. ``` So what you're saying definitely does make sense, and I tried this scenario out manually - for an EOS enabled worker, the response from the `POST /connectors` API when the worker's principal doesn't have a `WRITE` ACL on the config topic is (without the changes from this PR): ``` { "error_code": 500, "message": "Failed to write to config topic; this may be due to a transient error and the request can be safely retried" } ``` While the worker logs do have the exact root cause as well (the `TopicAuthorizationException`), I believe the REST API response in this case isn't all that helpful to the user. With the changes from this PR, the response in the same scenario looks like: ``` { "error_code": 500, "message": "Not authorized to access topics: [connect-configs]" } ``` > I'm not sure if we're in danger of double-completing the callback I don't believe we are in danger of double completing the callback, although you're right in that we are unnecessarily handling the error in two places in a way. We could avoid the use of callbacks in the producer send (although I couldn't find anything that explicitly warns against doing so) when using the transactional producer and instead refactor [this](https://github.com/apache/kafka/blob/e3585a4cd5ddb5b8475a49c38143d18e7c640bfe/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/distributed/DistributedHerder.java#L1494) / [this](https://github.com/apache/kafka/blob/e3585a4cd5ddb5b8475a49c38143d18e7c640bfe/connect/runtime/src/main/java/org/apache/kafka/connect/storage/KafkaConfigBackingStore.java#L731) so that the final exception that is thrown is the producer exception itself rather than the (doubly) wrapped one. WDYT? -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org