C0urante commented on code in PR #12984:
URL: https://github.com/apache/kafka/pull/12984#discussion_r1092418826


##########
connect/runtime/src/main/java/org/apache/kafka/connect/storage/KafkaConfigBackingStore.java:
##########
@@ -711,9 +752,32 @@ KafkaBasedLog<String, byte[]> 
setupAndCreateKafkaBasedLog(String topic, final Wo
         return createKafkaBasedLog(topic, producerProps, consumerProps, new 
ConsumeCallback(), topicDescription, adminSupplier);
     }
 
-    private void sendPrivileged(String key, byte[] value) {
+    /**
+     * Send a single record to the config topic synchronously. Note that 
{@link #claimWritePrivileges()} must be
+     * successfully invoked before calling this method if this store is 
configured to use a fencable writer.
+     * @param key the record key
+     * @param value the record value
+     */
+    private void sendPrivileged(String key, byte[] value) throws 
ExecutionException, InterruptedException, TimeoutException {
+        sendPrivileged(Collections.singletonList(new ProducerKeyValue(key, 
value)));
+    }
+
+    /**
+     * Send one or more records to the config topic synchronously. Note that 
{@link #claimWritePrivileges()} must be
+     * successfully invoked before calling this method if this store is 
configured to use a fencable writer.
+     * @param keyValues the list of producer record key/value pairs
+     */
+    private void sendPrivileged(List<ProducerKeyValue> keyValues) throws 
ExecutionException, InterruptedException, TimeoutException {
         if (!usesFencableWriter) {
-            configLog.send(key, value);
+            List<Future<RecordMetadata>> producerFutures = new ArrayList<>();
+            keyValues.forEach(
+                    keyValue -> 
producerFutures.add(configLog.send(keyValue.key, keyValue.value))
+            );
+
+            for (Future<RecordMetadata> future : producerFutures) {
+                future.get(READ_WRITE_TIMEOUT_MS, TimeUnit.MILLISECONDS);

Review Comment:
   This gives us 30 seconds for each write instead of 30 seconds total, and it 
doesn't take into account reading to the end of the log after writes have 
finished. Considering this is all taking place on the herder's tick thread, we 
should probably care about the difference.
   
   We might be able to use the [Timer 
class](https://github.com/apache/kafka/blob/eb7f490159c924ca0f21394d58366c257998f52e/clients/src/main/java/org/apache/kafka/common/utils/Timer.java)
 to simplify some of this logic.



##########
connect/runtime/src/main/java/org/apache/kafka/connect/storage/KafkaConfigBackingStore.java:
##########
@@ -712,8 +733,16 @@ KafkaBasedLog<String, byte[]> 
setupAndCreateKafkaBasedLog(String topic, final Wo
     }
 
     private void sendPrivileged(String key, byte[] value) {
+        sendPrivileged(key, value, null);
+    }
+
+    private void sendPrivileged(String key, byte[] value, Callback<Void> 
callback) {
         if (!usesFencableWriter) {
-            configLog.send(key, value);

Review Comment:
   Oof, that's a lot of context 😄
   
   Thinking about it some more, I'm hesitant to make significant changes to the 
exception mapper without a KIP since it's a crucial part of our API and there 
may be automated tooling (like K8s operators) built around the current 
behavior, and adding full stack traces and/or caused-by chains could make 
things less readable and human-friendly, especially for new users.
   
   Asking people to check the worker logs isn't a terrible solution, though it 
might be a bit tricky to make sure that that message reaches users regardless 
of whether EOS source support is enabled or disabled.
   
   An alternative could be to handle this case specially by instantiating an 
exception whose message contains information on its cause. For example, the 
message could be `"Failed to write task configs to Kafka. Caused by 
org.apache.kafka.common.errors.AuthorizationException: Not authorized to access 
topics: connect-configs"`.
   
   Regardless, this can and probably should be left as a follow-up since it's 
its own can of worms and doesn't have to block this PR.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org

Reply via email to