[
https://issues.apache.org/jira/browse/KAFKA-12558?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
]
Alan updated KAFKA-12558:
-------------------------
Description:
There is a race condition in {{MirrorSourceTask}} where certain partition
offsets may never be sent. The bug occurs when the [outstandingOffsetSync
semaphore is
full|https://github.com/apache/kafka/blob/trunk/connect/mirror/src/main/java/org/apache/kafka/connect/mirror/MirrorSourceTask.java#L207].
In this case, the sendOffsetSync [will silently
fail|https://github.com/apache/kafka/blob/trunk/connect/mirror/src/main/java/org/apache/kafka/connect/mirror/MirrorSourceTask.java#L207].
This failure is normally acceptable since offset sync will retry frequently.
However, {{maybeSyncOffsets}} has a bug where it will [mutate the partition
state|https://github.com/apache/kafka/blob/trunk/connect/mirror/src/main/java/org/apache/kafka/connect/mirror/MirrorSourceTask.java#L199]
prior to confirming the result of {{sendOffsetSync}}. The end result is that
the partition state is mutated prematurely, and prevent future offset syncs to
recover.
Since {{MAX_OUTSTANDING_OFFSET_SYNCS}} is 10, this bug happens when you assign
more than 10 partitions to each task.
In my test cases where I had over 100 partitions per task, the majority of the
offsets were wrong. Here's an example of such a failure.
https://issues.apache.org/jira/browse/KAFKA-12468?focusedCommentId=17308308&page=com.atlassian.jira.plugin.system.issuetabpanels%3Acomment-tabpanel#comment-17308308
During my troubleshooting, I customized the {{MirrorSourceTask}} to confirm
that all partitions that have the wrong offset were failing to acquire the
initial semaphore. The condition [can be trapped
here|https://github.com/apache/kafka/blob/trunk/connect/mirror/src/main/java/org/apache/kafka/connect/mirror/MirrorSourceTask.java#L208].
*Possible Fix:*
A possible fix is to create a {{shouldUpdate}} method in {{PartitionState}}.
This method should be read-only and return true if {{sendOffsetSync}} is
needed. Once {{sendOffsetSync}} is successful, only then {{update}} should be
called.
Here's some pseudocode
{code:java}
private void maybeSyncOffsets(TopicPartition topicPartition, long
upstreamOffset,
long downstreamOffset) {
PartitionState partitionState =
partitionStates.computeIfAbsent(topicPartition, x -> new
PartitionState(maxOffsetLag));
if (partitionState.shouldUpdate(upstreamOffset, downstreamOffset)) {
if(sendOffsetSync(topicPartition, upstreamOffset, downstreamOffset)) {
partitionState.update(upstreamOffset, downstreamOffset)
}
}
}
{code}
*Workaround:*
For those who are experiencing this issue, the workaround is to make sure you
have less than or equal to 10 partitions per task. Set your `tasks.max` value
accordingly.
was:
There is a race condition in {{MirrorSourceTask}} where certain partition
offsets may never be sent. The bug occurs when the [outstandingOffsetSync
semaphore is
full|https://github.com/apache/kafka/blob/trunk/connect/mirror/src/main/java/org/apache/kafka/connect/mirror/MirrorSourceTask.java#L207].
In this case, the sendOffsetSync [will silently
fail|https://github.com/apache/kafka/blob/trunk/connect/mirror/src/main/java/org/apache/kafka/connect/mirror/MirrorSourceTask.java#L207].
This failure is normally acceptable since offset sync will retry frequently.
However, {{maybeSyncOffsets}} has a bug where it will [mutate the partition
state|https://github.com/apache/kafka/blob/trunk/connect/mirror/src/main/java/org/apache/kafka/connect/mirror/MirrorSourceTask.java#L199]
prior to confirming the result of {{sendOffsetSync}}. The end result is that
the partition state is mutated prematurely, and prevent future offset syncs to
recover.
Since {{MAX_OUTSTANDING_OFFSET_SYNCS}} is 10, this bug happens when you assign
more than 10 partitions to each task.
In my test cases where I had over 100 partitions per task, the majority of the
offsets were wrong. Here's an example of such a failure.
https://issues.apache.org/jira/browse/KAFKA-12468?focusedCommentId=17308308&page=com.atlassian.jira.plugin.system.issuetabpanels%3Acomment-tabpanel#comment-17308308
During my troubleshooting, I customized the {{MirrorSourceTask}} to confirm
that all partitions that have the wrong offset were failing to acquire the
initial semaphore. The condition [can be trapped
here|https://github.com/apache/kafka/blob/trunk/connect/mirror/src/main/java/org/apache/kafka/connect/mirror/MirrorSourceTask.java#L208].
*Possible Fix:*
A possible fix is to create a {{shouldUpdate}} method in {{PartitionState}}.
This method should be read-only and return true if {{sendOffsetSync}} is
needed. Once {{sendOffsetSync}} is successful, only then {{update}} should be
called.
Here's some pseudocode
{code:java}
private void maybeSyncOffsets(TopicPartition topicPartition, long
upstreamOffset,
long downstreamOffset) {
PartitionState partitionState =
partitionStates.computeIfAbsent(topicPartition, x -> new
PartitionState(maxOffsetLag));
if (partitionState.shouldUpdate(upstreamOffset, downstreamOffset)) {
if(sendOffsetSync(topicPartition, upstreamOffset, downstreamOffset)) {
partitionState.update(upstreamOffset, downstreamOffset)
}
}
}
{code}
*Workaround:*
For those who are experiencing this issue, the workaround for this issue is to
make sure you have less than or equal to 10 partitions per task. Set your
`tasks.max` value accordingly.
> MM2 may not sync partition offsets correctly
> --------------------------------------------
>
> Key: KAFKA-12558
> URL: https://issues.apache.org/jira/browse/KAFKA-12558
> Project: Kafka
> Issue Type: Bug
> Components: mirrormaker
> Affects Versions: 2.7.0, 2.6.1
> Reporter: Alan
> Priority: Major
>
> There is a race condition in {{MirrorSourceTask}} where certain partition
> offsets may never be sent. The bug occurs when the [outstandingOffsetSync
> semaphore is
> full|https://github.com/apache/kafka/blob/trunk/connect/mirror/src/main/java/org/apache/kafka/connect/mirror/MirrorSourceTask.java#L207].
> In this case, the sendOffsetSync [will silently
> fail|https://github.com/apache/kafka/blob/trunk/connect/mirror/src/main/java/org/apache/kafka/connect/mirror/MirrorSourceTask.java#L207].
> This failure is normally acceptable since offset sync will retry frequently.
> However, {{maybeSyncOffsets}} has a bug where it will [mutate the partition
> state|https://github.com/apache/kafka/blob/trunk/connect/mirror/src/main/java/org/apache/kafka/connect/mirror/MirrorSourceTask.java#L199]
> prior to confirming the result of {{sendOffsetSync}}. The end result is that
> the partition state is mutated prematurely, and prevent future offset syncs
> to recover.
> Since {{MAX_OUTSTANDING_OFFSET_SYNCS}} is 10, this bug happens when you
> assign more than 10 partitions to each task.
> In my test cases where I had over 100 partitions per task, the majority of
> the offsets were wrong. Here's an example of such a failure.
> https://issues.apache.org/jira/browse/KAFKA-12468?focusedCommentId=17308308&page=com.atlassian.jira.plugin.system.issuetabpanels%3Acomment-tabpanel#comment-17308308
> During my troubleshooting, I customized the {{MirrorSourceTask}} to confirm
> that all partitions that have the wrong offset were failing to acquire the
> initial semaphore. The condition [can be trapped
> here|https://github.com/apache/kafka/blob/trunk/connect/mirror/src/main/java/org/apache/kafka/connect/mirror/MirrorSourceTask.java#L208].
> *Possible Fix:*
> A possible fix is to create a {{shouldUpdate}} method in {{PartitionState}}.
> This method should be read-only and return true if {{sendOffsetSync}} is
> needed. Once {{sendOffsetSync}} is successful, only then {{update}} should be
> called.
> Here's some pseudocode
> {code:java}
> private void maybeSyncOffsets(TopicPartition topicPartition, long
> upstreamOffset,
> long downstreamOffset) {
> PartitionState partitionState =
> partitionStates.computeIfAbsent(topicPartition, x -> new
> PartitionState(maxOffsetLag));
> if (partitionState.shouldUpdate(upstreamOffset, downstreamOffset)) {
> if(sendOffsetSync(topicPartition, upstreamOffset, downstreamOffset)) {
> partitionState.update(upstreamOffset, downstreamOffset)
> }
> }
> }
> {code}
>
> *Workaround:*
> For those who are experiencing this issue, the workaround is to make sure you
> have less than or equal to 10 partitions per task. Set your `tasks.max` value
> accordingly.
--
This message was sent by Atlassian Jira
(v8.3.4#803005)