yashmayya commented on code in PR #12984:
URL: https://github.com/apache/kafka/pull/12984#discussion_r1093090686


##########
connect/runtime/src/main/java/org/apache/kafka/connect/storage/KafkaConfigBackingStore.java:
##########
@@ -711,9 +752,32 @@ KafkaBasedLog<String, byte[]> 
setupAndCreateKafkaBasedLog(String topic, final Wo
         return createKafkaBasedLog(topic, producerProps, consumerProps, new 
ConsumeCallback(), topicDescription, adminSupplier);
     }
 
-    private void sendPrivileged(String key, byte[] value) {
+    /**
+     * Send a single record to the config topic synchronously. Note that 
{@link #claimWritePrivileges()} must be
+     * successfully invoked before calling this method if this store is 
configured to use a fencable writer.
+     * @param key the record key
+     * @param value the record value
+     */
+    private void sendPrivileged(String key, byte[] value) throws 
ExecutionException, InterruptedException, TimeoutException {
+        sendPrivileged(Collections.singletonList(new ProducerKeyValue(key, 
value)));
+    }
+
+    /**
+     * Send one or more records to the config topic synchronously. Note that 
{@link #claimWritePrivileges()} must be
+     * successfully invoked before calling this method if this store is 
configured to use a fencable writer.
+     * @param keyValues the list of producer record key/value pairs
+     */
+    private void sendPrivileged(List<ProducerKeyValue> keyValues) throws 
ExecutionException, InterruptedException, TimeoutException {
         if (!usesFencableWriter) {
-            configLog.send(key, value);
+            List<Future<RecordMetadata>> producerFutures = new ArrayList<>();
+            keyValues.forEach(
+                    keyValue -> 
producerFutures.add(configLog.send(keyValue.key, keyValue.value))
+            );
+
+            for (Future<RecordMetadata> future : producerFutures) {
+                future.get(READ_WRITE_TIMEOUT_MS, TimeUnit.MILLISECONDS);

Review Comment:
   > Considering this is all taking place on the herder's tick thread, we 
should probably care about the difference.
   
   Makes sense.
   
   > We might be able to use the [Timer 
class](https://github.com/apache/kafka/blob/eb7f490159c924ca0f21394d58366c257998f52e/clients/src/main/java/org/apache/kafka/common/utils/Timer.java)
 to simplify some of this logic.
   
   Thanks, that's a great suggestion and it looks like a perfect fit for the 
use case here. One thing I'd like to call out is that the existing 
`putTaskConfigs` implementation had multiple (3) reads to the end of the log 
with each having a 30 second timeout (thus potentially blocking for up to 90 
seconds in total). With the latest pushed changes in this PR, the overall 
timeout across all reads and writes done by `putTaskConfigs` will now be 30 
seconds. While I do believe that this is the right thing to do, I just wanted 
to call it out explicitly to make sure that we're on the same page!



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org

Reply via email to