Repository: cassandra
Updated Branches:
  refs/heads/cassandra-3.0 e86bef439 -> 12103653f
  refs/heads/cassandra-3.11 b64a4e4f6 -> eb027a1de
  refs/heads/trunk 471835815 -> 826c9f4a6


Fix pending view mutations handling and cleanup batchlog when there are local 
and remote paired mutations

Patch by Paulo Motta; Reviewed by Sylvain Lebresne for CASSANDRA-13069


Project: http://git-wip-us.apache.org/repos/asf/cassandra/repo
Commit: http://git-wip-us.apache.org/repos/asf/cassandra/commit/12103653
Tree: http://git-wip-us.apache.org/repos/asf/cassandra/tree/12103653
Diff: http://git-wip-us.apache.org/repos/asf/cassandra/diff/12103653

Branch: refs/heads/cassandra-3.0
Commit: 12103653f313d6f1ef030a535986123ddcffea9c
Parents: e86bef4
Author: Paulo Motta <pauloricard...@gmail.com>
Authored: Wed Dec 21 20:19:21 2016 -0200
Committer: Paulo Motta <pa...@apache.org>
Committed: Tue Sep 12 08:30:24 2017 -0500

----------------------------------------------------------------------
 CHANGES.txt                                     |  1 +
 .../cassandra/batchlog/BatchlogManager.java     |  3 +-
 .../service/BatchlogResponseHandler.java        |  4 +-
 .../apache/cassandra/service/StorageProxy.java  | 82 +++++++++-----------
 4 files changed, 41 insertions(+), 49 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/cassandra/blob/12103653/CHANGES.txt
----------------------------------------------------------------------
diff --git a/CHANGES.txt b/CHANGES.txt
index 76d155e..26b1794 100644
--- a/CHANGES.txt
+++ b/CHANGES.txt
@@ -1,4 +1,5 @@
 3.0.15
+ * Fix pending view mutations handling and cleanup batchlog when there are 
local and remote paired mutations (CASSANDRA-13069)
  * Range deletes in a CAS batch are ignored (CASSANDRA-13655)
  * Change repair midpoint logging for tiny ranges (CASSANDRA-13603)
  * Better handle corrupt final commitlog segment (CASSANDRA-11995)

http://git-wip-us.apache.org/repos/asf/cassandra/blob/12103653/src/java/org/apache/cassandra/batchlog/BatchlogManager.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/batchlog/BatchlogManager.java 
b/src/java/org/apache/cassandra/batchlog/BatchlogManager.java
index b614fc5..a0b614f 100644
--- a/src/java/org/apache/cassandra/batchlog/BatchlogManager.java
+++ b/src/java/org/apache/cassandra/batchlog/BatchlogManager.java
@@ -67,6 +67,7 @@ public class BatchlogManager implements BatchlogManagerMBean
 
     private static final Logger logger = 
LoggerFactory.getLogger(BatchlogManager.class);
     public static final BatchlogManager instance = new BatchlogManager();
+    public static final long BATCHLOG_REPLAY_TIMEOUT = 
Long.getLong("cassandra.batchlog.replay_timeout_in_ms", 
DatabaseDescriptor.getWriteRpcTimeout() * 2);
 
     private volatile long totalBatchesReplayed = 0; // no concurrency 
protection necessary as only written by replay thread.
     private volatile UUID lastReplayedUuid = UUIDGen.minTimeUUID(0);
@@ -284,7 +285,7 @@ public class BatchlogManager implements BatchlogManagerMBean
 
     public static long getBatchlogTimeout()
     {
-        return DatabaseDescriptor.getWriteRpcTimeout() * 2; // enough time for 
the actual write + BM removal mutation
+        return BATCHLOG_REPLAY_TIMEOUT; // enough time for the actual write + 
BM removal mutation
     }
 
     private static class ReplayingBatch

http://git-wip-us.apache.org/repos/asf/cassandra/blob/12103653/src/java/org/apache/cassandra/service/BatchlogResponseHandler.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/service/BatchlogResponseHandler.java 
b/src/java/org/apache/cassandra/service/BatchlogResponseHandler.java
index ac44923..a1477e6 100644
--- a/src/java/org/apache/cassandra/service/BatchlogResponseHandler.java
+++ b/src/java/org/apache/cassandra/service/BatchlogResponseHandler.java
@@ -50,7 +50,7 @@ public class BatchlogResponseHandler<T> extends 
AbstractWriteResponseHandler<T>
     {
         wrapped.response(msg);
         if (requiredBeforeFinishUpdater.decrementAndGet(this) == 0)
-            cleanup.run();
+            cleanup.ackMutation();
     }
 
     public boolean isLatencyForSnitch()
@@ -107,7 +107,7 @@ public class BatchlogResponseHandler<T> extends 
AbstractWriteResponseHandler<T>
             this.callback = callback;
         }
 
-        public void run()
+        public void ackMutation()
         {
             if (mutationsWaitingForUpdater.decrementAndGet(this) == 0)
                 callback.invoke();

http://git-wip-us.apache.org/repos/asf/cassandra/blob/12103653/src/java/org/apache/cassandra/service/StorageProxy.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/service/StorageProxy.java 
b/src/java/org/apache/cassandra/service/StorageProxy.java
index 6610cf7..1ce1bc5 100644
--- a/src/java/org/apache/cassandra/service/StorageProxy.java
+++ b/src/java/org/apache/cassandra/service/StorageProxy.java
@@ -729,15 +729,16 @@ public class StorageProxy implements StorageProxyMBean
             else
             {
                 List<WriteResponseHandlerWrapper> wrappers = new 
ArrayList<>(mutations.size());
-                List<Mutation> nonPairedMutations = new LinkedList<>();
+                //non-local mutations rely on the base mutation commit-log 
entry for eventual consistency
+                Set<Mutation> nonLocalMutations = new HashSet<>(mutations);
                 Token baseToken = 
StorageService.instance.getTokenMetadata().partitioner.getToken(dataKey);
 
                 ConsistencyLevel consistencyLevel = ConsistencyLevel.ONE;
 
                 //Since the base -> view replication is 1:1 we only need to 
store the BL locally
                 final Collection<InetAddress> batchlogEndpoints = 
Collections.singleton(FBUtilities.getBroadcastAddress());
-                BatchlogResponseHandler.BatchlogCleanup cleanup = new 
BatchlogResponseHandler.BatchlogCleanup(mutations.size(),
-                                                                               
                               () -> asyncRemoveFromBatchlog(batchlogEndpoints, 
batchUUID));
+                BatchlogResponseHandler.BatchlogCleanup cleanup = new 
BatchlogResponseHandler.BatchlogCleanup(mutations.size(), () -> 
asyncRemoveFromBatchlog(batchlogEndpoints, batchUUID));
+
                 // add a handler for each mutation - includes checking 
availability, but doesn't initiate any writes, yet
                 for (Mutation mutation : mutations)
                 {
@@ -746,61 +747,50 @@ public class StorageProxy implements StorageProxyMBean
                     Optional<InetAddress> pairedEndpoint = 
ViewUtils.getViewNaturalEndpoint(keyspaceName, baseToken, tk);
                     Collection<InetAddress> pendingEndpoints = 
StorageService.instance.getTokenMetadata().pendingEndpointsFor(tk, 
keyspaceName);
 
-                    if (pairedEndpoint.isPresent())
-                    {
-                        // When local node is the endpoint and there are no 
pending nodes we can
-                        // Just apply the mutation locally.
-                        if 
(pairedEndpoint.get().equals(FBUtilities.getBroadcastAddress())
-                            && pendingEndpoints.isEmpty() && 
StorageService.instance.isJoined())
-                            try
-                            {
-                                mutation.apply(writeCommitLog);
-                            }
-                            catch (Exception exc)
-                            {
-                                logger.error("Error applying local view update 
to keyspace {}: {}", mutation.getKeyspaceName(), mutation);
-                                throw exc;
-                            }
-                        else
-                        {
-                            wrappers.add(wrapViewBatchResponseHandler(mutation,
-                                                                      
consistencyLevel,
-                                                                      
consistencyLevel,
-                                                                      
Collections.singletonList(pairedEndpoint.get()),
-                                                                      
baseComplete,
-                                                                      
WriteType.BATCH,
-                                                                      
cleanup));
-                        }
-                    }
-                    else
+                    // if there are no paired endpoints there are probably 
range movements going on, so we write to the local batchlog to replay later
+                    if (!pairedEndpoint.isPresent())
                     {
-                        //if there are no paired endpoints there are probably 
range movements going on,
-                        //so we write to the local batchlog to replay later
                         if (pendingEndpoints.isEmpty())
                             logger.warn("Received base materialized view 
mutation for key {} that does not belong " +
                                         "to this node. There is probably a 
range movement happening (move or decommission)," +
                                         "but this node hasn't updated its ring 
metadata yet. Adding mutation to " +
                                         "local batchlog to be replayed later.",
                                         mutation.key());
-                        nonPairedMutations.add(mutation);
+                        continue;
+                    }
+
+                    // When local node is the paired endpoint just apply the 
mutation locally.
+                    if 
(pairedEndpoint.get().equals(FBUtilities.getBroadcastAddress()) && 
StorageService.instance.isJoined())
+                        try
+                        {
+                            mutation.apply(writeCommitLog);
+                            nonLocalMutations.remove(mutation);
+                            cleanup.ackMutation();
+                        }
+                        catch (Exception exc)
+                        {
+                            logger.error("Error applying local view update to 
keyspace {}: {}", mutation.getKeyspaceName(), mutation);
+                            throw exc;
+                        }
+                    else
+                    {
+                        wrappers.add(wrapViewBatchResponseHandler(mutation,
+                                                                  
consistencyLevel,
+                                                                  
consistencyLevel,
+                                                                  
Collections.singletonList(pairedEndpoint.get()),
+                                                                  baseComplete,
+                                                                  
WriteType.BATCH,
+                                                                  cleanup));
                     }
                 }
 
-                if (!wrappers.isEmpty())
-                {
-                    // Apply to local batchlog memtable in this thread
-                    BatchlogManager.store(Batch.createLocal(batchUUID, 
FBUtilities.timestampMicros(), Lists.transform(wrappers, w -> w.mutation)),
-                                          writeCommitLog);
+                // Apply to local batchlog memtable in this thread
+                if (!nonLocalMutations.isEmpty())
+                    BatchlogManager.store(Batch.createLocal(batchUUID, 
FBUtilities.timestampMicros(), nonLocalMutations), writeCommitLog);
 
-                    // now actually perform the writes and wait for them to 
complete
+                // Perform remote writes
+                if (!wrappers.isEmpty())
                     asyncWriteBatchedMutations(wrappers, localDataCenter, 
Stage.VIEW_MUTATION);
-                }
-
-                if (!nonPairedMutations.isEmpty())
-                {
-                    BatchlogManager.store(Batch.createLocal(batchUUID, 
FBUtilities.timestampMicros(), nonPairedMutations),
-                                          writeCommitLog);
-                }
             }
         }
         finally


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscr...@cassandra.apache.org
For additional commands, e-mail: commits-h...@cassandra.apache.org

Reply via email to