Merge branch 'cassandra-1.2' into trunk

Conflicts:
        src/java/org/apache/cassandra/db/HintedHandOffManager.java


Project: http://git-wip-us.apache.org/repos/asf/cassandra/repo
Commit: http://git-wip-us.apache.org/repos/asf/cassandra/commit/076a0e4e
Tree: http://git-wip-us.apache.org/repos/asf/cassandra/tree/076a0e4e
Diff: http://git-wip-us.apache.org/repos/asf/cassandra/diff/076a0e4e

Branch: refs/heads/trunk
Commit: 076a0e4e78793a11b5be78e3eb592514b7fc0d79
Parents: 08721da d224c2b
Author: Aleksey Yeschenko <alek...@apache.org>
Authored: Wed Mar 20 20:09:31 2013 +0300
Committer: Aleksey Yeschenko <alek...@apache.org>
Committed: Wed Mar 20 20:09:31 2013 +0300

----------------------------------------------------------------------
 CHANGES.txt                                        |    1 +
 .../apache/cassandra/db/HintedHandOffManager.java  |  178 ++++++++-------
 2 files changed, 97 insertions(+), 82 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/cassandra/blob/076a0e4e/CHANGES.txt
----------------------------------------------------------------------
diff --cc CHANGES.txt
index 681fb44,3eb85d6..3309ab9
--- a/CHANGES.txt
+++ b/CHANGES.txt
@@@ -22,9 -2,9 +22,10 @@@
   * Fix mixing prepared statements between keyspaces (CASSANDRA-5352)
   * Fix consistency level during bootstrap - strike 3 (CASSANDRA-5354)
   * Fix transposed arguments in AlreadyExistsException (CASSANDRA-5362)
+  * Improve asynchronous hint delivery (CASSANDRA-5179)
  
  
 +
  1.2.3
   * add check for sstable overlap within a level on startup (CASSANDRA-5327)
   * replace ipv6 colons in jmx object names (CASSANDRA-5298, 5328)

http://git-wip-us.apache.org/repos/asf/cassandra/blob/076a0e4e/src/java/org/apache/cassandra/db/HintedHandOffManager.java
----------------------------------------------------------------------
diff --cc src/java/org/apache/cassandra/db/HintedHandOffManager.java
index db85efb,53411f5..cb306ae
--- a/src/java/org/apache/cassandra/db/HintedHandOffManager.java
+++ b/src/java/org/apache/cassandra/db/HintedHandOffManager.java
@@@ -57,8 -58,7 +59,6 @@@ import org.apache.cassandra.metrics.Hin
  import org.apache.cassandra.net.MessageOut;
  import org.apache.cassandra.net.MessagingService;
  import org.apache.cassandra.service.*;
--import org.apache.cassandra.thrift.*;
  import org.apache.cassandra.utils.ByteBufferUtil;
  import org.apache.cassandra.utils.FBUtilities;
  import org.apache.cassandra.utils.UUIDGen;
@@@ -108,31 -107,11 +107,34 @@@ public class HintedHandOffManager imple
                                                                                
   Integer.MAX_VALUE,
                                                                                
   TimeUnit.SECONDS,
                                                                                
   new LinkedBlockingQueue<Runnable>(),
-                                                                               
   new NamedThreadFactory("HintedHandoff", Thread.MIN_PRIORITY), "internal");
+                                                                               
   new NamedThreadFactory("HintedHandoff", Thread.MIN_PRIORITY),
+                                                                               
   "internal");
+ 
+     private final ColumnFamilyStore hintStore = 
Table.open(Table.SYSTEM_KS).getColumnFamilyStore(SystemTable.HINTS_CF);
  
 +    /**
 +     * Returns a mutation representing a Hint to be sent to 
<code>targetId</code>
 +     * as soon as it becomes available again.
 +     */
 +    public static RowMutation hintFor(RowMutation mutation, UUID targetId) 
throws IOException
 +    {
 +        UUID hintId = UUIDGen.getTimeUUID();
 +
 +        // The hint TTL is set at the smallest GCGraceSeconds for any of the 
CFs in the RM;
 +        // this ensures that deletes aren't "undone" by delivery of an old 
hint
 +        int ttl = Integer.MAX_VALUE;
 +        for (ColumnFamily cf : mutation.getColumnFamilies())
 +            ttl = Math.min(ttl, cf.metadata().getGcGraceSeconds());
 +
 +        // serialize the hint with id and version as a composite column name
 +        ByteBuffer name = comparator.decompose(hintId, 
MessagingService.current_version);
 +        ByteBuffer value = ByteBuffer.wrap(FBUtilities.serialize(mutation, 
RowMutation.serializer, MessagingService.current_version));
 +        ColumnFamily cf = 
ColumnFamily.create(Schema.instance.getCFMetaData(Table.SYSTEM_KS, 
SystemTable.HINTS_CF));
 +        cf.addColumn(name, value, System.currentTimeMillis(), ttl);
 +
 +        return new RowMutation(Table.SYSTEM_KS, 
UUIDType.instance.decompose(targetId), cf);
 +    }
 +
      public void start()
      {
          MBeanServer mbs = ManagementFactory.getPlatformMBeanServer();
@@@ -157,10 -136,10 +159,10 @@@
          StorageService.optionalTasks.scheduleWithFixedDelay(runnable, 10, 10, 
TimeUnit.MINUTES);
      }
  
-     private static void deleteHint(ByteBuffer tokenBytes, ByteBuffer 
columnName, long timestamp) throws IOException
+     private static void deleteHint(ByteBuffer tokenBytes, ByteBuffer 
columnName, long timestamp)
      {
          RowMutation rm = new RowMutation(Table.SYSTEM_KS, tokenBytes);
 -        rm.delete(new QueryPath(SystemTable.HINTS_CF, null, columnName), 
timestamp);
 +        rm.delete(SystemTable.HINTS_CF, columnName, timestamp);
          rm.applyUnsafe(); // don't bother with commitlog since we're going to 
flush as soon as we're done with delivery
      }
  
@@@ -339,37 -302,40 +325,40 @@@
          int throttleInKB = DatabaseDescriptor.getHintedHandoffThrottleInKB();
          RateLimiter rateLimiter = RateLimiter.create(throttleInKB == 0 ? 
Double.MAX_VALUE : throttleInKB * 1024);
  
+         delivery:
          while (true)
          {
-             // check if hints delivery has been paused during the process
-             if (hintedHandOffPaused)
-             {
-                 logger.debug("Hints delivery process is paused, aborting");
-                 break;
-             }
+             QueryFilter filter = QueryFilter.getSliceFilter(epkey,
 -                                                            new 
QueryPath(SystemTable.HINTS_CF),
++                                                            
SystemTable.HINTS_CF,
+                                                             startColumn,
+                                                             
ByteBufferUtil.EMPTY_BYTE_BUFFER,
+                                                             false,
+                                                             pageSize);
  
-             QueryFilter filter = QueryFilter.getSliceFilter(epkey, 
SystemTable.HINTS_CF, startColumn, ByteBufferUtil.EMPTY_BYTE_BUFFER, false, 
pageSize);
+             ColumnFamily hintsPage = 
ColumnFamilyStore.removeDeleted(hintStore.getColumnFamily(filter),
+                                                                      (int) 
(System.currentTimeMillis() / 1000));
  
-             ColumnFamily hintsPage = 
ColumnFamilyStore.removeDeleted(hintStore.getColumnFamily(filter), 
(int)(System.currentTimeMillis() / 1000));
              if (pagingFinished(hintsPage, startColumn))
-             {
-                 if (ByteBufferUtil.EMPTY_BYTE_BUFFER.equals(startColumn))
-                 {
-                     // we've started from the beginning and could not find 
anything (only maybe tombstones)
-                     break;
-                 }
-                 else
-                 {
-                     // restart query from the first column until we read an 
empty row;
-                     // that will tell us everything was delivered 
successfully with no timeouts
-                     startColumn = ByteBufferUtil.EMPTY_BYTE_BUFFER;
-                     continue;
-                 }
+                 break;
  
+             // check if node is still alive and we should continue delivery 
process
+             if (!FailureDetector.instance.isAlive(endpoint))
+             {
+                 logger.info("Endpoint {} died during hint delivery; aborting 
({} delivered)", endpoint, rowsReplayed);
+                 return;
              }
  
+             List<WriteResponseHandler> responseHandlers = 
Lists.newArrayList();
+ 
 -            for (final IColumn hint : hintsPage.getSortedColumns())
 +            for (final Column hint : hintsPage.getSortedColumns())
              {
+                 // check if hints delivery has been paused during the process
+                 if (hintedHandOffPaused)
+                 {
+                     logger.debug("Hints delivery process is paused, 
aborting");
+                     break delivery;
+                 }
+ 
                  // Skip tombstones:
                  // if we iterate quickly enough, it's possible that we could 
request a new page in the same millisecond
                  // in which the local deletion timestamp was generated on the 
last column in the old page, in which

Reply via email to