C0urante commented on a change in pull request #11323: URL: https://github.com/apache/kafka/pull/11323#discussion_r720389261
########## File path: connect/runtime/src/main/java/org/apache/kafka/connect/runtime/WorkerSourceTask.java ########## @@ -291,6 +286,13 @@ private void maybeThrowProducerSendException() { } } + private void updateCommittableOffsets() { + Map<Map<String, Object>, Map<String, Object>> newOffsets = submittedRecords.committableOffsets(); + synchronized (this) { + offsets.putAll(newOffsets); + } Review comment: Yep, can do. I initially held off because there's only two synchronized blocks in the entire class now and they should both take very little time, but after checking out the parent `WorkerTask` it looks like we're doing some calls to the status listener in synchronized blocks as well. Seems worth the extra few lines here to avoid unnecessary synchronization. 👍 ########## File path: connect/runtime/src/main/java/org/apache/kafka/connect/runtime/WorkerSourceTask.java ########## @@ -94,14 +95,9 @@ private final TopicCreation topicCreation; private List<SourceRecord> toSend; - private boolean lastSendFailed; // Whether the last send failed *synchronously*, i.e. never made it into the producer's RecordAccumulator - // Use IdentityHashMap to ensure correctness with duplicate records. This is a HashMap because - // there is no IdentityHashSet. - private IdentityHashMap<ProducerRecord<byte[], byte[]>, ProducerRecord<byte[], byte[]>> outstandingMessages; - // A second buffer is used while an offset flush is running - private IdentityHashMap<ProducerRecord<byte[], byte[]>, ProducerRecord<byte[], byte[]>> outstandingMessagesBacklog; - private boolean flushing; - private CountDownLatch stopRequestedLatch; + private volatile Map<Map<String, Object>, Map<String, Object>> offsets; Review comment: Sure, fine by me 👍 -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org