Author: jbellis Date: Tue Feb 8 19:16:21 2011 New Revision: 1068532 URL: http://svn.apache.org/viewvc?rev=1068532&view=rev Log: enable #1530 only after cluster is all on 0.7.1 > patch by jbellis; reviewed by brandonwilliams for CASSANDRA-2138
Modified: cassandra/branches/cassandra-0.7/CHANGES.txt cassandra/branches/cassandra-0.7/NEWS.txt cassandra/branches/cassandra-0.7/src/java/org/apache/cassandra/gms/ApplicationState.java cassandra/branches/cassandra-0.7/src/java/org/apache/cassandra/gms/Gossiper.java cassandra/branches/cassandra-0.7/src/java/org/apache/cassandra/gms/VersionedValue.java cassandra/branches/cassandra-0.7/src/java/org/apache/cassandra/service/StorageProxy.java cassandra/branches/cassandra-0.7/src/java/org/apache/cassandra/service/StorageService.java cassandra/branches/cassandra-0.7/src/java/org/apache/cassandra/utils/FBUtilities.java Modified: cassandra/branches/cassandra-0.7/CHANGES.txt URL: http://svn.apache.org/viewvc/cassandra/branches/cassandra-0.7/CHANGES.txt?rev=1068532&r1=1068531&r2=1068532&view=diff ============================================================================== --- cassandra/branches/cassandra-0.7/CHANGES.txt (original) +++ cassandra/branches/cassandra-0.7/CHANGES.txt Tue Feb 8 19:16:21 2011 @@ -7,7 +7,7 @@ * buffer network stack to avoid inefficient small TCP messages while avoiding the nagle/delayed ack problem (CASSANDRA-1896) * check log4j configuration for changes every 10s (CASSANDRA-1525, 1907) - * more-efficient cross-DC replication (CASSANDRA-1530, -2051) + * more-efficient cross-DC replication (CASSANDRA-1530, -2051, -2138) * avoid polluting page cache with commitlog or sstable writes and seq scan operations (CASSANDRA-1470) * add RMI authentication options to nodetool (CASSANDRA-1921) Modified: cassandra/branches/cassandra-0.7/NEWS.txt URL: http://svn.apache.org/viewvc/cassandra/branches/cassandra-0.7/NEWS.txt?rev=1068532&r1=1068531&r2=1068532&view=diff ============================================================================== --- cassandra/branches/cassandra-0.7/NEWS.txt (original) +++ cassandra/branches/cassandra-0.7/NEWS.txt Tue Feb 8 19:16:21 2011 @@ -1,3 +1,19 @@ +0.7.1 +===== + +Uprading +-------- + - 0.7.1 is completely backwards compatible with 0.7.0. Just restart + each node with the new version, one at a time. (The cluster does + not all need to be upgraded simultaneously.) + +Features +-------- + - Cassandra can perform writes efficiently across datacenters by + sending a single copy of the mutation and having the recipient + forward that to other replicas in its datacenter. + + 0.7.0 ===== Modified: cassandra/branches/cassandra-0.7/src/java/org/apache/cassandra/gms/ApplicationState.java URL: http://svn.apache.org/viewvc/cassandra/branches/cassandra-0.7/src/java/org/apache/cassandra/gms/ApplicationState.java?rev=1068532&r1=1068531&r2=1068532&view=diff ============================================================================== --- cassandra/branches/cassandra-0.7/src/java/org/apache/cassandra/gms/ApplicationState.java (original) +++ cassandra/branches/cassandra-0.7/src/java/org/apache/cassandra/gms/ApplicationState.java Tue Feb 8 19:16:21 2011 @@ -28,6 +28,7 @@ public enum ApplicationState SCHEMA, DC, RACK, + RELEASE_VERSION, // pad to allow adding new states to existing cluster X1, X2, Modified: cassandra/branches/cassandra-0.7/src/java/org/apache/cassandra/gms/Gossiper.java URL: http://svn.apache.org/viewvc/cassandra/branches/cassandra-0.7/src/java/org/apache/cassandra/gms/Gossiper.java?rev=1068532&r1=1068531&r2=1068532&view=diff ============================================================================== --- cassandra/branches/cassandra-0.7/src/java/org/apache/cassandra/gms/Gossiper.java (original) +++ cassandra/branches/cassandra-0.7/src/java/org/apache/cassandra/gms/Gossiper.java Tue Feb 8 19:16:21 2011 @@ -467,6 +467,11 @@ public class Gossiper implements IFailur return endpointStateMap_.get(ep); } + public Set<Entry<InetAddress, EndpointState>> getEndpointStates() + { + return endpointStateMap_.entrySet(); + } + EndpointState getStateForVersionBiggerThan(InetAddress forEndpoint, int version) { EndpointState epState = endpointStateMap_.get(forEndpoint); Modified: cassandra/branches/cassandra-0.7/src/java/org/apache/cassandra/gms/VersionedValue.java URL: http://svn.apache.org/viewvc/cassandra/branches/cassandra-0.7/src/java/org/apache/cassandra/gms/VersionedValue.java?rev=1068532&r1=1068531&r2=1068532&view=diff ============================================================================== --- cassandra/branches/cassandra-0.7/src/java/org/apache/cassandra/gms/VersionedValue.java (original) +++ cassandra/branches/cassandra-0.7/src/java/org/apache/cassandra/gms/VersionedValue.java Tue Feb 8 19:16:21 2011 @@ -26,6 +26,7 @@ import java.util.UUID; import org.apache.cassandra.dht.IPartitioner; import org.apache.cassandra.dht.Token; import org.apache.cassandra.io.ICompactSerializer; +import org.apache.cassandra.utils.FBUtilities; /** @@ -142,6 +143,10 @@ public class VersionedValue implements C return new VersionedValue(rackId); } + public VersionedValue releaseVersion() + { + return new VersionedValue(FBUtilities.getReleaseVersionString()); + } } private static class VersionedValueSerializer implements ICompactSerializer<VersionedValue> Modified: cassandra/branches/cassandra-0.7/src/java/org/apache/cassandra/service/StorageProxy.java URL: http://svn.apache.org/viewvc/cassandra/branches/cassandra-0.7/src/java/org/apache/cassandra/service/StorageProxy.java?rev=1068532&r1=1068531&r2=1068532&view=diff ============================================================================== --- cassandra/branches/cassandra-0.7/src/java/org/apache/cassandra/service/StorageProxy.java (original) +++ cassandra/branches/cassandra-0.7/src/java/org/apache/cassandra/service/StorageProxy.java Tue Feb 8 19:16:21 2011 @@ -225,9 +225,9 @@ public class StorageProxy implements Sto // from previous loop iterations message.removeHeader(RowMutation.FORWARD_HEADER); - if (dataCenter.equals(localDataCenter)) + if (dataCenter.equals(localDataCenter) || StorageService.instance.useEfficientCrossDCWrites()) { - // direct writes to local DC + // direct writes to local DC or old Cassadra versions for (InetAddress destination : messages.getValue()) MessagingService.instance().sendRR(message, destination, handler); } Modified: cassandra/branches/cassandra-0.7/src/java/org/apache/cassandra/service/StorageService.java URL: http://svn.apache.org/viewvc/cassandra/branches/cassandra-0.7/src/java/org/apache/cassandra/service/StorageService.java?rev=1068532&r1=1068531&r2=1068532&view=diff ============================================================================== --- cassandra/branches/cassandra-0.7/src/java/org/apache/cassandra/service/StorageService.java (original) +++ cassandra/branches/cassandra-0.7/src/java/org/apache/cassandra/service/StorageService.java Tue Feb 8 19:16:21 2011 @@ -183,6 +183,8 @@ public class StorageService implements I private boolean initialized; private volatile boolean joined = false; private String operationMode; + + private volatile boolean efficientCrossDCWrites; private MigrationManager migrationManager = new MigrationManager(); /* Used for tracking drain progress */ @@ -421,6 +423,7 @@ public class StorageService implements I MessagingService.instance().listen(FBUtilities.getLocalAddress()); StorageLoadBalancer.instance.startBroadcasting(); MigrationManager.announce(DatabaseDescriptor.getDefsVersion(), DatabaseDescriptor.getSeeds()); + Gossiper.instance.addLocalApplicationState(ApplicationState.RELEASE_VERSION, valueFactory.releaseVersion()); HintedHandOffManager.instance.registerMBean(); @@ -628,23 +631,46 @@ public class StorageService implements I */ public void onChange(InetAddress endpoint, ApplicationState state, VersionedValue value) { - if (state != ApplicationState.STATUS) - return; + switch (state) + { + case RELEASE_VERSION: + updateEfficientCrossDCWriteMode(); + break; + case STATUS: + String apStateValue = value.value; + String[] pieces = apStateValue.split(VersionedValue.DELIMITER_STR, -1); + assert (pieces.length > 0); + + String moveName = pieces[0]; + + if (moveName.equals(VersionedValue.STATUS_BOOTSTRAPPING)) + handleStateBootstrap(endpoint, pieces); + else if (moveName.equals(VersionedValue.STATUS_NORMAL)) + handleStateNormal(endpoint, pieces); + else if (moveName.equals(VersionedValue.STATUS_LEAVING)) + handleStateLeaving(endpoint, pieces); + else if (moveName.equals(VersionedValue.STATUS_LEFT)) + handleStateLeft(endpoint, pieces); + } + } + + /** + * We can remove this in 0.8, since mixing 0.7.0 with 0.8 is not supported (0.7.1 is required) + */ + private void updateEfficientCrossDCWriteMode() + { + for (Map.Entry<InetAddress, EndpointState> entry : Gossiper.instance.getEndpointStates()) + { + VersionedValue version = entry.getValue().getApplicationState(ApplicationState.RELEASE_VERSION); - String apStateValue = value.value; - String[] pieces = apStateValue.split(VersionedValue.DELIMITER_STR, -1); - assert (pieces.length > 0); - - String moveName = pieces[0]; - - if (moveName.equals(VersionedValue.STATUS_BOOTSTRAPPING)) - handleStateBootstrap(endpoint, pieces); - else if (moveName.equals(VersionedValue.STATUS_NORMAL)) - handleStateNormal(endpoint, pieces); - else if (moveName.equals(VersionedValue.STATUS_LEAVING)) - handleStateLeaving(endpoint, pieces); - else if (moveName.equals(VersionedValue.STATUS_LEFT)) - handleStateLeft(endpoint, pieces); + // no version means it's old code that doesn't gossip version, < 0.7.1. + if (version == null) + { + efficientCrossDCWrites = false; + return; + } + } + efficientCrossDCWrites = true; } /** @@ -2145,4 +2171,9 @@ public class StorageService implements I if (oldSnitch instanceof DynamicEndpointSnitch) ((DynamicEndpointSnitch)oldSnitch).unregisterMBean(); } + + public boolean useEfficientCrossDCWrites() + { + return efficientCrossDCWrites; + } } Modified: cassandra/branches/cassandra-0.7/src/java/org/apache/cassandra/utils/FBUtilities.java URL: http://svn.apache.org/viewvc/cassandra/branches/cassandra-0.7/src/java/org/apache/cassandra/utils/FBUtilities.java?rev=1068532&r1=1068531&r2=1068532&view=diff ============================================================================== --- cassandra/branches/cassandra-0.7/src/java/org/apache/cassandra/utils/FBUtilities.java (original) +++ cassandra/branches/cassandra-0.7/src/java/org/apache/cassandra/utils/FBUtilities.java Tue Feb 8 19:16:21 2011 @@ -37,6 +37,8 @@ import java.util.concurrent.atomic.Atomi import com.google.common.base.Charsets; import com.google.common.base.Joiner; +import com.google.common.base.Strings; +import com.google.common.primitives.Ints; import org.apache.commons.collections.iterators.CollatingIterator; import org.apache.commons.lang.ArrayUtils; import org.slf4j.Logger;