gharris1727 commented on code in PR #13181:
URL: https://github.com/apache/kafka/pull/13181#discussion_r1092643889


##########
connect/mirror/src/main/java/org/apache/kafka/connect/mirror/MirrorSourceTask.java:
##########
@@ -87,6 +90,7 @@ public MirrorSourceTask() {}
     @Override
     public void start(Map<String, String> props) {
         MirrorSourceTaskConfig config = new MirrorSourceTaskConfig(props);
+        pendingOffsetSyncs.clear();

Review Comment:
   nit: findbugs doesn't complain about this not being synchronized? you must 
be on better terms with it than I am.



##########
connect/mirror/src/main/java/org/apache/kafka/connect/mirror/MirrorSourceTask.java:
##########
@@ -194,41 +200,63 @@ public void commitRecord(SourceRecord record, 
RecordMetadata metadata) {
         TopicPartition sourceTopicPartition = 
MirrorUtils.unwrapPartition(record.sourcePartition());
         long upstreamOffset = MirrorUtils.unwrapOffset(record.sourceOffset());
         long downstreamOffset = metadata.offset();
-        maybeSyncOffsets(sourceTopicPartition, upstreamOffset, 
downstreamOffset);
+        maybeQueueOffsetSyncs(sourceTopicPartition, upstreamOffset, 
downstreamOffset);
+        // We may be able to immediately publish an offset sync that we've 
queued up here
+        firePendingOffsetSyncs();
     }
 
-    // updates partition state and sends OffsetSync if necessary
-    private void maybeSyncOffsets(TopicPartition topicPartition, long 
upstreamOffset,
-            long downstreamOffset) {
+    // updates partition state and queues up OffsetSync if necessary
+    private void maybeQueueOffsetSyncs(TopicPartition topicPartition, long 
upstreamOffset,
+                                       long downstreamOffset) {
         PartitionState partitionState =
             partitionStates.computeIfAbsent(topicPartition, x -> new 
PartitionState(maxOffsetLag));
         if (partitionState.update(upstreamOffset, downstreamOffset)) {
-            if (sendOffsetSync(topicPartition, upstreamOffset, 
downstreamOffset)) {
-                partitionState.reset();
+            OffsetSync offsetSync = new OffsetSync(topicPartition, 
upstreamOffset, downstreamOffset);
+            synchronized (this) {
+                pendingOffsetSyncs.put(topicPartition, offsetSync);
             }
+            partitionState.reset();
         }
     }
 
-    // sends OffsetSync record upstream to internal offsets topic
-    private boolean sendOffsetSync(TopicPartition topicPartition, long 
upstreamOffset,
-            long downstreamOffset) {
-        if (!outstandingOffsetSyncs.tryAcquire()) {
-            // Too many outstanding offset syncs.
-            return false;
+    private void firePendingOffsetSyncs() {
+        while (true) {
+            OffsetSync pendingOffsetSync;
+            synchronized (this) {
+                Iterator<OffsetSync> syncIterator = 
pendingOffsetSyncs.values().iterator();
+                if (!syncIterator.hasNext()) {
+                    // Nothing to sync
+                    log.debug("No more pending offset syncs");

Review Comment:
   nit: these log statements could be spammy if they're called on every 
commitRecord, wdyt about `trace`?



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org

Reply via email to