Coalescing strategies improvements CASSANDRA-13090

With the previous code TIMEHORIZON and MOVINGAVERAGE
coalesing strategy would wait even when the backlog
still contains data which would make it grow even more.

Also:
- cleanups parkLoop()
- add otc_coalescing_max_coalesced_messages
- add otc_coalescing_enough_coalesced_messages
- add other otc_* settings to cassandra.yaml

patch by Corentin Chary <c.ch...@criteo.com> reviewed by Ariel Weisberg 
<aweisb...@apple.com> for CASSANDRA-13090


Project: http://git-wip-us.apache.org/repos/asf/cassandra/repo
Commit: http://git-wip-us.apache.org/repos/asf/cassandra/commit/5725e2c4
Tree: http://git-wip-us.apache.org/repos/asf/cassandra/tree/5725e2c4
Diff: http://git-wip-us.apache.org/repos/asf/cassandra/diff/5725e2c4

Branch: refs/heads/cassandra-3.0
Commit: 5725e2c422d21d8efe5ae3bc4389842939553650
Parents: 7e05f39
Author: Corentin Chary <c.ch...@criteo.com>
Authored: Mon Jan 9 12:06:56 2017 -0500
Committer: Ariel Weisberg <aweisb...@apple.com>
Committed: Mon Feb 13 12:58:29 2017 -0500

----------------------------------------------------------------------
 CHANGES.txt                                     |  1 +
 conf/cassandra.yaml                             | 32 ++++++++++++++++
 .../org/apache/cassandra/config/Config.java     |  3 +-
 .../cassandra/config/DatabaseDescriptor.java    | 16 ++++++++
 .../cassandra/net/OutboundTcpConnection.java    |  4 +-
 .../cassandra/utils/CoalescingStrategies.java   | 32 ++++++++++++----
 .../utils/CoalescingStrategiesTest.java         | 40 ++++++++++++++++++++
 7 files changed, 119 insertions(+), 9 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/cassandra/blob/5725e2c4/CHANGES.txt
----------------------------------------------------------------------
diff --git a/CHANGES.txt b/CHANGES.txt
index 214fe97..4052b0f 100644
--- a/CHANGES.txt
+++ b/CHANGES.txt
@@ -1,4 +1,5 @@
 2.2.9
+ * Coalescing strategy sleeps too much and shouldn't be enabled by default 
(CASSANDRA-13090)
  * Fix negative mean latency metric (CASSANDRA-12876)
  * Use only one file pointer when creating commitlog segments (CASSANDRA-12539)
  * Fix speculative retry bugs (CASSANDRA-13009)

http://git-wip-us.apache.org/repos/asf/cassandra/blob/5725e2c4/conf/cassandra.yaml
----------------------------------------------------------------------
diff --git a/conf/cassandra.yaml b/conf/cassandra.yaml
index ab2aa14..ddb7927 100644
--- a/conf/cassandra.yaml
+++ b/conf/cassandra.yaml
@@ -884,3 +884,35 @@ enable_user_defined_functions: false
 # below their system default. The sysinternals 'clockres' tool can confirm 
your system's default
 # setting.
 windows_timer_interval: 1
+
+# Maximum size of any value in SSTables. Safety measure to detect SSTable 
corruption
+# early. Any value size larger than this threshold will result into marking an 
SSTable
+# as corrupted.
+# max_value_size_in_mb: 256
+
+# Coalescing Strategies #
+# Coalescing multiples messages turns out to significantly boost message 
processing throughput (think doubling or more).
+# On bare metal, the floor for packet processing throughput is high enough 
that many applications won’t notice, but in
+# virtualized environments, the point at which an application can be bound by 
network packet processing can be
+# surprisingly low compared to the throughput of task processing that is 
possible inside a VM. It’s not that bare metal
+# doesn’t benefit from coalescing messages, it’s that the number of 
packets a bare metal network interface can process
+# is sufficient for many applications such that no load starvation is 
experienced even without coalescing.
+# There are other benefits to coalescing network messages that are harder to 
isolate with a simple metric like messages
+# per second. By coalescing multiple tasks together, a network thread can 
process multiple messages for the cost of one
+# trip to read from a socket, and all the task submission work can be done at 
the same time reducing context switching
+# and increasing cache friendliness of network message processing.
+# See CASSANDRA-8692 for details.
+
+# Strategy to use for coalescing messages in OutboundTcpConnection.
+# Can be fixed, movingaverage, timehorizon (default), disabled.
+# You can also specify a subclass of CoalescingStrategies.CoalescingStrategy 
by name.
+# otc_coalescing_strategy: TIMEHORIZON
+
+# How many microseconds to wait for coalescing. For fixed strategy this is the 
amount of time after the first
+# message is received before it will be sent with any accompanying messages. 
For moving average this is the
+# maximum amount of time that will be waited as well as the interval at which 
messages must arrive on average
+# for coalescing to be enabled.
+# otc_coalescing_window_us: 200
+
+# Do not try to coalesce messages if we already got that many messages. This 
should be more than 2 and less than 128.
+# otc_coalescing_enough_coalesced_messages: 8

http://git-wip-us.apache.org/repos/asf/cassandra/blob/5725e2c4/src/java/org/apache/cassandra/config/Config.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/config/Config.java 
b/src/java/org/apache/cassandra/config/Config.java
index 9f5a22f..64b36a0 100644
--- a/src/java/org/apache/cassandra/config/Config.java
+++ b/src/java/org/apache/cassandra/config/Config.java
@@ -281,12 +281,13 @@ public class Config
 
     /*
      * How many microseconds to wait for coalescing. For fixed strategy this 
is the amount of time after the first
-     * messgae is received before it will be sent with any accompanying 
messages. For moving average this is the
+     * message is received before it will be sent with any accompanying 
messages. For moving average this is the
      * maximum amount of time that will be waited as well as the interval at 
which messages must arrive on average
      * for coalescing to be enabled.
      */
     public static final int otc_coalescing_window_us_default = 200;
     public int otc_coalescing_window_us = otc_coalescing_window_us_default;
+    public int otc_coalescing_enough_coalesced_messages = 8;
 
     public int windows_timer_interval = 0;
 

http://git-wip-us.apache.org/repos/asf/cassandra/blob/5725e2c4/src/java/org/apache/cassandra/config/DatabaseDescriptor.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/config/DatabaseDescriptor.java 
b/src/java/org/apache/cassandra/config/DatabaseDescriptor.java
index 35debd0..981026d 100644
--- a/src/java/org/apache/cassandra/config/DatabaseDescriptor.java
+++ b/src/java/org/apache/cassandra/config/DatabaseDescriptor.java
@@ -689,6 +689,12 @@ public class DatabaseDescriptor
         }
         if (seedProvider.getSeeds().size() == 0)
             throw new ConfigurationException("The seed provider lists no 
seeds.", false);
+
+        if (conf.otc_coalescing_enough_coalesced_messages > 128)
+            throw new 
ConfigurationException("otc_coalescing_enough_coalesced_messages must be 
smaller than 128", false);
+
+        if (conf.otc_coalescing_enough_coalesced_messages <= 0)
+            throw new 
ConfigurationException("otc_coalescing_enough_coalesced_messages must be 
positive", false);
     }
 
     private static FileStore guessFileStore(String dir) throws IOException
@@ -1810,6 +1816,16 @@ public class DatabaseDescriptor
         return conf.otc_coalescing_window_us;
     }
 
+    public static int getOtcCoalescingEnoughCoalescedMessages()
+    {
+        return conf.otc_coalescing_enough_coalesced_messages;
+    }
+
+    public static void setOtcCoalescingEnoughCoalescedMessages(int 
otc_coalescing_enough_coalesced_messages)
+    {
+        conf.otc_coalescing_enough_coalesced_messages = 
otc_coalescing_enough_coalesced_messages;
+    }
+
     public static boolean enableUserDefinedFunctions()
     {
         return conf.enable_user_defined_functions;

http://git-wip-us.apache.org/repos/asf/cassandra/blob/5725e2c4/src/java/org/apache/cassandra/net/OutboundTcpConnection.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/net/OutboundTcpConnection.java 
b/src/java/org/apache/cassandra/net/OutboundTcpConnection.java
index 1bf3ea3..6ec78a4 100644
--- a/src/java/org/apache/cassandra/net/OutboundTcpConnection.java
+++ b/src/java/org/apache/cassandra/net/OutboundTcpConnection.java
@@ -81,6 +81,8 @@ public class OutboundTcpConnection extends Thread
     private static final String BUFFER_SIZE_PROPERTY = PREFIX + 
"otc_buffer_size";
     private static final int BUFFER_SIZE = 
Integer.getInteger(BUFFER_SIZE_PROPERTY, 1024 * 64);
 
+    public static final int MAX_COALESCED_MESSAGES = 128;
+
     private static CoalescingStrategy newCoalescingStrategy(String displayName)
     {
         return 
CoalescingStrategies.newCoalescingStrategy(DatabaseDescriptor.getOtcCoalescingStrategy(),
@@ -186,7 +188,7 @@ public class OutboundTcpConnection extends Thread
 
     public void run()
     {
-        final int drainedMessageSize = 128;
+        final int drainedMessageSize = MAX_COALESCED_MESSAGES;
         // keeping list (batch) size small for now; that way we don't have an 
unbounded array (that we never resize)
         final List<QueuedMessage> drainedMessages = new 
ArrayList<>(drainedMessageSize);
 

http://git-wip-us.apache.org/repos/asf/cassandra/blob/5725e2c4/src/java/org/apache/cassandra/utils/CoalescingStrategies.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/utils/CoalescingStrategies.java 
b/src/java/org/apache/cassandra/utils/CoalescingStrategies.java
index 826bd64..52d4240 100644
--- a/src/java/org/apache/cassandra/utils/CoalescingStrategies.java
+++ b/src/java/org/apache/cassandra/utils/CoalescingStrategies.java
@@ -18,8 +18,11 @@
 package org.apache.cassandra.utils;
 
 import org.apache.cassandra.config.Config;
+import org.apache.cassandra.config.DatabaseDescriptor;
 import org.apache.cassandra.io.util.FileUtils;
+
 import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
 
 import java.io.File;
 import java.io.RandomAccessFile;
@@ -38,6 +41,7 @@ import com.google.common.base.Preconditions;
 
 public class CoalescingStrategies
 {
+    static protected final Logger logger = 
LoggerFactory.getLogger(CoalescingStrategies.class);
 
     /*
      * Log debug information at info level about what the average is and when 
coalescing is enabled/disabled
@@ -85,15 +89,23 @@ public class CoalescingStrategies
     {
         long now = System.nanoTime();
         final long timer = now + nanos;
+        // We shouldn't loop if it's within a few % of the target sleep time 
if on a second iteration.
+        // See CASSANDRA-8692.
+        final long limit = timer - nanos / 16;
         do
         {
             LockSupport.parkNanos(timer - now);
+            now = System.nanoTime();
         }
-        while (timer - (now = System.nanoTime()) > nanos / 16);
+        while (now < limit);
     }
 
     private static boolean maybeSleep(int messages, long averageGap, long 
maxCoalesceWindow, Parker parker)
     {
+        // Do not sleep if there are still items in the backlog 
(CASSANDRA-13090).
+        if (messages >= 
DatabaseDescriptor.getOtcCoalescingEnoughCoalescedMessages())
+            return false;
+
         // only sleep if we can expect to double the number of messages we're 
sending in the time interval
         long sleep = messages * averageGap;
         if (sleep <= 0 || sleep > maxCoalesceWindow)
@@ -329,7 +341,7 @@ public class CoalescingStrategies
             if (input.drainTo(out, maxItems) == 0)
             {
                 out.add(input.take());
-                input.drainTo(out, maxItems - 1);
+                input.drainTo(out, maxItems - out.size());
             }
 
             for (Coalescable qm : out)
@@ -411,15 +423,16 @@ public class CoalescingStrategies
             if (input.drainTo(out, maxItems) == 0)
             {
                 out.add(input.take());
+                input.drainTo(out, maxItems - out.size());
             }
 
             long average = notifyOfSample(out.get(0).timestampNanos());
-
             debugGap(average);
 
-            maybeSleep(out.size(), average, maxCoalesceWindow, parker);
+            if (maybeSleep(out.size(), average, maxCoalesceWindow, parker)) {
+                input.drainTo(out, maxItems - out.size());
+            }
 
-            input.drainTo(out, maxItems - out.size());
             for (int ii = 1; ii < out.size(); ii++)
                 notifyOfSample(out.get(ii).timestampNanos());
         }
@@ -447,11 +460,16 @@ public class CoalescingStrategies
         @Override
         protected <C extends Coalescable> void 
coalesceInternal(BlockingQueue<C> input, List<C> out,  int maxItems) throws 
InterruptedException
         {
+            int enough = 
DatabaseDescriptor.getOtcCoalescingEnoughCoalescedMessages();
+
             if (input.drainTo(out, maxItems) == 0)
             {
                 out.add(input.take());
-                parker.park(coalesceWindow);
-                input.drainTo(out, maxItems - 1);
+                input.drainTo(out, maxItems - out.size());
+                if (out.size() < enough) {
+                    parker.park(coalesceWindow);
+                    input.drainTo(out, maxItems - out.size());
+                }
             }
             debugTimestamps(out);
         }

http://git-wip-us.apache.org/repos/asf/cassandra/blob/5725e2c4/test/unit/org/apache/cassandra/utils/CoalescingStrategiesTest.java
----------------------------------------------------------------------
diff --git a/test/unit/org/apache/cassandra/utils/CoalescingStrategiesTest.java 
b/test/unit/org/apache/cassandra/utils/CoalescingStrategiesTest.java
index 97d15fe..26b6b3a 100644
--- a/test/unit/org/apache/cassandra/utils/CoalescingStrategiesTest.java
+++ b/test/unit/org/apache/cassandra/utils/CoalescingStrategiesTest.java
@@ -17,10 +17,12 @@
  */
 package org.apache.cassandra.utils;
 
+import org.apache.cassandra.config.DatabaseDescriptor;
 import org.apache.cassandra.utils.CoalescingStrategies.Clock;
 import org.apache.cassandra.utils.CoalescingStrategies.Coalescable;
 import org.apache.cassandra.utils.CoalescingStrategies.CoalescingStrategy;
 import org.apache.cassandra.utils.CoalescingStrategies.Parker;
+import org.junit.BeforeClass;
 import org.junit.Before;
 import org.junit.Test;
 import org.slf4j.Logger;
@@ -101,6 +103,12 @@ public class CoalescingStrategiesTest
     Semaphore queueParked = new Semaphore(0);
     Semaphore queueRelease = new Semaphore(0);
 
+    @BeforeClass
+    public static void initDD()
+    {
+        DatabaseDescriptor.forceStaticInitialization();
+    }
+
     @SuppressWarnings({ "serial" })
     @Before
     public void setUp() throws Exception
@@ -207,6 +215,38 @@ public class CoalescingStrategiesTest
     }
 
     @Test
+    public void testFixedCoalescingStrategyEnough() throws Exception
+    {
+        int oldValue = 
DatabaseDescriptor.getOtcCoalescingEnoughCoalescedMessages();
+        DatabaseDescriptor.setOtcCoalescingEnoughCoalescedMessages(1);
+        try {
+            cs = newStrategy("FIXED", 200);
+
+            //Test that when a stream of messages continues arriving it keeps 
sending until all are drained
+            //It does this because it is already awake and sending messages
+            add(42);
+            add(42);
+            cs.coalesce(input, output, 128);
+            assertEquals(2, output.size());
+            assertNull(parker.parks.poll());
+
+            clear();
+
+            runBlocker(queueParked);
+            add(42);
+            add(42);
+            add(42);
+            release();
+            assertEquals(3, output.size());
+            assertNull(parker.parks.poll());
+        }
+        finally {
+            
DatabaseDescriptor.setOtcCoalescingEnoughCoalescedMessages(oldValue);
+        }
+
+    }
+
+    @Test
     public void testDisabledCoalescingStrateg() throws Exception
     {
         cs = newStrategy("DISABLED", 200);

Reply via email to