Merge branch 'cassandra-2.2' into cassandra-3.0

Project: http://git-wip-us.apache.org/repos/asf/cassandra/repo
Commit: http://git-wip-us.apache.org/repos/asf/cassandra/commit/59b40b31
Tree: http://git-wip-us.apache.org/repos/asf/cassandra/tree/59b40b31
Diff: http://git-wip-us.apache.org/repos/asf/cassandra/diff/59b40b31

Branch: refs/heads/trunk
Commit: 59b40b3173933620bc5f30e26366cd09b3a4ca10
Parents: eb41380 9cd7d54
Author: Aleksey Yeschenko <alek...@apache.org>
Authored: Fri Nov 18 17:29:41 2016 +0000
Committer: Aleksey Yeschenko <alek...@apache.org>
Committed: Fri Nov 18 17:30:46 2016 +0000

----------------------------------------------------------------------
 CHANGES.txt                                     |   3 +
 .../apache/cassandra/locator/TokenMetadata.java | 213 +++++++++++--------
 .../service/PendingRangeCalculatorService.java  |   6 +-
 .../gms/PendingRangeCalculatorServiceTest.java  | 133 ++++++++++++
 4 files changed, 267 insertions(+), 88 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/cassandra/blob/59b40b31/CHANGES.txt
----------------------------------------------------------------------
diff --cc CHANGES.txt
index 8a3ac65,5a2e0ab..bcd0b5c
--- a/CHANGES.txt
+++ b/CHANGES.txt
@@@ -1,42 -1,5 +1,45 @@@
 -2.2.9
 +3.0.11
 + * Prevent reloading of logback.xml from UDF sandbox (CASSANDRA-12535)
++Merged from 2.2:
+  * Avoid blocking gossip during pending range calculation (CASSANDRA-12281)
++
 +
 +3.0.10
 + * Disallow offheap_buffers memtable allocation (CASSANDRA-11039)
 + * Fix CommitLogSegmentManagerTest (CASSANDRA-12283)
 + * Pass root cause to CorruptBlockException when uncompression failed 
(CASSANDRA-12889)
 + * Fix partition count log during compaction (CASSANDRA-12184)
 + * Batch with multiple conditional updates for the same partition causes 
AssertionError (CASSANDRA-12867)
 + * Make AbstractReplicationStrategy extendable from outside its package 
(CASSANDRA-12788)
 + * Fix CommitLogTest.testDeleteIfNotDirty (CASSANDRA-12854)
 + * Don't tell users to turn off consistent rangemovements during rebuild. 
(CASSANDRA-12296)
 + * Avoid deadlock due to materialized view lock contention (CASSANDRA-12689)
 + * Fix for KeyCacheCqlTest flakiness (CASSANDRA-12801)
 + * Include SSTable filename in compacting large row message (CASSANDRA-12384)
 + * Fix potential socket leak (CASSANDRA-12329, CASSANDRA-12330)
 + * Fix ViewTest.testCompaction (CASSANDRA-12789)
 + * Improve avg aggregate functions (CASSANDRA-12417)
 + * Preserve quoted reserved keyword column names in MV creation 
(CASSANDRA-11803)
 + * nodetool stopdaemon errors out (CASSANDRA-12646)
 + * Split materialized view mutations on build to prevent OOM (CASSANDRA-12268)
 + * mx4j does not work in 3.0.8 (CASSANDRA-12274)
 + * Abort cqlsh copy-from in case of no answer after prolonged period of time 
(CASSANDRA-12740)
 + * Avoid sstable corrupt exception due to dropped static column 
(CASSANDRA-12582)
 + * Make stress use client mode to avoid checking commit log size on startup 
(CASSANDRA-12478)
 + * Fix exceptions with new vnode allocation (CASSANDRA-12715)
 + * Unify drain and shutdown processes (CASSANDRA-12509)
 + * Fix NPE in ComponentOfSlice.isEQ() (CASSANDRA-12706)
 + * Fix failure in LogTransactionTest (CASSANDRA-12632)
 + * Fix potentially incomplete non-frozen UDT values when querying with the
 +   full primary key specified (CASSANDRA-12605)
 + * Skip writing MV mutations to commitlog on mutation.applyUnsafe() 
(CASSANDRA-11670)
 + * Establish consistent distinction between non-existing partition and NULL 
value for LWTs on static columns (CASSANDRA-12060)
 + * Extend ColumnIdentifier.internedInstances key to include the type that 
generated the byte buffer (CASSANDRA-12516)
 + * Backport CASSANDRA-10756 (race condition in NativeTransportService 
shutdown) (CASSANDRA-12472)
 + * If CF has no clustering columns, any row cache is full partition cache 
(CASSANDRA-12499)
 + * Correct log message for statistics of offheap memtable flush 
(CASSANDRA-12776)
 + * Explicitly set locale for string validation 
(CASSANDRA-12541,CASSANDRA-12542,CASSANDRA-12543,CASSANDRA-12545)
 +Merged from 2.2:
   * Fix purgeability of tombstones with max timestamp (CASSANDRA-12792)
   * Fail repair if participant dies during sync or anticompaction 
(CASSANDRA-12901)
   * cqlsh COPY: unprotected pk values before converting them if not using 
prepared statements (CASSANDRA-12863)

http://git-wip-us.apache.org/repos/asf/cassandra/blob/59b40b31/src/java/org/apache/cassandra/locator/TokenMetadata.java
----------------------------------------------------------------------
diff --cc src/java/org/apache/cassandra/locator/TokenMetadata.java
index b50db00,aafd7f9..b44a1a1
--- a/src/java/org/apache/cassandra/locator/TokenMetadata.java
+++ b/src/java/org/apache/cassandra/locator/TokenMetadata.java
@@@ -128,15 -123,6 +128,15 @@@ public class TokenMetadat
          sortedTokens = sortTokens();
      }
  
 +    /**
-      * To be used by tests only (via {@link 
StorageService.setPartitionerUnsafe}).
++     * To be used by tests only (via {@link 
StorageService#setPartitionerUnsafe}).
 +     */
 +    @VisibleForTesting
 +    public TokenMetadata cloneWithNewPartitioner(IPartitioner newPartitioner)
 +    {
 +        return new TokenMetadata(tokenToEndpointMap, endpointToHostIdMap, 
topology, newPartitioner);
 +    }
 +
      private ArrayList<Token> sortTokens()
      {
          return new ArrayList<>(tokenToEndpointMap.keySet());

http://git-wip-us.apache.org/repos/asf/cassandra/blob/59b40b31/src/java/org/apache/cassandra/service/PendingRangeCalculatorService.java
----------------------------------------------------------------------
diff --cc 
src/java/org/apache/cassandra/service/PendingRangeCalculatorService.java
index 352a763,116cede..5b1aa0d
--- a/src/java/org/apache/cassandra/service/PendingRangeCalculatorService.java
+++ b/src/java/org/apache/cassandra/service/PendingRangeCalculatorService.java
@@@ -35,8 -35,11 +35,11 @@@ public class PendingRangeCalculatorServ
      public static final PendingRangeCalculatorService instance = new 
PendingRangeCalculatorService();
  
      private static Logger logger = 
LoggerFactory.getLogger(PendingRangeCalculatorService.class);
+ 
+     // the executor will only run a single range calculation at a time while 
keeping at most one task queued in order
+     // to trigger an update only after the most recent state change and not 
for each update individually
      private final JMXEnabledThreadPoolExecutor executor = new 
JMXEnabledThreadPoolExecutor(1, Integer.MAX_VALUE, TimeUnit.SECONDS,
 -            new LinkedBlockingQueue<Runnable>(1), new 
NamedThreadFactory("PendingRangeCalculator"), "internal");
 +            new LinkedBlockingQueue<>(1), new 
NamedThreadFactory("PendingRangeCalculator"), "internal");
  
      private AtomicInteger updateJobs = new AtomicInteger(0);
  
@@@ -59,10 -62,11 +62,11 @@@
              try
              {
                  long start = System.currentTimeMillis();
 -                List<String> keyspaces = 
Schema.instance.getNonSystemKeyspaces();
 +                List<String> keyspaces = 
Schema.instance.getNonLocalStrategyKeyspaces();
                  for (String keyspaceName : keyspaces)
                      
calculatePendingRanges(Keyspace.open(keyspaceName).getReplicationStrategy(), 
keyspaceName);
-                 logger.debug("finished calculation for {} keyspaces in {}ms", 
keyspaces.size(), System.currentTimeMillis() - start);
+                 if (logger.isTraceEnabled())
+                     logger.trace("Finished PendingRangeTask for {} keyspaces 
in {}ms", keyspaces.size(), System.currentTimeMillis() - start);
              }
              finally
              {

http://git-wip-us.apache.org/repos/asf/cassandra/blob/59b40b31/test/unit/org/apache/cassandra/gms/PendingRangeCalculatorServiceTest.java
----------------------------------------------------------------------
diff --cc 
test/unit/org/apache/cassandra/gms/PendingRangeCalculatorServiceTest.java
index 0000000,507948c..90bbf1d
mode 000000,100644..100644
--- a/test/unit/org/apache/cassandra/gms/PendingRangeCalculatorServiceTest.java
+++ b/test/unit/org/apache/cassandra/gms/PendingRangeCalculatorServiceTest.java
@@@ -1,0 -1,133 +1,133 @@@
+ /*
+  * Licensed to the Apache Software Foundation (ASF) under one
+  * or more contributor license agreements.  See the NOTICE file
+  * distributed with this work for additional information
+  * regarding copyright ownership.  The ASF licenses this file
+  * to you under the Apache License, Version 2.0 (the
+  * "License"); you may not use this file except in compliance
+  * with the License.  You may obtain a copy of the License at
+  *
+  *     http://www.apache.org/licenses/LICENSE-2.0
+  *
+  * Unless required by applicable law or agreed to in writing, software
+  * distributed under the License is distributed on an "AS IS" BASIS,
+  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+  * See the License for the specific language governing permissions and
+  * limitations under the License.
+  */
+ 
+ package org.apache.cassandra.gms;
+ 
+ import java.net.InetAddress;
+ import java.net.UnknownHostException;
+ import java.util.ArrayList;
+ import java.util.Collection;
+ import java.util.HashMap;
+ import java.util.Map;
+ import java.util.UUID;
+ import java.util.concurrent.locks.ReentrantLock;
+ 
+ import org.junit.BeforeClass;
+ import org.junit.Test;
+ import org.junit.runner.RunWith;
+ 
+ import org.apache.cassandra.SchemaLoader;
+ import org.apache.cassandra.db.Keyspace;
+ import org.apache.cassandra.dht.ByteOrderedPartitioner;
+ import org.apache.cassandra.dht.Token;
+ import org.apache.cassandra.exceptions.ConfigurationException;
+ import org.apache.cassandra.service.StorageService;
+ import org.jboss.byteman.contrib.bmunit.BMRule;
+ import org.jboss.byteman.contrib.bmunit.BMUnitRunner;
+ 
+ import static org.junit.Assert.assertEquals;
+ import static org.junit.Assert.assertFalse;
+ 
+ 
+ /**
+  * Test for "Gossip blocks on startup when another node is bootstrapping" 
(CASSANDRA-12281).
+  */
+ @RunWith(BMUnitRunner.class)
+ public class PendingRangeCalculatorServiceTest
+ {
+     static ReentrantLock calculationLock = new ReentrantLock();
+ 
+     @BeforeClass
+     public static void setUp() throws ConfigurationException
+     {
+         SchemaLoader.prepareServer();
+         StorageService.instance.initServer();
+     }
+ 
+     @Test
+     @BMRule(name = "Block pending range calculation",
+             targetClass = "TokenMetadata",
+             targetMethod = "calculatePendingRanges",
+             targetLocation = "AT INVOKE 
org.apache.cassandra.locator.AbstractReplicationStrategy.getAddressRanges",
+             action = 
"org.apache.cassandra.gms.PendingRangeCalculatorServiceTest.calculationLock.lock()")
+     public void testDelayedResponse() throws UnknownHostException, 
InterruptedException
+     {
 -        final InetAddress otherNodeAddr = InetAddress.getByName("127.0.0.2");
 -        final UUID otherHostId = UUID.randomUUID();
++        InetAddress otherNodeAddr = InetAddress.getByName("127.0.0.2");
++        UUID otherHostId = UUID.randomUUID();
+ 
+         // introduce node for first major state change
+         Gossiper.instance.applyStateLocally(getStates(otherNodeAddr, 
otherHostId, 1, false));
+ 
+         // acquire lock to block pending range calculation via byteman
+         calculationLock.lock();
+         try
+         {
+             // spawn thread that will trigger handling of a bootstrap state 
change which in turn will trigger
+             // the pending range calculation that will be blocked by our lock
+             Thread t1 = new Thread()
+             {
+                 public void run()
+                 {
+                     
Gossiper.instance.applyStateLocally(getStates(otherNodeAddr, otherHostId, 2, 
true));
+                 }
+             };
+             t1.start();
+ 
+             // busy-spin until t1 is blocked by lock
+             while (!calculationLock.hasQueuedThreads()) ;
+ 
+             // trigger further state changes in case we don't want the 
blocked thread from the
+             // expensive range calculation to block us here as well
+             Thread t2 = new Thread()
+             {
+                 public void run()
+                 {
+                     
Gossiper.instance.applyStateLocally(getStates(otherNodeAddr, otherHostId, 3, 
false));
+                     
Gossiper.instance.applyStateLocally(getStates(otherNodeAddr, otherHostId, 4, 
false));
+                     
Gossiper.instance.applyStateLocally(getStates(otherNodeAddr, otherHostId, 5, 
false));
+                 }
+             };
+             t2.start();
+             t2.join(2000);
+             assertFalse("Thread still blocked by pending range calculation", 
t2.isAlive());
+             assertEquals(5, 
Gossiper.instance.getEndpointStateForEndpoint(otherNodeAddr).getHeartBeatState().getHeartBeatVersion());
+         }
+         finally
+         {
+             calculationLock.unlock();
+         }
+     }
+ 
+     private Map<InetAddress, EndpointState> getStates(InetAddress 
otherNodeAddr, UUID hostId, int ver, boolean bootstrapping)
+     {
+         HeartBeatState hb = new HeartBeatState(1, ver);
+         EndpointState state = new EndpointState(hb);
+         Collection<Token> tokens = new ArrayList<>();
+ 
+         tokens.add(new ByteOrderedPartitioner.BytesToken(new byte[]{1,2,3}));
+         state.addApplicationState(ApplicationState.TOKENS, 
StorageService.instance.valueFactory.tokens(tokens));
+         state.addApplicationState(ApplicationState.STATUS, bootstrapping ?
+                 StorageService.instance.valueFactory.bootstrapping(tokens) : 
StorageService.instance.valueFactory.normal(tokens));
+         state.addApplicationState(ApplicationState.HOST_ID, 
StorageService.instance.valueFactory.hostId(hostId));
+         state.addApplicationState(ApplicationState.NET_VERSION, 
StorageService.instance.valueFactory.networkVersion());
+ 
+         Map<InetAddress, EndpointState> states = new HashMap<>();
+         states.put(otherNodeAddr, state);
+         return states;
+     }
+ }

Reply via email to