Repository: cassandra
Updated Branches:
  refs/heads/trunk 127f7c584 -> dc6f5bdb0


In mutateMV, if not yet gossiping, write all mutations to batchlog

Patch by Joel Knighton; reviewed by tjake for CASSANDRA-10413


Project: http://git-wip-us.apache.org/repos/asf/cassandra/repo
Commit: http://git-wip-us.apache.org/repos/asf/cassandra/commit/6c3fa8e3
Tree: http://git-wip-us.apache.org/repos/asf/cassandra/tree/6c3fa8e3
Diff: http://git-wip-us.apache.org/repos/asf/cassandra/diff/6c3fa8e3

Branch: refs/heads/trunk
Commit: 6c3fa8e30de21aecce35032762470bfa0fb3cb5e
Parents: e777301
Author: Joel Knighton <joel.knigh...@datastax.com>
Authored: Wed Sep 30 04:50:19 2015 +0000
Committer: T Jake Luciani <j...@apache.org>
Committed: Wed Oct 7 15:46:56 2015 -0400

----------------------------------------------------------------------
 CHANGES.txt                                     |  1 +
 .../org/apache/cassandra/db/view/ViewUtils.java |  2 +-
 .../apache/cassandra/service/StorageProxy.java  | 91 +++++++++++---------
 3 files changed, 52 insertions(+), 42 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/cassandra/blob/6c3fa8e3/CHANGES.txt
----------------------------------------------------------------------
diff --git a/CHANGES.txt b/CHANGES.txt
index ba0012e..0bac64e 100644
--- a/CHANGES.txt
+++ b/CHANGES.txt
@@ -1,4 +1,5 @@
 3.0
+ * If node is not yet gossiping write all MV updates to batchlog only 
(CASSANDRA-10413)
  * Re-populate token metadata after commit log recovery (CASSANDRA-10293)
  * Provide additional metrics for materialized views (CASSANDRA-10323)
  * Flush system schema tables after local schema changes (CASSANDRA-10429)

http://git-wip-us.apache.org/repos/asf/cassandra/blob/6c3fa8e3/src/java/org/apache/cassandra/db/view/ViewUtils.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/db/view/ViewUtils.java 
b/src/java/org/apache/cassandra/db/view/ViewUtils.java
index 628142d..ebbae65 100644
--- a/src/java/org/apache/cassandra/db/view/ViewUtils.java
+++ b/src/java/org/apache/cassandra/db/view/ViewUtils.java
@@ -94,7 +94,7 @@ public final class ViewUtils
 
             if 
(StorageService.instance.getTokenMetadata().pendingEndpointsFor(viewToken, 
keyspaceName).size() > 0)
             {
-                //Since there are pending endpoints we are going to store 
hints this in the batchlog regardless.
+                //Since there are pending endpoints we are going to write to 
the batchlog regardless.
                 //So we can pretend we are the views endpoint.
 
                 return FBUtilities.getBroadcastAddress();

http://git-wip-us.apache.org/repos/asf/cassandra/blob/6c3fa8e3/src/java/org/apache/cassandra/service/StorageProxy.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/service/StorageProxy.java 
b/src/java/org/apache/cassandra/service/StorageProxy.java
index d1142fc..f210951 100644
--- a/src/java/org/apache/cassandra/service/StorageProxy.java
+++ b/src/java/org/apache/cassandra/service/StorageProxy.java
@@ -660,55 +660,64 @@ public class StorageProxy implements StorageProxyMBean
         final String localDataCenter = 
DatabaseDescriptor.getEndpointSnitch().getDatacenter(FBUtilities.getBroadcastAddress());
 
         long startTime = System.nanoTime();
-        List<WriteResponseHandlerWrapper> wrappers = new 
ArrayList<>(mutations.size());
+
 
         try
         {
-            Token baseToken = 
StorageService.instance.getTokenMetadata().partitioner.getToken(dataKey);
-
-            ConsistencyLevel consistencyLevel = ConsistencyLevel.ONE;
-
-            //Since the base -> view replication is 1:1 we only need to store 
the BL locally
-            final Collection<InetAddress> batchlogEndpoints = 
Collections.singleton(FBUtilities.getBroadcastAddress());
+            // if we haven't joined the ring, write everything to batchlog 
because paired replicas may be stale
             final UUID batchUUID = UUIDGen.getTimeUUID();
-            BatchlogResponseHandler.BatchlogCleanup cleanup = new 
BatchlogResponseHandler.BatchlogCleanup(mutations.size(),
-                                                                               
                           () -> asyncRemoveFromBatchlog(batchlogEndpoints, 
batchUUID));
 
-            // add a handler for each mutation - includes checking 
availability, but doesn't initiate any writes, yet
-            for (Mutation mutation : mutations)
+            if (!Gossiper.instance.isEnabled())
+            {
+                BatchlogManager.store(Batch.createLocal(batchUUID, 
FBUtilities.timestampMicros(),
+                                                        mutations),
+                                      writeCommitLog);
+            }
+            else
             {
-                String keyspaceName = mutation.getKeyspaceName();
-                Token tk = mutation.key().getToken();
-                InetAddress pairedEndpoint = 
ViewUtils.getViewNaturalEndpoint(keyspaceName, baseToken, tk);
-                List<InetAddress> naturalEndpoints = 
Lists.newArrayList(pairedEndpoint);
-
-                WriteResponseHandlerWrapper wrapper = 
wrapViewBatchResponseHandler(mutation,
-                                                                               
    consistencyLevel,
-                                                                               
    consistencyLevel,
-                                                                               
    naturalEndpoints,
-                                                                               
    baseComplete,
-                                                                               
    WriteType.BATCH,
-                                                                               
    cleanup);
-
-                // When local node is the endpoint and there are no pending 
nodes we can
-                // Just apply the mutation locally.
-                if (pairedEndpoint.equals(FBUtilities.getBroadcastAddress()) 
&& wrapper.handler.pendingEndpoints.isEmpty())
+                List<WriteResponseHandlerWrapper> wrappers = new 
ArrayList<>(mutations.size());
+                Token baseToken = 
StorageService.instance.getTokenMetadata().partitioner.getToken(dataKey);
+
+                ConsistencyLevel consistencyLevel = ConsistencyLevel.ONE;
+
+                //Since the base -> view replication is 1:1 we only need to 
store the BL locally
+                final Collection<InetAddress> batchlogEndpoints = 
Collections.singleton(FBUtilities.getBroadcastAddress());
+                BatchlogResponseHandler.BatchlogCleanup cleanup = new 
BatchlogResponseHandler.BatchlogCleanup(mutations.size(),
+                                                                               
                               () -> asyncRemoveFromBatchlog(batchlogEndpoints, 
batchUUID));
+
+                // add a handler for each mutation - includes checking 
availability, but doesn't initiate any writes, yet
+                for (Mutation mutation : mutations)
                 {
-                    mutation.apply(writeCommitLog);
-                    viewWriteMetrics.viewReplicasSuccess.inc();
+                    String keyspaceName = mutation.getKeyspaceName();
+                    Token tk = mutation.key().getToken();
+                    InetAddress pairedEndpoint = 
ViewUtils.getViewNaturalEndpoint(keyspaceName, baseToken, tk);
+                    List<InetAddress> naturalEndpoints = 
Lists.newArrayList(pairedEndpoint);
+
+                    WriteResponseHandlerWrapper wrapper = 
wrapViewBatchResponseHandler(mutation,
+                                                                               
        consistencyLevel,
+                                                                               
        consistencyLevel,
+                                                                               
        naturalEndpoints,
+                                                                               
        baseComplete,
+                                                                               
        WriteType.BATCH,
+                                                                               
        cleanup);
+
+                    // When local node is the endpoint and there are no 
pending nodes we can
+                    // Just apply the mutation locally.
+                    if 
(pairedEndpoint.equals(FBUtilities.getBroadcastAddress()) && 
wrapper.handler.pendingEndpoints.isEmpty() && 
StorageService.instance.isJoined())
+                        mutation.apply(writeCommitLog);
+                    else
+                        wrappers.add(wrapper);
                 }
-                else
-                    wrappers.add(wrapper);
-            }
 
-            if (!wrappers.isEmpty())
-            {
-                // Apply to local batchlog memtable in this thread
-                BatchlogManager.store(Batch.createLocal(batchUUID, 
FBUtilities.timestampMicros(), Lists.transform(wrappers, w -> w.mutation)),
-                                      writeCommitLog);
+                if (!wrappers.isEmpty())
+                {
+                    // Apply to local batchlog memtable in this thread
+                    BatchlogManager.store(Batch.createLocal(batchUUID, 
FBUtilities.timestampMicros(), Lists.transform(wrappers, w -> w.mutation)),
+                                          writeCommitLog);
 
-                // now actually perform the writes and wait for them to 
complete
-                asyncWriteBatchedMutations(wrappers, localDataCenter, 
Stage.VIEW_MUTATION);
+                    // now actually perform the writes and wait for them to 
complete
+                    asyncWriteBatchedMutations(wrappers, localDataCenter, 
Stage.VIEW_MUTATION);
+                }
             }
         }
         finally
@@ -1081,7 +1090,7 @@ public class StorageProxy implements StorageProxyMBean
      * | off            |       ANY      | --> DO NOT fire hints. And DO NOT 
wait for them to complete.
      * }
      * </pre>
-     * 
+     *
      * @throws OverloadedException if the hints cannot be written/enqueued
      */
     public static void sendToHintedEndpoints(final Mutation mutation,
@@ -2250,7 +2259,7 @@ public class StorageProxy implements StorageProxyMBean
         }
 
         Set<InetAddress> allEndpoints = Gossiper.instance.getLiveTokenOwners();
-        
+
         int blockFor = allEndpoints.size();
         final TruncateResponseHandler responseHandler = new 
TruncateResponseHandler(blockFor);
 

Reply via email to