[ROCKETMQ-175] Consumer may miss messages because of inconsistent sub⦠closes apache/incubator-rocketmq#92
Project: http://git-wip-us.apache.org/repos/asf/incubator-rocketmq/repo Commit: http://git-wip-us.apache.org/repos/asf/incubator-rocketmq/commit/82803889 Tree: http://git-wip-us.apache.org/repos/asf/incubator-rocketmq/tree/82803889 Diff: http://git-wip-us.apache.org/repos/asf/incubator-rocketmq/diff/82803889 Branch: refs/heads/master Commit: 8280388917c466d030ffb774a2474ca8e4144811 Parents: 42826c4 Author: vsair <liuxue...@gmail.com> Authored: Fri May 26 15:13:29 2017 +0800 Committer: dongeforever <zhendongli...@yeah.net> Committed: Fri May 26 15:13:29 2017 +0800 ---------------------------------------------------------------------- .../rocketmq/client/impl/consumer/RebalancePushImpl.java | 11 +++++++++++ 1 file changed, 11 insertions(+) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/incubator-rocketmq/blob/82803889/client/src/main/java/org/apache/rocketmq/client/impl/consumer/RebalancePushImpl.java ---------------------------------------------------------------------- diff --git a/client/src/main/java/org/apache/rocketmq/client/impl/consumer/RebalancePushImpl.java b/client/src/main/java/org/apache/rocketmq/client/impl/consumer/RebalancePushImpl.java index 1730c99..509c9a4 100644 --- a/client/src/main/java/org/apache/rocketmq/client/impl/consumer/RebalancePushImpl.java +++ b/client/src/main/java/org/apache/rocketmq/client/impl/consumer/RebalancePushImpl.java @@ -30,6 +30,7 @@ import org.apache.rocketmq.common.consumer.ConsumeFromWhere; import org.apache.rocketmq.common.message.MessageQueue; import org.apache.rocketmq.common.protocol.heartbeat.ConsumeType; import org.apache.rocketmq.common.protocol.heartbeat.MessageModel; +import org.apache.rocketmq.common.protocol.heartbeat.SubscriptionData; public class RebalancePushImpl extends RebalanceImpl { private final static long UNLOCK_DELAY_TIME_MILLS = Long.parseLong(System.getProperty("rocketmq.client.unlockDelayTimeMills", "20000")); @@ -47,6 +48,16 @@ public class RebalancePushImpl extends RebalanceImpl { @Override public void messageQueueChanged(String topic, Set<MessageQueue> mqAll, Set<MessageQueue> mqDivided) { + /** + * When rebalance result changed, should update subscription's version to notify broker. + * Fix: inconsistency subscription may lead to consumer miss messages. + */ + SubscriptionData subscriptionData = this.subscriptionInner.get(topic); + long newVersion = System.currentTimeMillis(); + log.info("{} Rebalance changed, also update version: {}, {}", topic, subscriptionData.getSubVersion(), newVersion); + subscriptionData.setSubVersion(newVersion); + // notify broker + this.getmQClientFactory().sendHeartbeatToAllBrokerWithLock(); } @Override