Coalescing strategies improvements CASSANDRA-13090 With the previous code TIMEHORIZON and MOVINGAVERAGE coalesing strategy would wait even when the backlog still contains data which would make it grow even more.
Also: - cleanups parkLoop() - add otc_coalescing_max_coalesced_messages - add otc_coalescing_enough_coalesced_messages - add other otc_* settings to cassandra.yaml patch by Corentin Chary <c.ch...@criteo.com> reviewed by Ariel Weisberg <aweisb...@apple.com> for CASSANDRA-13090 Project: http://git-wip-us.apache.org/repos/asf/cassandra/repo Commit: http://git-wip-us.apache.org/repos/asf/cassandra/commit/5725e2c4 Tree: http://git-wip-us.apache.org/repos/asf/cassandra/tree/5725e2c4 Diff: http://git-wip-us.apache.org/repos/asf/cassandra/diff/5725e2c4 Branch: refs/heads/cassandra-3.0 Commit: 5725e2c422d21d8efe5ae3bc4389842939553650 Parents: 7e05f39 Author: Corentin Chary <c.ch...@criteo.com> Authored: Mon Jan 9 12:06:56 2017 -0500 Committer: Ariel Weisberg <aweisb...@apple.com> Committed: Mon Feb 13 12:58:29 2017 -0500 ---------------------------------------------------------------------- CHANGES.txt | 1 + conf/cassandra.yaml | 32 ++++++++++++++++ .../org/apache/cassandra/config/Config.java | 3 +- .../cassandra/config/DatabaseDescriptor.java | 16 ++++++++ .../cassandra/net/OutboundTcpConnection.java | 4 +- .../cassandra/utils/CoalescingStrategies.java | 32 ++++++++++++---- .../utils/CoalescingStrategiesTest.java | 40 ++++++++++++++++++++ 7 files changed, 119 insertions(+), 9 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/cassandra/blob/5725e2c4/CHANGES.txt ---------------------------------------------------------------------- diff --git a/CHANGES.txt b/CHANGES.txt index 214fe97..4052b0f 100644 --- a/CHANGES.txt +++ b/CHANGES.txt @@ -1,4 +1,5 @@ 2.2.9 + * Coalescing strategy sleeps too much and shouldn't be enabled by default (CASSANDRA-13090) * Fix negative mean latency metric (CASSANDRA-12876) * Use only one file pointer when creating commitlog segments (CASSANDRA-12539) * Fix speculative retry bugs (CASSANDRA-13009) http://git-wip-us.apache.org/repos/asf/cassandra/blob/5725e2c4/conf/cassandra.yaml ---------------------------------------------------------------------- diff --git a/conf/cassandra.yaml b/conf/cassandra.yaml index ab2aa14..ddb7927 100644 --- a/conf/cassandra.yaml +++ b/conf/cassandra.yaml @@ -884,3 +884,35 @@ enable_user_defined_functions: false # below their system default. The sysinternals 'clockres' tool can confirm your system's default # setting. windows_timer_interval: 1 + +# Maximum size of any value in SSTables. Safety measure to detect SSTable corruption +# early. Any value size larger than this threshold will result into marking an SSTable +# as corrupted. +# max_value_size_in_mb: 256 + +# Coalescing Strategies # +# Coalescing multiples messages turns out to significantly boost message processing throughput (think doubling or more). +# On bare metal, the floor for packet processing throughput is high enough that many applications wonât notice, but in +# virtualized environments, the point at which an application can be bound by network packet processing can be +# surprisingly low compared to the throughput of task processing that is possible inside a VM. Itâs not that bare metal +# doesnât benefit from coalescing messages, itâs that the number of packets a bare metal network interface can process +# is sufficient for many applications such that no load starvation is experienced even without coalescing. +# There are other benefits to coalescing network messages that are harder to isolate with a simple metric like messages +# per second. By coalescing multiple tasks together, a network thread can process multiple messages for the cost of one +# trip to read from a socket, and all the task submission work can be done at the same time reducing context switching +# and increasing cache friendliness of network message processing. +# See CASSANDRA-8692 for details. + +# Strategy to use for coalescing messages in OutboundTcpConnection. +# Can be fixed, movingaverage, timehorizon (default), disabled. +# You can also specify a subclass of CoalescingStrategies.CoalescingStrategy by name. +# otc_coalescing_strategy: TIMEHORIZON + +# How many microseconds to wait for coalescing. For fixed strategy this is the amount of time after the first +# message is received before it will be sent with any accompanying messages. For moving average this is the +# maximum amount of time that will be waited as well as the interval at which messages must arrive on average +# for coalescing to be enabled. +# otc_coalescing_window_us: 200 + +# Do not try to coalesce messages if we already got that many messages. This should be more than 2 and less than 128. +# otc_coalescing_enough_coalesced_messages: 8 http://git-wip-us.apache.org/repos/asf/cassandra/blob/5725e2c4/src/java/org/apache/cassandra/config/Config.java ---------------------------------------------------------------------- diff --git a/src/java/org/apache/cassandra/config/Config.java b/src/java/org/apache/cassandra/config/Config.java index 9f5a22f..64b36a0 100644 --- a/src/java/org/apache/cassandra/config/Config.java +++ b/src/java/org/apache/cassandra/config/Config.java @@ -281,12 +281,13 @@ public class Config /* * How many microseconds to wait for coalescing. For fixed strategy this is the amount of time after the first - * messgae is received before it will be sent with any accompanying messages. For moving average this is the + * message is received before it will be sent with any accompanying messages. For moving average this is the * maximum amount of time that will be waited as well as the interval at which messages must arrive on average * for coalescing to be enabled. */ public static final int otc_coalescing_window_us_default = 200; public int otc_coalescing_window_us = otc_coalescing_window_us_default; + public int otc_coalescing_enough_coalesced_messages = 8; public int windows_timer_interval = 0; http://git-wip-us.apache.org/repos/asf/cassandra/blob/5725e2c4/src/java/org/apache/cassandra/config/DatabaseDescriptor.java ---------------------------------------------------------------------- diff --git a/src/java/org/apache/cassandra/config/DatabaseDescriptor.java b/src/java/org/apache/cassandra/config/DatabaseDescriptor.java index 35debd0..981026d 100644 --- a/src/java/org/apache/cassandra/config/DatabaseDescriptor.java +++ b/src/java/org/apache/cassandra/config/DatabaseDescriptor.java @@ -689,6 +689,12 @@ public class DatabaseDescriptor } if (seedProvider.getSeeds().size() == 0) throw new ConfigurationException("The seed provider lists no seeds.", false); + + if (conf.otc_coalescing_enough_coalesced_messages > 128) + throw new ConfigurationException("otc_coalescing_enough_coalesced_messages must be smaller than 128", false); + + if (conf.otc_coalescing_enough_coalesced_messages <= 0) + throw new ConfigurationException("otc_coalescing_enough_coalesced_messages must be positive", false); } private static FileStore guessFileStore(String dir) throws IOException @@ -1810,6 +1816,16 @@ public class DatabaseDescriptor return conf.otc_coalescing_window_us; } + public static int getOtcCoalescingEnoughCoalescedMessages() + { + return conf.otc_coalescing_enough_coalesced_messages; + } + + public static void setOtcCoalescingEnoughCoalescedMessages(int otc_coalescing_enough_coalesced_messages) + { + conf.otc_coalescing_enough_coalesced_messages = otc_coalescing_enough_coalesced_messages; + } + public static boolean enableUserDefinedFunctions() { return conf.enable_user_defined_functions; http://git-wip-us.apache.org/repos/asf/cassandra/blob/5725e2c4/src/java/org/apache/cassandra/net/OutboundTcpConnection.java ---------------------------------------------------------------------- diff --git a/src/java/org/apache/cassandra/net/OutboundTcpConnection.java b/src/java/org/apache/cassandra/net/OutboundTcpConnection.java index 1bf3ea3..6ec78a4 100644 --- a/src/java/org/apache/cassandra/net/OutboundTcpConnection.java +++ b/src/java/org/apache/cassandra/net/OutboundTcpConnection.java @@ -81,6 +81,8 @@ public class OutboundTcpConnection extends Thread private static final String BUFFER_SIZE_PROPERTY = PREFIX + "otc_buffer_size"; private static final int BUFFER_SIZE = Integer.getInteger(BUFFER_SIZE_PROPERTY, 1024 * 64); + public static final int MAX_COALESCED_MESSAGES = 128; + private static CoalescingStrategy newCoalescingStrategy(String displayName) { return CoalescingStrategies.newCoalescingStrategy(DatabaseDescriptor.getOtcCoalescingStrategy(), @@ -186,7 +188,7 @@ public class OutboundTcpConnection extends Thread public void run() { - final int drainedMessageSize = 128; + final int drainedMessageSize = MAX_COALESCED_MESSAGES; // keeping list (batch) size small for now; that way we don't have an unbounded array (that we never resize) final List<QueuedMessage> drainedMessages = new ArrayList<>(drainedMessageSize); http://git-wip-us.apache.org/repos/asf/cassandra/blob/5725e2c4/src/java/org/apache/cassandra/utils/CoalescingStrategies.java ---------------------------------------------------------------------- diff --git a/src/java/org/apache/cassandra/utils/CoalescingStrategies.java b/src/java/org/apache/cassandra/utils/CoalescingStrategies.java index 826bd64..52d4240 100644 --- a/src/java/org/apache/cassandra/utils/CoalescingStrategies.java +++ b/src/java/org/apache/cassandra/utils/CoalescingStrategies.java @@ -18,8 +18,11 @@ package org.apache.cassandra.utils; import org.apache.cassandra.config.Config; +import org.apache.cassandra.config.DatabaseDescriptor; import org.apache.cassandra.io.util.FileUtils; + import org.slf4j.Logger; +import org.slf4j.LoggerFactory; import java.io.File; import java.io.RandomAccessFile; @@ -38,6 +41,7 @@ import com.google.common.base.Preconditions; public class CoalescingStrategies { + static protected final Logger logger = LoggerFactory.getLogger(CoalescingStrategies.class); /* * Log debug information at info level about what the average is and when coalescing is enabled/disabled @@ -85,15 +89,23 @@ public class CoalescingStrategies { long now = System.nanoTime(); final long timer = now + nanos; + // We shouldn't loop if it's within a few % of the target sleep time if on a second iteration. + // See CASSANDRA-8692. + final long limit = timer - nanos / 16; do { LockSupport.parkNanos(timer - now); + now = System.nanoTime(); } - while (timer - (now = System.nanoTime()) > nanos / 16); + while (now < limit); } private static boolean maybeSleep(int messages, long averageGap, long maxCoalesceWindow, Parker parker) { + // Do not sleep if there are still items in the backlog (CASSANDRA-13090). + if (messages >= DatabaseDescriptor.getOtcCoalescingEnoughCoalescedMessages()) + return false; + // only sleep if we can expect to double the number of messages we're sending in the time interval long sleep = messages * averageGap; if (sleep <= 0 || sleep > maxCoalesceWindow) @@ -329,7 +341,7 @@ public class CoalescingStrategies if (input.drainTo(out, maxItems) == 0) { out.add(input.take()); - input.drainTo(out, maxItems - 1); + input.drainTo(out, maxItems - out.size()); } for (Coalescable qm : out) @@ -411,15 +423,16 @@ public class CoalescingStrategies if (input.drainTo(out, maxItems) == 0) { out.add(input.take()); + input.drainTo(out, maxItems - out.size()); } long average = notifyOfSample(out.get(0).timestampNanos()); - debugGap(average); - maybeSleep(out.size(), average, maxCoalesceWindow, parker); + if (maybeSleep(out.size(), average, maxCoalesceWindow, parker)) { + input.drainTo(out, maxItems - out.size()); + } - input.drainTo(out, maxItems - out.size()); for (int ii = 1; ii < out.size(); ii++) notifyOfSample(out.get(ii).timestampNanos()); } @@ -447,11 +460,16 @@ public class CoalescingStrategies @Override protected <C extends Coalescable> void coalesceInternal(BlockingQueue<C> input, List<C> out, int maxItems) throws InterruptedException { + int enough = DatabaseDescriptor.getOtcCoalescingEnoughCoalescedMessages(); + if (input.drainTo(out, maxItems) == 0) { out.add(input.take()); - parker.park(coalesceWindow); - input.drainTo(out, maxItems - 1); + input.drainTo(out, maxItems - out.size()); + if (out.size() < enough) { + parker.park(coalesceWindow); + input.drainTo(out, maxItems - out.size()); + } } debugTimestamps(out); } http://git-wip-us.apache.org/repos/asf/cassandra/blob/5725e2c4/test/unit/org/apache/cassandra/utils/CoalescingStrategiesTest.java ---------------------------------------------------------------------- diff --git a/test/unit/org/apache/cassandra/utils/CoalescingStrategiesTest.java b/test/unit/org/apache/cassandra/utils/CoalescingStrategiesTest.java index 97d15fe..26b6b3a 100644 --- a/test/unit/org/apache/cassandra/utils/CoalescingStrategiesTest.java +++ b/test/unit/org/apache/cassandra/utils/CoalescingStrategiesTest.java @@ -17,10 +17,12 @@ */ package org.apache.cassandra.utils; +import org.apache.cassandra.config.DatabaseDescriptor; import org.apache.cassandra.utils.CoalescingStrategies.Clock; import org.apache.cassandra.utils.CoalescingStrategies.Coalescable; import org.apache.cassandra.utils.CoalescingStrategies.CoalescingStrategy; import org.apache.cassandra.utils.CoalescingStrategies.Parker; +import org.junit.BeforeClass; import org.junit.Before; import org.junit.Test; import org.slf4j.Logger; @@ -101,6 +103,12 @@ public class CoalescingStrategiesTest Semaphore queueParked = new Semaphore(0); Semaphore queueRelease = new Semaphore(0); + @BeforeClass + public static void initDD() + { + DatabaseDescriptor.forceStaticInitialization(); + } + @SuppressWarnings({ "serial" }) @Before public void setUp() throws Exception @@ -207,6 +215,38 @@ public class CoalescingStrategiesTest } @Test + public void testFixedCoalescingStrategyEnough() throws Exception + { + int oldValue = DatabaseDescriptor.getOtcCoalescingEnoughCoalescedMessages(); + DatabaseDescriptor.setOtcCoalescingEnoughCoalescedMessages(1); + try { + cs = newStrategy("FIXED", 200); + + //Test that when a stream of messages continues arriving it keeps sending until all are drained + //It does this because it is already awake and sending messages + add(42); + add(42); + cs.coalesce(input, output, 128); + assertEquals(2, output.size()); + assertNull(parker.parks.poll()); + + clear(); + + runBlocker(queueParked); + add(42); + add(42); + add(42); + release(); + assertEquals(3, output.size()); + assertNull(parker.parks.poll()); + } + finally { + DatabaseDescriptor.setOtcCoalescingEnoughCoalescedMessages(oldValue); + } + + } + + @Test public void testDisabledCoalescingStrateg() throws Exception { cs = newStrategy("DISABLED", 200);