Author: jbellis
Date: Tue Jan 25 17:09:43 2011
New Revision: 1063361

URL: http://svn.apache.org/viewvc?rev=1063361&view=rev
Log:
fix bugs in multi-DC replication
patch by ivancso; reviewed by jbellis for CASSANDRA-2051

Modified:
    cassandra/branches/cassandra-0.7/CHANGES.txt
    
cassandra/branches/cassandra-0.7/src/java/org/apache/cassandra/db/RowMutationVerbHandler.java
    
cassandra/branches/cassandra-0.7/src/java/org/apache/cassandra/net/Header.java
    
cassandra/branches/cassandra-0.7/src/java/org/apache/cassandra/net/Message.java
    
cassandra/branches/cassandra-0.7/src/java/org/apache/cassandra/service/DatacenterReadCallback.java
    
cassandra/branches/cassandra-0.7/src/java/org/apache/cassandra/service/StorageProxy.java

Modified: cassandra/branches/cassandra-0.7/CHANGES.txt
URL: 
http://svn.apache.org/viewvc/cassandra/branches/cassandra-0.7/CHANGES.txt?rev=1063361&r1=1063360&r2=1063361&view=diff
==============================================================================
--- cassandra/branches/cassandra-0.7/CHANGES.txt (original)
+++ cassandra/branches/cassandra-0.7/CHANGES.txt Tue Jan 25 17:09:43 2011
@@ -2,7 +2,7 @@
  * buffer network stack to avoid inefficient small TCP messages while avoiding
    the nagle/delayed ack problem (CASSANDRA-1896)
  * check log4j configuration for changes every 10s (CASSANDRA-1525, 1907)
- * more-efficient cross-DC replication (CASSANDRA-1530)
+ * more-efficient cross-DC replication (CASSANDRA-1530, -2051)
  * upgrade to TFastFramedTransport (CASSANDRA-1743)
  * avoid polluting page cache with commitlog or sstable writes
    and seq scan operations (CASSANDRA-1470)

Modified: 
cassandra/branches/cassandra-0.7/src/java/org/apache/cassandra/db/RowMutationVerbHandler.java
URL: 
http://svn.apache.org/viewvc/cassandra/branches/cassandra-0.7/src/java/org/apache/cassandra/db/RowMutationVerbHandler.java?rev=1063361&r1=1063360&r2=1063361&view=diff
==============================================================================
--- 
cassandra/branches/cassandra-0.7/src/java/org/apache/cassandra/db/RowMutationVerbHandler.java
 (original)
+++ 
cassandra/branches/cassandra-0.7/src/java/org/apache/cassandra/db/RowMutationVerbHandler.java
 Tue Jan 25 17:09:43 2011
@@ -90,7 +90,7 @@ public class RowMutationVerbHandler impl
     private void forwardToLocalNodes(Message message, byte[] forwardBytes) 
throws UnknownHostException
     {
         // remove fwds from message to avoid infinite loop
-        message.setHeader(RowMutation.FORWARD_HEADER, null);
+        message.removeHeader(RowMutation.FORWARD_HEADER);
 
         int bytesPerInetAddress = 
FBUtilities.getLocalAddress().getAddress().length;
         assert forwardBytes.length >= bytesPerInetAddress;
@@ -110,7 +110,7 @@ public class RowMutationVerbHandler impl
 
             // Send the original message to the address specified by the 
FORWARD_HINT
             // Let the response go back to the coordinator
-            MessagingService.instance().sendOneWay(message, message.getFrom());
+            MessagingService.instance().sendOneWay(message, address);
 
             offset += bytesPerInetAddress;
         }

Modified: 
cassandra/branches/cassandra-0.7/src/java/org/apache/cassandra/net/Header.java
URL: 
http://svn.apache.org/viewvc/cassandra/branches/cassandra-0.7/src/java/org/apache/cassandra/net/Header.java?rev=1063361&r1=1063360&r2=1063361&view=diff
==============================================================================
--- 
cassandra/branches/cassandra-0.7/src/java/org/apache/cassandra/net/Header.java 
(original)
+++ 
cassandra/branches/cassandra-0.7/src/java/org/apache/cassandra/net/Header.java 
Tue Jan 25 17:09:43 2011
@@ -97,6 +97,11 @@ public class Header
     {
         details_.put(key, value);
     }
+
+    void removeDetail(String key)
+    {
+        details_.remove(key);
+    }
 }
 
 class HeaderSerializer implements ICompactSerializer<Header>

Modified: 
cassandra/branches/cassandra-0.7/src/java/org/apache/cassandra/net/Message.java
URL: 
http://svn.apache.org/viewvc/cassandra/branches/cassandra-0.7/src/java/org/apache/cassandra/net/Message.java?rev=1063361&r1=1063360&r2=1063361&view=diff
==============================================================================
--- 
cassandra/branches/cassandra-0.7/src/java/org/apache/cassandra/net/Message.java 
(original)
+++ 
cassandra/branches/cassandra-0.7/src/java/org/apache/cassandra/net/Message.java 
Tue Jan 25 17:09:43 2011
@@ -68,6 +68,11 @@ public class Message
     {
         header_.setDetail(key, value);
     }
+    
+    public void removeHeader(String key)
+    {
+        header_.removeDetail(key);
+    }
 
     public byte[] getMessageBody()
     {

Modified: 
cassandra/branches/cassandra-0.7/src/java/org/apache/cassandra/service/DatacenterReadCallback.java
URL: 
http://svn.apache.org/viewvc/cassandra/branches/cassandra-0.7/src/java/org/apache/cassandra/service/DatacenterReadCallback.java?rev=1063361&r1=1063360&r2=1063361&view=diff
==============================================================================
--- 
cassandra/branches/cassandra-0.7/src/java/org/apache/cassandra/service/DatacenterReadCallback.java
 (original)
+++ 
cassandra/branches/cassandra-0.7/src/java/org/apache/cassandra/service/DatacenterReadCallback.java
 Tue Jan 25 17:09:43 2011
@@ -27,6 +27,7 @@ import java.util.concurrent.atomic.Atomi
 
 import org.apache.cassandra.config.DatabaseDescriptor;
 import org.apache.cassandra.db.Table;
+import org.apache.cassandra.db.ReadResponse;
 import org.apache.cassandra.locator.IEndpointSnitch;
 import org.apache.cassandra.locator.NetworkTopologyStrategy;
 import org.apache.cassandra.net.Message;
@@ -40,7 +41,7 @@ import org.apache.cassandra.utils.FBUtil
 public class DatacenterReadCallback<T> extends ReadCallback<T>
 {
     private static final IEndpointSnitch snitch = 
DatabaseDescriptor.getEndpointSnitch();
-       private static final String localdc = 
snitch.getDatacenter(FBUtilities.getLocalAddress());
+    private static final String localdc = 
snitch.getDatacenter(FBUtilities.getLocalAddress());
     private AtomicInteger localResponses;
     
     public DatacenterReadCallback(IResponseResolver<T> resolver, 
ConsistencyLevel consistencyLevel, String table)
@@ -54,8 +55,7 @@ public class DatacenterReadCallback<T> e
     {
         resolver.preprocess(message);
 
-        int n;
-        n = localdc.equals(snitch.getDatacenter(message.getFrom())) 
+        int n = localdc.equals(snitch.getDatacenter(message.getFrom()))
                 ? localResponses.decrementAndGet()
                 : localResponses.get();
 
@@ -66,6 +66,19 @@ public class DatacenterReadCallback<T> e
     }
     
     @Override
+    public void response(ReadResponse result)
+    {
+        ((ReadResponseResolver) resolver).injectPreProcessed(result);
+
+        int n = localResponses.decrementAndGet();
+
+        if (n == 0 && resolver.isDataPresent())
+        {
+            condition.signal();
+        }
+    }
+    
+    @Override
     public int determineBlockFor(ConsistencyLevel consistency_level, String 
table)
        {
         NetworkTopologyStrategy stategy = (NetworkTopologyStrategy) 
Table.open(table).getReplicationStrategy();

Modified: 
cassandra/branches/cassandra-0.7/src/java/org/apache/cassandra/service/StorageProxy.java
URL: 
http://svn.apache.org/viewvc/cassandra/branches/cassandra-0.7/src/java/org/apache/cassandra/service/StorageProxy.java?rev=1063361&r1=1063360&r2=1063361&view=diff
==============================================================================
--- 
cassandra/branches/cassandra-0.7/src/java/org/apache/cassandra/service/StorageProxy.java
 (original)
+++ 
cassandra/branches/cassandra-0.7/src/java/org/apache/cassandra/service/StorageProxy.java
 Tue Jan 25 17:09:43 2011
@@ -234,10 +234,9 @@ public class StorageProxy implements Sto
                 Message message = messages.getKey();
                 Iterator<InetAddress> iter = messages.getValue().iterator();
                 assert iter.hasNext();
-                
+
                 // First endpoint in list is the destination for this group
                 InetAddress target = iter.next();
-            
 
                 // Add all the other destinations that are bound for the same 
dataCenter as a header in the primary message.
                 while (iter.hasNext())
@@ -382,7 +381,7 @@ public class StorageProxy implements Sto
             {
                 Message message = command.makeReadMessage();
                 if (logger.isDebugEnabled())
-                    logger.debug("reading digest for " + command + " from " + 
message.getMessageId() + "@" + dataPoint);
+                    logger.debug("reading data for " + command + " from " + 
message.getMessageId() + "@" + dataPoint);
                 MessagingService.instance().sendRR(message, dataPoint, 
handler);
             }
 


Reply via email to