Repository: cassandra
Updated Branches:
  refs/heads/cassandra-2.0 19c2f54b3 -> 98bcf4022


Fix hint replay with many accumulated expired hints

patch by Aleksey Yeschenko; reviewed by Jonathan Ellis for
CASSANDRA-6998


Project: http://git-wip-us.apache.org/repos/asf/cassandra/repo
Commit: http://git-wip-us.apache.org/repos/asf/cassandra/commit/98bcf402
Tree: http://git-wip-us.apache.org/repos/asf/cassandra/tree/98bcf402
Diff: http://git-wip-us.apache.org/repos/asf/cassandra/diff/98bcf402

Branch: refs/heads/cassandra-2.0
Commit: 98bcf40226db7c8dbafa6eacee9ce5ef3feaeb1b
Parents: 19c2f54
Author: Aleksey Yeschenko <alek...@apache.org>
Authored: Thu Oct 16 19:04:33 2014 +0300
Committer: Aleksey Yeschenko <alek...@apache.org>
Committed: Thu Oct 16 19:05:59 2014 +0300

----------------------------------------------------------------------
 CHANGES.txt                                     |  1 +
 .../cassandra/db/HintedHandOffManager.java      | 56 +++++++++++---------
 .../cassandra/service/StorageService.java       |  2 +-
 .../apache/cassandra/db/HintedHandOffTest.java  |  7 +--
 4 files changed, 33 insertions(+), 33 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/cassandra/blob/98bcf402/CHANGES.txt
----------------------------------------------------------------------
diff --git a/CHANGES.txt b/CHANGES.txt
index 967b90c..158a48b 100644
--- a/CHANGES.txt
+++ b/CHANGES.txt
@@ -1,4 +1,5 @@
 2.0.11:
+ * Fix hint replay with many accumulated expired hints (CASSANDRA-6998)
  * Fix duplicate results in DISTINCT queries on static columns with query
    paging (CASSANDRA-8108)
  * Properly validate ascii and utf8 string literals in CQL queries 
(CASSANDRA-8101)

http://git-wip-us.apache.org/repos/asf/cassandra/blob/98bcf402/src/java/org/apache/cassandra/db/HintedHandOffManager.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/db/HintedHandOffManager.java 
b/src/java/org/apache/cassandra/db/HintedHandOffManager.java
index f6bb033..a6b6d4c 100644
--- a/src/java/org/apache/cassandra/db/HintedHandOffManager.java
+++ b/src/java/org/apache/cassandra/db/HintedHandOffManager.java
@@ -216,6 +216,7 @@ public class HintedHandOffManager implements 
HintedHandOffManagerMBean
                 {
                     logger.info("Deleting any stored hints for {}", endpoint);
                     rm.apply();
+                    hintStore.forceBlockingFlush();
                     compact();
                 }
                 catch (Exception e)
@@ -250,13 +251,20 @@ public class HintedHandOffManager implements 
HintedHandOffManagerMBean
     }
 
     @VisibleForTesting
-    protected Future<?> compact()
+    protected void compact()
     {
-        hintStore.forceBlockingFlush();
-        ArrayList<Descriptor> descriptors = new ArrayList<Descriptor>();
+        ArrayList<Descriptor> descriptors = new ArrayList<>();
         for (SSTable sstable : 
hintStore.getDataTracker().getUncompactingSSTables())
             descriptors.add(sstable.descriptor);
-        return CompactionManager.instance.submitUserDefined(hintStore, 
descriptors, (int) (System.currentTimeMillis() / 1000));
+
+        try
+        {
+            CompactionManager.instance.submitUserDefined(hintStore, 
descriptors, (int) (System.currentTimeMillis() / 1000)).get();
+        }
+        catch (InterruptedException | ExecutionException e)
+        {
+            throw new RuntimeException(e);
+        }
     }
 
     private static boolean pagingFinished(ColumnFamily hintColumnFamily, 
ByteBuffer startColumn)
@@ -333,9 +341,8 @@ public class HintedHandOffManager implements 
HintedHandOffManagerMBean
     /*
      * 1. Get the key of the endpoint we need to handoff
      * 2. For each column, deserialize the mutation and send it to the endpoint
-     * 3. Delete the subcolumn if the write was successful
+     * 3. Delete the column if the write was successful
      * 4. Force a flush
-     * 5. Do major compaction to clean up all deletes etc.
      */
     private void doDeliverHintsToEndpoint(InetAddress endpoint)
     {
@@ -357,7 +364,6 @@ public class HintedHandOffManager implements 
HintedHandOffManagerMBean
                            / 
(StorageService.instance.getTokenMetadata().getAllEndpoints().size() - 1);
         RateLimiter rateLimiter = RateLimiter.create(throttleInKB == 0 ? 
Double.MAX_VALUE : throttleInKB * 1024);
 
-        boolean finished = false;
         delivery:
         while (true)
         {
@@ -375,7 +381,6 @@ public class HintedHandOffManager implements 
HintedHandOffManagerMBean
             if (pagingFinished(hintsPage, startColumn))
             {
                 logger.info("Finished hinted handoff of {} rows to endpoint 
{}", rowsReplayed, endpoint);
-                finished = true;
                 break;
             }
 
@@ -469,17 +474,8 @@ public class HintedHandOffManager implements 
HintedHandOffManagerMBean
             }
         }
 
-        if (finished || rowsReplayed.get() >= 
DatabaseDescriptor.getTombstoneWarnThreshold())
-        {
-            try
-            {
-                compact().get();
-            }
-            catch (Exception e)
-            {
-                throw new RuntimeException(e);
-            }
-        }
+        // Flush all the tombstones to disk
+        hintStore.forceBlockingFlush();
     }
 
     // read less columns (mutations) per page if they are very large
@@ -503,8 +499,12 @@ public class HintedHandOffManager implements 
HintedHandOffManagerMBean
      */
     private void scheduleAllDeliveries()
     {
-        if (logger.isDebugEnabled())
-          logger.debug("Started scheduleAllDeliveries");
+        logger.debug("Started scheduleAllDeliveries");
+
+        // Force a major compaction to get rid of the tombstones and expired 
hints. Do it once, before we schedule any
+        // individual replay, to avoid N - 1 redundant individual compactions 
(when N is the number of nodes with hints
+        // to deliver to).
+        compact();
 
         IPartitioner p = StorageService.getPartitioner();
         RowPosition minPos = p.getMinimumToken().minKeyBound();
@@ -517,11 +517,10 @@ public class HintedHandOffManager implements 
HintedHandOffManagerMBean
             InetAddress target = 
StorageService.instance.getTokenMetadata().getEndpointForHostId(hostId);
             // token may have since been removed (in which case we have just 
read back a tombstone)
             if (target != null)
-                scheduleHintDelivery(target);
+                scheduleHintDelivery(target, false);
         }
 
-        if (logger.isDebugEnabled())
-          logger.debug("Finished scheduleAllDeliveries");
+        logger.debug("Finished scheduleAllDeliveries");
     }
 
     /*
@@ -529,7 +528,7 @@ public class HintedHandOffManager implements 
HintedHandOffManagerMBean
      * When we learn that some endpoint is back up we deliver the data
      * to him via an event driven mechanism.
     */
-    public void scheduleHintDelivery(final InetAddress to)
+    public void scheduleHintDelivery(final InetAddress to, final boolean 
precompact)
     {
         // We should not deliver hints to the same host in 2 different threads
         if (!queuedDeliveries.add(to))
@@ -543,6 +542,11 @@ public class HintedHandOffManager implements 
HintedHandOffManagerMBean
             {
                 try
                 {
+                    // If it's an individual node hint replay (triggered by 
Gossip or via JMX), and not the global scheduled replay
+                    // (every 10 minutes), force a major compaction to get rid 
of the tombstones and expired hints.
+                    if (precompact)
+                        compact();
+
                     deliverHintsToEndpoint(to);
                 }
                 finally
@@ -555,7 +559,7 @@ public class HintedHandOffManager implements 
HintedHandOffManagerMBean
 
     public void scheduleHintDelivery(String to) throws UnknownHostException
     {
-        scheduleHintDelivery(InetAddress.getByName(to));
+        scheduleHintDelivery(InetAddress.getByName(to), true);
     }
 
     public void pauseHintsDelivery(boolean b)

http://git-wip-us.apache.org/repos/asf/cassandra/blob/98bcf402/src/java/org/apache/cassandra/service/StorageService.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/service/StorageService.java 
b/src/java/org/apache/cassandra/service/StorageService.java
index 0cd4be5..cca6f79 100644
--- a/src/java/org/apache/cassandra/service/StorageService.java
+++ b/src/java/org/apache/cassandra/service/StorageService.java
@@ -1977,7 +1977,7 @@ public class StorageService extends 
NotificationBroadcasterSupport implements IE
 
         if (tokenMetadata.isMember(endpoint))
         {
-            HintedHandOffManager.instance.scheduleHintDelivery(endpoint);
+            HintedHandOffManager.instance.scheduleHintDelivery(endpoint, true);
             for (IEndpointLifecycleSubscriber subscriber : 
lifecycleSubscribers)
                 subscriber.onUp(endpoint);
         }

http://git-wip-us.apache.org/repos/asf/cassandra/blob/98bcf402/test/unit/org/apache/cassandra/db/HintedHandOffTest.java
----------------------------------------------------------------------
diff --git a/test/unit/org/apache/cassandra/db/HintedHandOffTest.java 
b/test/unit/org/apache/cassandra/db/HintedHandOffTest.java
index 9ffd702..302a1e7 100644
--- a/test/unit/org/apache/cassandra/db/HintedHandOffTest.java
+++ b/test/unit/org/apache/cassandra/db/HintedHandOffTest.java
@@ -23,7 +23,6 @@ package org.apache.cassandra.db;
 import java.net.InetAddress;
 import java.util.Map;
 import java.util.UUID;
-import java.util.concurrent.TimeUnit;
 
 import org.junit.Test;
 
@@ -31,12 +30,10 @@ import com.google.common.collect.Iterators;
 
 import org.apache.cassandra.SchemaLoader;
 import org.apache.cassandra.cql3.UntypedResultSet;
-import org.apache.cassandra.db.compaction.CompactionManager;
 import org.apache.cassandra.db.compaction.SizeTieredCompactionStrategy;
 import org.apache.cassandra.db.marshal.Int32Type;
 import org.apache.cassandra.db.marshal.UUIDType;
 import org.apache.cassandra.utils.ByteBufferUtil;
-import org.apache.cassandra.utils.FBUtilities;
 
 import static org.junit.Assert.assertEquals;
 import static org.apache.cassandra.cql3.QueryProcessor.processInternal;
@@ -75,9 +72,7 @@ public class HintedHandOffTest extends SchemaLoader
         assertEquals(1, hintStore.getSSTables().size());
 
         // submit compaction
-        FBUtilities.waitOnFuture(HintedHandOffManager.instance.compact());
-        while (CompactionManager.instance.getPendingTasks() > 0 || 
CompactionManager.instance.getActiveCompactions() > 0)
-            TimeUnit.SECONDS.sleep(1);
+        HintedHandOffManager.instance.compact();
 
         // single row should not be removed because of gc_grace_seconds
         // is 10 hours and there are no any tombstones in sstable

Reply via email to