Repository: cassandra
Updated Branches:
  refs/heads/cassandra-2.2 7d2fdfeb4 -> 9cd7d540d
  refs/heads/cassandra-3.0 eb41380cc -> 59b40b317
  refs/heads/cassandra-3.X f33cd55a5 -> 96d67b109
  refs/heads/trunk 29cb59106 -> f1c3aac76


Avoid blocking gossip during pending range calculation

patch by Stefan Podkowinski; reviewed by Joel Knighton for CASSANDRA-12281


Project: http://git-wip-us.apache.org/repos/asf/cassandra/repo
Commit: http://git-wip-us.apache.org/repos/asf/cassandra/commit/9cd7d540
Tree: http://git-wip-us.apache.org/repos/asf/cassandra/tree/9cd7d540
Diff: http://git-wip-us.apache.org/repos/asf/cassandra/diff/9cd7d540

Branch: refs/heads/cassandra-2.2
Commit: 9cd7d540de2ea525982d139a4c8a11233c4e98c9
Parents: 7d2fdfe
Author: Stefan Podkowinski <s.podkowin...@gmail.com>
Authored: Fri Oct 21 11:34:53 2016 +0200
Committer: Aleksey Yeschenko <alek...@apache.org>
Committed: Fri Nov 18 17:29:14 2016 +0000

----------------------------------------------------------------------
 CHANGES.txt                                     |   1 +
 .../apache/cassandra/locator/TokenMetadata.java | 211 +++++++++++--------
 .../service/PendingRangeCalculatorService.java  |  10 +-
 .../gms/PendingRangeCalculatorServiceTest.java  | 133 ++++++++++++
 4 files changed, 267 insertions(+), 88 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/cassandra/blob/9cd7d540/CHANGES.txt
----------------------------------------------------------------------
diff --git a/CHANGES.txt b/CHANGES.txt
index 54dc4b5..5a2e0ab 100644
--- a/CHANGES.txt
+++ b/CHANGES.txt
@@ -1,4 +1,5 @@
 2.2.9
+ * Avoid blocking gossip during pending range calculation (CASSANDRA-12281)
  * Fix purgeability of tombstones with max timestamp (CASSANDRA-12792)
  * Fail repair if participant dies during sync or anticompaction 
(CASSANDRA-12901)
  * cqlsh COPY: unprotected pk values before converting them if not using 
prepared statements (CASSANDRA-12863)

http://git-wip-us.apache.org/repos/asf/cassandra/blob/9cd7d540/src/java/org/apache/cassandra/locator/TokenMetadata.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/locator/TokenMetadata.java 
b/src/java/org/apache/cassandra/locator/TokenMetadata.java
index b06c9c8..aafd7f9 100644
--- a/src/java/org/apache/cassandra/locator/TokenMetadata.java
+++ b/src/java/org/apache/cassandra/locator/TokenMetadata.java
@@ -649,12 +649,7 @@ public class TokenMetadata
         lock.readLock().lock();
         try
         {
-            TokenMetadata allLeftMetadata = cloneOnlyTokenMap();
-
-            for (InetAddress endpoint : leavingEndpoints)
-                allLeftMetadata.removeEndpoint(endpoint);
-
-            return allLeftMetadata;
+            return removeEndpoints(cloneOnlyTokenMap(), leavingEndpoints);
         }
         finally
         {
@@ -662,6 +657,14 @@ public class TokenMetadata
         }
     }
 
+    private static TokenMetadata removeEndpoints(TokenMetadata 
allLeftMetadata, Set<InetAddress> leavingEndpoints)
+    {
+        for (InetAddress endpoint : leavingEndpoints)
+            allLeftMetadata.removeEndpoint(endpoint);
+
+        return allLeftMetadata;
+    }
+
     /**
      * Create a copy of TokenMetadata with tokenToEndpointMap reflecting 
situation after all
      * current leave, and move operations have finished.
@@ -787,118 +790,154 @@ public class TokenMetadata
      */
     public void calculatePendingRanges(AbstractReplicationStrategy strategy, 
String keyspaceName)
     {
-        lock.readLock().lock();
-        try
+        // avoid race between both branches - do not use a lock here as this 
will block any other unrelated operations!
+        synchronized (pendingRanges)
         {
-            PendingRangeMaps newPendingRanges = new PendingRangeMaps();
-
             if (bootstrapTokens.isEmpty() && leavingEndpoints.isEmpty() && 
movingEndpoints.isEmpty())
             {
                 if (logger.isTraceEnabled())
                     logger.trace("No bootstrapping, leaving or moving nodes -> 
empty pending ranges for {}", keyspaceName);
 
-                pendingRanges.put(keyspaceName, newPendingRanges);
-                return;
+                pendingRanges.put(keyspaceName, new PendingRangeMaps());
             }
+            else
+            {
+                if (logger.isDebugEnabled())
+                    logger.debug("Starting pending range calculation for {}", 
keyspaceName);
 
-            Multimap<InetAddress, Range<Token>> addressRanges = 
strategy.getAddressRanges();
-
-            // Copy of metadata reflecting the situation after all leave 
operations are finished.
-            TokenMetadata allLeftMetadata = cloneAfterAllLeft();
+                long startedAt = System.currentTimeMillis();
 
-            // get all ranges that will be affected by leaving nodes
-            Set<Range<Token>> affectedRanges = new HashSet<Range<Token>>();
-            for (InetAddress endpoint : leavingEndpoints)
-                affectedRanges.addAll(addressRanges.get(endpoint));
+                // create clone of current state
+                BiMultiValMap<Token, InetAddress> bootstrapTokens = new 
BiMultiValMap<>();
+                Set<InetAddress> leavingEndpoints = new HashSet<>();
+                Set<Pair<Token, InetAddress>> movingEndpoints = new 
HashSet<>();
+                TokenMetadata metadata;
 
-            // for each of those ranges, find what new nodes will be 
responsible for the range when
-            // all leaving nodes are gone.
-            TokenMetadata metadata = cloneOnlyTokenMap(); // don't do this in 
the loop! #7758
-            for (Range<Token> range : affectedRanges)
-            {
-                Set<InetAddress> currentEndpoints = 
ImmutableSet.copyOf(strategy.calculateNaturalEndpoints(range.right, metadata));
-                Set<InetAddress> newEndpoints = 
ImmutableSet.copyOf(strategy.calculateNaturalEndpoints(range.right, 
allLeftMetadata));
-                for (InetAddress address : Sets.difference(newEndpoints, 
currentEndpoints))
+                lock.readLock().lock();
+                try
+                {
+                    bootstrapTokens.putAll(this.bootstrapTokens);
+                    leavingEndpoints.addAll(this.leavingEndpoints);
+                    movingEndpoints.addAll(this.movingEndpoints);
+                    metadata = this.cloneOnlyTokenMap();
+                }
+                finally
                 {
-                    newPendingRanges.addPendingRange(range, address);
+                    lock.readLock().unlock();
                 }
+
+                pendingRanges.put(keyspaceName, 
calculatePendingRanges(strategy, metadata, bootstrapTokens,
+                                                                       
leavingEndpoints, movingEndpoints));
+                long took = System.currentTimeMillis() - startedAt;
+
+                if (logger.isDebugEnabled())
+                    logger.debug("Pending range calculation for {} completed 
(took: {}ms)", keyspaceName, took);
+                if (logger.isTraceEnabled())
+                    logger.trace("Calculated pending ranges for {}:\n{}", 
keyspaceName, (pendingRanges.isEmpty() ? "<empty>" : printPendingRanges()));
             }
+        }
+    }
+
+    /**
+     * @see TokenMetadata#calculatePendingRanges(AbstractReplicationStrategy, 
String)
+     */
+    private static PendingRangeMaps 
calculatePendingRanges(AbstractReplicationStrategy strategy,
+                                                           TokenMetadata 
metadata,
+                                                           
BiMultiValMap<Token, InetAddress> bootstrapTokens,
+                                                           Set<InetAddress> 
leavingEndpoints,
+                                                           Set<Pair<Token, 
InetAddress>> movingEndpoints)
+    {
+        PendingRangeMaps newPendingRanges = new PendingRangeMaps();
+
+        Multimap<InetAddress, Range<Token>> addressRanges = 
strategy.getAddressRanges(metadata);
 
-            // At this stage newPendingRanges has been updated according to 
leave operations. We can
-            // now continue the calculation by checking bootstrapping nodes.
+        // Copy of metadata reflecting the situation after all leave 
operations are finished.
+        TokenMetadata allLeftMetadata = 
removeEndpoints(metadata.cloneOnlyTokenMap(), leavingEndpoints);
 
-            // For each of the bootstrapping nodes, simply add and remove them 
one by one to
-            // allLeftMetadata and check in between what their ranges would be.
-            Multimap<InetAddress, Token> bootstrapAddresses = 
bootstrapTokens.inverse();
-            for (InetAddress endpoint : bootstrapAddresses.keySet())
+        // get all ranges that will be affected by leaving nodes
+        Set<Range<Token>> affectedRanges = new HashSet<Range<Token>>();
+        for (InetAddress endpoint : leavingEndpoints)
+            affectedRanges.addAll(addressRanges.get(endpoint));
+
+        // for each of those ranges, find what new nodes will be responsible 
for the range when
+        // all leaving nodes are gone.
+        for (Range<Token> range : affectedRanges)
+        {
+            Set<InetAddress> currentEndpoints = 
ImmutableSet.copyOf(strategy.calculateNaturalEndpoints(range.right, metadata));
+            Set<InetAddress> newEndpoints = 
ImmutableSet.copyOf(strategy.calculateNaturalEndpoints(range.right, 
allLeftMetadata));
+            for (InetAddress address : Sets.difference(newEndpoints, 
currentEndpoints))
             {
-                Collection<Token> tokens = bootstrapAddresses.get(endpoint);
+                newPendingRanges.addPendingRange(range, address);
+            }
+        }
 
-                allLeftMetadata.updateNormalTokens(tokens, endpoint);
-                for (Range<Token> range : 
strategy.getAddressRanges(allLeftMetadata).get(endpoint))
-                {
-                    newPendingRanges.addPendingRange(range, endpoint);
-                }
-                allLeftMetadata.removeEndpoint(endpoint);
+        // At this stage newPendingRanges has been updated according to leave 
operations. We can
+        // now continue the calculation by checking bootstrapping nodes.
+
+        // For each of the bootstrapping nodes, simply add and remove them one 
by one to
+        // allLeftMetadata and check in between what their ranges would be.
+        Multimap<InetAddress, Token> bootstrapAddresses = 
bootstrapTokens.inverse();
+        for (InetAddress endpoint : bootstrapAddresses.keySet())
+        {
+            Collection<Token> tokens = bootstrapAddresses.get(endpoint);
+
+            allLeftMetadata.updateNormalTokens(tokens, endpoint);
+            for (Range<Token> range : 
strategy.getAddressRanges(allLeftMetadata).get(endpoint))
+            {
+                newPendingRanges.addPendingRange(range, endpoint);
             }
+            allLeftMetadata.removeEndpoint(endpoint);
+        }
 
-            // At this stage newPendingRanges has been updated according to 
leaving and bootstrapping nodes.
-            // We can now finish the calculation by checking moving nodes.
+        // At this stage newPendingRanges has been updated according to 
leaving and bootstrapping nodes.
+        // We can now finish the calculation by checking moving nodes.
 
-            // For each of the moving nodes, we do the same thing we did for 
bootstrapping:
-            // simply add and remove them one by one to allLeftMetadata and 
check in between what their ranges would be.
-            for (Pair<Token, InetAddress> moving : movingEndpoints)
+        // For each of the moving nodes, we do the same thing we did for 
bootstrapping:
+        // simply add and remove them one by one to allLeftMetadata and check 
in between what their ranges would be.
+        for (Pair<Token, InetAddress> moving : movingEndpoints)
+        {
+            //Calculate all the ranges which will could be affected. This will 
include the ranges before and after the move.
+            Set<Range<Token>> moveAffectedRanges = new HashSet<>();
+            InetAddress endpoint = moving.right; // address of the moving node
+            //Add ranges before the move
+            for (Range<Token> range : 
strategy.getAddressRanges(allLeftMetadata).get(endpoint))
             {
-                //Calculate all the ranges which will could be affected. This 
will include the ranges before and after the move.
-                Set<Range<Token>> moveAffectedRanges = new HashSet<>();
-                InetAddress endpoint = moving.right; // address of the moving 
node
-                //Add ranges before the move
-                for (Range<Token> range : 
strategy.getAddressRanges(allLeftMetadata).get(endpoint))
-                {
-                    moveAffectedRanges.add(range);
-                }
+                moveAffectedRanges.add(range);
+            }
 
-                allLeftMetadata.updateNormalToken(moving.left, endpoint);
-                //Add ranges after the move
-                for (Range<Token> range : 
strategy.getAddressRanges(allLeftMetadata).get(endpoint))
-                {
-                    moveAffectedRanges.add(range);
-                }
+            allLeftMetadata.updateNormalToken(moving.left, endpoint);
+            //Add ranges after the move
+            for (Range<Token> range : 
strategy.getAddressRanges(allLeftMetadata).get(endpoint))
+            {
+                moveAffectedRanges.add(range);
+            }
 
-                for(Range<Token> range : moveAffectedRanges)
+            for(Range<Token> range : moveAffectedRanges)
+            {
+                Set<InetAddress> currentEndpoints = 
ImmutableSet.copyOf(strategy.calculateNaturalEndpoints(range.right, metadata));
+                Set<InetAddress> newEndpoints = 
ImmutableSet.copyOf(strategy.calculateNaturalEndpoints(range.right, 
allLeftMetadata));
+                Set<InetAddress> difference = Sets.difference(newEndpoints, 
currentEndpoints);
+                for(final InetAddress address : difference)
                 {
-                    Set<InetAddress> currentEndpoints = 
ImmutableSet.copyOf(strategy.calculateNaturalEndpoints(range.right, metadata));
-                    Set<InetAddress> newEndpoints = 
ImmutableSet.copyOf(strategy.calculateNaturalEndpoints(range.right, 
allLeftMetadata));
-                    Set<InetAddress> difference = 
Sets.difference(newEndpoints, currentEndpoints);
-                    for(final InetAddress address : difference)
-                    {
-                        Collection<Range<Token>> newRanges = 
strategy.getAddressRanges(allLeftMetadata).get(address);
-                        Collection<Range<Token>> oldRanges = 
strategy.getAddressRanges(metadata).get(address);
-                        //We want to get rid of any ranges which the node is 
currently getting.
-                        newRanges.removeAll(oldRanges);
+                    Collection<Range<Token>> newRanges = 
strategy.getAddressRanges(allLeftMetadata).get(address);
+                    Collection<Range<Token>> oldRanges = 
strategy.getAddressRanges(metadata).get(address);
+                    //We want to get rid of any ranges which the node is 
currently getting.
+                    newRanges.removeAll(oldRanges);
 
-                        for(Range<Token> newRange : newRanges)
+                    for(Range<Token> newRange : newRanges)
+                    {
+                        for(Range<Token> pendingRange : 
newRange.subtractAll(oldRanges))
                         {
-                            for(Range<Token> pendingRange : 
newRange.subtractAll(oldRanges))
-                            {
-                                newPendingRanges.addPendingRange(pendingRange, 
address);
-                            }
+                            newPendingRanges.addPendingRange(pendingRange, 
address);
                         }
                     }
                 }
-
-                allLeftMetadata.removeEndpoint(endpoint);
             }
 
-            pendingRanges.put(keyspaceName, newPendingRanges);
-
-            if (logger.isTraceEnabled())
-                logger.trace("Pending ranges:\n{}", (pendingRanges.isEmpty() ? 
"<empty>" : printPendingRanges()));
-        }
-        finally
-        {
-            lock.readLock().unlock();
+            allLeftMetadata.removeEndpoint(endpoint);
         }
+
+        return newPendingRanges;
     }
 
     public Token getPredecessor(Token token)

http://git-wip-us.apache.org/repos/asf/cassandra/blob/9cd7d540/src/java/org/apache/cassandra/service/PendingRangeCalculatorService.java
----------------------------------------------------------------------
diff --git 
a/src/java/org/apache/cassandra/service/PendingRangeCalculatorService.java 
b/src/java/org/apache/cassandra/service/PendingRangeCalculatorService.java
index 806f6a5..116cede 100644
--- a/src/java/org/apache/cassandra/service/PendingRangeCalculatorService.java
+++ b/src/java/org/apache/cassandra/service/PendingRangeCalculatorService.java
@@ -26,6 +26,7 @@ import 
org.apache.cassandra.locator.AbstractReplicationStrategy;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
+import java.util.List;
 import java.util.concurrent.*;
 import java.util.concurrent.atomic.AtomicInteger;
 
@@ -34,6 +35,9 @@ public class PendingRangeCalculatorService
     public static final PendingRangeCalculatorService instance = new 
PendingRangeCalculatorService();
 
     private static Logger logger = 
LoggerFactory.getLogger(PendingRangeCalculatorService.class);
+
+    // the executor will only run a single range calculation at a time while 
keeping at most one task queued in order
+    // to trigger an update only after the most recent state change and not 
for each update individually
     private final JMXEnabledThreadPoolExecutor executor = new 
JMXEnabledThreadPoolExecutor(1, Integer.MAX_VALUE, TimeUnit.SECONDS,
             new LinkedBlockingQueue<Runnable>(1), new 
NamedThreadFactory("PendingRangeCalculator"), "internal");
 
@@ -58,9 +62,11 @@ public class PendingRangeCalculatorService
             try
             {
                 long start = System.currentTimeMillis();
-                for (String keyspaceName : 
Schema.instance.getNonSystemKeyspaces())
+                List<String> keyspaces = 
Schema.instance.getNonSystemKeyspaces();
+                for (String keyspaceName : keyspaces)
                     
calculatePendingRanges(Keyspace.open(keyspaceName).getReplicationStrategy(), 
keyspaceName);
-                logger.debug("finished calculation for {} keyspaces in {}ms", 
Schema.instance.getNonSystemKeyspaces().size(), System.currentTimeMillis() - 
start);
+                if (logger.isTraceEnabled())
+                    logger.trace("Finished PendingRangeTask for {} keyspaces 
in {}ms", keyspaces.size(), System.currentTimeMillis() - start);
             }
             finally
             {

http://git-wip-us.apache.org/repos/asf/cassandra/blob/9cd7d540/test/unit/org/apache/cassandra/gms/PendingRangeCalculatorServiceTest.java
----------------------------------------------------------------------
diff --git 
a/test/unit/org/apache/cassandra/gms/PendingRangeCalculatorServiceTest.java 
b/test/unit/org/apache/cassandra/gms/PendingRangeCalculatorServiceTest.java
new file mode 100644
index 0000000..507948c
--- /dev/null
+++ b/test/unit/org/apache/cassandra/gms/PendingRangeCalculatorServiceTest.java
@@ -0,0 +1,133 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.cassandra.gms;
+
+import java.net.InetAddress;
+import java.net.UnknownHostException;
+import java.util.ArrayList;
+import java.util.Collection;
+import java.util.HashMap;
+import java.util.Map;
+import java.util.UUID;
+import java.util.concurrent.locks.ReentrantLock;
+
+import org.junit.BeforeClass;
+import org.junit.Test;
+import org.junit.runner.RunWith;
+
+import org.apache.cassandra.SchemaLoader;
+import org.apache.cassandra.db.Keyspace;
+import org.apache.cassandra.dht.ByteOrderedPartitioner;
+import org.apache.cassandra.dht.Token;
+import org.apache.cassandra.exceptions.ConfigurationException;
+import org.apache.cassandra.service.StorageService;
+import org.jboss.byteman.contrib.bmunit.BMRule;
+import org.jboss.byteman.contrib.bmunit.BMUnitRunner;
+
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertFalse;
+
+
+/**
+ * Test for "Gossip blocks on startup when another node is bootstrapping" 
(CASSANDRA-12281).
+ */
+@RunWith(BMUnitRunner.class)
+public class PendingRangeCalculatorServiceTest
+{
+    static ReentrantLock calculationLock = new ReentrantLock();
+
+    @BeforeClass
+    public static void setUp() throws ConfigurationException
+    {
+        SchemaLoader.prepareServer();
+        StorageService.instance.initServer();
+    }
+
+    @Test
+    @BMRule(name = "Block pending range calculation",
+            targetClass = "TokenMetadata",
+            targetMethod = "calculatePendingRanges",
+            targetLocation = "AT INVOKE 
org.apache.cassandra.locator.AbstractReplicationStrategy.getAddressRanges",
+            action = 
"org.apache.cassandra.gms.PendingRangeCalculatorServiceTest.calculationLock.lock()")
+    public void testDelayedResponse() throws UnknownHostException, 
InterruptedException
+    {
+        final InetAddress otherNodeAddr = InetAddress.getByName("127.0.0.2");
+        final UUID otherHostId = UUID.randomUUID();
+
+        // introduce node for first major state change
+        Gossiper.instance.applyStateLocally(getStates(otherNodeAddr, 
otherHostId, 1, false));
+
+        // acquire lock to block pending range calculation via byteman
+        calculationLock.lock();
+        try
+        {
+            // spawn thread that will trigger handling of a bootstrap state 
change which in turn will trigger
+            // the pending range calculation that will be blocked by our lock
+            Thread t1 = new Thread()
+            {
+                public void run()
+                {
+                    
Gossiper.instance.applyStateLocally(getStates(otherNodeAddr, otherHostId, 2, 
true));
+                }
+            };
+            t1.start();
+
+            // busy-spin until t1 is blocked by lock
+            while (!calculationLock.hasQueuedThreads()) ;
+
+            // trigger further state changes in case we don't want the blocked 
thread from the
+            // expensive range calculation to block us here as well
+            Thread t2 = new Thread()
+            {
+                public void run()
+                {
+                    
Gossiper.instance.applyStateLocally(getStates(otherNodeAddr, otherHostId, 3, 
false));
+                    
Gossiper.instance.applyStateLocally(getStates(otherNodeAddr, otherHostId, 4, 
false));
+                    
Gossiper.instance.applyStateLocally(getStates(otherNodeAddr, otherHostId, 5, 
false));
+                }
+            };
+            t2.start();
+            t2.join(2000);
+            assertFalse("Thread still blocked by pending range calculation", 
t2.isAlive());
+            assertEquals(5, 
Gossiper.instance.getEndpointStateForEndpoint(otherNodeAddr).getHeartBeatState().getHeartBeatVersion());
+        }
+        finally
+        {
+            calculationLock.unlock();
+        }
+    }
+
+    private Map<InetAddress, EndpointState> getStates(InetAddress 
otherNodeAddr, UUID hostId, int ver, boolean bootstrapping)
+    {
+        HeartBeatState hb = new HeartBeatState(1, ver);
+        EndpointState state = new EndpointState(hb);
+        Collection<Token> tokens = new ArrayList<>();
+
+        tokens.add(new ByteOrderedPartitioner.BytesToken(new byte[]{1,2,3}));
+        state.addApplicationState(ApplicationState.TOKENS, 
StorageService.instance.valueFactory.tokens(tokens));
+        state.addApplicationState(ApplicationState.STATUS, bootstrapping ?
+                StorageService.instance.valueFactory.bootstrapping(tokens) : 
StorageService.instance.valueFactory.normal(tokens));
+        state.addApplicationState(ApplicationState.HOST_ID, 
StorageService.instance.valueFactory.hostId(hostId));
+        state.addApplicationState(ApplicationState.NET_VERSION, 
StorageService.instance.valueFactory.networkVersion());
+
+        Map<InetAddress, EndpointState> states = new HashMap<>();
+        states.put(otherNodeAddr, state);
+        return states;
+    }
+}

Reply via email to