Updated Branches: refs/heads/cassandra-1.2 b2dfaed31 -> de72e7fc0
Use real node messaging versions for schema exchange decisions patch by Aleksey Yeschenko; reviewed by Piotr KoÅaczkowski for CASSANDRA-6700 Project: http://git-wip-us.apache.org/repos/asf/cassandra/repo Commit: http://git-wip-us.apache.org/repos/asf/cassandra/commit/de72e7fc Tree: http://git-wip-us.apache.org/repos/asf/cassandra/tree/de72e7fc Diff: http://git-wip-us.apache.org/repos/asf/cassandra/diff/de72e7fc Branch: refs/heads/cassandra-1.2 Commit: de72e7fc0a750fdb2fcd752092e5a07a7f47046e Parents: b2dfaed Author: Aleksey Yeschenko <alek...@apache.org> Authored: Thu Feb 13 19:26:48 2014 +0300 Committer: Aleksey Yeschenko <alek...@apache.org> Committed: Thu Feb 13 19:26:48 2014 +0300 ---------------------------------------------------------------------- CHANGES.txt | 1 + .../cassandra/net/IncomingTcpConnection.java | 4 +-- .../apache/cassandra/net/MessagingService.java | 28 +++++++++++++------- .../cassandra/service/MigrationManager.java | 4 +-- 4 files changed, 23 insertions(+), 14 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/cassandra/blob/de72e7fc/CHANGES.txt ---------------------------------------------------------------------- diff --git a/CHANGES.txt b/CHANGES.txt index de7c307..872934a 100644 --- a/CHANGES.txt +++ b/CHANGES.txt @@ -6,6 +6,7 @@ * Compact hints after partial replay to clean out tombstones (CASSANDRA-6666) * Log USING TTL/TIMESTAMP in a counter update warning (CASSANDRA-6649) * Don't exchange schema between nodes with different versions (CASSANDRA-6695) + * Use real node messaging versions for schema exchange decisions (CASSANDRA-6700) 1.2.15 http://git-wip-us.apache.org/repos/asf/cassandra/blob/de72e7fc/src/java/org/apache/cassandra/net/IncomingTcpConnection.java ---------------------------------------------------------------------- diff --git a/src/java/org/apache/cassandra/net/IncomingTcpConnection.java b/src/java/org/apache/cassandra/net/IncomingTcpConnection.java index 3b24a7f..d0126c7 100644 --- a/src/java/org/apache/cassandra/net/IncomingTcpConnection.java +++ b/src/java/org/apache/cassandra/net/IncomingTcpConnection.java @@ -130,8 +130,8 @@ public class IncomingTcpConnection extends Thread logger.info("Received messages from newer protocol version {}. Ignoring", version); return; } - MessagingService.instance().setVersion(from, Math.min(MessagingService.current_version, maxVersion)); - logger.debug("set version for {} to {}", from, Math.min(MessagingService.current_version, maxVersion)); + MessagingService.instance().setVersion(from, maxVersion); + logger.debug("Set version for {} to {} (will use {})", from, maxVersion, Math.min(MessagingService.current_version, maxVersion)); // outbound side will reconnect if necessary to upgrade version while (true) http://git-wip-us.apache.org/repos/asf/cassandra/blob/de72e7fc/src/java/org/apache/cassandra/net/MessagingService.java ---------------------------------------------------------------------- diff --git a/src/java/org/apache/cassandra/net/MessagingService.java b/src/java/org/apache/cassandra/net/MessagingService.java index bfc3957..09fa272 100644 --- a/src/java/org/apache/cassandra/net/MessagingService.java +++ b/src/java/org/apache/cassandra/net/MessagingService.java @@ -800,10 +800,10 @@ public final class MessagingService implements MessagingServiceMBean /** * @return the last version associated with address, or @param version if this is the first such version */ - public int setVersion(InetAddress address, int version) + public int setVersion(InetAddress endpoint, int version) { - logger.debug("Setting version {} for {}", version, address); - Integer v = versions.put(address, version); + logger.debug("Setting version {} for {}", version, endpoint); + Integer v = versions.put(endpoint, version); return v == null ? version : v; } @@ -813,27 +813,35 @@ public final class MessagingService implements MessagingServiceMBean versions.remove(endpoint); } - public Integer getVersion(InetAddress address) + public int getVersion(InetAddress endpoint) { - Integer v = versions.get(address); + Integer v = versions.get(endpoint); if (v == null) { // we don't know the version. assume current. we'll know soon enough if that was incorrect. - logger.trace("Assuming current protocol version for {}", address); + logger.trace("Assuming current protocol version for {}", endpoint); return MessagingService.current_version; } else - return v; + return Math.min(v, MessagingService.current_version); } - public int getVersion(String address) throws UnknownHostException + public int getVersion(String endpoint) throws UnknownHostException { - return getVersion(InetAddress.getByName(address)); + return getVersion(InetAddress.getByName(endpoint)); + } + + public int getRawVersion(InetAddress endpoint) + { + Integer v = versions.get(endpoint); + if (v == null) + throw new IllegalStateException("getRawVersion() was called without checking knowsVersion() result first"); + return v; } public boolean knowsVersion(InetAddress endpoint) { - return versions.get(endpoint) != null; + return versions.containsKey(endpoint); } public void incrementDroppedMessages(Verb verb) http://git-wip-us.apache.org/repos/asf/cassandra/blob/de72e7fc/src/java/org/apache/cassandra/service/MigrationManager.java ---------------------------------------------------------------------- diff --git a/src/java/org/apache/cassandra/service/MigrationManager.java b/src/java/org/apache/cassandra/service/MigrationManager.java index 68d0bad..584415d 100644 --- a/src/java/org/apache/cassandra/service/MigrationManager.java +++ b/src/java/org/apache/cassandra/service/MigrationManager.java @@ -138,7 +138,7 @@ public class MigrationManager * Don't request schema from fat clients */ return MessagingService.instance().knowsVersion(endpoint) - && MessagingService.instance().getVersion(endpoint) == MessagingService.current_version + && MessagingService.instance().getRawVersion(endpoint) == MessagingService.current_version && !Gossiper.instance.isFatClient(endpoint); } @@ -292,7 +292,7 @@ public class MigrationManager // only push schema to nodes with known and equal versions if (!endpoint.equals(FBUtilities.getBroadcastAddress()) && MessagingService.instance().knowsVersion(endpoint) && - MessagingService.instance().getVersion(endpoint) == MessagingService.current_version) + MessagingService.instance().getRawVersion(endpoint) == MessagingService.current_version) pushSchemaMutation(endpoint, schema); }