Author: ivank
Date: Mon Dec  3 15:27:32 2012
New Revision: 1416560

URL: http://svn.apache.org/viewvc?rev=1416560&view=rev
Log:
BOOKKEEPER-442: Failed to deliver messages due to inconsistency between 
SubscriptionState and LedgerRanges. (jiannan via ivank)

Modified:
    zookeeper/bookkeeper/trunk/CHANGES.txt
    
zookeeper/bookkeeper/trunk/hedwig-server/src/main/java/org/apache/hedwig/server/persistence/BookkeeperPersistenceManager.java
    
zookeeper/bookkeeper/trunk/hedwig-server/src/main/java/org/apache/hedwig/server/subscriptions/AbstractSubscriptionManager.java
    
zookeeper/bookkeeper/trunk/hedwig-server/src/main/java/org/apache/hedwig/server/subscriptions/InMemorySubscriptionState.java
    
zookeeper/bookkeeper/trunk/hedwig-server/src/test/java/org/apache/hedwig/server/persistence/TestBookKeeperPersistenceManager.java

Modified: zookeeper/bookkeeper/trunk/CHANGES.txt
URL: 
http://svn.apache.org/viewvc/zookeeper/bookkeeper/trunk/CHANGES.txt?rev=1416560&r1=1416559&r2=1416560&view=diff
==============================================================================
--- zookeeper/bookkeeper/trunk/CHANGES.txt (original)
+++ zookeeper/bookkeeper/trunk/CHANGES.txt Mon Dec  3 15:27:32 2012
@@ -160,6 +160,8 @@ Trunk (unreleased changes)
 
         BOOKKEEPER-482: Precommit is reporting findbugs errors in trunk (ivank 
via sijie)
 
+        BOOKKEEPER-442: Failed to deliver messages due to inconsistency 
between SubscriptionState and LedgerRanges. (jiannan via ivank)
+
     IMPROVEMENTS:
 
       BOOKKEEPER-467: Allocate ports for testing dynamically (ivank)

Modified: 
zookeeper/bookkeeper/trunk/hedwig-server/src/main/java/org/apache/hedwig/server/persistence/BookkeeperPersistenceManager.java
URL: 
http://svn.apache.org/viewvc/zookeeper/bookkeeper/trunk/hedwig-server/src/main/java/org/apache/hedwig/server/persistence/BookkeeperPersistenceManager.java?rev=1416560&r1=1416559&r2=1416560&view=diff
==============================================================================
--- 
zookeeper/bookkeeper/trunk/hedwig-server/src/main/java/org/apache/hedwig/server/persistence/BookkeeperPersistenceManager.java
 (original)
+++ 
zookeeper/bookkeeper/trunk/hedwig-server/src/main/java/org/apache/hedwig/server/persistence/BookkeeperPersistenceManager.java
 Mon Dec  3 15:27:32 2012
@@ -231,6 +231,19 @@ public class BookkeeperPersistenceManage
         }
 
         protected void read(final InMemoryLedgerRange imlr, final long 
startSeqId, final long endSeqId) {
+            // Verify whether startSeqId falls in ledger range.
+            // Only the left endpoint of range needs to be checked.
+            if (imlr.getStartSeqIdIncluded() > startSeqId) {
+                logger.error(
+                        "Invalid RangeScan read, startSeqId {} doesn't fall in 
ledger range [{} ~ {}]",
+                        va(startSeqId, imlr.getStartSeqIdIncluded(), 
imlr.range.hasEndSeqIdIncluded() ? imlr.range
+                                .getEndSeqIdIncluded().getLocalComponent() : 
""));
+                request.callback.scanFailed(request.ctx, new 
PubSubException.UnexpectedConditionException("Scan request is out of range"));
+
+                // try release topic to reset the state
+                lostTopic(topic);
+                return;
+            }
 
             if (imlr.handle == null) {
 

Modified: 
zookeeper/bookkeeper/trunk/hedwig-server/src/main/java/org/apache/hedwig/server/subscriptions/AbstractSubscriptionManager.java
URL: 
http://svn.apache.org/viewvc/zookeeper/bookkeeper/trunk/hedwig-server/src/main/java/org/apache/hedwig/server/subscriptions/AbstractSubscriptionManager.java?rev=1416560&r1=1416559&r2=1416560&view=diff
==============================================================================
--- 
zookeeper/bookkeeper/trunk/hedwig-server/src/main/java/org/apache/hedwig/server/subscriptions/AbstractSubscriptionManager.java
 (original)
+++ 
zookeeper/bookkeeper/trunk/hedwig-server/src/main/java/org/apache/hedwig/server/subscriptions/AbstractSubscriptionManager.java
 Mon Dec  3 15:27:32 2012
@@ -129,24 +129,24 @@ public abstract class AbstractSubscripti
 
                 long minConsumedMessage = Long.MAX_VALUE;
                 boolean hasBound = true;
-                // Loop through all subscribers to the current topic to find 
the
-                // minimum consumed message id. The consume pointers are
-                // persisted lazily so we'll use the stale in-memory value
-                // instead. This keeps things consistent in case of a server
-                // crash.
+                // Loop through all subscribers on the current topic to find 
the
+                // minimum persisted message id. The reason not using in-memory
+                // consumed message id is LedgerRangs and 
InMemorySubscriptionState
+                // may be inconsistent in case of a server crash.
                 for (InMemorySubscriptionState curSubscription : 
topicSubscriptions.values()) {
-                    if 
(curSubscription.getSubscriptionState().getMsgId().getLocalComponent() < 
minConsumedMessage)
-                        minConsumedMessage = 
curSubscription.getSubscriptionState().getMsgId().getLocalComponent();
+                    if (curSubscription.getLastPersistedSeqId() < 
minConsumedMessage) {
+                        minConsumedMessage = 
curSubscription.getLastPersistedSeqId();
+                    }
                     hasBound = hasBound && 
curSubscription.getSubscriptionPreferences().hasMessageBound();
                 }
                 boolean callPersistenceManager = true;
-                // Don't call the PersistenceManager if nobody is subscribed to
-                // the topic yet, or the consume pointer has not changed since
-                // the last time, or if this is the initial subscription.
+                // Call the PersistenceManager if nobody subscribes to the 
topic
+                // yet, or the consume pointer has moved ahead since the last
+                // time, or if this is the initial subscription.
                 Long minConsumedFromMap = 
topic2MinConsumedMessagesMap.get(topic);
                 if (topicSubscriptions.isEmpty()
-                    || (minConsumedFromMap != null && 
minConsumedFromMap.equals(minConsumedMessage))
-                    || minConsumedMessage == 0) {
+                    || (minConsumedFromMap != null && minConsumedFromMap < 
minConsumedMessage)
+                    || (minConsumedFromMap == null && minConsumedMessage != 
0)) {
                     topic2MinConsumedMessagesMap.put(topic, 
minConsumedMessage);
                     pm.consumedUntil(topic, minConsumedMessage);
                 } else if (hasBound) {
@@ -547,14 +547,25 @@ public abstract class AbstractSubscripti
                 return;
             }
 
-            InMemorySubscriptionState subState = topicSubs.get(subscriberId);
+            final InMemorySubscriptionState subState = 
topicSubs.get(subscriberId);
             if (subState == null) {
                 cb.operationFinished(ctx, null);
                 return;
             }
 
             if (subState.setLastConsumeSeqId(consumeSeqId, 
cfg.getConsumeInterval())) {
-                updateSubscriptionState(topic, subscriberId, subState, cb, 
ctx);
+                updateSubscriptionState(topic, subscriberId, subState, new 
Callback<Void>() {
+                    @Override
+                    public void operationFinished(Object ctx, Void 
resultOfOperation) {
+                        
subState.setLastPersistedSeqId(consumeSeqId.getLocalComponent());
+                        cb.operationFinished(ctx, resultOfOperation);
+                    }
+
+                    @Override
+                    public void operationFailed(Object ctx, PubSubException 
exception) {
+                        cb.operationFailed(ctx, exception);
+                    }
+                }, ctx);
             } else {
                 if (logger.isDebugEnabled()) {
                     logger.debug("Only advanced consume pointer in memory, 
will persist later, topic: "

Modified: 
zookeeper/bookkeeper/trunk/hedwig-server/src/main/java/org/apache/hedwig/server/subscriptions/InMemorySubscriptionState.java
URL: 
http://svn.apache.org/viewvc/zookeeper/bookkeeper/trunk/hedwig-server/src/main/java/org/apache/hedwig/server/subscriptions/InMemorySubscriptionState.java?rev=1416560&r1=1416559&r2=1416560&view=diff
==============================================================================
--- 
zookeeper/bookkeeper/trunk/hedwig-server/src/main/java/org/apache/hedwig/server/subscriptions/InMemorySubscriptionState.java
 (original)
+++ 
zookeeper/bookkeeper/trunk/hedwig-server/src/main/java/org/apache/hedwig/server/subscriptions/InMemorySubscriptionState.java
 Mon Dec  3 15:27:32 2012
@@ -34,7 +34,8 @@ public class InMemorySubscriptionState {
     SubscriptionState subscriptionState;
     SubscriptionPreferences subscriptionPreferences;
     MessageSeqId lastConsumeSeqId;
-       Version version;
+    Version version;
+    long lastPersistedSeqId;
 
     public InMemorySubscriptionState(SubscriptionData subscriptionData, 
Version version, MessageSeqId lastConsumeSeqId) {
         this.subscriptionState = subscriptionData.getState();
@@ -50,6 +51,7 @@ public class InMemorySubscriptionState {
         }
         this.lastConsumeSeqId = lastConsumeSeqId;
         this.version = version;
+        this.lastPersistedSeqId = 
subscriptionState.getMsgId().getLocalComponent();
     }
 
     public InMemorySubscriptionState(SubscriptionData subscriptionData, 
Version version) {
@@ -125,6 +127,14 @@ public class InMemorySubscriptionState {
         return true;
     }
 
+    public long getLastPersistedSeqId() {
+        return lastPersistedSeqId;
+    }
+
+    public void setLastPersistedSeqId(long lastPersistedSeqId) {
+        this.lastPersistedSeqId = lastPersistedSeqId;
+    }
+
     /**
      * Update preferences.
      *

Modified: 
zookeeper/bookkeeper/trunk/hedwig-server/src/test/java/org/apache/hedwig/server/persistence/TestBookKeeperPersistenceManager.java
URL: 
http://svn.apache.org/viewvc/zookeeper/bookkeeper/trunk/hedwig-server/src/test/java/org/apache/hedwig/server/persistence/TestBookKeeperPersistenceManager.java?rev=1416560&r1=1416559&r2=1416560&view=diff
==============================================================================
--- 
zookeeper/bookkeeper/trunk/hedwig-server/src/test/java/org/apache/hedwig/server/persistence/TestBookKeeperPersistenceManager.java
 (original)
+++ 
zookeeper/bookkeeper/trunk/hedwig-server/src/test/java/org/apache/hedwig/server/persistence/TestBookKeeperPersistenceManager.java
 Mon Dec  3 15:27:32 2012
@@ -24,6 +24,7 @@ import java.util.Collection;
 import java.util.LinkedList;
 import java.util.List;
 import java.util.Iterator;
+import java.util.Map;
 import java.util.concurrent.Executors;
 import java.util.concurrent.LinkedBlockingQueue;
 import java.util.concurrent.ScheduledExecutorService;
@@ -36,19 +37,27 @@ import org.apache.bookkeeper.versioning.
 import org.apache.bookkeeper.versioning.Versioned;
 
 import org.apache.hedwig.HelperMethods;
+import org.apache.hedwig.StubCallback;
 import org.apache.hedwig.exceptions.PubSubException;
 import org.apache.hedwig.protocol.PubSubProtocol;
 import org.apache.hedwig.protocol.PubSubProtocol.LedgerRange;
 import org.apache.hedwig.protocol.PubSubProtocol.LedgerRanges;
 import org.apache.hedwig.protocol.PubSubProtocol.Message;
+import org.apache.hedwig.protocol.PubSubProtocol.MessageSeqId;
+import org.apache.hedwig.protocol.PubSubProtocol.SubscribeRequest;
+import 
org.apache.hedwig.protocol.PubSubProtocol.SubscribeRequest.CreateOrAttach;
+import org.apache.hedwig.protocol.PubSubProtocol.SubscriptionData;
 import org.apache.hedwig.server.common.ServerConfiguration;
 import org.apache.hedwig.server.meta.MetadataManagerFactory;
 import org.apache.hedwig.server.meta.SubscriptionDataManager;
 import org.apache.hedwig.server.meta.TopicOwnershipManager;
 import org.apache.hedwig.server.meta.TopicPersistenceManager;
+import org.apache.hedwig.server.subscriptions.MMSubscriptionManager;
 import org.apache.hedwig.server.topics.TopicManager;
 import org.apache.hedwig.server.topics.TrivialOwnAllTopicManager;
 import org.apache.hedwig.util.Callback;
+import org.apache.hedwig.util.ConcurrencyUtils;
+import org.apache.hedwig.util.Either;
 import org.apache.zookeeper.ZooKeeper;
 import org.junit.After;
 import org.junit.Before;
@@ -77,6 +86,7 @@ public class TestBookKeeperPersistenceMa
     PubSubException failureException = null;
     TestMetadataManagerFactory metadataManagerFactory;
     TopicPersistenceManager tpManager;
+    MMSubscriptionManager sm;
 
     boolean removeStartSeqId;
 
@@ -160,7 +170,66 @@ public class TestBookKeeperPersistenceMa
 
         @Override
         public SubscriptionDataManager newSubscriptionDataManager() {
-            return factory.newSubscriptionDataManager();
+            final SubscriptionDataManager sdm = 
factory.newSubscriptionDataManager();
+            return new SubscriptionDataManager() {
+                @Override
+                public void close() throws IOException {
+                    sdm.close();
+                }
+
+                @Override
+                public void createSubscriptionData(ByteString topic, 
ByteString subscriberId, SubscriptionData data,
+                        Callback<Version> callback, Object ctx) {
+                    sdm.createSubscriptionData(topic, subscriberId, data, 
callback, ctx);
+                }
+
+                @Override
+                public boolean isPartialUpdateSupported() {
+                    return sdm.isPartialUpdateSupported();
+                }
+
+                @Override
+                public void updateSubscriptionData(ByteString topic, 
ByteString subscriberId,
+                        SubscriptionData dataToUpdate, Version version, 
Callback<Version> callback, Object ctx) {
+                    if (serviceDownCount > 0) {
+                        --serviceDownCount;
+                        callback.operationFailed(ctx,
+                                new 
PubSubException.ServiceDownException("Metadata Store is down"));
+                        return;
+                    }
+                    sdm.updateSubscriptionData(topic, subscriberId, 
dataToUpdate, version, callback, ctx);
+                }
+
+                @Override
+                public void replaceSubscriptionData(ByteString topic, 
ByteString subscriberId,
+                        SubscriptionData dataToReplace, Version version, 
Callback<Version> callback, Object ctx) {
+                    if (serviceDownCount > 0) {
+                        --serviceDownCount;
+                        callback.operationFailed(ctx,
+                                new 
PubSubException.ServiceDownException("Metadata Store is down"));
+                        return;
+                    }
+                    sdm.replaceSubscriptionData(topic, subscriberId, 
dataToReplace, version, callback, ctx);
+                }
+
+                @Override
+                public void deleteSubscriptionData(ByteString topic, 
ByteString subscriberId, Version version,
+                        Callback<Void> callback, Object ctx) {
+                    sdm.deleteSubscriptionData(topic, subscriberId, version, 
callback, ctx);
+                }
+
+                @Override
+                public void readSubscriptionData(ByteString topic, ByteString 
subscriberId,
+                        Callback<Versioned<SubscriptionData>> callback, Object 
ctx) {
+                    sdm.readSubscriptionData(topic, subscriberId, callback, 
ctx);
+                }
+
+                @Override
+                public void readSubscriptions(ByteString topic,
+                        Callback<Map<ByteString, Versioned<SubscriptionData>>> 
cb, Object ctx) {
+                    sdm.readSubscriptions(topic, cb, ctx);
+                }
+            };
         }
 
         @Override
@@ -194,7 +263,16 @@ public class TestBookKeeperPersistenceMa
         bktb = new BookKeeperTestBase(numBookies, readDelay);
         bktb.setUp();
 
-        conf = new ServerConfiguration();
+        conf = new ServerConfiguration() {
+            @Override
+            public int getMessagesConsumedThreadRunInterval() {
+                return 2000;
+            }
+            @Override
+            public int getConsumeInterval() {
+                return 0;
+            }
+        };
         org.apache.bookkeeper.conf.ClientConfiguration bkClientConf =
                 new org.apache.bookkeeper.conf.ClientConfiguration();
         bkClientConf.setNumWorkerThreads(1).setReadTimeout(9999)
@@ -208,6 +286,7 @@ public class TestBookKeeperPersistenceMa
         tm = new TrivialOwnAllTopicManager(conf, scheduler);
         manager = new BookkeeperPersistenceManager(bktb.bk, 
metadataManagerFactory,
                                                    tm, conf, scheduler);
+        sm = new MMSubscriptionManager(conf, metadataManagerFactory, tm, 
manager, null, scheduler);
     }
 
     @Override
@@ -215,6 +294,7 @@ public class TestBookKeeperPersistenceMa
     public void tearDown() throws Exception {
         tm.stop();
         manager.stop();
+        sm.stop();
         tpManager.close();
         metadataManagerFactory.shutdown();
         scheduler.shutdown();
@@ -464,6 +544,97 @@ public class TestBookKeeperPersistenceMa
         }
     }
 
+    @Test
+    public void testInconsistentSubscriptionStateAndLedgerRanges1() throws 
Exception {
+        // See the comment of inconsistentSubscriptionStateAndLedgerRanges.
+        // For this case, Step (2) failed to update subscription state 
metadata,
+        // but LedgerRanges is updated success.
+        // Result: scan messages from 1 to 4 take place on ledger L2.
+        inconsistentSubscriptionStateAndLedgerRanges(1);
+    }
+
+    @Test
+    public void testInconsistentSubscriptionStateAndLedgerRanges2() throws 
Exception {
+        // See the comment of inconsistentSubscriptionStateAndLedgerRanges.
+        // For this case, step (2) failed to update subscription state 
metadata,
+        // step (3) successfully delete L1 but failed to update LedgerRanges.
+        // Result: scan messages from 1 to 4 falls in L1 and L2,
+        //         but BookKeeper may complain L1 not found.
+        inconsistentSubscriptionStateAndLedgerRanges(2);
+    }
+
+    /**
+     * Since InMemorySubscriptionState and LedgerRanges is maintained
+     * separately, there may exist such inconsistent state:
+     * (1). Topic ledgers: L1 [1 ~ 2], L2 [3 ~ ]
+     * (2). Subscriber consumes to 2 and InMemorySubscriptionState is updated
+     *      successfully but failed when updating subscription state metadata
+     * (3). AbstractSubscriptionManager#MessagesConsumedTask use
+     *      InMemorySubscriptionState to do garbage collection
+     *      and L1 is delete
+     * (4). If Hub restarts at this time, old subscription state is read and
+     *      Hub will try to deliver message from 1
+     */
+    public void inconsistentSubscriptionStateAndLedgerRanges(int failedCount) 
throws Exception {
+        final ByteString topic = 
ByteString.copyFromUtf8("inconsistentSubscriptionStateAndLedgerRanges");
+        final ByteString subscriberId = ByteString.copyFromUtf8("subId");
+        LinkedList<Message> msgs = new LinkedList<Message>();
+
+        // make ledger L1 [1 ~ 2]
+        acquireTopic(topic);
+        msgs.addAll(publishMessages(topic, 2));
+        releaseTopic(topic);
+
+        // acquire topic again to force a new ledger L2 [3 ~ ]
+        acquireTopic(topic);
+        msgs.addAll(publishMessages(topic, 2));
+
+        StubCallback<Void> voidCb = new StubCallback<Void>();
+        StubCallback<SubscriptionData> subDataCb = new 
StubCallback<SubscriptionData>();
+        Either<Void, PubSubException> voidResult;
+        Either<SubscriptionData, PubSubException> subDataResult;
+
+        // prepare for subscription
+        sm.acquiredTopic(topic, voidCb, null);
+        voidResult = ConcurrencyUtils.take(voidCb.queue);
+        assertNull(voidResult.right()); // no exception
+
+        // Do subscription
+        SubscribeRequest subRequest = 
SubscribeRequest.newBuilder().setSubscriberId(subscriberId)
+                .setCreateOrAttach(CreateOrAttach.CREATE_OR_ATTACH).build();
+        sm.serveSubscribeRequest(topic, subRequest, 
MessageSeqId.newBuilder().setLocalComponent(0).build(), subDataCb,
+                null);
+        subDataResult = ConcurrencyUtils.take(subDataCb.queue);
+        assertNotNull(subDataResult.left()); // serveSubscribeRequest success
+                                             // and return a SubscriptionData
+                                             // object
+        assertNull(subDataResult.right()); // no exception
+
+        // simulate inconsistent situation between InMemorySubscriptionState 
and
+        // LedgerRanges
+        metadataManagerFactory.setServiceDownCount(failedCount);
+        sm.setConsumeSeqIdForSubscriber(topic, subscriberId, 
MessageSeqId.newBuilder().setLocalComponent(2).build(),
+                voidCb, null);
+        voidResult = ConcurrencyUtils.take(voidCb.queue);
+        assertNotNull(voidResult.right()); // update subscription state failed
+                                           // and expect a exception
+
+        // wait AbstractSubscriptionManager#MessagesConsumedTask to garbage
+        // collect ledger L1
+        Thread.sleep(conf.getMessagesConsumedThreadRunInterval() * 2);
+
+        // simulate hub restart: read old subscription state metadata and 
deliver
+        // messages from 1
+        LinkedBlockingQueue<Boolean> statusQueue = new 
LinkedBlockingQueue<Boolean>();
+        RangeScanRequest scan = new RangeScanRequest(topic, 1, 4, 
Long.MAX_VALUE, new RangeScanVerifier(msgs, null),
+                statusQueue);
+        manager.scanMessages(scan);
+        Boolean b = statusQueue.poll(10 * readDelay, TimeUnit.MILLISECONDS);
+        if (b == null) {
+            fail("Scan request doesn't finish");
+        }
+    }
+
     class TestCallback implements Callback<PubSubProtocol.MessageSeqId> {
 
         @Override


Reply via email to