Repository: cassandra Updated Branches: refs/heads/cassandra-3.0 2a824c024 -> b2f38ef17 refs/heads/trunk c79878355 -> b63e6eef5
Properly handle hinted handoff after topology changes patch by Branimir Lambov; reviewed by Aleksey Yeschenko for CASSANDRA-5902 Project: http://git-wip-us.apache.org/repos/asf/cassandra/repo Commit: http://git-wip-us.apache.org/repos/asf/cassandra/commit/b2f38ef1 Tree: http://git-wip-us.apache.org/repos/asf/cassandra/tree/b2f38ef1 Diff: http://git-wip-us.apache.org/repos/asf/cassandra/diff/b2f38ef1 Branch: refs/heads/cassandra-3.0 Commit: b2f38ef177b5c19288f96d9bfe5304ec94391f73 Parents: 2a824c0 Author: Branimir Lambov <branimir.lam...@datastax.com> Authored: Wed Nov 18 16:26:37 2015 +0200 Committer: Aleksey Yeschenko <alek...@apache.org> Committed: Thu Feb 11 13:06:16 2016 +0000 ---------------------------------------------------------------------- CHANGES.txt | 1 + .../cassandra/batchlog/BatchlogManager.java | 3 +- .../apache/cassandra/hints/HintVerbHandler.java | 22 +++- .../cassandra/hints/HintsDispatchExecutor.java | 28 ++++- .../apache/cassandra/hints/HintsDispatcher.java | 7 +- .../apache/cassandra/hints/HintsService.java | 20 +++- .../apache/cassandra/service/StorageProxy.java | 20 +++- .../cassandra/service/StorageService.java | 8 ++ .../org/apache/cassandra/hints/HintTest.java | 109 +++++++++++++++++++ 9 files changed, 199 insertions(+), 19 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/cassandra/blob/b2f38ef1/CHANGES.txt ---------------------------------------------------------------------- diff --git a/CHANGES.txt b/CHANGES.txt index 828ba21..e723aba 100644 --- a/CHANGES.txt +++ b/CHANGES.txt @@ -1,4 +1,5 @@ 3.0.4 + * Properly handle hinted handoff after topology changes (CASSANDRA-5902) * AssertionError when listing sstable files on inconsistent disk state (CASSANDRA-11156) * Fix wrong rack counting and invalid conditions check for TokenAllocation (CASSANDRA-11139) http://git-wip-us.apache.org/repos/asf/cassandra/blob/b2f38ef1/src/java/org/apache/cassandra/batchlog/BatchlogManager.java ---------------------------------------------------------------------- diff --git a/src/java/org/apache/cassandra/batchlog/BatchlogManager.java b/src/java/org/apache/cassandra/batchlog/BatchlogManager.java index 7ccc6f8..f5133bb 100644 --- a/src/java/org/apache/cassandra/batchlog/BatchlogManager.java +++ b/src/java/org/apache/cassandra/batchlog/BatchlogManager.java @@ -420,8 +420,7 @@ public class BatchlogManager implements BatchlogManagerMBean String ks = mutation.getKeyspaceName(); Token tk = mutation.key().getToken(); - for (InetAddress endpoint : Iterables.concat(StorageService.instance.getNaturalEndpoints(ks, tk), - StorageService.instance.getTokenMetadata().pendingEndpointsFor(tk, ks))) + for (InetAddress endpoint : StorageService.instance.getNaturalAndPendingEndpoints(ks, tk)) { if (endpoint.equals(FBUtilities.getBroadcastAddress())) { http://git-wip-us.apache.org/repos/asf/cassandra/blob/b2f38ef1/src/java/org/apache/cassandra/hints/HintVerbHandler.java ---------------------------------------------------------------------- diff --git a/src/java/org/apache/cassandra/hints/HintVerbHandler.java b/src/java/org/apache/cassandra/hints/HintVerbHandler.java index b2c7b6a..d8838a9 100644 --- a/src/java/org/apache/cassandra/hints/HintVerbHandler.java +++ b/src/java/org/apache/cassandra/hints/HintVerbHandler.java @@ -29,6 +29,7 @@ import org.apache.cassandra.net.IVerbHandler; import org.apache.cassandra.net.MessageIn; import org.apache.cassandra.net.MessagingService; import org.apache.cassandra.serializers.MarshalException; +import org.apache.cassandra.service.StorageProxy; import org.apache.cassandra.service.StorageService; /** @@ -72,12 +73,23 @@ public final class HintVerbHandler implements IVerbHandler<HintMessage> return; } - // Apply the hint if this node is the destination, store for future dispatch if this node isn't (must have gotten - // it from a decommissioned node that had streamed it before going out). - if (hostId.equals(StorageService.instance.getLocalHostUUID())) - hint.apply(); - else + if (!hostId.equals(StorageService.instance.getLocalHostUUID())) + { + // the node is not the final destination of the hint (must have gotten it from a decommissioning node), + // so just store it locally, to be delivered later. HintsService.instance.write(hostId, hint); + } + else if (!StorageProxy.instance.appliesLocally(hint.mutation)) + { + // the topology has changed, and we are no longer a replica of the mutation - since we don't know which node(s) + // it has been handed over to, re-address the hint to all replicas; see CASSANDRA-5902. + HintsService.instance.writeForAllReplicas(hint); + } + else + { + // the common path - the node is both the destination and a valid replica for the hint. + hint.apply(); + } reply(id, message.from); } http://git-wip-us.apache.org/repos/asf/cassandra/blob/b2f38ef1/src/java/org/apache/cassandra/hints/HintsDispatchExecutor.java ---------------------------------------------------------------------- diff --git a/src/java/org/apache/cassandra/hints/HintsDispatchExecutor.java b/src/java/org/apache/cassandra/hints/HintsDispatchExecutor.java index 7782d5d..5292dc1 100644 --- a/src/java/org/apache/cassandra/hints/HintsDispatchExecutor.java +++ b/src/java/org/apache/cassandra/hints/HintsDispatchExecutor.java @@ -18,6 +18,7 @@ package org.apache.cassandra.hints; import java.io.File; +import java.net.InetAddress; import java.util.Map; import java.util.UUID; import java.util.concurrent.*; @@ -236,10 +237,21 @@ final class HintsDispatchExecutor { logger.trace("Dispatching hints file {}", descriptor.fileName()); + InetAddress address = StorageService.instance.getEndpointForHostId(hostId); + if (address != null) + return deliver(descriptor, address); + + // address == null means the target no longer exist; find new home for each hint entry. + convert(descriptor); + return true; + } + + private boolean deliver(HintsDescriptor descriptor, InetAddress address) + { File file = new File(hintsDirectory, descriptor.fileName()); Long offset = store.getDispatchOffset(descriptor).orElse(null); - try (HintsDispatcher dispatcher = HintsDispatcher.create(file, rateLimiter, hostId, descriptor.hostId, isPaused)) + try (HintsDispatcher dispatcher = HintsDispatcher.create(file, rateLimiter, address, descriptor.hostId, isPaused)) { if (offset != null) dispatcher.seek(offset); @@ -260,5 +272,19 @@ final class HintsDispatchExecutor } } } + + // for each hint in the hints file for a node that isn't part of the ring anymore, write RF hints for each replica + private void convert(HintsDescriptor descriptor) + { + File file = new File(hintsDirectory, descriptor.fileName()); + + try (HintsReader reader = HintsReader.open(file, rateLimiter)) + { + reader.forEach(page -> page.hintsIterator().forEachRemaining(HintsService.instance::writeForAllReplicas)); + store.delete(descriptor); + store.cleanUp(descriptor); + logger.info("Finished converting hints file {}", descriptor.fileName()); + } + } } } http://git-wip-us.apache.org/repos/asf/cassandra/blob/b2f38ef1/src/java/org/apache/cassandra/hints/HintsDispatcher.java ---------------------------------------------------------------------- diff --git a/src/java/org/apache/cassandra/hints/HintsDispatcher.java b/src/java/org/apache/cassandra/hints/HintsDispatcher.java index 94a6669..e582d88 100644 --- a/src/java/org/apache/cassandra/hints/HintsDispatcher.java +++ b/src/java/org/apache/cassandra/hints/HintsDispatcher.java @@ -32,7 +32,6 @@ import org.apache.cassandra.gms.FailureDetector; import org.apache.cassandra.net.IAsyncCallbackWithFailure; import org.apache.cassandra.net.MessageIn; import org.apache.cassandra.net.MessagingService; -import org.apache.cassandra.service.StorageService; import org.apache.cassandra.utils.concurrent.SimpleCondition; /** @@ -64,11 +63,10 @@ final class HintsDispatcher implements AutoCloseable this.isPaused = isPaused; } - static HintsDispatcher create(File file, RateLimiter rateLimiter, UUID hostId, UUID hintFor, AtomicBoolean isPaused) + static HintsDispatcher create(File file, RateLimiter rateLimiter, InetAddress address, UUID hostId, AtomicBoolean isPaused) { - InetAddress address = StorageService.instance.getEndpointForHostId(hostId); int messagingVersion = MessagingService.instance().getVersion(address); - return new HintsDispatcher(HintsReader.open(file, rateLimiter), hintFor, address, messagingVersion, isPaused); + return new HintsDispatcher(HintsReader.open(file, rateLimiter), hostId, address, messagingVersion, isPaused); } public void close() @@ -152,6 +150,7 @@ final class HintsDispatcher implements AutoCloseable /* * Sending hints in compatibility mode. */ + private <T> Action sendHints(Iterator<T> hints, Collection<Callback> callbacks, Function<T, Callback> sendFunction) { while (hints.hasNext()) http://git-wip-us.apache.org/repos/asf/cassandra/blob/b2f38ef1/src/java/org/apache/cassandra/hints/HintsService.java ---------------------------------------------------------------------- diff --git a/src/java/org/apache/cassandra/hints/HintsService.java b/src/java/org/apache/cassandra/hints/HintsService.java index 5001af4..5a32786 100644 --- a/src/java/org/apache/cassandra/hints/HintsService.java +++ b/src/java/org/apache/cassandra/hints/HintsService.java @@ -22,7 +22,6 @@ import java.lang.management.ManagementFactory; import java.net.InetAddress; import java.net.UnknownHostException; import java.util.Collections; -import java.util.Map; import java.util.UUID; import java.util.concurrent.*; import java.util.concurrent.atomic.AtomicBoolean; @@ -40,9 +39,11 @@ import org.apache.cassandra.config.DatabaseDescriptor; import org.apache.cassandra.config.ParameterizedClass; import org.apache.cassandra.metrics.HintedHandoffMetrics; import org.apache.cassandra.metrics.StorageMetrics; -import org.apache.cassandra.schema.CompressionParams; +import org.apache.cassandra.dht.Token; +import org.apache.cassandra.service.StorageProxy; import org.apache.cassandra.service.StorageService; +import static com.google.common.collect.Iterables.filter; import static com.google.common.collect.Iterables.transform; import static com.google.common.collect.Iterables.size; @@ -167,6 +168,21 @@ public final class HintsService implements HintsServiceMBean } /** + * Write a hint for all replicas. Used to re-dispatch hints whose destination is either missing or no longer correct. + */ + void writeForAllReplicas(Hint hint) + { + String keyspaceName = hint.mutation.getKeyspaceName(); + Token token = hint.mutation.key().getToken(); + + Iterable<UUID> hostIds = + transform(filter(StorageService.instance.getNaturalAndPendingEndpoints(keyspaceName, token), StorageProxy::shouldHint), + StorageService.instance::getHostIdForEndpoint); + + write(hostIds, hint); + } + + /** * Flush the buffer pool for the selected target nodes, then fsync their writers. * * @param hostIds host ids of the nodes to flush and fsync hints for http://git-wip-us.apache.org/repos/asf/cassandra/blob/b2f38ef1/src/java/org/apache/cassandra/service/StorageProxy.java ---------------------------------------------------------------------- diff --git a/src/java/org/apache/cassandra/service/StorageProxy.java b/src/java/org/apache/cassandra/service/StorageProxy.java index 8fa2082..5cebf27 100644 --- a/src/java/org/apache/cassandra/service/StorageProxy.java +++ b/src/java/org/apache/cassandra/service/StorageProxy.java @@ -71,6 +71,8 @@ import org.apache.cassandra.triggers.TriggerExecutor; import org.apache.cassandra.utils.*; import org.apache.cassandra.utils.AbstractIterator; +import static com.google.common.collect.Iterables.contains; + public class StorageProxy implements StorageProxyMBean { public static final String MBEAN_NAME = "org.apache.cassandra.db:type=StorageProxy"; @@ -670,12 +672,10 @@ public class StorageProxy implements StorageProxyMBean private static void hintMutation(Mutation mutation) { - Token tk = DatabaseDescriptor.getPartitioner().getToken(mutation.key().getKey()); - List<InetAddress> naturalEndpoints = StorageService.instance.getNaturalEndpoints(mutation.getKeyspaceName(), tk); - Collection<InetAddress> pendingEndpoints = - StorageService.instance.getTokenMetadata().pendingEndpointsFor(tk, mutation.getKeyspaceName()); + String keyspaceName = mutation.getKeyspaceName(); + Token token = mutation.key().getToken(); - Iterable<InetAddress> endpoints = Iterables.concat(naturalEndpoints, pendingEndpoints); + Iterable<InetAddress> endpoints = StorageService.instance.getNaturalAndPendingEndpoints(keyspaceName, token); ArrayList<InetAddress> endpointsToHint = new ArrayList<>(Iterables.size(endpoints)); // local writes can timeout, but cannot be dropped (see LocalMutationRunnable and CASSANDRA-6510), @@ -687,6 +687,16 @@ public class StorageProxy implements StorageProxyMBean submitHint(mutation, endpointsToHint, null); } + public boolean appliesLocally(Mutation mutation) + { + String keyspaceName = mutation.getKeyspaceName(); + Token token = mutation.key().getToken(); + InetAddress local = FBUtilities.getBroadcastAddress(); + + return StorageService.instance.getNaturalEndpoints(keyspaceName, token).contains(local) + || StorageService.instance.getTokenMetadata().pendingEndpointsFor(token, keyspaceName).contains(local); + } + /** * Use this method to have these Mutations applied * across all replicas. http://git-wip-us.apache.org/repos/asf/cassandra/blob/b2f38ef1/src/java/org/apache/cassandra/service/StorageService.java ---------------------------------------------------------------------- diff --git a/src/java/org/apache/cassandra/service/StorageService.java b/src/java/org/apache/cassandra/service/StorageService.java index 259bd10..4cdeeb0 100644 --- a/src/java/org/apache/cassandra/service/StorageService.java +++ b/src/java/org/apache/cassandra/service/StorageService.java @@ -3301,6 +3301,14 @@ public class StorageService extends NotificationBroadcasterSupport implements IE } /** + * Returns the endpoints currently responsible for storing the token plus pending ones + */ + public Iterable<InetAddress> getNaturalAndPendingEndpoints(String keyspaceName, Token token) + { + return Iterables.concat(getNaturalEndpoints(keyspaceName, token), tokenMetadata.pendingEndpointsFor(token, keyspaceName)); + } + + /** * This method attempts to return N endpoints that are responsible for storing the * specified key i.e for replication. * http://git-wip-us.apache.org/repos/asf/cassandra/blob/b2f38ef1/test/unit/org/apache/cassandra/hints/HintTest.java ---------------------------------------------------------------------- diff --git a/test/unit/org/apache/cassandra/hints/HintTest.java b/test/unit/org/apache/cassandra/hints/HintTest.java index 4c7ec70..1d486e1 100644 --- a/test/unit/org/apache/cassandra/hints/HintTest.java +++ b/test/unit/org/apache/cassandra/hints/HintTest.java @@ -18,6 +18,11 @@ package org.apache.cassandra.hints; import java.io.IOException; +import java.net.InetAddress; +import java.util.Collections; +import java.util.UUID; + +import com.google.common.collect.ImmutableList; import org.junit.Before; import org.junit.BeforeClass; @@ -26,17 +31,24 @@ import org.junit.Test; import org.apache.cassandra.SchemaLoader; import org.apache.cassandra.Util; import org.apache.cassandra.config.CFMetaData; +import org.apache.cassandra.config.DatabaseDescriptor; import org.apache.cassandra.config.Schema; import org.apache.cassandra.db.*; import org.apache.cassandra.db.partitions.FilteredPartition; import org.apache.cassandra.db.partitions.PartitionIterator; import org.apache.cassandra.db.partitions.PartitionUpdate; +import org.apache.cassandra.dht.BootStrapper; import org.apache.cassandra.io.util.DataInputBuffer; import org.apache.cassandra.io.util.DataInputPlus; import org.apache.cassandra.io.util.DataOutputBuffer; +import org.apache.cassandra.locator.TokenMetadata; +import org.apache.cassandra.metrics.StorageMetrics; +import org.apache.cassandra.net.MessageIn; import org.apache.cassandra.net.MessagingService; import org.apache.cassandra.schema.KeyspaceParams; import org.apache.cassandra.schema.TableParams; +import org.apache.cassandra.service.StorageProxy; +import org.apache.cassandra.service.StorageService; import org.apache.cassandra.utils.FBUtilities; import static junit.framework.Assert.*; @@ -67,6 +79,12 @@ public class HintTest @Before public void resetGcGraceSeconds() { + TokenMetadata tokenMeta = StorageService.instance.getTokenMetadata(); + InetAddress local = FBUtilities.getBroadcastAddress(); + tokenMeta.clearUnsafe(); + tokenMeta.updateHostId(UUID.randomUUID(), local); + tokenMeta.updateNormalTokens(BootStrapper.getRandomTokens(tokenMeta, 1), local); + for (CFMetaData table : Schema.instance.getTablesAndViews(KEYSPACE)) table.gcGraceSeconds(TableParams.DEFAULT_GC_GRACE_SECONDS); } @@ -185,6 +203,97 @@ public class HintTest assertNoPartitions(key, TABLE2); } + @SuppressWarnings("unchecked") + @Test + public void testChangedTopology() throws Exception + { + // create a hint + long now = FBUtilities.timestampMicros(); + String key = "testChangedTopology"; + Mutation mutation = createMutation(key, now); + Hint hint = Hint.create(mutation, now / 1000); + + // Prepare metadata with injected stale endpoint serving the mutation key. + TokenMetadata tokenMeta = StorageService.instance.getTokenMetadata(); + InetAddress local = FBUtilities.getBroadcastAddress(); + InetAddress endpoint = InetAddress.getByName("1.1.1.1"); + UUID localId = StorageService.instance.getLocalHostUUID(); + UUID targetId = UUID.randomUUID(); + tokenMeta.updateHostId(targetId, endpoint); + tokenMeta.updateNormalTokens(ImmutableList.of(mutation.key().getToken()), endpoint); + + // sanity check that there is no data inside yet + assertNoPartitions(key, TABLE0); + assertNoPartitions(key, TABLE1); + assertNoPartitions(key, TABLE2); + + assert StorageProxy.instance.getHintsInProgress() == 0; + long totalHintCount = StorageProxy.instance.getTotalHints(); + // Process hint message. + HintMessage message = new HintMessage(localId, hint); + MessagingService.instance().getVerbHandler(MessagingService.Verb.HINT).doVerb( + MessageIn.create(local, message, Collections.emptyMap(), MessagingService.Verb.HINT, MessagingService.current_version), + -1); + + // hint should not be applied as we no longer are a replica + assertNoPartitions(key, TABLE0); + assertNoPartitions(key, TABLE1); + assertNoPartitions(key, TABLE2); + + // Attempt to send to new endpoint should have been made. Node is not live hence it should now be a hint. + assertEquals(totalHintCount + 1, StorageProxy.instance.getTotalHints()); + } + + @SuppressWarnings("unchecked") + @Test + public void testChangedTopologyNotHintable() throws Exception + { + // create a hint + long now = FBUtilities.timestampMicros(); + String key = "testChangedTopology"; + Mutation mutation = createMutation(key, now); + Hint hint = Hint.create(mutation, now / 1000); + + // Prepare metadata with injected stale endpoint. + TokenMetadata tokenMeta = StorageService.instance.getTokenMetadata(); + InetAddress local = FBUtilities.getBroadcastAddress(); + InetAddress endpoint = InetAddress.getByName("1.1.1.1"); + UUID localId = StorageService.instance.getLocalHostUUID(); + UUID targetId = UUID.randomUUID(); + tokenMeta.updateHostId(targetId, endpoint); + tokenMeta.updateNormalTokens(ImmutableList.of(mutation.key().getToken()), endpoint); + + // sanity check that there is no data inside yet + assertNoPartitions(key, TABLE0); + assertNoPartitions(key, TABLE1); + assertNoPartitions(key, TABLE2); + + try + { + DatabaseDescriptor.setHintedHandoffEnabled(false); + + assert StorageMetrics.totalHintsInProgress.getCount() == 0; + long totalHintCount = StorageMetrics.totalHints.getCount(); + // Process hint message. + HintMessage message = new HintMessage(localId, hint); + MessagingService.instance().getVerbHandler(MessagingService.Verb.HINT).doVerb( + MessageIn.create(local, message, Collections.emptyMap(), MessagingService.Verb.HINT, MessagingService.current_version), + -1); + + // hint should not be applied as we no longer are a replica + assertNoPartitions(key, TABLE0); + assertNoPartitions(key, TABLE1); + assertNoPartitions(key, TABLE2); + + // Attempt to send to new endpoint should not have been made. + assertEquals(totalHintCount, StorageMetrics.totalHints.getCount()); + } + finally + { + DatabaseDescriptor.setHintedHandoffEnabled(true); + } + } + private static Mutation createMutation(String key, long now) { Mutation mutation = new Mutation(KEYSPACE, dk(key));