yashmayya commented on code in PR #12984:
URL: https://github.com/apache/kafka/pull/12984#discussion_r1050765139


##########
connect/runtime/src/main/java/org/apache/kafka/connect/storage/KafkaConfigBackingStore.java:
##########
@@ -712,8 +733,16 @@ KafkaBasedLog<String, byte[]> 
setupAndCreateKafkaBasedLog(String topic, final Wo
     }
 
     private void sendPrivileged(String key, byte[] value) {
+        sendPrivileged(key, value, null);
+    }
+
+    private void sendPrivileged(String key, byte[] value, Callback<Void> 
callback) {
         if (!usesFencableWriter) {
-            configLog.send(key, value);

Review Comment:
   Thanks, that's a good observation. Currently, the pause / resume APIs (which 
call `putTargetState`) don't use callback mechanisms whatsoever and they also 
don't have any documented response bodies (they send back a `202` response with 
an empty body right now) and will hence need a bit more refactoring. I feel 
like that should be done in a follow up / separate PR, WDYT?



##########
connect/runtime/src/main/java/org/apache/kafka/connect/storage/KafkaConfigBackingStore.java:
##########
@@ -723,7 +752,11 @@ private void sendPrivileged(String key, byte[] value) {
 
         try {
             fencableProducer.beginTransaction();
-            fencableProducer.send(new ProducerRecord<>(topic, key, value));
+            fencableProducer.send(new ProducerRecord<>(topic, key, value), 
(metadata, exception) -> {

Review Comment:
   > I believe here we're intentionally relying on this behavior of 
commitTransaction to propagate errors:
   
   
   Thanks for pointing this out!
   
   As per 
https://kafka.apache.org/33/javadoc/org/apache/kafka/clients/producer/KafkaProducer.html#send-
   
   ```
   When used as part of a transaction, it is not necessary to define a callback 
or check the result of the future in order to detect errors from send. If any 
of the send calls failed with an irrecoverable error, the final 
[commitTransaction()](https://kafka.apache.org/33/javadoc/org/apache/kafka/clients/producer/KafkaProducer.html#commitTransaction--)
 call will fail and throw the exception from the last failed send. When this 
happens, your application should call 
[abortTransaction()](https://kafka.apache.org/33/javadoc/org/apache/kafka/clients/producer/KafkaProducer.html#abortTransaction--)
 to reset the state and continue to send data.
   ```
   
   So what you're saying definitely does make sense, and I tried this scenario 
out manually - for an EOS enabled worker, the response from the `POST 
/connectors` API when the worker's principal doesn't have a `WRITE` ACL on the 
config topic is (without the changes from this PR): 
   
   ```
   {
     "error_code": 500,
     "message": "Failed to write to config topic; this may be due to a 
transient error and the request can be safely retried"
   }
   ```
   
   While the worker logs do have the exact root cause as well (the 
`TopicAuthorizationException`), I believe the REST API response in this case 
isn't all that helpful to the user. With the changes from this PR, the response 
in the same scenario looks like:
   
   ```
   {
     "error_code": 500,
     "message": "Not authorized to access topics: [connect-configs]"
   }
   ```
   
   > I'm not sure if we're in danger of double-completing the callback
   
   I don't believe we are in danger of double completing the callback, although 
you're right in that we are unnecessarily handling the error in two places in a 
way.
   
   We could avoid the use of callbacks in the producer send (although I 
couldn't find anything that explicitly warns against doing so) when using the 
transactional producer and instead refactor 
[this](https://github.com/apache/kafka/blob/e3585a4cd5ddb5b8475a49c38143d18e7c640bfe/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/distributed/DistributedHerder.java#L1494)
 / 
[this](https://github.com/apache/kafka/blob/e3585a4cd5ddb5b8475a49c38143d18e7c640bfe/connect/runtime/src/main/java/org/apache/kafka/connect/storage/KafkaConfigBackingStore.java#L731)
 so that the final exception that is thrown is the producer exception itself 
rather than the (doubly) wrapped one. WDYT? 



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org

Reply via email to