C0urante commented on a change in pull request #11323:
URL: https://github.com/apache/kafka/pull/11323#discussion_r720389261



##########
File path: 
connect/runtime/src/main/java/org/apache/kafka/connect/runtime/WorkerSourceTask.java
##########
@@ -291,6 +286,13 @@ private void maybeThrowProducerSendException() {
         }
     }
 
+    private void updateCommittableOffsets() {
+        Map<Map<String, Object>, Map<String, Object>> newOffsets = 
submittedRecords.committableOffsets();
+        synchronized (this) {
+            offsets.putAll(newOffsets);
+        }

Review comment:
       Yep, can do. I initially held off because there's only two synchronized 
blocks in the entire class now and they should both take very little time, but 
after checking out the parent `WorkerTask` it looks like we're doing some calls 
to the status listener in synchronized blocks as well. Seems worth the extra 
few lines here to avoid unnecessary synchronization. 👍 

##########
File path: 
connect/runtime/src/main/java/org/apache/kafka/connect/runtime/WorkerSourceTask.java
##########
@@ -94,14 +95,9 @@
     private final TopicCreation topicCreation;
 
     private List<SourceRecord> toSend;
-    private boolean lastSendFailed; // Whether the last send failed 
*synchronously*, i.e. never made it into the producer's RecordAccumulator
-    // Use IdentityHashMap to ensure correctness with duplicate records. This 
is a HashMap because
-    // there is no IdentityHashSet.
-    private IdentityHashMap<ProducerRecord<byte[], byte[]>, 
ProducerRecord<byte[], byte[]>> outstandingMessages;
-    // A second buffer is used while an offset flush is running
-    private IdentityHashMap<ProducerRecord<byte[], byte[]>, 
ProducerRecord<byte[], byte[]>> outstandingMessagesBacklog;
-    private boolean flushing;
-    private CountDownLatch stopRequestedLatch;
+    private volatile Map<Map<String, Object>, Map<String, Object>> offsets;

Review comment:
       Sure, fine by me 👍 




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


Reply via email to