Merge branch 'cassandra-3.0' into cassandra-3.11

Project: http://git-wip-us.apache.org/repos/asf/cassandra/repo
Commit: http://git-wip-us.apache.org/repos/asf/cassandra/commit/18765886
Tree: http://git-wip-us.apache.org/repos/asf/cassandra/tree/18765886
Diff: http://git-wip-us.apache.org/repos/asf/cassandra/diff/18765886

Branch: refs/heads/trunk
Commit: 187658861d15cbf9c3a9c3cdae42ecd191328eaf
Parents: 388c961 617c8eb
Author: Ariel Weisberg <aweisb...@apple.com>
Authored: Wed May 3 16:31:21 2017 -0400
Committer: Ariel Weisberg <aweisb...@apple.com>
Committed: Wed May 3 16:34:56 2017 -0400

----------------------------------------------------------------------
 CHANGES.txt                                     |   1 +
 conf/cassandra.yaml                             |   9 +
 .../org/apache/cassandra/config/Config.java     |   6 +
 .../cassandra/config/DatabaseDescriptor.java    |  10 ++
 .../cassandra/net/OutboundTcpConnection.java    | 112 +++++++++---
 .../apache/cassandra/service/StorageProxy.java  |   8 +
 .../cassandra/service/StorageProxyMBean.java    |   3 +
 .../net/OutboundTcpConnectionTest.java          | 175 +++++++++++++++++++
 8 files changed, 299 insertions(+), 25 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/cassandra/blob/18765886/CHANGES.txt
----------------------------------------------------------------------
diff --cc CHANGES.txt
index 3844bfa,9ec6c95..64e9e50
--- a/CHANGES.txt
+++ b/CHANGES.txt
@@@ -1,34 -1,5 +1,35 @@@
 -3.0.14
 +3.11.0
 + * UDA fails without input rows (CASSANDRA-13399)
 + * Fix compaction-stress by using daemonInitialization (CASSANDRA-13188)
 + * V5 protocol flags decoding broken (CASSANDRA-13443)
 + * Use write lock not read lock for removing sstables from compaction 
strategies. (CASSANDRA-13422)
 + * Use corePoolSize equal to maxPoolSize in JMXEnabledThreadPoolExecutors 
(CASSANDRA-13329)
 + * Avoid rebuilding SASI indexes containing no values (CASSANDRA-12962)
 + * Add charset to Analyser input stream (CASSANDRA-13151)
 + * Fix testLimitSSTables flake caused by concurrent flush (CASSANDRA-12820)
 + * cdc column addition strikes again (CASSANDRA-13382)
 + * Fix static column indexes (CASSANDRA-13277)
 + * DataOutputBuffer.asNewBuffer broken (CASSANDRA-13298)
 + * unittest CipherFactoryTest failed on MacOS (CASSANDRA-13370)
 + * Forbid SELECT restrictions and CREATE INDEX over non-frozen UDT columns 
(CASSANDRA-13247)
 + * Default logging we ship will incorrectly print "?:?" for "%F:%L" pattern 
(CASSANDRA-13317)
 + * Possible AssertionError in UnfilteredRowIteratorWithLowerBound 
(CASSANDRA-13366)
 + * Support unaligned memory access for AArch64 (CASSANDRA-13326)
 + * Improve SASI range iterator efficiency on intersection with an empty range 
(CASSANDRA-12915).
 + * Fix equality comparisons of columns using the duration type 
(CASSANDRA-13174)
 + * Obfuscate password in stress-graphs (CASSANDRA-12233)
 + * Move to FastThreadLocalThread and FastThreadLocal (CASSANDRA-13034)
 + * nodetool stopdaemon errors out (CASSANDRA-13030)
 + * Tables in system_distributed should not use gcgs of 0 (CASSANDRA-12954)
 + * Fix primary index calculation for SASI (CASSANDRA-12910)
 + * More fixes to the TokenAllocator (CASSANDRA-12990)
 + * NoReplicationTokenAllocator should work with zero replication factor 
(CASSANDRA-12983)
 + * Address message coalescing regression (CASSANDRA-12676)
 + * Delete illegal character from StandardTokenizerImpl.jflex (CASSANDRA-13417)
 + * Fix cqlsh automatic protocol downgrade regression (CASSANDRA-13307)
 + * Tracing payload not passed from QueryMessage to tracing session 
(CASSANDRA-12835)
 +Merged from 3.0:
+  * Expire OutboundTcpConnection messages by a single Thread (CASSANDRA-13265)
   * Fail repair if insufficient responses received (CASSANDRA-13397)
   * Fix SSTableLoader fail when the loaded table contains dropped columns 
(CASSANDRA-13276)
   * Avoid name clashes in CassandraIndexTest (CASSANDRA-13427)

http://git-wip-us.apache.org/repos/asf/cassandra/blob/18765886/conf/cassandra.yaml
----------------------------------------------------------------------

http://git-wip-us.apache.org/repos/asf/cassandra/blob/18765886/src/java/org/apache/cassandra/config/Config.java
----------------------------------------------------------------------
diff --cc src/java/org/apache/cassandra/config/Config.java
index f43f816,6a99cd3..8fde816
--- a/src/java/org/apache/cassandra/config/Config.java
+++ b/src/java/org/apache/cassandra/config/Config.java
@@@ -307,19 -298,14 +307,25 @@@ public class Confi
      public int otc_coalescing_window_us = otc_coalescing_window_us_default;
      public int otc_coalescing_enough_coalesced_messages = 8;
  
+     /**
+      * Backlog expiration interval in milliseconds for the 
OutboundTcpConnection.
+      */
+     public static final int otc_backlog_expiration_interval_ms_default = 200;
+     public volatile int otc_backlog_expiration_interval_ms = 
otc_backlog_expiration_interval_ms_default;
 - 
++
      public int windows_timer_interval = 0;
  
 +    /**
 +     * Size of the CQL prepared statements cache in MB.
 +     * Defaults to 1/256th of the heap size or 10MB, whichever is greater.
 +     */
 +    public Long prepared_statements_cache_size_mb = null;
 +    /**
 +     * Size of the Thrift prepared statements cache in MB.
 +     * Defaults to 1/256th of the heap size or 10MB, whichever is greater.
 +     */
 +    public Long thrift_prepared_statements_cache_size_mb = null;
 +
      public boolean enable_user_defined_functions = false;
      public boolean enable_scripted_user_defined_functions = false;
      /**

http://git-wip-us.apache.org/repos/asf/cassandra/blob/18765886/src/java/org/apache/cassandra/config/DatabaseDescriptor.java
----------------------------------------------------------------------

http://git-wip-us.apache.org/repos/asf/cassandra/blob/18765886/src/java/org/apache/cassandra/net/OutboundTcpConnection.java
----------------------------------------------------------------------
diff --cc src/java/org/apache/cassandra/net/OutboundTcpConnection.java
index a0ed270,99ad194..6cd1064
--- a/src/java/org/apache/cassandra/net/OutboundTcpConnection.java
+++ b/src/java/org/apache/cassandra/net/OutboundTcpConnection.java
@@@ -63,9 -63,10 +64,10 @@@ import org.xerial.snappy.SnappyOutputSt
  import org.apache.cassandra.config.Config;
  import org.apache.cassandra.config.DatabaseDescriptor;
  
+ import com.google.common.annotations.VisibleForTesting;
  import com.google.common.util.concurrent.Uninterruptibles;
  
 -public class OutboundTcpConnection extends Thread
 +public class OutboundTcpConnection extends FastThreadLocalThread
  {
      private static final Logger logger = 
LoggerFactory.getLogger(OutboundTcpConnection.class);
  
@@@ -116,10 -117,15 +118,14 @@@
  
          if (coalescingWindow < 0)
              throw new ExceptionInInitializerError(
 -                    "Value provided for coalescing window must be greather 
than 0: " + coalescingWindow);
 +                    "Value provided for coalescing window must be greater 
than 0: " + coalescingWindow);
+ 
+         int otc_backlog_expiration_interval_in_ms = 
DatabaseDescriptor.getOtcBacklogExpirationInterval();
+         if (otc_backlog_expiration_interval_in_ms != 
Config.otc_backlog_expiration_interval_ms_default)
+             logger.info("OutboundTcpConnection backlog expiration interval 
set to to {}ms", otc_backlog_expiration_interval_in_ms);
 -
      }
  
-     private static final MessageOut CLOSE_SENTINEL = new 
MessageOut(MessagingService.Verb.INTERNAL_RESPONSE);
 -    private static final MessageOut<?> CLOSE_SENTINEL = new 
MessageOut<MessagingService.Verb>(MessagingService.Verb.INTERNAL_RESPONSE);
++    private static final MessageOut<?> CLOSE_SENTINEL = new 
MessageOut(MessagingService.Verb.INTERNAL_RESPONSE);
      private volatile boolean isStopped = false;
  
      private static final int OPEN_RETRY_DELAY = 100; // ms between retries
@@@ -177,9 -188,20 +188,21 @@@
          }
      }
  
+     /**
+      * This is a helper method for unit testing. Disclaimer: Do not use this 
method outside unit tests, as
+      * this method is iterating the queue which can be an expensive operation 
(CPU time, queue locking).
+      * 
+      * @return true, if the queue contains at least one expired element
+      */
+     @VisibleForTesting // (otherwise = VisibleForTesting.NONE)
+     boolean backlogContainsExpiredMessages(long nowNanos)
+     {
+         return backlog.stream().anyMatch(entry -> entry.isTimedOut(nowNanos));
+     }
+ 
      void closeSocket(boolean destroyThread)
      {
 +        logger.debug("Enqueuing socket close for {}", 
poolReference.endPoint());
          isStopped = destroyThread; // Exit loop to stop the thread
          backlog.clear();
          // in the "destroyThread = true" case, enqueuing the sentinel is 
important mostly to unblock the backlog.take()

http://git-wip-us.apache.org/repos/asf/cassandra/blob/18765886/src/java/org/apache/cassandra/service/StorageProxy.java
----------------------------------------------------------------------
diff --cc src/java/org/apache/cassandra/service/StorageProxy.java
index e0be68c,ea082d5..0da25a9
--- a/src/java/org/apache/cassandra/service/StorageProxy.java
+++ b/src/java/org/apache/cassandra/service/StorageProxy.java
@@@ -2811,8 -2682,11 +2811,16 @@@ public class StorageProxy implements St
          return ReadRepairMetrics.repairedBackground.getCount();
      }
  
 +    public int getNumberOfTables()
 +    {
 +        return Schema.instance.getNumberOfTables();
 +    }
++
+     public int getOtcBacklogExpirationInterval() {
+         return DatabaseDescriptor.getOtcBacklogExpirationInterval();
+     }
+ 
+     public void setOtcBacklogExpirationInterval(int intervalInMillis) {
+         DatabaseDescriptor.setOtcBacklogExpirationInterval(intervalInMillis);
+     }
  }

http://git-wip-us.apache.org/repos/asf/cassandra/blob/18765886/src/java/org/apache/cassandra/service/StorageProxyMBean.java
----------------------------------------------------------------------
diff --cc src/java/org/apache/cassandra/service/StorageProxyMBean.java
index 0a4ba19,ee82a5b..8678dde
--- a/src/java/org/apache/cassandra/service/StorageProxyMBean.java
+++ b/src/java/org/apache/cassandra/service/StorageProxyMBean.java
@@@ -59,8 -59,9 +59,11 @@@ public interface StorageProxyMBea
      public long getReadRepairRepairedBlocking();
      public long getReadRepairRepairedBackground();
  
+     public int getOtcBacklogExpirationInterval();
+     public void setOtcBacklogExpirationInterval(int intervalInMillis);
+ 
      /** Returns each live node's schema version */
      public Map<String, List<String>> getSchemaVersions();
 +
 +    public int getNumberOfTables();
  }

http://git-wip-us.apache.org/repos/asf/cassandra/blob/18765886/test/unit/org/apache/cassandra/net/OutboundTcpConnectionTest.java
----------------------------------------------------------------------
diff --cc test/unit/org/apache/cassandra/net/OutboundTcpConnectionTest.java
index 0000000,c09ae0f..e3b6817
mode 000000,100644..100644
--- a/test/unit/org/apache/cassandra/net/OutboundTcpConnectionTest.java
+++ b/test/unit/org/apache/cassandra/net/OutboundTcpConnectionTest.java
@@@ -1,0 -1,170 +1,175 @@@
+ /*
+  * Licensed to the Apache Software Foundation (ASF) under one
+  * or more contributor license agreements.  See the NOTICE file
+  * distributed with this work for additional information
+  * regarding copyright ownership.  The ASF licenses this file
+  * to you under the Apache License, Version 2.0 (the
+  * "License"); you may not use this file except in compliance
+  * with the License.  You may obtain a copy of the License at
+  *
+  *     http://www.apache.org/licenses/LICENSE-2.0
+  *
+  * Unless required by applicable law or agreed to in writing, software
+  * distributed under the License is distributed on an "AS IS" BASIS,
+  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+  * See the License for the specific language governing permissions and
+  * limitations under the License.
+  */
+ package org.apache.cassandra.net;
+ 
+ import org.apache.cassandra.config.DatabaseDescriptor;
+ import org.apache.cassandra.net.MessagingService.Verb;
+ import org.junit.BeforeClass;
+ import org.junit.Test;
+ import static org.junit.Assert.assertFalse;
+ import static org.junit.Assert.assertTrue;
+ 
+ import java.net.InetAddress;
+ import java.net.UnknownHostException;
+ import java.util.concurrent.TimeUnit;
+ import java.util.concurrent.atomic.AtomicInteger;
+ 
+ /**
+  * The tests check whether Queue expiration in the OutboundTcpConnection 
behaves properly for droppable and
+  * non-droppable messages.
+  */
+ public class OutboundTcpConnectionTest
+ {
+     AtomicInteger messageId = new AtomicInteger(0);
+ 
+     final static Verb VERB_DROPPABLE = Verb.MUTATION; // Droppable, 2s timeout
+     final static Verb VERB_NONDROPPABLE = Verb.GOSSIP_DIGEST_ACK; // Not 
droppable
++    
++    final static long NANOS_FOR_TIMEOUT;
+ 
 -    final static long NANOS_FOR_TIMEOUT = 
TimeUnit.MILLISECONDS.toNanos(DatabaseDescriptor.getTimeout(VERB_DROPPABLE)*2);
 -
++    static
++    {
++        DatabaseDescriptor.daemonInitialization();
++        NANOS_FOR_TIMEOUT = 
TimeUnit.MILLISECONDS.toNanos(VERB_DROPPABLE.getTimeout()*2);
++    }
+     
+     /**
+      * Verifies our assumptions whether a Verb can be dropped or not. The 
tests make use of droppabilty, and
+      * may produce wrong test results if their droppabilty is changed. 
+      */
+     @BeforeClass
+     public static void assertDroppability()
+     {
+         if (!MessagingService.DROPPABLE_VERBS.contains(VERB_DROPPABLE))
+             throw new AssertionError("Expected " + VERB_DROPPABLE + " to be 
droppable");
+         if (MessagingService.DROPPABLE_VERBS.contains(VERB_NONDROPPABLE))
+             throw new AssertionError("Expected " + VERB_NONDROPPABLE + " not 
to be droppable");
+     }
+ 
+     /**
+      * Tests that non-droppable messages are never expired
+      */
+     @Test
+     public void testNondroppable() throws UnknownHostException
+     {
+         OutboundTcpConnection otc = getOutboundTcpConnectionForLocalhost();
+         long nanoTimeBeforeEnqueue = System.nanoTime();
+ 
+         assertFalse("Fresh OutboundTcpConnection contains expired messages",
+                 otc.backlogContainsExpiredMessages(nanoTimeBeforeEnqueue));
+ 
+         fillToPurgeSize(otc, VERB_NONDROPPABLE);
+         fillToPurgeSize(otc, VERB_NONDROPPABLE);
+         otc.expireMessages(expirationTimeNanos());
+ 
+         assertFalse("OutboundTcpConnection with non-droppable verbs should 
not expire",
+                 otc.backlogContainsExpiredMessages(expirationTimeNanos()));
+     }
+ 
+     /**
+      * Tests that droppable messages will be dropped after they expire, but 
not before.
+      * 
+      * @throws UnknownHostException
+      */
+     @Test
+     public void testDroppable() throws UnknownHostException
+     {
+         OutboundTcpConnection otc = getOutboundTcpConnectionForLocalhost();
+         long nanoTimeBeforeEnqueue = System.nanoTime();
+ 
+         initialFill(otc, VERB_DROPPABLE);
+         assertFalse("OutboundTcpConnection with droppable verbs should not 
expire immediately",
+                 otc.backlogContainsExpiredMessages(nanoTimeBeforeEnqueue));
+ 
+         otc.expireMessages(nanoTimeBeforeEnqueue);
+         assertFalse("OutboundTcpConnection with droppable verbs should not 
expire with enqueue-time expiration",
+                 otc.backlogContainsExpiredMessages(nanoTimeBeforeEnqueue));
+ 
+         // Lets presume, expiration time have passed => At that time there 
shall be expired messages in the Queue
+         long nanoTimeWhenExpired = expirationTimeNanos();
+         assertTrue("OutboundTcpConnection with droppable verbs should have 
expired",
+                 otc.backlogContainsExpiredMessages(nanoTimeWhenExpired));
+ 
+         // Using the same timestamp, lets expire them and check whether they 
have gone
+         otc.expireMessages(nanoTimeWhenExpired);
+         assertFalse("OutboundTcpConnection should not have expired entries",
+                 otc.backlogContainsExpiredMessages(nanoTimeWhenExpired));
+ 
+         // Actually the previous test can be done in a harder way: As 
expireMessages() has run, we cannot have
+         // ANY expired values, thus lets test also against 
nanoTimeBeforeEnqueue
+         assertFalse("OutboundTcpConnection should not have any expired 
entries",
+                 otc.backlogContainsExpiredMessages(nanoTimeBeforeEnqueue));
+ 
+     }
+ 
+     /**
+      * Fills the given OutboundTcpConnection with (1 + BACKLOG_PURGE_SIZE), 
elements. The first
+      * BACKLOG_PURGE_SIZE elements are non-droppable, the last one is a 
message with the given Verb and can be
+      * droppable or non-droppable.
+      */
+     private void initialFill(OutboundTcpConnection otc, Verb verb)
+     {
+         assertFalse("Fresh OutboundTcpConnection contains expired messages",
+                 otc.backlogContainsExpiredMessages(System.nanoTime()));
+ 
+         fillToPurgeSize(otc, VERB_NONDROPPABLE);
+         MessageOut<?> messageDroppable10s = new MessageOut<>(verb);
+         otc.enqueue(messageDroppable10s, nextMessageId());
+         otc.expireMessages(System.nanoTime());
+     }
+ 
+     /**
+      * Returns a nano timestamp in the far future, when expiration should 
have been performed for VERB_DROPPABLE.
+      * The offset is chosen as 2 times of the expiration time of 
VERB_DROPPABLE.
+      * 
+      * @return The future nano timestamp
+      */
+     private long expirationTimeNanos()
+     {
+         return System.nanoTime() + NANOS_FOR_TIMEOUT;
+     }
+ 
+     private int nextMessageId()
+     {
+         return messageId.incrementAndGet();
+     }
+ 
+     /**
+      * Adds BACKLOG_PURGE_SIZE messages to the queue. Hint: At 
BACKLOG_PURGE_SIZE expiration starts to work.
+      * 
+      * @param otc
+      *            The OutboundTcpConnection
+      * @param verb
+      *            The verb that defines the message type
+      */
+     private void fillToPurgeSize(OutboundTcpConnection otc, Verb verb)
+     {
+         for (int i = 0; i < OutboundTcpConnection.BACKLOG_PURGE_SIZE; i++)
+         {
+             otc.enqueue(new MessageOut<>(verb), nextMessageId());
+         }
+     }
+ 
+     private OutboundTcpConnection getOutboundTcpConnectionForLocalhost() 
throws UnknownHostException
+     {
+         InetAddress lo = InetAddress.getByName("127.0.0.1");
 -        OutboundTcpConnectionPool otcPool = new OutboundTcpConnectionPool(lo);
 -        OutboundTcpConnection otc = new OutboundTcpConnection(otcPool);
++        OutboundTcpConnectionPool otcPool = new OutboundTcpConnectionPool(lo, 
null);
++        OutboundTcpConnection otc = new OutboundTcpConnection(otcPool, 
"lo-OutboundTcpConnectionTest");
+         return otc;
+     }
+ }


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscr...@cassandra.apache.org
For additional commands, e-mail: commits-h...@cassandra.apache.org

Reply via email to