[ 
https://issues.apache.org/jira/browse/KAFKA-4521?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Dong Lin updated KAFKA-4521:
----------------------------
    Description: 
In order to ensure that messages from a given partition in the source cluster 
are mirrored to the same partition in the destination cluster in the *same* 
order, MirrorMaker needs to produce and flush all messages that its consumer 
has received from source cluster before giving up partition in the cluster. 

However, as of current implementation of Apache Kafka, this is not guaranteed 
and will cause out-of-order message delivery in the following scenario.

- mirror maker process 1 fetches messages 1, 2, 3, from source cluster in one 
FetchRequest
- mirror maker process 2 starts up and triggers rebalance.
- `InternalRebalanceListenerForOldConsumer` is executed by zookeeper listener 
thread, which does producer.flush() and commit offset of this consumer. 
However, at this moment messages 1, 2, 3 haven't even been produced.
- consumer of mirror maker process 1 releases ownership of this partition
- consumer of mirror maker process 2 gets ownership of this partition.
- mirror maker process 2 fetches messages 4, 5, 6 from source cluster.
- messages 4, 5, 6 can be produced before messages 1, 2, 3. 

To fix this problem, the rebalance listener callback function should signal 
MirrorMakerThread to get all messages from consumer, produce these messages to 
destination cluster, flush producer, and commit offset. Rebalance listener 
callback function should wait for MirrorMakerThread to finish these steps 
before it allows ownership of this partition to be released.



  was:
In order to ensure that messages from a given partition in the source cluster 
are mirrored to the same partition in the destination cluster in the *same* 
order, MirrorMaker needs to produce and flush all messages that its consumer 
has received from source cluster before giving up partition in the cluster. 

However, as of current implementation of Apache Kafka, this is not guaranteed 
and will cause out-of-order message delivery in the following scenario.

- mirror maker process 1 fetches messages 1, 2, 3, from source cluster in one 
FetchRequest
- mirror maker process 2 starts up and triggers rebalance.
- `InternalRebalanceListenerForOldConsumer` is executed by zookeeper listener 
thread, which does producer.flush() and commit offset of this consumer. 
However, at this moment messages 1, 2, 3 haven't even been produced.
- consumer of mirror maker process 1 releases ownership of this partition
- consumer of mirror maker process 2 gets ownership of this partition.
- mirror maker process 2 fetches messages 4, 5, 6 from source cluster.
- messages 4, 5, 6 can be produced before messages 1, 2, 3. 

To avoid this problem, the rebalance listener callback function should signal 
MirrorMakerThread to get all messages from consumer, produce these messages to 
destination cluster, flush producer, and commit offset. rebalance listener 
callback function waits for MirrorMakerThread to finish these steps before it 
allows ownership of this partition to be released.




> MirrorMaker should flush all messages before releasing partition ownership 
> during rebalance
> -------------------------------------------------------------------------------------------
>
>                 Key: KAFKA-4521
>                 URL: https://issues.apache.org/jira/browse/KAFKA-4521
>             Project: Kafka
>          Issue Type: Bug
>            Reporter: Dong Lin
>            Assignee: Dong Lin
>
> In order to ensure that messages from a given partition in the source cluster 
> are mirrored to the same partition in the destination cluster in the *same* 
> order, MirrorMaker needs to produce and flush all messages that its consumer 
> has received from source cluster before giving up partition in the cluster. 
> However, as of current implementation of Apache Kafka, this is not guaranteed 
> and will cause out-of-order message delivery in the following scenario.
> - mirror maker process 1 fetches messages 1, 2, 3, from source cluster in one 
> FetchRequest
> - mirror maker process 2 starts up and triggers rebalance.
> - `InternalRebalanceListenerForOldConsumer` is executed by zookeeper listener 
> thread, which does producer.flush() and commit offset of this consumer. 
> However, at this moment messages 1, 2, 3 haven't even been produced.
> - consumer of mirror maker process 1 releases ownership of this partition
> - consumer of mirror maker process 2 gets ownership of this partition.
> - mirror maker process 2 fetches messages 4, 5, 6 from source cluster.
> - messages 4, 5, 6 can be produced before messages 1, 2, 3. 
> To fix this problem, the rebalance listener callback function should signal 
> MirrorMakerThread to get all messages from consumer, produce these messages 
> to destination cluster, flush producer, and commit offset. Rebalance listener 
> callback function should wait for MirrorMakerThread to finish these steps 
> before it allows ownership of this partition to be released.



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)

Reply via email to