Author: jbellis
Date: Mon Jul 26 04:39:42 2010
New Revision: 979156

URL: http://svn.apache.org/viewvc?rev=979156&view=rev
Log:
add ack to Binary write verb.  patch by jbellis; reviewed by Toby Jungen for 
CASSANDRA-1093

Modified:
    cassandra/branches/cassandra-0.6/CHANGES.txt
    
cassandra/branches/cassandra-0.6/contrib/bmt_example/CassandraBulkLoader.java
    
cassandra/branches/cassandra-0.6/src/java/org/apache/cassandra/db/BinaryVerbHandler.java

Modified: cassandra/branches/cassandra-0.6/CHANGES.txt
URL: 
http://svn.apache.org/viewvc/cassandra/branches/cassandra-0.6/CHANGES.txt?rev=979156&r1=979155&r2=979156&view=diff
==============================================================================
--- cassandra/branches/cassandra-0.6/CHANGES.txt (original)
+++ cassandra/branches/cassandra-0.6/CHANGES.txt Mon Jul 26 04:39:42 2010
@@ -17,6 +17,8 @@
  * cassandra-cli.bat works on windows (CASSANDRA-1236)
  * pre-emptively drop requests that cannot be processed within RPCTimeout
    (CASSANDRA-685)
+ * add ack to Binary write verb and update CassandraBulkLoader
+   to wait for acks for each row (CASSANDRA-1093)
 
 
 0.6.3

Modified: 
cassandra/branches/cassandra-0.6/contrib/bmt_example/CassandraBulkLoader.java
URL: 
http://svn.apache.org/viewvc/cassandra/branches/cassandra-0.6/contrib/bmt_example/CassandraBulkLoader.java?rev=979156&r1=979155&r2=979156&view=diff
==============================================================================
--- 
cassandra/branches/cassandra-0.6/contrib/bmt_example/CassandraBulkLoader.java 
(original)
+++ 
cassandra/branches/cassandra-0.6/contrib/bmt_example/CassandraBulkLoader.java 
Mon Jul 26 04:39:42 2010
@@ -1,4 +1,4 @@
-/**
+/*
  * Licensed to the Apache Software Foundation (ASF) under one
  * or more contributor license agreements.  See the NOTICE file
  * distributed with this work for additional information
@@ -17,7 +17,7 @@
  */
  
  /**
-  * Cassandra has a backdoor called the Binary Memtable. The purpose of this 
backdoor is to
+  * Cassandra has a back door called the Binary Memtable. The purpose of this 
backdoor is to
   * mass import large amounts of data, without using the Thrift interface.
   *
   * Inserting data through the binary memtable, allows you to skip the commit 
log overhead, and an ack
@@ -36,6 +36,12 @@
   * in the mapper, so that the end result generates the data set into a column 
oriented subset. Once you get to the
   * reduce aspect, you can generate the ColumnFamilies you want inserted, and 
send it to your nodes.
   *
+  * For Cassandra 0.6.4, we modified this example to wait for acks from all 
Cassandra nodes for each row
+  * before proceeding to the next.  This means to keep Cassandra similarly 
busy you can either
+  * 1) add more reducer tasks,
+  * 2) remove the "wait for acks" block of code,
+  * 3) parallelize the writing of rows to Cassandra, e.g. with an Executor.
+  *
   * THIS CANNOT RUN ON THE SAME IP ADDRESS AS A CASSANDRA INSTANCE.
   */
   
@@ -60,7 +66,10 @@ import org.apache.cassandra.dht.BigInteg
 import org.apache.cassandra.io.util.DataOutputBuffer;
 import java.net.InetAddress;
 import java.net.UnknownHostException;
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.TimeoutException;
 
+import org.apache.cassandra.net.IAsyncResult;
 import org.apache.cassandra.net.Message;
 import org.apache.cassandra.net.MessagingService;
 import org.apache.cassandra.service.StorageService;
@@ -173,10 +182,24 @@ public class CassandraBulkLoader {
 
             /* Get serialized message to send to cluster */
             message = createMessage(keyspace, key.toString(), cfName, 
columnFamilies);
+            List<IAsyncResult> results = new ArrayList<IAsyncResult>();
             for (InetAddress endpoint: 
StorageService.instance.getNaturalEndpoints(keyspace, key.toString()))
             {
                 /* Send message to end point */
-                MessagingService.instance.sendOneWay(message, endpoint);
+                results.add(MessagingService.instance.sendRR(message, 
endpoint));
+            }
+            /* wait for acks */
+            for (IAsyncResult result : results)
+            {
+                try
+                {
+                    result.get(DatabaseDescriptor.getRpcTimeout(), 
TimeUnit.MILLISECONDS);
+                }
+                catch (TimeoutException e)
+                {
+                    // you should probably add retry logic here
+                    throw new RuntimeException(e);
+                }
             }
             
             output.collect(key, new Text(" inserted into Cassandra node(s)"));

Modified: 
cassandra/branches/cassandra-0.6/src/java/org/apache/cassandra/db/BinaryVerbHandler.java
URL: 
http://svn.apache.org/viewvc/cassandra/branches/cassandra-0.6/src/java/org/apache/cassandra/db/BinaryVerbHandler.java?rev=979156&r1=979155&r2=979156&view=diff
==============================================================================
--- 
cassandra/branches/cassandra-0.6/src/java/org/apache/cassandra/db/BinaryVerbHandler.java
 (original)
+++ 
cassandra/branches/cassandra-0.6/src/java/org/apache/cassandra/db/BinaryVerbHandler.java
 Mon Jul 26 04:39:42 2010
@@ -23,6 +23,7 @@ import java.io.DataInputStream;
 
 import org.apache.cassandra.net.IVerbHandler;
 import org.apache.cassandra.net.Message;
+import org.apache.cassandra.net.MessagingService;
 
 import org.apache.log4j.Logger;
 
@@ -40,11 +41,16 @@ public class BinaryVerbHandler implement
             RowMutationMessage rmMsg = 
RowMutationMessage.serializer().deserialize(new DataInputStream(buffer));
             RowMutation rm = rmMsg.getRowMutation();
             rm.applyBinary();
+
+            WriteResponse response = new WriteResponse(rm.getTable(), 
rm.key(), true);
+            Message responseMessage = 
WriteResponse.makeWriteResponseMessage(message, response);
+            if (logger_.isDebugEnabled())
+              logger_.debug("binary " + rm + " applied.  Sending response to " 
+ message.getMessageId() + "@" + message.getFrom());
+            MessagingService.instance.sendOneWay(responseMessage, 
message.getFrom());
         }
         catch (Exception e)
         {
             throw new RuntimeException(e);
         }
     }
-
 }


Reply via email to