Author: jbellis
Date: Tue Feb  8 19:16:21 2011
New Revision: 1068532

URL: http://svn.apache.org/viewvc?rev=1068532&view=rev
Log:
enable #1530 only after cluster is all on 0.7.1
> patch by jbellis; reviewed by brandonwilliams for CASSANDRA-2138

Modified:
    cassandra/branches/cassandra-0.7/CHANGES.txt
    cassandra/branches/cassandra-0.7/NEWS.txt
    
cassandra/branches/cassandra-0.7/src/java/org/apache/cassandra/gms/ApplicationState.java
    
cassandra/branches/cassandra-0.7/src/java/org/apache/cassandra/gms/Gossiper.java
    
cassandra/branches/cassandra-0.7/src/java/org/apache/cassandra/gms/VersionedValue.java
    
cassandra/branches/cassandra-0.7/src/java/org/apache/cassandra/service/StorageProxy.java
    
cassandra/branches/cassandra-0.7/src/java/org/apache/cassandra/service/StorageService.java
    
cassandra/branches/cassandra-0.7/src/java/org/apache/cassandra/utils/FBUtilities.java

Modified: cassandra/branches/cassandra-0.7/CHANGES.txt
URL: 
http://svn.apache.org/viewvc/cassandra/branches/cassandra-0.7/CHANGES.txt?rev=1068532&r1=1068531&r2=1068532&view=diff
==============================================================================
--- cassandra/branches/cassandra-0.7/CHANGES.txt (original)
+++ cassandra/branches/cassandra-0.7/CHANGES.txt Tue Feb  8 19:16:21 2011
@@ -7,7 +7,7 @@
  * buffer network stack to avoid inefficient small TCP messages while avoiding
    the nagle/delayed ack problem (CASSANDRA-1896)
  * check log4j configuration for changes every 10s (CASSANDRA-1525, 1907)
- * more-efficient cross-DC replication (CASSANDRA-1530, -2051)
+ * more-efficient cross-DC replication (CASSANDRA-1530, -2051, -2138)
  * avoid polluting page cache with commitlog or sstable writes
    and seq scan operations (CASSANDRA-1470)
  * add RMI authentication options to nodetool (CASSANDRA-1921)

Modified: cassandra/branches/cassandra-0.7/NEWS.txt
URL: 
http://svn.apache.org/viewvc/cassandra/branches/cassandra-0.7/NEWS.txt?rev=1068532&r1=1068531&r2=1068532&view=diff
==============================================================================
--- cassandra/branches/cassandra-0.7/NEWS.txt (original)
+++ cassandra/branches/cassandra-0.7/NEWS.txt Tue Feb  8 19:16:21 2011
@@ -1,3 +1,19 @@
+0.7.1
+=====
+
+Uprading
+--------
+    - 0.7.1 is completely backwards compatible with 0.7.0.  Just restart
+      each node with the new version, one at a time.  (The cluster does
+      not all need to be upgraded simultaneously.)
+
+Features
+--------
+    - Cassandra can perform writes efficiently across datacenters by
+      sending a single copy of the mutation and having the recipient
+      forward that to other replicas in its datacenter.
+
+
 0.7.0
 =====
 

Modified: 
cassandra/branches/cassandra-0.7/src/java/org/apache/cassandra/gms/ApplicationState.java
URL: 
http://svn.apache.org/viewvc/cassandra/branches/cassandra-0.7/src/java/org/apache/cassandra/gms/ApplicationState.java?rev=1068532&r1=1068531&r2=1068532&view=diff
==============================================================================
--- 
cassandra/branches/cassandra-0.7/src/java/org/apache/cassandra/gms/ApplicationState.java
 (original)
+++ 
cassandra/branches/cassandra-0.7/src/java/org/apache/cassandra/gms/ApplicationState.java
 Tue Feb  8 19:16:21 2011
@@ -28,6 +28,7 @@ public enum ApplicationState
     SCHEMA,
     DC,
     RACK,
+    RELEASE_VERSION,
     // pad to allow adding new states to existing cluster
     X1,
     X2,

Modified: 
cassandra/branches/cassandra-0.7/src/java/org/apache/cassandra/gms/Gossiper.java
URL: 
http://svn.apache.org/viewvc/cassandra/branches/cassandra-0.7/src/java/org/apache/cassandra/gms/Gossiper.java?rev=1068532&r1=1068531&r2=1068532&view=diff
==============================================================================
--- 
cassandra/branches/cassandra-0.7/src/java/org/apache/cassandra/gms/Gossiper.java
 (original)
+++ 
cassandra/branches/cassandra-0.7/src/java/org/apache/cassandra/gms/Gossiper.java
 Tue Feb  8 19:16:21 2011
@@ -467,6 +467,11 @@ public class Gossiper implements IFailur
         return endpointStateMap_.get(ep);
     }
 
+    public Set<Entry<InetAddress, EndpointState>> getEndpointStates()
+    {
+        return endpointStateMap_.entrySet();
+    }
+
     EndpointState getStateForVersionBiggerThan(InetAddress forEndpoint, int 
version)
     {
         EndpointState epState = endpointStateMap_.get(forEndpoint);

Modified: 
cassandra/branches/cassandra-0.7/src/java/org/apache/cassandra/gms/VersionedValue.java
URL: 
http://svn.apache.org/viewvc/cassandra/branches/cassandra-0.7/src/java/org/apache/cassandra/gms/VersionedValue.java?rev=1068532&r1=1068531&r2=1068532&view=diff
==============================================================================
--- 
cassandra/branches/cassandra-0.7/src/java/org/apache/cassandra/gms/VersionedValue.java
 (original)
+++ 
cassandra/branches/cassandra-0.7/src/java/org/apache/cassandra/gms/VersionedValue.java
 Tue Feb  8 19:16:21 2011
@@ -26,6 +26,7 @@ import java.util.UUID;
 import org.apache.cassandra.dht.IPartitioner;
 import org.apache.cassandra.dht.Token;
 import org.apache.cassandra.io.ICompactSerializer;
+import org.apache.cassandra.utils.FBUtilities;
 
 
 /**
@@ -142,6 +143,10 @@ public class VersionedValue implements C
             return new VersionedValue(rackId);
         }
 
+        public VersionedValue releaseVersion()
+        {
+            return new VersionedValue(FBUtilities.getReleaseVersionString());
+        }
     }
 
     private static class VersionedValueSerializer implements 
ICompactSerializer<VersionedValue>

Modified: 
cassandra/branches/cassandra-0.7/src/java/org/apache/cassandra/service/StorageProxy.java
URL: 
http://svn.apache.org/viewvc/cassandra/branches/cassandra-0.7/src/java/org/apache/cassandra/service/StorageProxy.java?rev=1068532&r1=1068531&r2=1068532&view=diff
==============================================================================
--- 
cassandra/branches/cassandra-0.7/src/java/org/apache/cassandra/service/StorageProxy.java
 (original)
+++ 
cassandra/branches/cassandra-0.7/src/java/org/apache/cassandra/service/StorageProxy.java
 Tue Feb  8 19:16:21 2011
@@ -225,9 +225,9 @@ public class StorageProxy implements Sto
                 // from previous loop iterations
                 message.removeHeader(RowMutation.FORWARD_HEADER);
 
-                if (dataCenter.equals(localDataCenter))
+                if (dataCenter.equals(localDataCenter) || 
StorageService.instance.useEfficientCrossDCWrites())
                 {
-                    // direct writes to local DC
+                    // direct writes to local DC or old Cassadra versions
                     for (InetAddress destination : messages.getValue())
                         MessagingService.instance().sendRR(message, 
destination, handler);
                 }

Modified: 
cassandra/branches/cassandra-0.7/src/java/org/apache/cassandra/service/StorageService.java
URL: 
http://svn.apache.org/viewvc/cassandra/branches/cassandra-0.7/src/java/org/apache/cassandra/service/StorageService.java?rev=1068532&r1=1068531&r2=1068532&view=diff
==============================================================================
--- 
cassandra/branches/cassandra-0.7/src/java/org/apache/cassandra/service/StorageService.java
 (original)
+++ 
cassandra/branches/cassandra-0.7/src/java/org/apache/cassandra/service/StorageService.java
 Tue Feb  8 19:16:21 2011
@@ -183,6 +183,8 @@ public class StorageService implements I
     private boolean initialized;
     private volatile boolean joined = false;
     private String operationMode;
+
+    private volatile boolean efficientCrossDCWrites;
     private MigrationManager migrationManager = new MigrationManager();
 
     /* Used for tracking drain progress */
@@ -421,6 +423,7 @@ public class StorageService implements I
         MessagingService.instance().listen(FBUtilities.getLocalAddress());
         StorageLoadBalancer.instance.startBroadcasting();
         MigrationManager.announce(DatabaseDescriptor.getDefsVersion(), 
DatabaseDescriptor.getSeeds());
+        
Gossiper.instance.addLocalApplicationState(ApplicationState.RELEASE_VERSION, 
valueFactory.releaseVersion());
 
         HintedHandOffManager.instance.registerMBean();
 
@@ -628,23 +631,46 @@ public class StorageService implements I
      */
     public void onChange(InetAddress endpoint, ApplicationState state, 
VersionedValue value)
     {
-        if (state != ApplicationState.STATUS)
-            return;
+        switch (state)
+        {
+            case RELEASE_VERSION:
+                updateEfficientCrossDCWriteMode();
+                break;
+            case STATUS:
+                String apStateValue = value.value;
+                String[] pieces = 
apStateValue.split(VersionedValue.DELIMITER_STR, -1);
+                assert (pieces.length > 0);
+
+                String moveName = pieces[0];
+
+                if (moveName.equals(VersionedValue.STATUS_BOOTSTRAPPING))
+                    handleStateBootstrap(endpoint, pieces);
+                else if (moveName.equals(VersionedValue.STATUS_NORMAL))
+                    handleStateNormal(endpoint, pieces);
+                else if (moveName.equals(VersionedValue.STATUS_LEAVING))
+                    handleStateLeaving(endpoint, pieces);
+                else if (moveName.equals(VersionedValue.STATUS_LEFT))
+                    handleStateLeft(endpoint, pieces);
+        }
+    }
+
+    /**
+     * We can remove this in 0.8, since mixing 0.7.0 with 0.8 is not supported 
(0.7.1 is required)
+     */
+    private void updateEfficientCrossDCWriteMode()
+    {
+        for (Map.Entry<InetAddress, EndpointState> entry : 
Gossiper.instance.getEndpointStates())
+        {
+            VersionedValue version = 
entry.getValue().getApplicationState(ApplicationState.RELEASE_VERSION);
 
-        String apStateValue = value.value;
-        String[] pieces = apStateValue.split(VersionedValue.DELIMITER_STR, -1);
-        assert (pieces.length > 0);
-
-        String moveName = pieces[0];
-
-        if (moveName.equals(VersionedValue.STATUS_BOOTSTRAPPING))
-            handleStateBootstrap(endpoint, pieces);
-        else if (moveName.equals(VersionedValue.STATUS_NORMAL))
-            handleStateNormal(endpoint, pieces);
-        else if (moveName.equals(VersionedValue.STATUS_LEAVING))
-            handleStateLeaving(endpoint, pieces);
-        else if (moveName.equals(VersionedValue.STATUS_LEFT))
-            handleStateLeft(endpoint, pieces);
+            // no version means it's old code that doesn't gossip version, < 
0.7.1.
+            if (version == null)
+            {
+                efficientCrossDCWrites = false;
+                return;
+            }
+        }
+        efficientCrossDCWrites = true;
     }
 
     /**
@@ -2145,4 +2171,9 @@ public class StorageService implements I
         if (oldSnitch instanceof DynamicEndpointSnitch)
             ((DynamicEndpointSnitch)oldSnitch).unregisterMBean();
     }
+
+    public boolean useEfficientCrossDCWrites()
+    {
+        return efficientCrossDCWrites;
+    }
 }

Modified: 
cassandra/branches/cassandra-0.7/src/java/org/apache/cassandra/utils/FBUtilities.java
URL: 
http://svn.apache.org/viewvc/cassandra/branches/cassandra-0.7/src/java/org/apache/cassandra/utils/FBUtilities.java?rev=1068532&r1=1068531&r2=1068532&view=diff
==============================================================================
--- 
cassandra/branches/cassandra-0.7/src/java/org/apache/cassandra/utils/FBUtilities.java
 (original)
+++ 
cassandra/branches/cassandra-0.7/src/java/org/apache/cassandra/utils/FBUtilities.java
 Tue Feb  8 19:16:21 2011
@@ -37,6 +37,8 @@ import java.util.concurrent.atomic.Atomi
 
 import com.google.common.base.Charsets;
 import com.google.common.base.Joiner;
+import com.google.common.base.Strings;
+import com.google.common.primitives.Ints;
 import org.apache.commons.collections.iterators.CollatingIterator;
 import org.apache.commons.lang.ArrayUtils;
 import org.slf4j.Logger;


Reply via email to