Repository: cassandra Updated Branches: refs/heads/trunk 127f7c584 -> dc6f5bdb0
In mutateMV, if not yet gossiping, write all mutations to batchlog Patch by Joel Knighton; reviewed by tjake for CASSANDRA-10413 Project: http://git-wip-us.apache.org/repos/asf/cassandra/repo Commit: http://git-wip-us.apache.org/repos/asf/cassandra/commit/6c3fa8e3 Tree: http://git-wip-us.apache.org/repos/asf/cassandra/tree/6c3fa8e3 Diff: http://git-wip-us.apache.org/repos/asf/cassandra/diff/6c3fa8e3 Branch: refs/heads/trunk Commit: 6c3fa8e30de21aecce35032762470bfa0fb3cb5e Parents: e777301 Author: Joel Knighton <joel.knigh...@datastax.com> Authored: Wed Sep 30 04:50:19 2015 +0000 Committer: T Jake Luciani <j...@apache.org> Committed: Wed Oct 7 15:46:56 2015 -0400 ---------------------------------------------------------------------- CHANGES.txt | 1 + .../org/apache/cassandra/db/view/ViewUtils.java | 2 +- .../apache/cassandra/service/StorageProxy.java | 91 +++++++++++--------- 3 files changed, 52 insertions(+), 42 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/cassandra/blob/6c3fa8e3/CHANGES.txt ---------------------------------------------------------------------- diff --git a/CHANGES.txt b/CHANGES.txt index ba0012e..0bac64e 100644 --- a/CHANGES.txt +++ b/CHANGES.txt @@ -1,4 +1,5 @@ 3.0 + * If node is not yet gossiping write all MV updates to batchlog only (CASSANDRA-10413) * Re-populate token metadata after commit log recovery (CASSANDRA-10293) * Provide additional metrics for materialized views (CASSANDRA-10323) * Flush system schema tables after local schema changes (CASSANDRA-10429) http://git-wip-us.apache.org/repos/asf/cassandra/blob/6c3fa8e3/src/java/org/apache/cassandra/db/view/ViewUtils.java ---------------------------------------------------------------------- diff --git a/src/java/org/apache/cassandra/db/view/ViewUtils.java b/src/java/org/apache/cassandra/db/view/ViewUtils.java index 628142d..ebbae65 100644 --- a/src/java/org/apache/cassandra/db/view/ViewUtils.java +++ b/src/java/org/apache/cassandra/db/view/ViewUtils.java @@ -94,7 +94,7 @@ public final class ViewUtils if (StorageService.instance.getTokenMetadata().pendingEndpointsFor(viewToken, keyspaceName).size() > 0) { - //Since there are pending endpoints we are going to store hints this in the batchlog regardless. + //Since there are pending endpoints we are going to write to the batchlog regardless. //So we can pretend we are the views endpoint. return FBUtilities.getBroadcastAddress(); http://git-wip-us.apache.org/repos/asf/cassandra/blob/6c3fa8e3/src/java/org/apache/cassandra/service/StorageProxy.java ---------------------------------------------------------------------- diff --git a/src/java/org/apache/cassandra/service/StorageProxy.java b/src/java/org/apache/cassandra/service/StorageProxy.java index d1142fc..f210951 100644 --- a/src/java/org/apache/cassandra/service/StorageProxy.java +++ b/src/java/org/apache/cassandra/service/StorageProxy.java @@ -660,55 +660,64 @@ public class StorageProxy implements StorageProxyMBean final String localDataCenter = DatabaseDescriptor.getEndpointSnitch().getDatacenter(FBUtilities.getBroadcastAddress()); long startTime = System.nanoTime(); - List<WriteResponseHandlerWrapper> wrappers = new ArrayList<>(mutations.size()); + try { - Token baseToken = StorageService.instance.getTokenMetadata().partitioner.getToken(dataKey); - - ConsistencyLevel consistencyLevel = ConsistencyLevel.ONE; - - //Since the base -> view replication is 1:1 we only need to store the BL locally - final Collection<InetAddress> batchlogEndpoints = Collections.singleton(FBUtilities.getBroadcastAddress()); + // if we haven't joined the ring, write everything to batchlog because paired replicas may be stale final UUID batchUUID = UUIDGen.getTimeUUID(); - BatchlogResponseHandler.BatchlogCleanup cleanup = new BatchlogResponseHandler.BatchlogCleanup(mutations.size(), - () -> asyncRemoveFromBatchlog(batchlogEndpoints, batchUUID)); - // add a handler for each mutation - includes checking availability, but doesn't initiate any writes, yet - for (Mutation mutation : mutations) + if (!Gossiper.instance.isEnabled()) + { + BatchlogManager.store(Batch.createLocal(batchUUID, FBUtilities.timestampMicros(), + mutations), + writeCommitLog); + } + else { - String keyspaceName = mutation.getKeyspaceName(); - Token tk = mutation.key().getToken(); - InetAddress pairedEndpoint = ViewUtils.getViewNaturalEndpoint(keyspaceName, baseToken, tk); - List<InetAddress> naturalEndpoints = Lists.newArrayList(pairedEndpoint); - - WriteResponseHandlerWrapper wrapper = wrapViewBatchResponseHandler(mutation, - consistencyLevel, - consistencyLevel, - naturalEndpoints, - baseComplete, - WriteType.BATCH, - cleanup); - - // When local node is the endpoint and there are no pending nodes we can - // Just apply the mutation locally. - if (pairedEndpoint.equals(FBUtilities.getBroadcastAddress()) && wrapper.handler.pendingEndpoints.isEmpty()) + List<WriteResponseHandlerWrapper> wrappers = new ArrayList<>(mutations.size()); + Token baseToken = StorageService.instance.getTokenMetadata().partitioner.getToken(dataKey); + + ConsistencyLevel consistencyLevel = ConsistencyLevel.ONE; + + //Since the base -> view replication is 1:1 we only need to store the BL locally + final Collection<InetAddress> batchlogEndpoints = Collections.singleton(FBUtilities.getBroadcastAddress()); + BatchlogResponseHandler.BatchlogCleanup cleanup = new BatchlogResponseHandler.BatchlogCleanup(mutations.size(), + () -> asyncRemoveFromBatchlog(batchlogEndpoints, batchUUID)); + + // add a handler for each mutation - includes checking availability, but doesn't initiate any writes, yet + for (Mutation mutation : mutations) { - mutation.apply(writeCommitLog); - viewWriteMetrics.viewReplicasSuccess.inc(); + String keyspaceName = mutation.getKeyspaceName(); + Token tk = mutation.key().getToken(); + InetAddress pairedEndpoint = ViewUtils.getViewNaturalEndpoint(keyspaceName, baseToken, tk); + List<InetAddress> naturalEndpoints = Lists.newArrayList(pairedEndpoint); + + WriteResponseHandlerWrapper wrapper = wrapViewBatchResponseHandler(mutation, + consistencyLevel, + consistencyLevel, + naturalEndpoints, + baseComplete, + WriteType.BATCH, + cleanup); + + // When local node is the endpoint and there are no pending nodes we can + // Just apply the mutation locally. + if (pairedEndpoint.equals(FBUtilities.getBroadcastAddress()) && wrapper.handler.pendingEndpoints.isEmpty() && StorageService.instance.isJoined()) + mutation.apply(writeCommitLog); + else + wrappers.add(wrapper); } - else - wrappers.add(wrapper); - } - if (!wrappers.isEmpty()) - { - // Apply to local batchlog memtable in this thread - BatchlogManager.store(Batch.createLocal(batchUUID, FBUtilities.timestampMicros(), Lists.transform(wrappers, w -> w.mutation)), - writeCommitLog); + if (!wrappers.isEmpty()) + { + // Apply to local batchlog memtable in this thread + BatchlogManager.store(Batch.createLocal(batchUUID, FBUtilities.timestampMicros(), Lists.transform(wrappers, w -> w.mutation)), + writeCommitLog); - // now actually perform the writes and wait for them to complete - asyncWriteBatchedMutations(wrappers, localDataCenter, Stage.VIEW_MUTATION); + // now actually perform the writes and wait for them to complete + asyncWriteBatchedMutations(wrappers, localDataCenter, Stage.VIEW_MUTATION); + } } } finally @@ -1081,7 +1090,7 @@ public class StorageProxy implements StorageProxyMBean * | off | ANY | --> DO NOT fire hints. And DO NOT wait for them to complete. * } * </pre> - * + * * @throws OverloadedException if the hints cannot be written/enqueued */ public static void sendToHintedEndpoints(final Mutation mutation, @@ -2250,7 +2259,7 @@ public class StorageProxy implements StorageProxyMBean } Set<InetAddress> allEndpoints = Gossiper.instance.getLiveTokenOwners(); - + int blockFor = allEndpoints.size(); final TruncateResponseHandler responseHandler = new TruncateResponseHandler(blockFor);