Merge branch 'cassandra-3.0' into cassandra-3.11
Project: http://git-wip-us.apache.org/repos/asf/cassandra/repo Commit: http://git-wip-us.apache.org/repos/asf/cassandra/commit/18765886 Tree: http://git-wip-us.apache.org/repos/asf/cassandra/tree/18765886 Diff: http://git-wip-us.apache.org/repos/asf/cassandra/diff/18765886 Branch: refs/heads/trunk Commit: 187658861d15cbf9c3a9c3cdae42ecd191328eaf Parents: 388c961 617c8eb Author: Ariel Weisberg <aweisb...@apple.com> Authored: Wed May 3 16:31:21 2017 -0400 Committer: Ariel Weisberg <aweisb...@apple.com> Committed: Wed May 3 16:34:56 2017 -0400 ---------------------------------------------------------------------- CHANGES.txt | 1 + conf/cassandra.yaml | 9 + .../org/apache/cassandra/config/Config.java | 6 + .../cassandra/config/DatabaseDescriptor.java | 10 ++ .../cassandra/net/OutboundTcpConnection.java | 112 +++++++++--- .../apache/cassandra/service/StorageProxy.java | 8 + .../cassandra/service/StorageProxyMBean.java | 3 + .../net/OutboundTcpConnectionTest.java | 175 +++++++++++++++++++ 8 files changed, 299 insertions(+), 25 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/cassandra/blob/18765886/CHANGES.txt ---------------------------------------------------------------------- diff --cc CHANGES.txt index 3844bfa,9ec6c95..64e9e50 --- a/CHANGES.txt +++ b/CHANGES.txt @@@ -1,34 -1,5 +1,35 @@@ -3.0.14 +3.11.0 + * UDA fails without input rows (CASSANDRA-13399) + * Fix compaction-stress by using daemonInitialization (CASSANDRA-13188) + * V5 protocol flags decoding broken (CASSANDRA-13443) + * Use write lock not read lock for removing sstables from compaction strategies. (CASSANDRA-13422) + * Use corePoolSize equal to maxPoolSize in JMXEnabledThreadPoolExecutors (CASSANDRA-13329) + * Avoid rebuilding SASI indexes containing no values (CASSANDRA-12962) + * Add charset to Analyser input stream (CASSANDRA-13151) + * Fix testLimitSSTables flake caused by concurrent flush (CASSANDRA-12820) + * cdc column addition strikes again (CASSANDRA-13382) + * Fix static column indexes (CASSANDRA-13277) + * DataOutputBuffer.asNewBuffer broken (CASSANDRA-13298) + * unittest CipherFactoryTest failed on MacOS (CASSANDRA-13370) + * Forbid SELECT restrictions and CREATE INDEX over non-frozen UDT columns (CASSANDRA-13247) + * Default logging we ship will incorrectly print "?:?" for "%F:%L" pattern (CASSANDRA-13317) + * Possible AssertionError in UnfilteredRowIteratorWithLowerBound (CASSANDRA-13366) + * Support unaligned memory access for AArch64 (CASSANDRA-13326) + * Improve SASI range iterator efficiency on intersection with an empty range (CASSANDRA-12915). + * Fix equality comparisons of columns using the duration type (CASSANDRA-13174) + * Obfuscate password in stress-graphs (CASSANDRA-12233) + * Move to FastThreadLocalThread and FastThreadLocal (CASSANDRA-13034) + * nodetool stopdaemon errors out (CASSANDRA-13030) + * Tables in system_distributed should not use gcgs of 0 (CASSANDRA-12954) + * Fix primary index calculation for SASI (CASSANDRA-12910) + * More fixes to the TokenAllocator (CASSANDRA-12990) + * NoReplicationTokenAllocator should work with zero replication factor (CASSANDRA-12983) + * Address message coalescing regression (CASSANDRA-12676) + * Delete illegal character from StandardTokenizerImpl.jflex (CASSANDRA-13417) + * Fix cqlsh automatic protocol downgrade regression (CASSANDRA-13307) + * Tracing payload not passed from QueryMessage to tracing session (CASSANDRA-12835) +Merged from 3.0: + * Expire OutboundTcpConnection messages by a single Thread (CASSANDRA-13265) * Fail repair if insufficient responses received (CASSANDRA-13397) * Fix SSTableLoader fail when the loaded table contains dropped columns (CASSANDRA-13276) * Avoid name clashes in CassandraIndexTest (CASSANDRA-13427) http://git-wip-us.apache.org/repos/asf/cassandra/blob/18765886/conf/cassandra.yaml ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/cassandra/blob/18765886/src/java/org/apache/cassandra/config/Config.java ---------------------------------------------------------------------- diff --cc src/java/org/apache/cassandra/config/Config.java index f43f816,6a99cd3..8fde816 --- a/src/java/org/apache/cassandra/config/Config.java +++ b/src/java/org/apache/cassandra/config/Config.java @@@ -307,19 -298,14 +307,25 @@@ public class Confi public int otc_coalescing_window_us = otc_coalescing_window_us_default; public int otc_coalescing_enough_coalesced_messages = 8; + /** + * Backlog expiration interval in milliseconds for the OutboundTcpConnection. + */ + public static final int otc_backlog_expiration_interval_ms_default = 200; + public volatile int otc_backlog_expiration_interval_ms = otc_backlog_expiration_interval_ms_default; - ++ public int windows_timer_interval = 0; + /** + * Size of the CQL prepared statements cache in MB. + * Defaults to 1/256th of the heap size or 10MB, whichever is greater. + */ + public Long prepared_statements_cache_size_mb = null; + /** + * Size of the Thrift prepared statements cache in MB. + * Defaults to 1/256th of the heap size or 10MB, whichever is greater. + */ + public Long thrift_prepared_statements_cache_size_mb = null; + public boolean enable_user_defined_functions = false; public boolean enable_scripted_user_defined_functions = false; /** http://git-wip-us.apache.org/repos/asf/cassandra/blob/18765886/src/java/org/apache/cassandra/config/DatabaseDescriptor.java ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/cassandra/blob/18765886/src/java/org/apache/cassandra/net/OutboundTcpConnection.java ---------------------------------------------------------------------- diff --cc src/java/org/apache/cassandra/net/OutboundTcpConnection.java index a0ed270,99ad194..6cd1064 --- a/src/java/org/apache/cassandra/net/OutboundTcpConnection.java +++ b/src/java/org/apache/cassandra/net/OutboundTcpConnection.java @@@ -63,9 -63,10 +64,10 @@@ import org.xerial.snappy.SnappyOutputSt import org.apache.cassandra.config.Config; import org.apache.cassandra.config.DatabaseDescriptor; + import com.google.common.annotations.VisibleForTesting; import com.google.common.util.concurrent.Uninterruptibles; -public class OutboundTcpConnection extends Thread +public class OutboundTcpConnection extends FastThreadLocalThread { private static final Logger logger = LoggerFactory.getLogger(OutboundTcpConnection.class); @@@ -116,10 -117,15 +118,14 @@@ if (coalescingWindow < 0) throw new ExceptionInInitializerError( - "Value provided for coalescing window must be greather than 0: " + coalescingWindow); + "Value provided for coalescing window must be greater than 0: " + coalescingWindow); + + int otc_backlog_expiration_interval_in_ms = DatabaseDescriptor.getOtcBacklogExpirationInterval(); + if (otc_backlog_expiration_interval_in_ms != Config.otc_backlog_expiration_interval_ms_default) + logger.info("OutboundTcpConnection backlog expiration interval set to to {}ms", otc_backlog_expiration_interval_in_ms); - } - private static final MessageOut CLOSE_SENTINEL = new MessageOut(MessagingService.Verb.INTERNAL_RESPONSE); - private static final MessageOut<?> CLOSE_SENTINEL = new MessageOut<MessagingService.Verb>(MessagingService.Verb.INTERNAL_RESPONSE); ++ private static final MessageOut<?> CLOSE_SENTINEL = new MessageOut(MessagingService.Verb.INTERNAL_RESPONSE); private volatile boolean isStopped = false; private static final int OPEN_RETRY_DELAY = 100; // ms between retries @@@ -177,9 -188,20 +188,21 @@@ } } + /** + * This is a helper method for unit testing. Disclaimer: Do not use this method outside unit tests, as + * this method is iterating the queue which can be an expensive operation (CPU time, queue locking). + * + * @return true, if the queue contains at least one expired element + */ + @VisibleForTesting // (otherwise = VisibleForTesting.NONE) + boolean backlogContainsExpiredMessages(long nowNanos) + { + return backlog.stream().anyMatch(entry -> entry.isTimedOut(nowNanos)); + } + void closeSocket(boolean destroyThread) { + logger.debug("Enqueuing socket close for {}", poolReference.endPoint()); isStopped = destroyThread; // Exit loop to stop the thread backlog.clear(); // in the "destroyThread = true" case, enqueuing the sentinel is important mostly to unblock the backlog.take() http://git-wip-us.apache.org/repos/asf/cassandra/blob/18765886/src/java/org/apache/cassandra/service/StorageProxy.java ---------------------------------------------------------------------- diff --cc src/java/org/apache/cassandra/service/StorageProxy.java index e0be68c,ea082d5..0da25a9 --- a/src/java/org/apache/cassandra/service/StorageProxy.java +++ b/src/java/org/apache/cassandra/service/StorageProxy.java @@@ -2811,8 -2682,11 +2811,16 @@@ public class StorageProxy implements St return ReadRepairMetrics.repairedBackground.getCount(); } + public int getNumberOfTables() + { + return Schema.instance.getNumberOfTables(); + } ++ + public int getOtcBacklogExpirationInterval() { + return DatabaseDescriptor.getOtcBacklogExpirationInterval(); + } + + public void setOtcBacklogExpirationInterval(int intervalInMillis) { + DatabaseDescriptor.setOtcBacklogExpirationInterval(intervalInMillis); + } } http://git-wip-us.apache.org/repos/asf/cassandra/blob/18765886/src/java/org/apache/cassandra/service/StorageProxyMBean.java ---------------------------------------------------------------------- diff --cc src/java/org/apache/cassandra/service/StorageProxyMBean.java index 0a4ba19,ee82a5b..8678dde --- a/src/java/org/apache/cassandra/service/StorageProxyMBean.java +++ b/src/java/org/apache/cassandra/service/StorageProxyMBean.java @@@ -59,8 -59,9 +59,11 @@@ public interface StorageProxyMBea public long getReadRepairRepairedBlocking(); public long getReadRepairRepairedBackground(); + public int getOtcBacklogExpirationInterval(); + public void setOtcBacklogExpirationInterval(int intervalInMillis); + /** Returns each live node's schema version */ public Map<String, List<String>> getSchemaVersions(); + + public int getNumberOfTables(); } http://git-wip-us.apache.org/repos/asf/cassandra/blob/18765886/test/unit/org/apache/cassandra/net/OutboundTcpConnectionTest.java ---------------------------------------------------------------------- diff --cc test/unit/org/apache/cassandra/net/OutboundTcpConnectionTest.java index 0000000,c09ae0f..e3b6817 mode 000000,100644..100644 --- a/test/unit/org/apache/cassandra/net/OutboundTcpConnectionTest.java +++ b/test/unit/org/apache/cassandra/net/OutboundTcpConnectionTest.java @@@ -1,0 -1,170 +1,175 @@@ + /* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + package org.apache.cassandra.net; + + import org.apache.cassandra.config.DatabaseDescriptor; + import org.apache.cassandra.net.MessagingService.Verb; + import org.junit.BeforeClass; + import org.junit.Test; + import static org.junit.Assert.assertFalse; + import static org.junit.Assert.assertTrue; + + import java.net.InetAddress; + import java.net.UnknownHostException; + import java.util.concurrent.TimeUnit; + import java.util.concurrent.atomic.AtomicInteger; + + /** + * The tests check whether Queue expiration in the OutboundTcpConnection behaves properly for droppable and + * non-droppable messages. + */ + public class OutboundTcpConnectionTest + { + AtomicInteger messageId = new AtomicInteger(0); + + final static Verb VERB_DROPPABLE = Verb.MUTATION; // Droppable, 2s timeout + final static Verb VERB_NONDROPPABLE = Verb.GOSSIP_DIGEST_ACK; // Not droppable ++ ++ final static long NANOS_FOR_TIMEOUT; + - final static long NANOS_FOR_TIMEOUT = TimeUnit.MILLISECONDS.toNanos(DatabaseDescriptor.getTimeout(VERB_DROPPABLE)*2); - ++ static ++ { ++ DatabaseDescriptor.daemonInitialization(); ++ NANOS_FOR_TIMEOUT = TimeUnit.MILLISECONDS.toNanos(VERB_DROPPABLE.getTimeout()*2); ++ } + + /** + * Verifies our assumptions whether a Verb can be dropped or not. The tests make use of droppabilty, and + * may produce wrong test results if their droppabilty is changed. + */ + @BeforeClass + public static void assertDroppability() + { + if (!MessagingService.DROPPABLE_VERBS.contains(VERB_DROPPABLE)) + throw new AssertionError("Expected " + VERB_DROPPABLE + " to be droppable"); + if (MessagingService.DROPPABLE_VERBS.contains(VERB_NONDROPPABLE)) + throw new AssertionError("Expected " + VERB_NONDROPPABLE + " not to be droppable"); + } + + /** + * Tests that non-droppable messages are never expired + */ + @Test + public void testNondroppable() throws UnknownHostException + { + OutboundTcpConnection otc = getOutboundTcpConnectionForLocalhost(); + long nanoTimeBeforeEnqueue = System.nanoTime(); + + assertFalse("Fresh OutboundTcpConnection contains expired messages", + otc.backlogContainsExpiredMessages(nanoTimeBeforeEnqueue)); + + fillToPurgeSize(otc, VERB_NONDROPPABLE); + fillToPurgeSize(otc, VERB_NONDROPPABLE); + otc.expireMessages(expirationTimeNanos()); + + assertFalse("OutboundTcpConnection with non-droppable verbs should not expire", + otc.backlogContainsExpiredMessages(expirationTimeNanos())); + } + + /** + * Tests that droppable messages will be dropped after they expire, but not before. + * + * @throws UnknownHostException + */ + @Test + public void testDroppable() throws UnknownHostException + { + OutboundTcpConnection otc = getOutboundTcpConnectionForLocalhost(); + long nanoTimeBeforeEnqueue = System.nanoTime(); + + initialFill(otc, VERB_DROPPABLE); + assertFalse("OutboundTcpConnection with droppable verbs should not expire immediately", + otc.backlogContainsExpiredMessages(nanoTimeBeforeEnqueue)); + + otc.expireMessages(nanoTimeBeforeEnqueue); + assertFalse("OutboundTcpConnection with droppable verbs should not expire with enqueue-time expiration", + otc.backlogContainsExpiredMessages(nanoTimeBeforeEnqueue)); + + // Lets presume, expiration time have passed => At that time there shall be expired messages in the Queue + long nanoTimeWhenExpired = expirationTimeNanos(); + assertTrue("OutboundTcpConnection with droppable verbs should have expired", + otc.backlogContainsExpiredMessages(nanoTimeWhenExpired)); + + // Using the same timestamp, lets expire them and check whether they have gone + otc.expireMessages(nanoTimeWhenExpired); + assertFalse("OutboundTcpConnection should not have expired entries", + otc.backlogContainsExpiredMessages(nanoTimeWhenExpired)); + + // Actually the previous test can be done in a harder way: As expireMessages() has run, we cannot have + // ANY expired values, thus lets test also against nanoTimeBeforeEnqueue + assertFalse("OutboundTcpConnection should not have any expired entries", + otc.backlogContainsExpiredMessages(nanoTimeBeforeEnqueue)); + + } + + /** + * Fills the given OutboundTcpConnection with (1 + BACKLOG_PURGE_SIZE), elements. The first + * BACKLOG_PURGE_SIZE elements are non-droppable, the last one is a message with the given Verb and can be + * droppable or non-droppable. + */ + private void initialFill(OutboundTcpConnection otc, Verb verb) + { + assertFalse("Fresh OutboundTcpConnection contains expired messages", + otc.backlogContainsExpiredMessages(System.nanoTime())); + + fillToPurgeSize(otc, VERB_NONDROPPABLE); + MessageOut<?> messageDroppable10s = new MessageOut<>(verb); + otc.enqueue(messageDroppable10s, nextMessageId()); + otc.expireMessages(System.nanoTime()); + } + + /** + * Returns a nano timestamp in the far future, when expiration should have been performed for VERB_DROPPABLE. + * The offset is chosen as 2 times of the expiration time of VERB_DROPPABLE. + * + * @return The future nano timestamp + */ + private long expirationTimeNanos() + { + return System.nanoTime() + NANOS_FOR_TIMEOUT; + } + + private int nextMessageId() + { + return messageId.incrementAndGet(); + } + + /** + * Adds BACKLOG_PURGE_SIZE messages to the queue. Hint: At BACKLOG_PURGE_SIZE expiration starts to work. + * + * @param otc + * The OutboundTcpConnection + * @param verb + * The verb that defines the message type + */ + private void fillToPurgeSize(OutboundTcpConnection otc, Verb verb) + { + for (int i = 0; i < OutboundTcpConnection.BACKLOG_PURGE_SIZE; i++) + { + otc.enqueue(new MessageOut<>(verb), nextMessageId()); + } + } + + private OutboundTcpConnection getOutboundTcpConnectionForLocalhost() throws UnknownHostException + { + InetAddress lo = InetAddress.getByName("127.0.0.1"); - OutboundTcpConnectionPool otcPool = new OutboundTcpConnectionPool(lo); - OutboundTcpConnection otc = new OutboundTcpConnection(otcPool); ++ OutboundTcpConnectionPool otcPool = new OutboundTcpConnectionPool(lo, null); ++ OutboundTcpConnection otc = new OutboundTcpConnection(otcPool, "lo-OutboundTcpConnectionTest"); + return otc; + } + } --------------------------------------------------------------------- To unsubscribe, e-mail: commits-unsubscr...@cassandra.apache.org For additional commands, e-mail: commits-h...@cassandra.apache.org