Expire OTC messages by a single Thread patch by Christian Esken; reviewed by Ariel Weisberg for CASSANDRA-13265
Project: http://git-wip-us.apache.org/repos/asf/cassandra/repo Commit: http://git-wip-us.apache.org/repos/asf/cassandra/commit/617c8eba Tree: http://git-wip-us.apache.org/repos/asf/cassandra/tree/617c8eba Diff: http://git-wip-us.apache.org/repos/asf/cassandra/diff/617c8eba Branch: refs/heads/cassandra-3.11 Commit: 617c8ebadb6c4df99c35a913184760e82172b1f5 Parents: b77e754 Author: Christian Esken <christian.es...@trivago.com> Authored: Wed Mar 1 15:56:36 2017 +0100 Committer: Ariel Weisberg <aweisb...@apple.com> Committed: Wed May 3 16:30:43 2017 -0400 ---------------------------------------------------------------------- CHANGES.txt | 1 + conf/cassandra.yaml | 9 + .../org/apache/cassandra/config/Config.java | 6 + .../cassandra/config/DatabaseDescriptor.java | 10 ++ .../cassandra/net/OutboundTcpConnection.java | 113 +++++++++--- .../apache/cassandra/service/StorageProxy.java | 10 +- .../cassandra/service/StorageProxyMBean.java | 3 + .../net/OutboundTcpConnectionTest.java | 170 +++++++++++++++++++ 8 files changed, 295 insertions(+), 27 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/cassandra/blob/617c8eba/CHANGES.txt ---------------------------------------------------------------------- diff --git a/CHANGES.txt b/CHANGES.txt index 19cd39c..9ec6c95 100644 --- a/CHANGES.txt +++ b/CHANGES.txt @@ -1,4 +1,5 @@ 3.0.14 + * Expire OutboundTcpConnection messages by a single Thread (CASSANDRA-13265) * Fail repair if insufficient responses received (CASSANDRA-13397) * Fix SSTableLoader fail when the loaded table contains dropped columns (CASSANDRA-13276) * Avoid name clashes in CassandraIndexTest (CASSANDRA-13427) http://git-wip-us.apache.org/repos/asf/cassandra/blob/617c8eba/conf/cassandra.yaml ---------------------------------------------------------------------- diff --git a/conf/cassandra.yaml b/conf/cassandra.yaml index 61d3844..22491c6 100644 --- a/conf/cassandra.yaml +++ b/conf/cassandra.yaml @@ -989,3 +989,12 @@ windows_timer_interval: 1 # Do not try to coalesce messages if we already got that many messages. This should be more than 2 and less than 128. # otc_coalescing_enough_coalesced_messages: 8 + +# How many milliseconds to wait between two expiration runs on the backlog (queue) of the OutboundTcpConnection. +# Expiration is done if messages are piling up in the backlog. Droppable messages are expired to free the memory +# taken by expired messages. The interval should be between 0 and 1000, and in most installations the default value +# will be appropriate. A smaller value could potentially expire messages slightly sooner at the expense of more CPU +# time and queue contention while iterating the backlog of messages. +# An interval of 0 disables any wait time, which is the behavior of former Cassandra versions. +# +# otc_backlog_expiration_interval_ms: 200 http://git-wip-us.apache.org/repos/asf/cassandra/blob/617c8eba/src/java/org/apache/cassandra/config/Config.java ---------------------------------------------------------------------- diff --git a/src/java/org/apache/cassandra/config/Config.java b/src/java/org/apache/cassandra/config/Config.java index 9aaf7ae..6a99cd3 100644 --- a/src/java/org/apache/cassandra/config/Config.java +++ b/src/java/org/apache/cassandra/config/Config.java @@ -298,6 +298,12 @@ public class Config public int otc_coalescing_window_us = otc_coalescing_window_us_default; public int otc_coalescing_enough_coalesced_messages = 8; + /** + * Backlog expiration interval in milliseconds for the OutboundTcpConnection. + */ + public static final int otc_backlog_expiration_interval_ms_default = 200; + public volatile int otc_backlog_expiration_interval_ms = otc_backlog_expiration_interval_ms_default; + public int windows_timer_interval = 0; public boolean enable_user_defined_functions = false; http://git-wip-us.apache.org/repos/asf/cassandra/blob/617c8eba/src/java/org/apache/cassandra/config/DatabaseDescriptor.java ---------------------------------------------------------------------- diff --git a/src/java/org/apache/cassandra/config/DatabaseDescriptor.java b/src/java/org/apache/cassandra/config/DatabaseDescriptor.java index 602214f..e9e54c3 100644 --- a/src/java/org/apache/cassandra/config/DatabaseDescriptor.java +++ b/src/java/org/apache/cassandra/config/DatabaseDescriptor.java @@ -1967,6 +1967,16 @@ public class DatabaseDescriptor conf.otc_coalescing_enough_coalesced_messages = otc_coalescing_enough_coalesced_messages; } + public static int getOtcBacklogExpirationInterval() + { + return conf.otc_backlog_expiration_interval_ms; + } + + public static void setOtcBacklogExpirationInterval(int intervalInMillis) + { + conf.otc_backlog_expiration_interval_ms = intervalInMillis; + } + public static int getWindowsTimerInterval() { return conf.windows_timer_interval; http://git-wip-us.apache.org/repos/asf/cassandra/blob/617c8eba/src/java/org/apache/cassandra/net/OutboundTcpConnection.java ---------------------------------------------------------------------- diff --git a/src/java/org/apache/cassandra/net/OutboundTcpConnection.java b/src/java/org/apache/cassandra/net/OutboundTcpConnection.java index 4608399..99ad194 100644 --- a/src/java/org/apache/cassandra/net/OutboundTcpConnection.java +++ b/src/java/org/apache/cassandra/net/OutboundTcpConnection.java @@ -31,6 +31,7 @@ import java.util.concurrent.BlockingQueue; import java.util.concurrent.CountDownLatch; import java.util.concurrent.LinkedBlockingQueue; import java.util.concurrent.TimeUnit; +import java.util.concurrent.atomic.AtomicBoolean; import java.util.concurrent.atomic.AtomicInteger; import java.util.concurrent.atomic.AtomicLong; import java.util.zip.Checksum; @@ -62,6 +63,7 @@ import org.xerial.snappy.SnappyOutputStream; import org.apache.cassandra.config.Config; import org.apache.cassandra.config.DatabaseDescriptor; +import com.google.common.annotations.VisibleForTesting; import com.google.common.util.concurrent.Uninterruptibles; public class OutboundTcpConnection extends Thread @@ -116,9 +118,14 @@ public class OutboundTcpConnection extends Thread if (coalescingWindow < 0) throw new ExceptionInInitializerError( "Value provided for coalescing window must be greather than 0: " + coalescingWindow); + + int otc_backlog_expiration_interval_in_ms = DatabaseDescriptor.getOtcBacklogExpirationInterval(); + if (otc_backlog_expiration_interval_in_ms != Config.otc_backlog_expiration_interval_ms_default) + logger.info("OutboundTcpConnection backlog expiration interval set to to {}ms", otc_backlog_expiration_interval_in_ms); + } - private static final MessageOut CLOSE_SENTINEL = new MessageOut(MessagingService.Verb.INTERNAL_RESPONSE); + private static final MessageOut<?> CLOSE_SENTINEL = new MessageOut<MessagingService.Verb>(MessagingService.Verb.INTERNAL_RESPONSE); private volatile boolean isStopped = false; private static final int OPEN_RETRY_DELAY = 100; // ms between retries @@ -128,6 +135,11 @@ public class OutboundTcpConnection extends Thread static final int LZ4_HASH_SEED = 0x9747b28c; private final BlockingQueue<QueuedMessage> backlog = new LinkedBlockingQueue<>(); + private static final String BACKLOG_PURGE_SIZE_PROPERTY = PREFIX + "otc_backlog_purge_size"; + @VisibleForTesting + static final int BACKLOG_PURGE_SIZE = Integer.getInteger(BACKLOG_PURGE_SIZE_PROPERTY, 1024); + private final AtomicBoolean backlogExpirationActive = new AtomicBoolean(false); + private volatile long backlogNextExpirationTime; private final OutboundTcpConnectionPool poolReference; @@ -164,11 +176,11 @@ public class OutboundTcpConnection extends Thread public void enqueue(MessageOut<?> message, int id) { - if (backlog.size() > 1024) - expireMessages(); + long nanoTime = System.nanoTime(); + expireMessages(nanoTime); try { - backlog.put(new QueuedMessage(message, id)); + backlog.put(new QueuedMessage(message, id, nanoTime)); } catch (InterruptedException e) { @@ -176,6 +188,18 @@ public class OutboundTcpConnection extends Thread } } + /** + * This is a helper method for unit testing. Disclaimer: Do not use this method outside unit tests, as + * this method is iterating the queue which can be an expensive operation (CPU time, queue locking). + * + * @return true, if the queue contains at least one expired element + */ + @VisibleForTesting // (otherwise = VisibleForTesting.NONE) + boolean backlogContainsExpiredMessages(long nowNanos) + { + return backlog.stream().anyMatch(entry -> entry.isTimedOut(nowNanos)); + } + void closeSocket(boolean destroyThread) { isStopped = destroyThread; // Exit loop to stop the thread @@ -214,9 +238,8 @@ public class OutboundTcpConnection extends Thread throw new AssertionError(e); } - currentMsgBufferCount = drainedMessages.size(); + int count = currentMsgBufferCount = drainedMessages.size(); - int count = drainedMessages.size(); //The timestamp of the first message has already been provided to the coalescing strategy //so skip logging it. inner: @@ -233,14 +256,16 @@ public class OutboundTcpConnection extends Thread continue; } - if (qm.isTimedOut()) + if (qm.isTimedOut(System.nanoTime())) dropped.incrementAndGet(); else if (socket != null || connect()) writeConnected(qm, count == 1 && backlog.isEmpty()); else { - // clear out the queue, else gossip messages back up. - drainedMessages.clear(); + // Not connected! Clear out the queue, else gossip messages back up. Update dropped + // statistics accordingly. Hint: The statistics may be slightly too low, if messages + // are added between the calls of backlog.size() and backlog.clear() + dropped.addAndGet(backlog.size()); backlog.clear(); break inner; } @@ -254,6 +279,8 @@ public class OutboundTcpConnection extends Thread } currentMsgBufferCount = --count; } + // Update dropped statistics by the number of unprocessed drainedMessages + dropped.addAndGet(currentMsgBufferCount); drainedMessages.clear(); } } @@ -343,7 +370,7 @@ public class OutboundTcpConnection extends Thread } } - private void writeInternal(MessageOut message, int id, long timestamp) throws IOException + private void writeInternal(MessageOut<?> message, int id, long timestamp) throws IOException { out.writeInt(MessagingService.PROTOCOL_MAGIC); @@ -563,18 +590,53 @@ public class OutboundTcpConnection extends Thread return version.get(); } - private void expireMessages() + /** + * Expire elements from the queue if the queue is pretty full and expiration is not already in progress. + * This method will only remove droppable expired entries. If no such element exists, nothing is removed from the queue. + * + * @param timestampNanos The current time as from System.nanoTime() + */ + @VisibleForTesting + void expireMessages(long timestampNanos) { - Iterator<QueuedMessage> iter = backlog.iterator(); - while (iter.hasNext()) + if (backlog.size() <= BACKLOG_PURGE_SIZE) + return; // Plenty of space + + if (backlogNextExpirationTime - timestampNanos > 0) + return; // Expiration is not due. + + /** + * Expiration is an expensive process. Iterating the queue locks the queue for both writes and + * reads during iter.next() and iter.remove(). Thus letting only a single Thread do expiration. + */ + if (backlogExpirationActive.compareAndSet(false, true)) { - QueuedMessage qm = iter.next(); - if (!qm.droppable) - continue; - if (!qm.isTimedOut()) - return; - iter.remove(); - dropped.incrementAndGet(); + try + { + Iterator<QueuedMessage> iter = backlog.iterator(); + while (iter.hasNext()) + { + QueuedMessage qm = iter.next(); + if (!qm.droppable) + continue; + if (!qm.isTimedOut(timestampNanos)) + continue; + iter.remove(); + dropped.incrementAndGet(); + } + + if (logger.isTraceEnabled()) + { + long duration = TimeUnit.NANOSECONDS.toMicros(System.nanoTime() - timestampNanos); + logger.trace("Expiration of {} took {}μs", getName(), duration); + } + } + finally + { + long backlogExpirationIntervalNanos = TimeUnit.MILLISECONDS.toNanos(DatabaseDescriptor.getOtcBacklogExpirationInterval()); + backlogNextExpirationTime = timestampNanos + backlogExpirationIntervalNanos; + backlogExpirationActive.set(false); + } } } @@ -586,18 +648,19 @@ public class OutboundTcpConnection extends Thread final long timestampNanos; final boolean droppable; - QueuedMessage(MessageOut<?> message, int id) + QueuedMessage(MessageOut<?> message, int id, long timestampNanos) { this.message = message; this.id = id; - this.timestampNanos = System.nanoTime(); + this.timestampNanos = timestampNanos; this.droppable = MessagingService.DROPPABLE_VERBS.contains(message.verb); } /** don't drop a non-droppable message just because it's timestamp is expired */ - boolean isTimedOut() + boolean isTimedOut(long nowNanos) { - return droppable && timestampNanos < System.nanoTime() - TimeUnit.MILLISECONDS.toNanos(message.getTimeout()); + long messageTimeoutNanos = TimeUnit.MILLISECONDS.toNanos(message.getTimeout()); + return droppable && nowNanos - timestampNanos > messageTimeoutNanos; } boolean shouldRetry() @@ -615,7 +678,7 @@ public class OutboundTcpConnection extends Thread { RetriedQueuedMessage(QueuedMessage msg) { - super(msg.message, msg.id); + super(msg.message, msg.id, msg.timestampNanos); } boolean shouldRetry() http://git-wip-us.apache.org/repos/asf/cassandra/blob/617c8eba/src/java/org/apache/cassandra/service/StorageProxy.java ---------------------------------------------------------------------- diff --git a/src/java/org/apache/cassandra/service/StorageProxy.java b/src/java/org/apache/cassandra/service/StorageProxy.java index cffd63c..ea082d5 100644 --- a/src/java/org/apache/cassandra/service/StorageProxy.java +++ b/src/java/org/apache/cassandra/service/StorageProxy.java @@ -72,8 +72,6 @@ import org.apache.cassandra.triggers.TriggerExecutor; import org.apache.cassandra.utils.*; import org.apache.cassandra.utils.AbstractIterator; -import static com.google.common.collect.Iterables.contains; - public class StorageProxy implements StorageProxyMBean { public static final String MBEAN_NAME = "org.apache.cassandra.db:type=StorageProxy"; @@ -2683,4 +2681,12 @@ public class StorageProxy implements StorageProxyMBean public long getReadRepairRepairedBackground() { return ReadRepairMetrics.repairedBackground.getCount(); } + + public int getOtcBacklogExpirationInterval() { + return DatabaseDescriptor.getOtcBacklogExpirationInterval(); + } + + public void setOtcBacklogExpirationInterval(int intervalInMillis) { + DatabaseDescriptor.setOtcBacklogExpirationInterval(intervalInMillis); + } } http://git-wip-us.apache.org/repos/asf/cassandra/blob/617c8eba/src/java/org/apache/cassandra/service/StorageProxyMBean.java ---------------------------------------------------------------------- diff --git a/src/java/org/apache/cassandra/service/StorageProxyMBean.java b/src/java/org/apache/cassandra/service/StorageProxyMBean.java index 0db0ca6..ee82a5b 100644 --- a/src/java/org/apache/cassandra/service/StorageProxyMBean.java +++ b/src/java/org/apache/cassandra/service/StorageProxyMBean.java @@ -59,6 +59,9 @@ public interface StorageProxyMBean public long getReadRepairRepairedBlocking(); public long getReadRepairRepairedBackground(); + public int getOtcBacklogExpirationInterval(); + public void setOtcBacklogExpirationInterval(int intervalInMillis); + /** Returns each live node's schema version */ public Map<String, List<String>> getSchemaVersions(); } http://git-wip-us.apache.org/repos/asf/cassandra/blob/617c8eba/test/unit/org/apache/cassandra/net/OutboundTcpConnectionTest.java ---------------------------------------------------------------------- diff --git a/test/unit/org/apache/cassandra/net/OutboundTcpConnectionTest.java b/test/unit/org/apache/cassandra/net/OutboundTcpConnectionTest.java new file mode 100644 index 0000000..c09ae0f --- /dev/null +++ b/test/unit/org/apache/cassandra/net/OutboundTcpConnectionTest.java @@ -0,0 +1,170 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.cassandra.net; + +import org.apache.cassandra.config.DatabaseDescriptor; +import org.apache.cassandra.net.MessagingService.Verb; +import org.junit.BeforeClass; +import org.junit.Test; +import static org.junit.Assert.assertFalse; +import static org.junit.Assert.assertTrue; + +import java.net.InetAddress; +import java.net.UnknownHostException; +import java.util.concurrent.TimeUnit; +import java.util.concurrent.atomic.AtomicInteger; + +/** + * The tests check whether Queue expiration in the OutboundTcpConnection behaves properly for droppable and + * non-droppable messages. + */ +public class OutboundTcpConnectionTest +{ + AtomicInteger messageId = new AtomicInteger(0); + + final static Verb VERB_DROPPABLE = Verb.MUTATION; // Droppable, 2s timeout + final static Verb VERB_NONDROPPABLE = Verb.GOSSIP_DIGEST_ACK; // Not droppable + + final static long NANOS_FOR_TIMEOUT = TimeUnit.MILLISECONDS.toNanos(DatabaseDescriptor.getTimeout(VERB_DROPPABLE)*2); + + + /** + * Verifies our assumptions whether a Verb can be dropped or not. The tests make use of droppabilty, and + * may produce wrong test results if their droppabilty is changed. + */ + @BeforeClass + public static void assertDroppability() + { + if (!MessagingService.DROPPABLE_VERBS.contains(VERB_DROPPABLE)) + throw new AssertionError("Expected " + VERB_DROPPABLE + " to be droppable"); + if (MessagingService.DROPPABLE_VERBS.contains(VERB_NONDROPPABLE)) + throw new AssertionError("Expected " + VERB_NONDROPPABLE + " not to be droppable"); + } + + /** + * Tests that non-droppable messages are never expired + */ + @Test + public void testNondroppable() throws UnknownHostException + { + OutboundTcpConnection otc = getOutboundTcpConnectionForLocalhost(); + long nanoTimeBeforeEnqueue = System.nanoTime(); + + assertFalse("Fresh OutboundTcpConnection contains expired messages", + otc.backlogContainsExpiredMessages(nanoTimeBeforeEnqueue)); + + fillToPurgeSize(otc, VERB_NONDROPPABLE); + fillToPurgeSize(otc, VERB_NONDROPPABLE); + otc.expireMessages(expirationTimeNanos()); + + assertFalse("OutboundTcpConnection with non-droppable verbs should not expire", + otc.backlogContainsExpiredMessages(expirationTimeNanos())); + } + + /** + * Tests that droppable messages will be dropped after they expire, but not before. + * + * @throws UnknownHostException + */ + @Test + public void testDroppable() throws UnknownHostException + { + OutboundTcpConnection otc = getOutboundTcpConnectionForLocalhost(); + long nanoTimeBeforeEnqueue = System.nanoTime(); + + initialFill(otc, VERB_DROPPABLE); + assertFalse("OutboundTcpConnection with droppable verbs should not expire immediately", + otc.backlogContainsExpiredMessages(nanoTimeBeforeEnqueue)); + + otc.expireMessages(nanoTimeBeforeEnqueue); + assertFalse("OutboundTcpConnection with droppable verbs should not expire with enqueue-time expiration", + otc.backlogContainsExpiredMessages(nanoTimeBeforeEnqueue)); + + // Lets presume, expiration time have passed => At that time there shall be expired messages in the Queue + long nanoTimeWhenExpired = expirationTimeNanos(); + assertTrue("OutboundTcpConnection with droppable verbs should have expired", + otc.backlogContainsExpiredMessages(nanoTimeWhenExpired)); + + // Using the same timestamp, lets expire them and check whether they have gone + otc.expireMessages(nanoTimeWhenExpired); + assertFalse("OutboundTcpConnection should not have expired entries", + otc.backlogContainsExpiredMessages(nanoTimeWhenExpired)); + + // Actually the previous test can be done in a harder way: As expireMessages() has run, we cannot have + // ANY expired values, thus lets test also against nanoTimeBeforeEnqueue + assertFalse("OutboundTcpConnection should not have any expired entries", + otc.backlogContainsExpiredMessages(nanoTimeBeforeEnqueue)); + + } + + /** + * Fills the given OutboundTcpConnection with (1 + BACKLOG_PURGE_SIZE), elements. The first + * BACKLOG_PURGE_SIZE elements are non-droppable, the last one is a message with the given Verb and can be + * droppable or non-droppable. + */ + private void initialFill(OutboundTcpConnection otc, Verb verb) + { + assertFalse("Fresh OutboundTcpConnection contains expired messages", + otc.backlogContainsExpiredMessages(System.nanoTime())); + + fillToPurgeSize(otc, VERB_NONDROPPABLE); + MessageOut<?> messageDroppable10s = new MessageOut<>(verb); + otc.enqueue(messageDroppable10s, nextMessageId()); + otc.expireMessages(System.nanoTime()); + } + + /** + * Returns a nano timestamp in the far future, when expiration should have been performed for VERB_DROPPABLE. + * The offset is chosen as 2 times of the expiration time of VERB_DROPPABLE. + * + * @return The future nano timestamp + */ + private long expirationTimeNanos() + { + return System.nanoTime() + NANOS_FOR_TIMEOUT; + } + + private int nextMessageId() + { + return messageId.incrementAndGet(); + } + + /** + * Adds BACKLOG_PURGE_SIZE messages to the queue. Hint: At BACKLOG_PURGE_SIZE expiration starts to work. + * + * @param otc + * The OutboundTcpConnection + * @param verb + * The verb that defines the message type + */ + private void fillToPurgeSize(OutboundTcpConnection otc, Verb verb) + { + for (int i = 0; i < OutboundTcpConnection.BACKLOG_PURGE_SIZE; i++) + { + otc.enqueue(new MessageOut<>(verb), nextMessageId()); + } + } + + private OutboundTcpConnection getOutboundTcpConnectionForLocalhost() throws UnknownHostException + { + InetAddress lo = InetAddress.getByName("127.0.0.1"); + OutboundTcpConnectionPool otcPool = new OutboundTcpConnectionPool(lo); + OutboundTcpConnection otc = new OutboundTcpConnection(otcPool); + return otc; + } +} --------------------------------------------------------------------- To unsubscribe, e-mail: commits-unsubscr...@cassandra.apache.org For additional commands, e-mail: commits-h...@cassandra.apache.org