Author: jbellis Date: Tue Jan 25 17:09:43 2011 New Revision: 1063361 URL: http://svn.apache.org/viewvc?rev=1063361&view=rev Log: fix bugs in multi-DC replication patch by ivancso; reviewed by jbellis for CASSANDRA-2051
Modified: cassandra/branches/cassandra-0.7/CHANGES.txt cassandra/branches/cassandra-0.7/src/java/org/apache/cassandra/db/RowMutationVerbHandler.java cassandra/branches/cassandra-0.7/src/java/org/apache/cassandra/net/Header.java cassandra/branches/cassandra-0.7/src/java/org/apache/cassandra/net/Message.java cassandra/branches/cassandra-0.7/src/java/org/apache/cassandra/service/DatacenterReadCallback.java cassandra/branches/cassandra-0.7/src/java/org/apache/cassandra/service/StorageProxy.java Modified: cassandra/branches/cassandra-0.7/CHANGES.txt URL: http://svn.apache.org/viewvc/cassandra/branches/cassandra-0.7/CHANGES.txt?rev=1063361&r1=1063360&r2=1063361&view=diff ============================================================================== --- cassandra/branches/cassandra-0.7/CHANGES.txt (original) +++ cassandra/branches/cassandra-0.7/CHANGES.txt Tue Jan 25 17:09:43 2011 @@ -2,7 +2,7 @@ * buffer network stack to avoid inefficient small TCP messages while avoiding the nagle/delayed ack problem (CASSANDRA-1896) * check log4j configuration for changes every 10s (CASSANDRA-1525, 1907) - * more-efficient cross-DC replication (CASSANDRA-1530) + * more-efficient cross-DC replication (CASSANDRA-1530, -2051) * upgrade to TFastFramedTransport (CASSANDRA-1743) * avoid polluting page cache with commitlog or sstable writes and seq scan operations (CASSANDRA-1470) Modified: cassandra/branches/cassandra-0.7/src/java/org/apache/cassandra/db/RowMutationVerbHandler.java URL: http://svn.apache.org/viewvc/cassandra/branches/cassandra-0.7/src/java/org/apache/cassandra/db/RowMutationVerbHandler.java?rev=1063361&r1=1063360&r2=1063361&view=diff ============================================================================== --- cassandra/branches/cassandra-0.7/src/java/org/apache/cassandra/db/RowMutationVerbHandler.java (original) +++ cassandra/branches/cassandra-0.7/src/java/org/apache/cassandra/db/RowMutationVerbHandler.java Tue Jan 25 17:09:43 2011 @@ -90,7 +90,7 @@ public class RowMutationVerbHandler impl private void forwardToLocalNodes(Message message, byte[] forwardBytes) throws UnknownHostException { // remove fwds from message to avoid infinite loop - message.setHeader(RowMutation.FORWARD_HEADER, null); + message.removeHeader(RowMutation.FORWARD_HEADER); int bytesPerInetAddress = FBUtilities.getLocalAddress().getAddress().length; assert forwardBytes.length >= bytesPerInetAddress; @@ -110,7 +110,7 @@ public class RowMutationVerbHandler impl // Send the original message to the address specified by the FORWARD_HINT // Let the response go back to the coordinator - MessagingService.instance().sendOneWay(message, message.getFrom()); + MessagingService.instance().sendOneWay(message, address); offset += bytesPerInetAddress; } Modified: cassandra/branches/cassandra-0.7/src/java/org/apache/cassandra/net/Header.java URL: http://svn.apache.org/viewvc/cassandra/branches/cassandra-0.7/src/java/org/apache/cassandra/net/Header.java?rev=1063361&r1=1063360&r2=1063361&view=diff ============================================================================== --- cassandra/branches/cassandra-0.7/src/java/org/apache/cassandra/net/Header.java (original) +++ cassandra/branches/cassandra-0.7/src/java/org/apache/cassandra/net/Header.java Tue Jan 25 17:09:43 2011 @@ -97,6 +97,11 @@ public class Header { details_.put(key, value); } + + void removeDetail(String key) + { + details_.remove(key); + } } class HeaderSerializer implements ICompactSerializer<Header> Modified: cassandra/branches/cassandra-0.7/src/java/org/apache/cassandra/net/Message.java URL: http://svn.apache.org/viewvc/cassandra/branches/cassandra-0.7/src/java/org/apache/cassandra/net/Message.java?rev=1063361&r1=1063360&r2=1063361&view=diff ============================================================================== --- cassandra/branches/cassandra-0.7/src/java/org/apache/cassandra/net/Message.java (original) +++ cassandra/branches/cassandra-0.7/src/java/org/apache/cassandra/net/Message.java Tue Jan 25 17:09:43 2011 @@ -68,6 +68,11 @@ public class Message { header_.setDetail(key, value); } + + public void removeHeader(String key) + { + header_.removeDetail(key); + } public byte[] getMessageBody() { Modified: cassandra/branches/cassandra-0.7/src/java/org/apache/cassandra/service/DatacenterReadCallback.java URL: http://svn.apache.org/viewvc/cassandra/branches/cassandra-0.7/src/java/org/apache/cassandra/service/DatacenterReadCallback.java?rev=1063361&r1=1063360&r2=1063361&view=diff ============================================================================== --- cassandra/branches/cassandra-0.7/src/java/org/apache/cassandra/service/DatacenterReadCallback.java (original) +++ cassandra/branches/cassandra-0.7/src/java/org/apache/cassandra/service/DatacenterReadCallback.java Tue Jan 25 17:09:43 2011 @@ -27,6 +27,7 @@ import java.util.concurrent.atomic.Atomi import org.apache.cassandra.config.DatabaseDescriptor; import org.apache.cassandra.db.Table; +import org.apache.cassandra.db.ReadResponse; import org.apache.cassandra.locator.IEndpointSnitch; import org.apache.cassandra.locator.NetworkTopologyStrategy; import org.apache.cassandra.net.Message; @@ -40,7 +41,7 @@ import org.apache.cassandra.utils.FBUtil public class DatacenterReadCallback<T> extends ReadCallback<T> { private static final IEndpointSnitch snitch = DatabaseDescriptor.getEndpointSnitch(); - private static final String localdc = snitch.getDatacenter(FBUtilities.getLocalAddress()); + private static final String localdc = snitch.getDatacenter(FBUtilities.getLocalAddress()); private AtomicInteger localResponses; public DatacenterReadCallback(IResponseResolver<T> resolver, ConsistencyLevel consistencyLevel, String table) @@ -54,8 +55,7 @@ public class DatacenterReadCallback<T> e { resolver.preprocess(message); - int n; - n = localdc.equals(snitch.getDatacenter(message.getFrom())) + int n = localdc.equals(snitch.getDatacenter(message.getFrom())) ? localResponses.decrementAndGet() : localResponses.get(); @@ -66,6 +66,19 @@ public class DatacenterReadCallback<T> e } @Override + public void response(ReadResponse result) + { + ((ReadResponseResolver) resolver).injectPreProcessed(result); + + int n = localResponses.decrementAndGet(); + + if (n == 0 && resolver.isDataPresent()) + { + condition.signal(); + } + } + + @Override public int determineBlockFor(ConsistencyLevel consistency_level, String table) { NetworkTopologyStrategy stategy = (NetworkTopologyStrategy) Table.open(table).getReplicationStrategy(); Modified: cassandra/branches/cassandra-0.7/src/java/org/apache/cassandra/service/StorageProxy.java URL: http://svn.apache.org/viewvc/cassandra/branches/cassandra-0.7/src/java/org/apache/cassandra/service/StorageProxy.java?rev=1063361&r1=1063360&r2=1063361&view=diff ============================================================================== --- cassandra/branches/cassandra-0.7/src/java/org/apache/cassandra/service/StorageProxy.java (original) +++ cassandra/branches/cassandra-0.7/src/java/org/apache/cassandra/service/StorageProxy.java Tue Jan 25 17:09:43 2011 @@ -234,10 +234,9 @@ public class StorageProxy implements Sto Message message = messages.getKey(); Iterator<InetAddress> iter = messages.getValue().iterator(); assert iter.hasNext(); - + // First endpoint in list is the destination for this group InetAddress target = iter.next(); - // Add all the other destinations that are bound for the same dataCenter as a header in the primary message. while (iter.hasNext()) @@ -382,7 +381,7 @@ public class StorageProxy implements Sto { Message message = command.makeReadMessage(); if (logger.isDebugEnabled()) - logger.debug("reading digest for " + command + " from " + message.getMessageId() + "@" + dataPoint); + logger.debug("reading data for " + command + " from " + message.getMessageId() + "@" + dataPoint); MessagingService.instance().sendRR(message, dataPoint, handler); }