[ 
https://issues.apache.org/jira/browse/BOOKKEEPER-442?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Jiannan Wang updated BOOKKEEPER-442:
------------------------------------

    Attachment: BOOKKEEPER-442.diff

Change follows Ivan's comment: logging the error instead of throw a runtime 
exception
                
> Failed to deliver messages due to inconsistency between SubscriptionState and 
> LedgerRanges.
> -------------------------------------------------------------------------------------------
>
>                 Key: BOOKKEEPER-442
>                 URL: https://issues.apache.org/jira/browse/BOOKKEEPER-442
>             Project: Bookkeeper
>          Issue Type: Bug
>          Components: hedwig-server
>    Affects Versions: 4.1.0, 4.2.0
>            Reporter: Sijie Guo
>            Assignee: Jiannan Wang
>            Priority: Critical
>             Fix For: 4.2.0
>
>         Attachments: BOOKKEEPER-442.diff, BOOKKEEPER-442.diff
>
>
> The problems encountered when failed to updateSubscriptionState but deleted 
> consumed ledgers. 
> The issue is described as below:
> 1) A subscriber setLastConsumeSeqId to move consume ptr. If the consume ptr 
> is moved over consume interval, an update subscription state operation is 
> issued to update to ZooKeeper.
> {code}
> AbstractSubscriptionManager:
>             
>             if (subState.setLastConsumeSeqId(consumeSeqId, 
> cfg.getConsumeInterval())) {                updateSubscriptionState(topic, 
> subscriberId, subState, cb, ctx);
>             }
> {code}
> 2) when move consume ptr, it also changed in-memory subscription state before 
> the subscription state is persisted to ZooKeeper.
> {code}
>     public boolean setLastConsumeSeqId(MessageSeqId lastConsumeSeqId, int 
> consumeInterval) {
>         long interval = lastConsumeSeqId.getLocalComponent() - 
> subscriptionState.getMsgId().          getLocalComponent();
>         if (interval <= 0) {
>             return false;
>         }
>         // set consume seq id when it is larger
>         this.lastConsumeSeqId = lastConsumeSeqId;
>         if (interval < consumeInterval) {
>             return false;
>         }
>         // subscription state will be updated, marked it as clean
>         subscriptionState = SubscriptionState.newBuilder(subscriptionState).  
>                         setMsgId(lastConsumeSeqId).build();
>         return true;
>     }
> {code}
> 3) MessageConsumedTask runs periodically to delete consumed ledgers. it would 
> use in-memory subscription state to perform such deletion. so if ledger is 
> deleted first and failed to update subscription state. it would cause 
> inconsistent state, when hub restarts and subscriber reconnects, it would use 
> old seq id to start delivering but the ledger has messages with old seq id 
> has been deleted.
> {code}
> for (InMemorySubscriptionState curSubscription : topicSubscriptions.values()) 
> {
>                     if 
> (curSubscription.getSubscriptionState().getMsgId().getLocalComponent() <      
>  minConsumedMessage)
>                         minConsumedMessage = 
> curSubscription.getSubscriptionState().getMsgId().       getLocalComponent();
>                     hasBound = hasBound && 
> curSubscription.getSubscriptionPreferences().              hasMessageBound();
>                 }
> {code} 
> The fix would be let message consume task only use persistence state to 
> performance deletions only. 

--
This message is automatically generated by JIRA.
If you think it was sent incorrectly, please contact your JIRA administrators
For more information on JIRA, see: http://www.atlassian.com/software/jira

Reply via email to