[ https://issues.apache.org/jira/browse/BOOKKEEPER-442?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ]
Jiannan Wang updated BOOKKEEPER-442: ------------------------------------ Attachment: BOOKKEEPER-442.diff Change follows Ivan's comment: logging the error instead of throw a runtime exception > Failed to deliver messages due to inconsistency between SubscriptionState and > LedgerRanges. > ------------------------------------------------------------------------------------------- > > Key: BOOKKEEPER-442 > URL: https://issues.apache.org/jira/browse/BOOKKEEPER-442 > Project: Bookkeeper > Issue Type: Bug > Components: hedwig-server > Affects Versions: 4.1.0, 4.2.0 > Reporter: Sijie Guo > Assignee: Jiannan Wang > Priority: Critical > Fix For: 4.2.0 > > Attachments: BOOKKEEPER-442.diff, BOOKKEEPER-442.diff > > > The problems encountered when failed to updateSubscriptionState but deleted > consumed ledgers. > The issue is described as below: > 1) A subscriber setLastConsumeSeqId to move consume ptr. If the consume ptr > is moved over consume interval, an update subscription state operation is > issued to update to ZooKeeper. > {code} > AbstractSubscriptionManager: > > if (subState.setLastConsumeSeqId(consumeSeqId, > cfg.getConsumeInterval())) { updateSubscriptionState(topic, > subscriberId, subState, cb, ctx); > } > {code} > 2) when move consume ptr, it also changed in-memory subscription state before > the subscription state is persisted to ZooKeeper. > {code} > public boolean setLastConsumeSeqId(MessageSeqId lastConsumeSeqId, int > consumeInterval) { > long interval = lastConsumeSeqId.getLocalComponent() - > subscriptionState.getMsgId(). getLocalComponent(); > if (interval <= 0) { > return false; > } > // set consume seq id when it is larger > this.lastConsumeSeqId = lastConsumeSeqId; > if (interval < consumeInterval) { > return false; > } > // subscription state will be updated, marked it as clean > subscriptionState = SubscriptionState.newBuilder(subscriptionState). > setMsgId(lastConsumeSeqId).build(); > return true; > } > {code} > 3) MessageConsumedTask runs periodically to delete consumed ledgers. it would > use in-memory subscription state to perform such deletion. so if ledger is > deleted first and failed to update subscription state. it would cause > inconsistent state, when hub restarts and subscriber reconnects, it would use > old seq id to start delivering but the ledger has messages with old seq id > has been deleted. > {code} > for (InMemorySubscriptionState curSubscription : topicSubscriptions.values()) > { > if > (curSubscription.getSubscriptionState().getMsgId().getLocalComponent() < > minConsumedMessage) > minConsumedMessage = > curSubscription.getSubscriptionState().getMsgId(). getLocalComponent(); > hasBound = hasBound && > curSubscription.getSubscriptionPreferences(). hasMessageBound(); > } > {code} > The fix would be let message consume task only use persistence state to > performance deletions only. -- This message is automatically generated by JIRA. If you think it was sent incorrectly, please contact your JIRA administrators For more information on JIRA, see: http://www.atlassian.com/software/jira