Author: jbellis Date: Mon Jul 26 04:39:42 2010 New Revision: 979156 URL: http://svn.apache.org/viewvc?rev=979156&view=rev Log: add ack to Binary write verb. patch by jbellis; reviewed by Toby Jungen for CASSANDRA-1093
Modified: cassandra/branches/cassandra-0.6/CHANGES.txt cassandra/branches/cassandra-0.6/contrib/bmt_example/CassandraBulkLoader.java cassandra/branches/cassandra-0.6/src/java/org/apache/cassandra/db/BinaryVerbHandler.java Modified: cassandra/branches/cassandra-0.6/CHANGES.txt URL: http://svn.apache.org/viewvc/cassandra/branches/cassandra-0.6/CHANGES.txt?rev=979156&r1=979155&r2=979156&view=diff ============================================================================== --- cassandra/branches/cassandra-0.6/CHANGES.txt (original) +++ cassandra/branches/cassandra-0.6/CHANGES.txt Mon Jul 26 04:39:42 2010 @@ -17,6 +17,8 @@ * cassandra-cli.bat works on windows (CASSANDRA-1236) * pre-emptively drop requests that cannot be processed within RPCTimeout (CASSANDRA-685) + * add ack to Binary write verb and update CassandraBulkLoader + to wait for acks for each row (CASSANDRA-1093) 0.6.3 Modified: cassandra/branches/cassandra-0.6/contrib/bmt_example/CassandraBulkLoader.java URL: http://svn.apache.org/viewvc/cassandra/branches/cassandra-0.6/contrib/bmt_example/CassandraBulkLoader.java?rev=979156&r1=979155&r2=979156&view=diff ============================================================================== --- cassandra/branches/cassandra-0.6/contrib/bmt_example/CassandraBulkLoader.java (original) +++ cassandra/branches/cassandra-0.6/contrib/bmt_example/CassandraBulkLoader.java Mon Jul 26 04:39:42 2010 @@ -1,4 +1,4 @@ -/** +/* * Licensed to the Apache Software Foundation (ASF) under one * or more contributor license agreements. See the NOTICE file * distributed with this work for additional information @@ -17,7 +17,7 @@ */ /** - * Cassandra has a backdoor called the Binary Memtable. The purpose of this backdoor is to + * Cassandra has a back door called the Binary Memtable. The purpose of this backdoor is to * mass import large amounts of data, without using the Thrift interface. * * Inserting data through the binary memtable, allows you to skip the commit log overhead, and an ack @@ -36,6 +36,12 @@ * in the mapper, so that the end result generates the data set into a column oriented subset. Once you get to the * reduce aspect, you can generate the ColumnFamilies you want inserted, and send it to your nodes. * + * For Cassandra 0.6.4, we modified this example to wait for acks from all Cassandra nodes for each row + * before proceeding to the next. This means to keep Cassandra similarly busy you can either + * 1) add more reducer tasks, + * 2) remove the "wait for acks" block of code, + * 3) parallelize the writing of rows to Cassandra, e.g. with an Executor. + * * THIS CANNOT RUN ON THE SAME IP ADDRESS AS A CASSANDRA INSTANCE. */ @@ -60,7 +66,10 @@ import org.apache.cassandra.dht.BigInteg import org.apache.cassandra.io.util.DataOutputBuffer; import java.net.InetAddress; import java.net.UnknownHostException; +import java.util.concurrent.TimeUnit; +import java.util.concurrent.TimeoutException; +import org.apache.cassandra.net.IAsyncResult; import org.apache.cassandra.net.Message; import org.apache.cassandra.net.MessagingService; import org.apache.cassandra.service.StorageService; @@ -173,10 +182,24 @@ public class CassandraBulkLoader { /* Get serialized message to send to cluster */ message = createMessage(keyspace, key.toString(), cfName, columnFamilies); + List<IAsyncResult> results = new ArrayList<IAsyncResult>(); for (InetAddress endpoint: StorageService.instance.getNaturalEndpoints(keyspace, key.toString())) { /* Send message to end point */ - MessagingService.instance.sendOneWay(message, endpoint); + results.add(MessagingService.instance.sendRR(message, endpoint)); + } + /* wait for acks */ + for (IAsyncResult result : results) + { + try + { + result.get(DatabaseDescriptor.getRpcTimeout(), TimeUnit.MILLISECONDS); + } + catch (TimeoutException e) + { + // you should probably add retry logic here + throw new RuntimeException(e); + } } output.collect(key, new Text(" inserted into Cassandra node(s)")); Modified: cassandra/branches/cassandra-0.6/src/java/org/apache/cassandra/db/BinaryVerbHandler.java URL: http://svn.apache.org/viewvc/cassandra/branches/cassandra-0.6/src/java/org/apache/cassandra/db/BinaryVerbHandler.java?rev=979156&r1=979155&r2=979156&view=diff ============================================================================== --- cassandra/branches/cassandra-0.6/src/java/org/apache/cassandra/db/BinaryVerbHandler.java (original) +++ cassandra/branches/cassandra-0.6/src/java/org/apache/cassandra/db/BinaryVerbHandler.java Mon Jul 26 04:39:42 2010 @@ -23,6 +23,7 @@ import java.io.DataInputStream; import org.apache.cassandra.net.IVerbHandler; import org.apache.cassandra.net.Message; +import org.apache.cassandra.net.MessagingService; import org.apache.log4j.Logger; @@ -40,11 +41,16 @@ public class BinaryVerbHandler implement RowMutationMessage rmMsg = RowMutationMessage.serializer().deserialize(new DataInputStream(buffer)); RowMutation rm = rmMsg.getRowMutation(); rm.applyBinary(); + + WriteResponse response = new WriteResponse(rm.getTable(), rm.key(), true); + Message responseMessage = WriteResponse.makeWriteResponseMessage(message, response); + if (logger_.isDebugEnabled()) + logger_.debug("binary " + rm + " applied. Sending response to " + message.getMessageId() + "@" + message.getFrom()); + MessagingService.instance.sendOneWay(responseMessage, message.getFrom()); } catch (Exception e) { throw new RuntimeException(e); } } - }