Updated Branches: refs/heads/trunk e546e3387 -> 71c94feb0
Fix paxos not always replaying when it should patch by slebresne; reviewed by jbellis for CASSANDRA-6012 Project: http://git-wip-us.apache.org/repos/asf/cassandra/repo Commit: http://git-wip-us.apache.org/repos/asf/cassandra/commit/50c9d77e Tree: http://git-wip-us.apache.org/repos/asf/cassandra/tree/50c9d77e Diff: http://git-wip-us.apache.org/repos/asf/cassandra/diff/50c9d77e Branch: refs/heads/trunk Commit: 50c9d77e1b2be206a765e7f86cd4b8ecf3d232df Parents: fa70e06 Author: Sylvain Lebresne <sylv...@datastax.com> Authored: Fri Sep 13 12:06:46 2013 +0200 Committer: Sylvain Lebresne <sylv...@datastax.com> Committed: Fri Sep 13 12:06:46 2013 +0200 ---------------------------------------------------------------------- CHANGES.txt | 1 + .../apache/cassandra/service/StorageProxy.java | 4 ++-- .../service/paxos/PrepareCallback.java | 20 ++++++++++++++++---- 3 files changed, 19 insertions(+), 6 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/cassandra/blob/50c9d77e/CHANGES.txt ---------------------------------------------------------------------- diff --git a/CHANGES.txt b/CHANGES.txt index 814029d..ebdfddb 100644 --- a/CHANGES.txt +++ b/CHANGES.txt @@ -15,6 +15,7 @@ * Require superuser status for adding triggers (CASSANDRA-5963) * Make standalone scrubber handle old and new style leveled manifest (CASSANDRA-6005) + * Fix paxos not always replaying when it should (CASSANDRA-6012) Merged from 1.2: 1.2.10 * Fix possible divide-by-zero in HHOM (CASSANDRA-5990) http://git-wip-us.apache.org/repos/asf/cassandra/blob/50c9d77e/src/java/org/apache/cassandra/service/StorageProxy.java ---------------------------------------------------------------------- diff --git a/src/java/org/apache/cassandra/service/StorageProxy.java b/src/java/org/apache/cassandra/service/StorageProxy.java index 1184bb5..83cb265 100644 --- a/src/java/org/apache/cassandra/service/StorageProxy.java +++ b/src/java/org/apache/cassandra/service/StorageProxy.java @@ -357,7 +357,7 @@ public class StorageProxy implements StorageProxyMBean { long ballotMillis = summary == null ? System.currentTimeMillis() - : Math.max(System.currentTimeMillis(), 1 + UUIDGen.unixTimestamp(summary.inProgressCommit.ballot)); + : Math.max(System.currentTimeMillis(), 1 + UUIDGen.unixTimestamp(summary.mostRecentInProgressCommit.ballot)); UUID ballot = UUIDGen.getTimeUUID(ballotMillis); // prepare @@ -372,7 +372,7 @@ public class StorageProxy implements StorageProxyMBean continue; } - Commit inProgress = summary.inProgressCommit; + Commit inProgress = summary.mostRecentInProgressCommitWithUpdate; Commit mostRecent = summary.mostRecentCommit; // If we have an in-progress ballot greater than the MRC we know, then it's an in-progress round that http://git-wip-us.apache.org/repos/asf/cassandra/blob/50c9d77e/src/java/org/apache/cassandra/service/paxos/PrepareCallback.java ---------------------------------------------------------------------- diff --git a/src/java/org/apache/cassandra/service/paxos/PrepareCallback.java b/src/java/org/apache/cassandra/service/paxos/PrepareCallback.java index 23d7952..9293254 100644 --- a/src/java/org/apache/cassandra/service/paxos/PrepareCallback.java +++ b/src/java/org/apache/cassandra/service/paxos/PrepareCallback.java @@ -40,7 +40,8 @@ public class PrepareCallback extends AbstractPaxosCallback<PrepareResponse> public boolean promised = true; public Commit mostRecentCommit; - public Commit inProgressCommit; + public Commit mostRecentInProgressCommit; + public Commit mostRecentInProgressCommitWithUpdate; private Map<InetAddress, Commit> commitsByReplica = new HashMap<InetAddress, Commit>(); @@ -49,7 +50,8 @@ public class PrepareCallback extends AbstractPaxosCallback<PrepareResponse> super(targets); // need to inject the right key in the empty commit so comparing with empty commits in the reply works as expected mostRecentCommit = Commit.emptyCommit(key, metadata); - inProgressCommit = Commit.emptyCommit(key, metadata); + mostRecentInProgressCommit = Commit.emptyCommit(key, metadata); + mostRecentInProgressCommitWithUpdate = Commit.emptyCommit(key, metadata); } public synchronized void response(MessageIn<PrepareResponse> message) @@ -57,6 +59,12 @@ public class PrepareCallback extends AbstractPaxosCallback<PrepareResponse> PrepareResponse response = message.payload; logger.debug("Prepare response {} from {}", response, message.from); + // In case of clock skew, another node could be proposing with ballot that are quite a bit + // older than our own. In that case, we record the more recent commit we've received to make + // sure we re-prepare on an older ballot. + if (response.inProgressCommit.isAfter(mostRecentInProgressCommit)) + mostRecentInProgressCommit = response.inProgressCommit; + if (!response.promised) { promised = false; @@ -68,8 +76,12 @@ public class PrepareCallback extends AbstractPaxosCallback<PrepareResponse> if (response.mostRecentCommit.isAfter(mostRecentCommit)) mostRecentCommit = response.mostRecentCommit; - if (response.inProgressCommit.isAfter(inProgressCommit)) - inProgressCommit = response.inProgressCommit; + // If some response has an update, then we should replay the update with the highest ballot. So find + // the the highest commit that actually have an update + if (response.inProgressCommit.isAfter(mostRecentInProgressCommitWithUpdate) && !response.inProgressCommit.update.isEmpty()) + mostRecentInProgressCommitWithUpdate = response.inProgressCommit; + + latch.countDown(); }