Repository: cassandra Updated Branches: refs/heads/cassandra-2.0 19c2f54b3 -> 98bcf4022
Fix hint replay with many accumulated expired hints patch by Aleksey Yeschenko; reviewed by Jonathan Ellis for CASSANDRA-6998 Project: http://git-wip-us.apache.org/repos/asf/cassandra/repo Commit: http://git-wip-us.apache.org/repos/asf/cassandra/commit/98bcf402 Tree: http://git-wip-us.apache.org/repos/asf/cassandra/tree/98bcf402 Diff: http://git-wip-us.apache.org/repos/asf/cassandra/diff/98bcf402 Branch: refs/heads/cassandra-2.0 Commit: 98bcf40226db7c8dbafa6eacee9ce5ef3feaeb1b Parents: 19c2f54 Author: Aleksey Yeschenko <alek...@apache.org> Authored: Thu Oct 16 19:04:33 2014 +0300 Committer: Aleksey Yeschenko <alek...@apache.org> Committed: Thu Oct 16 19:05:59 2014 +0300 ---------------------------------------------------------------------- CHANGES.txt | 1 + .../cassandra/db/HintedHandOffManager.java | 56 +++++++++++--------- .../cassandra/service/StorageService.java | 2 +- .../apache/cassandra/db/HintedHandOffTest.java | 7 +-- 4 files changed, 33 insertions(+), 33 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/cassandra/blob/98bcf402/CHANGES.txt ---------------------------------------------------------------------- diff --git a/CHANGES.txt b/CHANGES.txt index 967b90c..158a48b 100644 --- a/CHANGES.txt +++ b/CHANGES.txt @@ -1,4 +1,5 @@ 2.0.11: + * Fix hint replay with many accumulated expired hints (CASSANDRA-6998) * Fix duplicate results in DISTINCT queries on static columns with query paging (CASSANDRA-8108) * Properly validate ascii and utf8 string literals in CQL queries (CASSANDRA-8101) http://git-wip-us.apache.org/repos/asf/cassandra/blob/98bcf402/src/java/org/apache/cassandra/db/HintedHandOffManager.java ---------------------------------------------------------------------- diff --git a/src/java/org/apache/cassandra/db/HintedHandOffManager.java b/src/java/org/apache/cassandra/db/HintedHandOffManager.java index f6bb033..a6b6d4c 100644 --- a/src/java/org/apache/cassandra/db/HintedHandOffManager.java +++ b/src/java/org/apache/cassandra/db/HintedHandOffManager.java @@ -216,6 +216,7 @@ public class HintedHandOffManager implements HintedHandOffManagerMBean { logger.info("Deleting any stored hints for {}", endpoint); rm.apply(); + hintStore.forceBlockingFlush(); compact(); } catch (Exception e) @@ -250,13 +251,20 @@ public class HintedHandOffManager implements HintedHandOffManagerMBean } @VisibleForTesting - protected Future<?> compact() + protected void compact() { - hintStore.forceBlockingFlush(); - ArrayList<Descriptor> descriptors = new ArrayList<Descriptor>(); + ArrayList<Descriptor> descriptors = new ArrayList<>(); for (SSTable sstable : hintStore.getDataTracker().getUncompactingSSTables()) descriptors.add(sstable.descriptor); - return CompactionManager.instance.submitUserDefined(hintStore, descriptors, (int) (System.currentTimeMillis() / 1000)); + + try + { + CompactionManager.instance.submitUserDefined(hintStore, descriptors, (int) (System.currentTimeMillis() / 1000)).get(); + } + catch (InterruptedException | ExecutionException e) + { + throw new RuntimeException(e); + } } private static boolean pagingFinished(ColumnFamily hintColumnFamily, ByteBuffer startColumn) @@ -333,9 +341,8 @@ public class HintedHandOffManager implements HintedHandOffManagerMBean /* * 1. Get the key of the endpoint we need to handoff * 2. For each column, deserialize the mutation and send it to the endpoint - * 3. Delete the subcolumn if the write was successful + * 3. Delete the column if the write was successful * 4. Force a flush - * 5. Do major compaction to clean up all deletes etc. */ private void doDeliverHintsToEndpoint(InetAddress endpoint) { @@ -357,7 +364,6 @@ public class HintedHandOffManager implements HintedHandOffManagerMBean / (StorageService.instance.getTokenMetadata().getAllEndpoints().size() - 1); RateLimiter rateLimiter = RateLimiter.create(throttleInKB == 0 ? Double.MAX_VALUE : throttleInKB * 1024); - boolean finished = false; delivery: while (true) { @@ -375,7 +381,6 @@ public class HintedHandOffManager implements HintedHandOffManagerMBean if (pagingFinished(hintsPage, startColumn)) { logger.info("Finished hinted handoff of {} rows to endpoint {}", rowsReplayed, endpoint); - finished = true; break; } @@ -469,17 +474,8 @@ public class HintedHandOffManager implements HintedHandOffManagerMBean } } - if (finished || rowsReplayed.get() >= DatabaseDescriptor.getTombstoneWarnThreshold()) - { - try - { - compact().get(); - } - catch (Exception e) - { - throw new RuntimeException(e); - } - } + // Flush all the tombstones to disk + hintStore.forceBlockingFlush(); } // read less columns (mutations) per page if they are very large @@ -503,8 +499,12 @@ public class HintedHandOffManager implements HintedHandOffManagerMBean */ private void scheduleAllDeliveries() { - if (logger.isDebugEnabled()) - logger.debug("Started scheduleAllDeliveries"); + logger.debug("Started scheduleAllDeliveries"); + + // Force a major compaction to get rid of the tombstones and expired hints. Do it once, before we schedule any + // individual replay, to avoid N - 1 redundant individual compactions (when N is the number of nodes with hints + // to deliver to). + compact(); IPartitioner p = StorageService.getPartitioner(); RowPosition minPos = p.getMinimumToken().minKeyBound(); @@ -517,11 +517,10 @@ public class HintedHandOffManager implements HintedHandOffManagerMBean InetAddress target = StorageService.instance.getTokenMetadata().getEndpointForHostId(hostId); // token may have since been removed (in which case we have just read back a tombstone) if (target != null) - scheduleHintDelivery(target); + scheduleHintDelivery(target, false); } - if (logger.isDebugEnabled()) - logger.debug("Finished scheduleAllDeliveries"); + logger.debug("Finished scheduleAllDeliveries"); } /* @@ -529,7 +528,7 @@ public class HintedHandOffManager implements HintedHandOffManagerMBean * When we learn that some endpoint is back up we deliver the data * to him via an event driven mechanism. */ - public void scheduleHintDelivery(final InetAddress to) + public void scheduleHintDelivery(final InetAddress to, final boolean precompact) { // We should not deliver hints to the same host in 2 different threads if (!queuedDeliveries.add(to)) @@ -543,6 +542,11 @@ public class HintedHandOffManager implements HintedHandOffManagerMBean { try { + // If it's an individual node hint replay (triggered by Gossip or via JMX), and not the global scheduled replay + // (every 10 minutes), force a major compaction to get rid of the tombstones and expired hints. + if (precompact) + compact(); + deliverHintsToEndpoint(to); } finally @@ -555,7 +559,7 @@ public class HintedHandOffManager implements HintedHandOffManagerMBean public void scheduleHintDelivery(String to) throws UnknownHostException { - scheduleHintDelivery(InetAddress.getByName(to)); + scheduleHintDelivery(InetAddress.getByName(to), true); } public void pauseHintsDelivery(boolean b) http://git-wip-us.apache.org/repos/asf/cassandra/blob/98bcf402/src/java/org/apache/cassandra/service/StorageService.java ---------------------------------------------------------------------- diff --git a/src/java/org/apache/cassandra/service/StorageService.java b/src/java/org/apache/cassandra/service/StorageService.java index 0cd4be5..cca6f79 100644 --- a/src/java/org/apache/cassandra/service/StorageService.java +++ b/src/java/org/apache/cassandra/service/StorageService.java @@ -1977,7 +1977,7 @@ public class StorageService extends NotificationBroadcasterSupport implements IE if (tokenMetadata.isMember(endpoint)) { - HintedHandOffManager.instance.scheduleHintDelivery(endpoint); + HintedHandOffManager.instance.scheduleHintDelivery(endpoint, true); for (IEndpointLifecycleSubscriber subscriber : lifecycleSubscribers) subscriber.onUp(endpoint); } http://git-wip-us.apache.org/repos/asf/cassandra/blob/98bcf402/test/unit/org/apache/cassandra/db/HintedHandOffTest.java ---------------------------------------------------------------------- diff --git a/test/unit/org/apache/cassandra/db/HintedHandOffTest.java b/test/unit/org/apache/cassandra/db/HintedHandOffTest.java index 9ffd702..302a1e7 100644 --- a/test/unit/org/apache/cassandra/db/HintedHandOffTest.java +++ b/test/unit/org/apache/cassandra/db/HintedHandOffTest.java @@ -23,7 +23,6 @@ package org.apache.cassandra.db; import java.net.InetAddress; import java.util.Map; import java.util.UUID; -import java.util.concurrent.TimeUnit; import org.junit.Test; @@ -31,12 +30,10 @@ import com.google.common.collect.Iterators; import org.apache.cassandra.SchemaLoader; import org.apache.cassandra.cql3.UntypedResultSet; -import org.apache.cassandra.db.compaction.CompactionManager; import org.apache.cassandra.db.compaction.SizeTieredCompactionStrategy; import org.apache.cassandra.db.marshal.Int32Type; import org.apache.cassandra.db.marshal.UUIDType; import org.apache.cassandra.utils.ByteBufferUtil; -import org.apache.cassandra.utils.FBUtilities; import static org.junit.Assert.assertEquals; import static org.apache.cassandra.cql3.QueryProcessor.processInternal; @@ -75,9 +72,7 @@ public class HintedHandOffTest extends SchemaLoader assertEquals(1, hintStore.getSSTables().size()); // submit compaction - FBUtilities.waitOnFuture(HintedHandOffManager.instance.compact()); - while (CompactionManager.instance.getPendingTasks() > 0 || CompactionManager.instance.getActiveCompactions() > 0) - TimeUnit.SECONDS.sleep(1); + HintedHandOffManager.instance.compact(); // single row should not be removed because of gc_grace_seconds // is 10 hours and there are no any tombstones in sstable