Author: ivank
Date: Mon Dec 3 15:27:32 2012
New Revision: 1416560
URL: http://svn.apache.org/viewvc?rev=1416560&view=rev
Log:
BOOKKEEPER-442: Failed to deliver messages due to inconsistency between
SubscriptionState and LedgerRanges. (jiannan via ivank)
Modified:
zookeeper/bookkeeper/trunk/CHANGES.txt
zookeeper/bookkeeper/trunk/hedwig-server/src/main/java/org/apache/hedwig/server/persistence/BookkeeperPersistenceManager.java
zookeeper/bookkeeper/trunk/hedwig-server/src/main/java/org/apache/hedwig/server/subscriptions/AbstractSubscriptionManager.java
zookeeper/bookkeeper/trunk/hedwig-server/src/main/java/org/apache/hedwig/server/subscriptions/InMemorySubscriptionState.java
zookeeper/bookkeeper/trunk/hedwig-server/src/test/java/org/apache/hedwig/server/persistence/TestBookKeeperPersistenceManager.java
Modified: zookeeper/bookkeeper/trunk/CHANGES.txt
URL:
http://svn.apache.org/viewvc/zookeeper/bookkeeper/trunk/CHANGES.txt?rev=1416560&r1=1416559&r2=1416560&view=diff
==============================================================================
--- zookeeper/bookkeeper/trunk/CHANGES.txt (original)
+++ zookeeper/bookkeeper/trunk/CHANGES.txt Mon Dec 3 15:27:32 2012
@@ -160,6 +160,8 @@ Trunk (unreleased changes)
BOOKKEEPER-482: Precommit is reporting findbugs errors in trunk (ivank
via sijie)
+ BOOKKEEPER-442: Failed to deliver messages due to inconsistency
between SubscriptionState and LedgerRanges. (jiannan via ivank)
+
IMPROVEMENTS:
BOOKKEEPER-467: Allocate ports for testing dynamically (ivank)
Modified:
zookeeper/bookkeeper/trunk/hedwig-server/src/main/java/org/apache/hedwig/server/persistence/BookkeeperPersistenceManager.java
URL:
http://svn.apache.org/viewvc/zookeeper/bookkeeper/trunk/hedwig-server/src/main/java/org/apache/hedwig/server/persistence/BookkeeperPersistenceManager.java?rev=1416560&r1=1416559&r2=1416560&view=diff
==============================================================================
---
zookeeper/bookkeeper/trunk/hedwig-server/src/main/java/org/apache/hedwig/server/persistence/BookkeeperPersistenceManager.java
(original)
+++
zookeeper/bookkeeper/trunk/hedwig-server/src/main/java/org/apache/hedwig/server/persistence/BookkeeperPersistenceManager.java
Mon Dec 3 15:27:32 2012
@@ -231,6 +231,19 @@ public class BookkeeperPersistenceManage
}
protected void read(final InMemoryLedgerRange imlr, final long
startSeqId, final long endSeqId) {
+ // Verify whether startSeqId falls in ledger range.
+ // Only the left endpoint of range needs to be checked.
+ if (imlr.getStartSeqIdIncluded() > startSeqId) {
+ logger.error(
+ "Invalid RangeScan read, startSeqId {} doesn't fall in
ledger range [{} ~ {}]",
+ va(startSeqId, imlr.getStartSeqIdIncluded(),
imlr.range.hasEndSeqIdIncluded() ? imlr.range
+ .getEndSeqIdIncluded().getLocalComponent() :
""));
+ request.callback.scanFailed(request.ctx, new
PubSubException.UnexpectedConditionException("Scan request is out of range"));
+
+ // try release topic to reset the state
+ lostTopic(topic);
+ return;
+ }
if (imlr.handle == null) {
Modified:
zookeeper/bookkeeper/trunk/hedwig-server/src/main/java/org/apache/hedwig/server/subscriptions/AbstractSubscriptionManager.java
URL:
http://svn.apache.org/viewvc/zookeeper/bookkeeper/trunk/hedwig-server/src/main/java/org/apache/hedwig/server/subscriptions/AbstractSubscriptionManager.java?rev=1416560&r1=1416559&r2=1416560&view=diff
==============================================================================
---
zookeeper/bookkeeper/trunk/hedwig-server/src/main/java/org/apache/hedwig/server/subscriptions/AbstractSubscriptionManager.java
(original)
+++
zookeeper/bookkeeper/trunk/hedwig-server/src/main/java/org/apache/hedwig/server/subscriptions/AbstractSubscriptionManager.java
Mon Dec 3 15:27:32 2012
@@ -129,24 +129,24 @@ public abstract class AbstractSubscripti
long minConsumedMessage = Long.MAX_VALUE;
boolean hasBound = true;
- // Loop through all subscribers to the current topic to find
the
- // minimum consumed message id. The consume pointers are
- // persisted lazily so we'll use the stale in-memory value
- // instead. This keeps things consistent in case of a server
- // crash.
+ // Loop through all subscribers on the current topic to find
the
+ // minimum persisted message id. The reason not using in-memory
+ // consumed message id is LedgerRangs and
InMemorySubscriptionState
+ // may be inconsistent in case of a server crash.
for (InMemorySubscriptionState curSubscription :
topicSubscriptions.values()) {
- if
(curSubscription.getSubscriptionState().getMsgId().getLocalComponent() <
minConsumedMessage)
- minConsumedMessage =
curSubscription.getSubscriptionState().getMsgId().getLocalComponent();
+ if (curSubscription.getLastPersistedSeqId() <
minConsumedMessage) {
+ minConsumedMessage =
curSubscription.getLastPersistedSeqId();
+ }
hasBound = hasBound &&
curSubscription.getSubscriptionPreferences().hasMessageBound();
}
boolean callPersistenceManager = true;
- // Don't call the PersistenceManager if nobody is subscribed to
- // the topic yet, or the consume pointer has not changed since
- // the last time, or if this is the initial subscription.
+ // Call the PersistenceManager if nobody subscribes to the
topic
+ // yet, or the consume pointer has moved ahead since the last
+ // time, or if this is the initial subscription.
Long minConsumedFromMap =
topic2MinConsumedMessagesMap.get(topic);
if (topicSubscriptions.isEmpty()
- || (minConsumedFromMap != null &&
minConsumedFromMap.equals(minConsumedMessage))
- || minConsumedMessage == 0) {
+ || (minConsumedFromMap != null && minConsumedFromMap <
minConsumedMessage)
+ || (minConsumedFromMap == null && minConsumedMessage !=
0)) {
topic2MinConsumedMessagesMap.put(topic,
minConsumedMessage);
pm.consumedUntil(topic, minConsumedMessage);
} else if (hasBound) {
@@ -547,14 +547,25 @@ public abstract class AbstractSubscripti
return;
}
- InMemorySubscriptionState subState = topicSubs.get(subscriberId);
+ final InMemorySubscriptionState subState =
topicSubs.get(subscriberId);
if (subState == null) {
cb.operationFinished(ctx, null);
return;
}
if (subState.setLastConsumeSeqId(consumeSeqId,
cfg.getConsumeInterval())) {
- updateSubscriptionState(topic, subscriberId, subState, cb,
ctx);
+ updateSubscriptionState(topic, subscriberId, subState, new
Callback<Void>() {
+ @Override
+ public void operationFinished(Object ctx, Void
resultOfOperation) {
+
subState.setLastPersistedSeqId(consumeSeqId.getLocalComponent());
+ cb.operationFinished(ctx, resultOfOperation);
+ }
+
+ @Override
+ public void operationFailed(Object ctx, PubSubException
exception) {
+ cb.operationFailed(ctx, exception);
+ }
+ }, ctx);
} else {
if (logger.isDebugEnabled()) {
logger.debug("Only advanced consume pointer in memory,
will persist later, topic: "
Modified:
zookeeper/bookkeeper/trunk/hedwig-server/src/main/java/org/apache/hedwig/server/subscriptions/InMemorySubscriptionState.java
URL:
http://svn.apache.org/viewvc/zookeeper/bookkeeper/trunk/hedwig-server/src/main/java/org/apache/hedwig/server/subscriptions/InMemorySubscriptionState.java?rev=1416560&r1=1416559&r2=1416560&view=diff
==============================================================================
---
zookeeper/bookkeeper/trunk/hedwig-server/src/main/java/org/apache/hedwig/server/subscriptions/InMemorySubscriptionState.java
(original)
+++
zookeeper/bookkeeper/trunk/hedwig-server/src/main/java/org/apache/hedwig/server/subscriptions/InMemorySubscriptionState.java
Mon Dec 3 15:27:32 2012
@@ -34,7 +34,8 @@ public class InMemorySubscriptionState {
SubscriptionState subscriptionState;
SubscriptionPreferences subscriptionPreferences;
MessageSeqId lastConsumeSeqId;
- Version version;
+ Version version;
+ long lastPersistedSeqId;
public InMemorySubscriptionState(SubscriptionData subscriptionData,
Version version, MessageSeqId lastConsumeSeqId) {
this.subscriptionState = subscriptionData.getState();
@@ -50,6 +51,7 @@ public class InMemorySubscriptionState {
}
this.lastConsumeSeqId = lastConsumeSeqId;
this.version = version;
+ this.lastPersistedSeqId =
subscriptionState.getMsgId().getLocalComponent();
}
public InMemorySubscriptionState(SubscriptionData subscriptionData,
Version version) {
@@ -125,6 +127,14 @@ public class InMemorySubscriptionState {
return true;
}
+ public long getLastPersistedSeqId() {
+ return lastPersistedSeqId;
+ }
+
+ public void setLastPersistedSeqId(long lastPersistedSeqId) {
+ this.lastPersistedSeqId = lastPersistedSeqId;
+ }
+
/**
* Update preferences.
*
Modified:
zookeeper/bookkeeper/trunk/hedwig-server/src/test/java/org/apache/hedwig/server/persistence/TestBookKeeperPersistenceManager.java
URL:
http://svn.apache.org/viewvc/zookeeper/bookkeeper/trunk/hedwig-server/src/test/java/org/apache/hedwig/server/persistence/TestBookKeeperPersistenceManager.java?rev=1416560&r1=1416559&r2=1416560&view=diff
==============================================================================
---
zookeeper/bookkeeper/trunk/hedwig-server/src/test/java/org/apache/hedwig/server/persistence/TestBookKeeperPersistenceManager.java
(original)
+++
zookeeper/bookkeeper/trunk/hedwig-server/src/test/java/org/apache/hedwig/server/persistence/TestBookKeeperPersistenceManager.java
Mon Dec 3 15:27:32 2012
@@ -24,6 +24,7 @@ import java.util.Collection;
import java.util.LinkedList;
import java.util.List;
import java.util.Iterator;
+import java.util.Map;
import java.util.concurrent.Executors;
import java.util.concurrent.LinkedBlockingQueue;
import java.util.concurrent.ScheduledExecutorService;
@@ -36,19 +37,27 @@ import org.apache.bookkeeper.versioning.
import org.apache.bookkeeper.versioning.Versioned;
import org.apache.hedwig.HelperMethods;
+import org.apache.hedwig.StubCallback;
import org.apache.hedwig.exceptions.PubSubException;
import org.apache.hedwig.protocol.PubSubProtocol;
import org.apache.hedwig.protocol.PubSubProtocol.LedgerRange;
import org.apache.hedwig.protocol.PubSubProtocol.LedgerRanges;
import org.apache.hedwig.protocol.PubSubProtocol.Message;
+import org.apache.hedwig.protocol.PubSubProtocol.MessageSeqId;
+import org.apache.hedwig.protocol.PubSubProtocol.SubscribeRequest;
+import
org.apache.hedwig.protocol.PubSubProtocol.SubscribeRequest.CreateOrAttach;
+import org.apache.hedwig.protocol.PubSubProtocol.SubscriptionData;
import org.apache.hedwig.server.common.ServerConfiguration;
import org.apache.hedwig.server.meta.MetadataManagerFactory;
import org.apache.hedwig.server.meta.SubscriptionDataManager;
import org.apache.hedwig.server.meta.TopicOwnershipManager;
import org.apache.hedwig.server.meta.TopicPersistenceManager;
+import org.apache.hedwig.server.subscriptions.MMSubscriptionManager;
import org.apache.hedwig.server.topics.TopicManager;
import org.apache.hedwig.server.topics.TrivialOwnAllTopicManager;
import org.apache.hedwig.util.Callback;
+import org.apache.hedwig.util.ConcurrencyUtils;
+import org.apache.hedwig.util.Either;
import org.apache.zookeeper.ZooKeeper;
import org.junit.After;
import org.junit.Before;
@@ -77,6 +86,7 @@ public class TestBookKeeperPersistenceMa
PubSubException failureException = null;
TestMetadataManagerFactory metadataManagerFactory;
TopicPersistenceManager tpManager;
+ MMSubscriptionManager sm;
boolean removeStartSeqId;
@@ -160,7 +170,66 @@ public class TestBookKeeperPersistenceMa
@Override
public SubscriptionDataManager newSubscriptionDataManager() {
- return factory.newSubscriptionDataManager();
+ final SubscriptionDataManager sdm =
factory.newSubscriptionDataManager();
+ return new SubscriptionDataManager() {
+ @Override
+ public void close() throws IOException {
+ sdm.close();
+ }
+
+ @Override
+ public void createSubscriptionData(ByteString topic,
ByteString subscriberId, SubscriptionData data,
+ Callback<Version> callback, Object ctx) {
+ sdm.createSubscriptionData(topic, subscriberId, data,
callback, ctx);
+ }
+
+ @Override
+ public boolean isPartialUpdateSupported() {
+ return sdm.isPartialUpdateSupported();
+ }
+
+ @Override
+ public void updateSubscriptionData(ByteString topic,
ByteString subscriberId,
+ SubscriptionData dataToUpdate, Version version,
Callback<Version> callback, Object ctx) {
+ if (serviceDownCount > 0) {
+ --serviceDownCount;
+ callback.operationFailed(ctx,
+ new
PubSubException.ServiceDownException("Metadata Store is down"));
+ return;
+ }
+ sdm.updateSubscriptionData(topic, subscriberId,
dataToUpdate, version, callback, ctx);
+ }
+
+ @Override
+ public void replaceSubscriptionData(ByteString topic,
ByteString subscriberId,
+ SubscriptionData dataToReplace, Version version,
Callback<Version> callback, Object ctx) {
+ if (serviceDownCount > 0) {
+ --serviceDownCount;
+ callback.operationFailed(ctx,
+ new
PubSubException.ServiceDownException("Metadata Store is down"));
+ return;
+ }
+ sdm.replaceSubscriptionData(topic, subscriberId,
dataToReplace, version, callback, ctx);
+ }
+
+ @Override
+ public void deleteSubscriptionData(ByteString topic,
ByteString subscriberId, Version version,
+ Callback<Void> callback, Object ctx) {
+ sdm.deleteSubscriptionData(topic, subscriberId, version,
callback, ctx);
+ }
+
+ @Override
+ public void readSubscriptionData(ByteString topic, ByteString
subscriberId,
+ Callback<Versioned<SubscriptionData>> callback, Object
ctx) {
+ sdm.readSubscriptionData(topic, subscriberId, callback,
ctx);
+ }
+
+ @Override
+ public void readSubscriptions(ByteString topic,
+ Callback<Map<ByteString, Versioned<SubscriptionData>>>
cb, Object ctx) {
+ sdm.readSubscriptions(topic, cb, ctx);
+ }
+ };
}
@Override
@@ -194,7 +263,16 @@ public class TestBookKeeperPersistenceMa
bktb = new BookKeeperTestBase(numBookies, readDelay);
bktb.setUp();
- conf = new ServerConfiguration();
+ conf = new ServerConfiguration() {
+ @Override
+ public int getMessagesConsumedThreadRunInterval() {
+ return 2000;
+ }
+ @Override
+ public int getConsumeInterval() {
+ return 0;
+ }
+ };
org.apache.bookkeeper.conf.ClientConfiguration bkClientConf =
new org.apache.bookkeeper.conf.ClientConfiguration();
bkClientConf.setNumWorkerThreads(1).setReadTimeout(9999)
@@ -208,6 +286,7 @@ public class TestBookKeeperPersistenceMa
tm = new TrivialOwnAllTopicManager(conf, scheduler);
manager = new BookkeeperPersistenceManager(bktb.bk,
metadataManagerFactory,
tm, conf, scheduler);
+ sm = new MMSubscriptionManager(conf, metadataManagerFactory, tm,
manager, null, scheduler);
}
@Override
@@ -215,6 +294,7 @@ public class TestBookKeeperPersistenceMa
public void tearDown() throws Exception {
tm.stop();
manager.stop();
+ sm.stop();
tpManager.close();
metadataManagerFactory.shutdown();
scheduler.shutdown();
@@ -464,6 +544,97 @@ public class TestBookKeeperPersistenceMa
}
}
+ @Test
+ public void testInconsistentSubscriptionStateAndLedgerRanges1() throws
Exception {
+ // See the comment of inconsistentSubscriptionStateAndLedgerRanges.
+ // For this case, Step (2) failed to update subscription state
metadata,
+ // but LedgerRanges is updated success.
+ // Result: scan messages from 1 to 4 take place on ledger L2.
+ inconsistentSubscriptionStateAndLedgerRanges(1);
+ }
+
+ @Test
+ public void testInconsistentSubscriptionStateAndLedgerRanges2() throws
Exception {
+ // See the comment of inconsistentSubscriptionStateAndLedgerRanges.
+ // For this case, step (2) failed to update subscription state
metadata,
+ // step (3) successfully delete L1 but failed to update LedgerRanges.
+ // Result: scan messages from 1 to 4 falls in L1 and L2,
+ // but BookKeeper may complain L1 not found.
+ inconsistentSubscriptionStateAndLedgerRanges(2);
+ }
+
+ /**
+ * Since InMemorySubscriptionState and LedgerRanges is maintained
+ * separately, there may exist such inconsistent state:
+ * (1). Topic ledgers: L1 [1 ~ 2], L2 [3 ~ ]
+ * (2). Subscriber consumes to 2 and InMemorySubscriptionState is updated
+ * successfully but failed when updating subscription state metadata
+ * (3). AbstractSubscriptionManager#MessagesConsumedTask use
+ * InMemorySubscriptionState to do garbage collection
+ * and L1 is delete
+ * (4). If Hub restarts at this time, old subscription state is read and
+ * Hub will try to deliver message from 1
+ */
+ public void inconsistentSubscriptionStateAndLedgerRanges(int failedCount)
throws Exception {
+ final ByteString topic =
ByteString.copyFromUtf8("inconsistentSubscriptionStateAndLedgerRanges");
+ final ByteString subscriberId = ByteString.copyFromUtf8("subId");
+ LinkedList<Message> msgs = new LinkedList<Message>();
+
+ // make ledger L1 [1 ~ 2]
+ acquireTopic(topic);
+ msgs.addAll(publishMessages(topic, 2));
+ releaseTopic(topic);
+
+ // acquire topic again to force a new ledger L2 [3 ~ ]
+ acquireTopic(topic);
+ msgs.addAll(publishMessages(topic, 2));
+
+ StubCallback<Void> voidCb = new StubCallback<Void>();
+ StubCallback<SubscriptionData> subDataCb = new
StubCallback<SubscriptionData>();
+ Either<Void, PubSubException> voidResult;
+ Either<SubscriptionData, PubSubException> subDataResult;
+
+ // prepare for subscription
+ sm.acquiredTopic(topic, voidCb, null);
+ voidResult = ConcurrencyUtils.take(voidCb.queue);
+ assertNull(voidResult.right()); // no exception
+
+ // Do subscription
+ SubscribeRequest subRequest =
SubscribeRequest.newBuilder().setSubscriberId(subscriberId)
+ .setCreateOrAttach(CreateOrAttach.CREATE_OR_ATTACH).build();
+ sm.serveSubscribeRequest(topic, subRequest,
MessageSeqId.newBuilder().setLocalComponent(0).build(), subDataCb,
+ null);
+ subDataResult = ConcurrencyUtils.take(subDataCb.queue);
+ assertNotNull(subDataResult.left()); // serveSubscribeRequest success
+ // and return a SubscriptionData
+ // object
+ assertNull(subDataResult.right()); // no exception
+
+ // simulate inconsistent situation between InMemorySubscriptionState
and
+ // LedgerRanges
+ metadataManagerFactory.setServiceDownCount(failedCount);
+ sm.setConsumeSeqIdForSubscriber(topic, subscriberId,
MessageSeqId.newBuilder().setLocalComponent(2).build(),
+ voidCb, null);
+ voidResult = ConcurrencyUtils.take(voidCb.queue);
+ assertNotNull(voidResult.right()); // update subscription state failed
+ // and expect a exception
+
+ // wait AbstractSubscriptionManager#MessagesConsumedTask to garbage
+ // collect ledger L1
+ Thread.sleep(conf.getMessagesConsumedThreadRunInterval() * 2);
+
+ // simulate hub restart: read old subscription state metadata and
deliver
+ // messages from 1
+ LinkedBlockingQueue<Boolean> statusQueue = new
LinkedBlockingQueue<Boolean>();
+ RangeScanRequest scan = new RangeScanRequest(topic, 1, 4,
Long.MAX_VALUE, new RangeScanVerifier(msgs, null),
+ statusQueue);
+ manager.scanMessages(scan);
+ Boolean b = statusQueue.poll(10 * readDelay, TimeUnit.MILLISECONDS);
+ if (b == null) {
+ fail("Scan request doesn't finish");
+ }
+ }
+
class TestCallback implements Callback<PubSubProtocol.MessageSeqId> {
@Override