C0urante commented on code in PR #12984: URL: https://github.com/apache/kafka/pull/12984#discussion_r1092418826
########## connect/runtime/src/main/java/org/apache/kafka/connect/storage/KafkaConfigBackingStore.java: ########## @@ -711,9 +752,32 @@ KafkaBasedLog<String, byte[]> setupAndCreateKafkaBasedLog(String topic, final Wo return createKafkaBasedLog(topic, producerProps, consumerProps, new ConsumeCallback(), topicDescription, adminSupplier); } - private void sendPrivileged(String key, byte[] value) { + /** + * Send a single record to the config topic synchronously. Note that {@link #claimWritePrivileges()} must be + * successfully invoked before calling this method if this store is configured to use a fencable writer. + * @param key the record key + * @param value the record value + */ + private void sendPrivileged(String key, byte[] value) throws ExecutionException, InterruptedException, TimeoutException { + sendPrivileged(Collections.singletonList(new ProducerKeyValue(key, value))); + } + + /** + * Send one or more records to the config topic synchronously. Note that {@link #claimWritePrivileges()} must be + * successfully invoked before calling this method if this store is configured to use a fencable writer. + * @param keyValues the list of producer record key/value pairs + */ + private void sendPrivileged(List<ProducerKeyValue> keyValues) throws ExecutionException, InterruptedException, TimeoutException { if (!usesFencableWriter) { - configLog.send(key, value); + List<Future<RecordMetadata>> producerFutures = new ArrayList<>(); + keyValues.forEach( + keyValue -> producerFutures.add(configLog.send(keyValue.key, keyValue.value)) + ); + + for (Future<RecordMetadata> future : producerFutures) { + future.get(READ_WRITE_TIMEOUT_MS, TimeUnit.MILLISECONDS); Review Comment: This gives us 30 seconds for each write instead of 30 seconds total, and it doesn't take into account reading to the end of the log after writes have finished. Considering this is all taking place on the herder's tick thread, we should probably care about the difference. We might be able to use the [Timer class](https://github.com/apache/kafka/blob/eb7f490159c924ca0f21394d58366c257998f52e/clients/src/main/java/org/apache/kafka/common/utils/Timer.java) to simplify some of this logic. ########## connect/runtime/src/main/java/org/apache/kafka/connect/storage/KafkaConfigBackingStore.java: ########## @@ -712,8 +733,16 @@ KafkaBasedLog<String, byte[]> setupAndCreateKafkaBasedLog(String topic, final Wo } private void sendPrivileged(String key, byte[] value) { + sendPrivileged(key, value, null); + } + + private void sendPrivileged(String key, byte[] value, Callback<Void> callback) { if (!usesFencableWriter) { - configLog.send(key, value); Review Comment: Oof, that's a lot of context 😄 Thinking about it some more, I'm hesitant to make significant changes to the exception mapper without a KIP since it's a crucial part of our API and there may be automated tooling (like K8s operators) built around the current behavior, and adding full stack traces and/or caused-by chains could make things less readable and human-friendly, especially for new users. Asking people to check the worker logs isn't a terrible solution, though it might be a bit tricky to make sure that that message reaches users regardless of whether EOS source support is enabled or disabled. An alternative could be to handle this case specially by instantiating an exception whose message contains information on its cause. For example, the message could be `"Failed to write task configs to Kafka. Caused by org.apache.kafka.common.errors.AuthorizationException: Not authorized to access topics: connect-configs"`. Regardless, this can and probably should be left as a follow-up since it's its own can of worms and doesn't have to block this PR. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org