Fix pending view mutations handling and cleanup batchlog when there are local and remote paired mutations
Patch by Paulo Motta; Reviewed by Sylvain Lebresne for CASSANDRA-13069 Project: http://git-wip-us.apache.org/repos/asf/cassandra/repo Commit: http://git-wip-us.apache.org/repos/asf/cassandra/commit/12103653 Tree: http://git-wip-us.apache.org/repos/asf/cassandra/tree/12103653 Diff: http://git-wip-us.apache.org/repos/asf/cassandra/diff/12103653 Branch: refs/heads/cassandra-3.11 Commit: 12103653f313d6f1ef030a535986123ddcffea9c Parents: e86bef4 Author: Paulo Motta <pauloricard...@gmail.com> Authored: Wed Dec 21 20:19:21 2016 -0200 Committer: Paulo Motta <pa...@apache.org> Committed: Tue Sep 12 08:30:24 2017 -0500 ---------------------------------------------------------------------- CHANGES.txt | 1 + .../cassandra/batchlog/BatchlogManager.java | 3 +- .../service/BatchlogResponseHandler.java | 4 +- .../apache/cassandra/service/StorageProxy.java | 82 +++++++++----------- 4 files changed, 41 insertions(+), 49 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/cassandra/blob/12103653/CHANGES.txt ---------------------------------------------------------------------- diff --git a/CHANGES.txt b/CHANGES.txt index 76d155e..26b1794 100644 --- a/CHANGES.txt +++ b/CHANGES.txt @@ -1,4 +1,5 @@ 3.0.15 + * Fix pending view mutations handling and cleanup batchlog when there are local and remote paired mutations (CASSANDRA-13069) * Range deletes in a CAS batch are ignored (CASSANDRA-13655) * Change repair midpoint logging for tiny ranges (CASSANDRA-13603) * Better handle corrupt final commitlog segment (CASSANDRA-11995) http://git-wip-us.apache.org/repos/asf/cassandra/blob/12103653/src/java/org/apache/cassandra/batchlog/BatchlogManager.java ---------------------------------------------------------------------- diff --git a/src/java/org/apache/cassandra/batchlog/BatchlogManager.java b/src/java/org/apache/cassandra/batchlog/BatchlogManager.java index b614fc5..a0b614f 100644 --- a/src/java/org/apache/cassandra/batchlog/BatchlogManager.java +++ b/src/java/org/apache/cassandra/batchlog/BatchlogManager.java @@ -67,6 +67,7 @@ public class BatchlogManager implements BatchlogManagerMBean private static final Logger logger = LoggerFactory.getLogger(BatchlogManager.class); public static final BatchlogManager instance = new BatchlogManager(); + public static final long BATCHLOG_REPLAY_TIMEOUT = Long.getLong("cassandra.batchlog.replay_timeout_in_ms", DatabaseDescriptor.getWriteRpcTimeout() * 2); private volatile long totalBatchesReplayed = 0; // no concurrency protection necessary as only written by replay thread. private volatile UUID lastReplayedUuid = UUIDGen.minTimeUUID(0); @@ -284,7 +285,7 @@ public class BatchlogManager implements BatchlogManagerMBean public static long getBatchlogTimeout() { - return DatabaseDescriptor.getWriteRpcTimeout() * 2; // enough time for the actual write + BM removal mutation + return BATCHLOG_REPLAY_TIMEOUT; // enough time for the actual write + BM removal mutation } private static class ReplayingBatch http://git-wip-us.apache.org/repos/asf/cassandra/blob/12103653/src/java/org/apache/cassandra/service/BatchlogResponseHandler.java ---------------------------------------------------------------------- diff --git a/src/java/org/apache/cassandra/service/BatchlogResponseHandler.java b/src/java/org/apache/cassandra/service/BatchlogResponseHandler.java index ac44923..a1477e6 100644 --- a/src/java/org/apache/cassandra/service/BatchlogResponseHandler.java +++ b/src/java/org/apache/cassandra/service/BatchlogResponseHandler.java @@ -50,7 +50,7 @@ public class BatchlogResponseHandler<T> extends AbstractWriteResponseHandler<T> { wrapped.response(msg); if (requiredBeforeFinishUpdater.decrementAndGet(this) == 0) - cleanup.run(); + cleanup.ackMutation(); } public boolean isLatencyForSnitch() @@ -107,7 +107,7 @@ public class BatchlogResponseHandler<T> extends AbstractWriteResponseHandler<T> this.callback = callback; } - public void run() + public void ackMutation() { if (mutationsWaitingForUpdater.decrementAndGet(this) == 0) callback.invoke(); http://git-wip-us.apache.org/repos/asf/cassandra/blob/12103653/src/java/org/apache/cassandra/service/StorageProxy.java ---------------------------------------------------------------------- diff --git a/src/java/org/apache/cassandra/service/StorageProxy.java b/src/java/org/apache/cassandra/service/StorageProxy.java index 6610cf7..1ce1bc5 100644 --- a/src/java/org/apache/cassandra/service/StorageProxy.java +++ b/src/java/org/apache/cassandra/service/StorageProxy.java @@ -729,15 +729,16 @@ public class StorageProxy implements StorageProxyMBean else { List<WriteResponseHandlerWrapper> wrappers = new ArrayList<>(mutations.size()); - List<Mutation> nonPairedMutations = new LinkedList<>(); + //non-local mutations rely on the base mutation commit-log entry for eventual consistency + Set<Mutation> nonLocalMutations = new HashSet<>(mutations); Token baseToken = StorageService.instance.getTokenMetadata().partitioner.getToken(dataKey); ConsistencyLevel consistencyLevel = ConsistencyLevel.ONE; //Since the base -> view replication is 1:1 we only need to store the BL locally final Collection<InetAddress> batchlogEndpoints = Collections.singleton(FBUtilities.getBroadcastAddress()); - BatchlogResponseHandler.BatchlogCleanup cleanup = new BatchlogResponseHandler.BatchlogCleanup(mutations.size(), - () -> asyncRemoveFromBatchlog(batchlogEndpoints, batchUUID)); + BatchlogResponseHandler.BatchlogCleanup cleanup = new BatchlogResponseHandler.BatchlogCleanup(mutations.size(), () -> asyncRemoveFromBatchlog(batchlogEndpoints, batchUUID)); + // add a handler for each mutation - includes checking availability, but doesn't initiate any writes, yet for (Mutation mutation : mutations) { @@ -746,61 +747,50 @@ public class StorageProxy implements StorageProxyMBean Optional<InetAddress> pairedEndpoint = ViewUtils.getViewNaturalEndpoint(keyspaceName, baseToken, tk); Collection<InetAddress> pendingEndpoints = StorageService.instance.getTokenMetadata().pendingEndpointsFor(tk, keyspaceName); - if (pairedEndpoint.isPresent()) - { - // When local node is the endpoint and there are no pending nodes we can - // Just apply the mutation locally. - if (pairedEndpoint.get().equals(FBUtilities.getBroadcastAddress()) - && pendingEndpoints.isEmpty() && StorageService.instance.isJoined()) - try - { - mutation.apply(writeCommitLog); - } - catch (Exception exc) - { - logger.error("Error applying local view update to keyspace {}: {}", mutation.getKeyspaceName(), mutation); - throw exc; - } - else - { - wrappers.add(wrapViewBatchResponseHandler(mutation, - consistencyLevel, - consistencyLevel, - Collections.singletonList(pairedEndpoint.get()), - baseComplete, - WriteType.BATCH, - cleanup)); - } - } - else + // if there are no paired endpoints there are probably range movements going on, so we write to the local batchlog to replay later + if (!pairedEndpoint.isPresent()) { - //if there are no paired endpoints there are probably range movements going on, - //so we write to the local batchlog to replay later if (pendingEndpoints.isEmpty()) logger.warn("Received base materialized view mutation for key {} that does not belong " + "to this node. There is probably a range movement happening (move or decommission)," + "but this node hasn't updated its ring metadata yet. Adding mutation to " + "local batchlog to be replayed later.", mutation.key()); - nonPairedMutations.add(mutation); + continue; + } + + // When local node is the paired endpoint just apply the mutation locally. + if (pairedEndpoint.get().equals(FBUtilities.getBroadcastAddress()) && StorageService.instance.isJoined()) + try + { + mutation.apply(writeCommitLog); + nonLocalMutations.remove(mutation); + cleanup.ackMutation(); + } + catch (Exception exc) + { + logger.error("Error applying local view update to keyspace {}: {}", mutation.getKeyspaceName(), mutation); + throw exc; + } + else + { + wrappers.add(wrapViewBatchResponseHandler(mutation, + consistencyLevel, + consistencyLevel, + Collections.singletonList(pairedEndpoint.get()), + baseComplete, + WriteType.BATCH, + cleanup)); } } - if (!wrappers.isEmpty()) - { - // Apply to local batchlog memtable in this thread - BatchlogManager.store(Batch.createLocal(batchUUID, FBUtilities.timestampMicros(), Lists.transform(wrappers, w -> w.mutation)), - writeCommitLog); + // Apply to local batchlog memtable in this thread + if (!nonLocalMutations.isEmpty()) + BatchlogManager.store(Batch.createLocal(batchUUID, FBUtilities.timestampMicros(), nonLocalMutations), writeCommitLog); - // now actually perform the writes and wait for them to complete + // Perform remote writes + if (!wrappers.isEmpty()) asyncWriteBatchedMutations(wrappers, localDataCenter, Stage.VIEW_MUTATION); - } - - if (!nonPairedMutations.isEmpty()) - { - BatchlogManager.store(Batch.createLocal(batchUUID, FBUtilities.timestampMicros(), nonPairedMutations), - writeCommitLog); - } } } finally --------------------------------------------------------------------- To unsubscribe, e-mail: commits-unsubscr...@cassandra.apache.org For additional commands, e-mail: commits-h...@cassandra.apache.org