[ https://issues.apache.org/jira/browse/KAFKA-12558?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ]
Chris Egerton resolved KAFKA-12558. ----------------------------------- Fix Version/s: 3.5.0 Resolution: Fixed > MM2 may not sync partition offsets correctly > -------------------------------------------- > > Key: KAFKA-12558 > URL: https://issues.apache.org/jira/browse/KAFKA-12558 > Project: Kafka > Issue Type: Bug > Components: mirrormaker > Affects Versions: 2.7.0, 2.6.1 > Reporter: Alan Ning > Priority: Major > Fix For: 3.5.0 > > > There is a race condition in {{MirrorSourceTask}} where certain partition > offsets may never be sent. The bug occurs when the [outstandingOffsetSync > semaphore is > full|https://github.com/apache/kafka/blob/trunk/connect/mirror/src/main/java/org/apache/kafka/connect/mirror/MirrorSourceTask.java#L207]. > In this case, the sendOffsetSync [will silently > fail|https://github.com/apache/kafka/blob/trunk/connect/mirror/src/main/java/org/apache/kafka/connect/mirror/MirrorSourceTask.java#L207]. > This failure is normally acceptable since offset sync will retry frequently. > However, {{maybeSyncOffsets}} has a bug where it will [mutate the partition > state|https://github.com/apache/kafka/blob/trunk/connect/mirror/src/main/java/org/apache/kafka/connect/mirror/MirrorSourceTask.java#L199] > prior to confirming the result of {{sendOffsetSync}}. The end result is that > the partition state is mutated prematurely, and prevent future offset syncs > to recover. > Since {{MAX_OUTSTANDING_OFFSET_SYNCS}} is 10, this bug happens when you > assign more than 10 partitions to each task. > In my test cases where I had over 100 partitions per task, the majority of > the offsets were wrong. Here's an example of such a failure. > https://issues.apache.org/jira/browse/KAFKA-12468?focusedCommentId=17308308&page=com.atlassian.jira.plugin.system.issuetabpanels%3Acomment-tabpanel#comment-17308308 > During my troubleshooting, I customized the {{MirrorSourceTask}} to confirm > that all partitions that have the wrong offset were failing to acquire the > initial semaphore. The condition [can be trapped > here|https://github.com/apache/kafka/blob/trunk/connect/mirror/src/main/java/org/apache/kafka/connect/mirror/MirrorSourceTask.java#L208]. > *Possible Fix:* > A possible fix is to create a {{shouldUpdate}} method in {{PartitionState}}. > This method should be read-only and return true if {{sendOffsetSync}} is > needed. Once {{sendOffsetSync}} is successful, only then {{update}} should be > called. > Here's some pseudocode > {code:java} > private void maybeSyncOffsets(TopicPartition topicPartition, long > upstreamOffset, > long downstreamOffset) { > PartitionState partitionState = > partitionStates.computeIfAbsent(topicPartition, x -> new > PartitionState(maxOffsetLag)); > if (partitionState.shouldUpdate(upstreamOffset, downstreamOffset)) { > if(sendOffsetSync(topicPartition, upstreamOffset, downstreamOffset)) { > partitionState.update(upstreamOffset, downstreamOffset) > } > } > } > {code} > > *Workaround:* > For those who are experiencing this issue, the workaround is to make sure you > have less than or equal to 10 partitions per task. Set your `tasks.max` value > accordingly. -- This message was sent by Atlassian Jira (v8.20.10#820010)