Repository: cassandra
Updated Branches:
  refs/heads/cassandra-3.0 fe37e0644 -> 037d24efd
  refs/heads/trunk 4b27287cd -> fc9c6faa2


Introduce backpressure for hints

patch by Ariel Weisberg; reviewed by Benedict Elliott Smith for
CASSANDRA-10972


Project: http://git-wip-us.apache.org/repos/asf/cassandra/repo
Commit: http://git-wip-us.apache.org/repos/asf/cassandra/commit/037d24ef
Tree: http://git-wip-us.apache.org/repos/asf/cassandra/tree/037d24ef
Diff: http://git-wip-us.apache.org/repos/asf/cassandra/diff/037d24ef

Branch: refs/heads/cassandra-3.0
Commit: 037d24efdf83bd2736556f9880c5e1f6be48fa77
Parents: fe37e06
Author: Ariel Weisberg <ariel.weisb...@datastax.com>
Authored: Mon Dec 28 16:32:05 2015 -0500
Committer: Aleksey Yeschenko <alek...@apache.org>
Committed: Tue Feb 23 15:28:41 2016 +0000

----------------------------------------------------------------------
 CHANGES.txt                                     |  1 +
 build.xml                                       | 14 +++-
 .../apache/cassandra/hints/HintsBufferPool.java | 34 ++++++---
 .../cassandra/hints/HintsWriteExecutor.java     |  3 +-
 .../cassandra/hints/HintsBufferPoolTest.java    | 75 ++++++++++++++++++++
 .../apache/cassandra/hints/HintsBufferTest.java |  2 +-
 6 files changed, 114 insertions(+), 15 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/cassandra/blob/037d24ef/CHANGES.txt
----------------------------------------------------------------------
diff --git a/CHANGES.txt b/CHANGES.txt
index da91594..a675016 100644
--- a/CHANGES.txt
+++ b/CHANGES.txt
@@ -1,4 +1,5 @@
 3.0.4
+ * Introduce backpressure for hints (CASSANDRA-10972)
  * Fix ClusteringPrefix not being able to read tombstone range boundaries 
(CASSANDRA-11158)
  * Prevent logging in sandboxed state (CASSANDRA-11033)
  * Disallow drop/alter operations of UDTs used by UDAs (CASSANDRA-10721)

http://git-wip-us.apache.org/repos/asf/cassandra/blob/037d24ef/build.xml
----------------------------------------------------------------------
diff --git a/build.xml b/build.xml
index d27b77a..6ef99fd 100644
--- a/build.xml
+++ b/build.xml
@@ -111,6 +111,8 @@
     <property name="jacoco.finalexecfile" 
value="${jacoco.export.dir}/jacoco.exec" />
     <property name="jacoco.version" value="0.7.5.201505241946"/>
 
+    <property name="byteman.version" value="3.0.3"/>
+
     <property name="ecj.version" value="4.4.2"/>
 
     <condition property="maven-ant-tasks.jar.exists">
@@ -382,6 +384,11 @@
           <dependency groupId="org.jacoco" artifactId="org.jacoco.agent" 
version="${jacoco.version}"/>
           <dependency groupId="org.jacoco" artifactId="org.jacoco.ant" 
version="${jacoco.version}"/>
 
+          <dependency groupId="org.jboss.byteman" artifactId="byteman" 
version="${byteman.version}"/>
+          <dependency groupId="org.jboss.byteman" artifactId="byteman-submit" 
version="${byteman.version}"/>
+          <dependency groupId="org.jboss.byteman" artifactId="byteman-bmunit" 
version="${byteman.version}"/>
+
+
           <dependency groupId="org.openjdk.jmh" artifactId="jmh-core" 
version="1.1.1"/>
           <dependency groupId="org.openjdk.jmh" 
artifactId="jmh-generator-annprocess" version="1.1.1"/>
 
@@ -479,7 +486,10 @@
                 artifactId="cassandra-parent"
                 version="${version}"/>
         <dependency groupId="org.jacoco" artifactId="org.jacoco.agent"/>
-        <dependency groupId="org.jacoco" artifactId="org.jacoco.ant"/>
+        <dependency groupId="org.jacoco" artifactId="org.jacoco.ant" />
+        <dependency groupId="org.jboss.byteman" artifactId="byteman"/>
+        <dependency groupId="org.jboss.byteman" artifactId="byteman-submit"/>
+        <dependency groupId="org.jboss.byteman" artifactId="byteman-bmunit"/>
       </artifact:pom>
 
       <artifact:pom id="test-deps-pom"
@@ -1165,6 +1175,7 @@
         <jvmarg value="-Dcassandra.keepBriefBrief=${cassandra.keepBriefBrief}" 
/>
        <optjvmargs/>
         <classpath>
+          <pathelement path="${java.class.path}"/>
           <path refid="cassandra.classpath" />
           <pathelement location="${test.classes}"/>
           <pathelement location="${test.conf}"/>
@@ -1701,6 +1712,7 @@
   <classpathentry kind="output" path="build/classes/main"/>
   <classpathentry kind="lib" path="build/classes/thrift" 
sourcepath="interface/thrift/gen-java/"/>
   <classpathentry kind="lib" path="test/conf"/>
+  <classpathentry kind="lib" path="${java.home}/../lib/tools.jar"/>
 ]]>
        </echo>   
        <path id="eclipse-project-libs-path">

http://git-wip-us.apache.org/repos/asf/cassandra/blob/037d24ef/src/java/org/apache/cassandra/hints/HintsBufferPool.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/hints/HintsBufferPool.java 
b/src/java/org/apache/cassandra/hints/HintsBufferPool.java
index 83b155a..25f9bc1 100644
--- a/src/java/org/apache/cassandra/hints/HintsBufferPool.java
+++ b/src/java/org/apache/cassandra/hints/HintsBufferPool.java
@@ -17,10 +17,11 @@
  */
 package org.apache.cassandra.hints;
 
-import java.util.Queue;
 import java.util.UUID;
-import java.util.concurrent.ConcurrentLinkedQueue;
+import java.util.concurrent.BlockingQueue;
+import java.util.concurrent.LinkedBlockingQueue;
 
+import org.apache.cassandra.config.Config;
 import org.apache.cassandra.net.MessagingService;
 
 /**
@@ -34,15 +35,16 @@ final class HintsBufferPool
         void flush(HintsBuffer buffer, HintsBufferPool pool);
     }
 
+    static final int MAX_ALLOCATED_BUFFERS = 
Integer.getInteger(Config.PROPERTY_PREFIX + "MAX_HINT_BUFFERS", 3);
     private volatile HintsBuffer currentBuffer;
-    private final Queue<HintsBuffer> reserveBuffers;
+    private final BlockingQueue<HintsBuffer> reserveBuffers;
     private final int bufferSize;
     private final FlushCallback flushCallback;
+    private int allocatedBuffers = 0;
 
     HintsBufferPool(int bufferSize, FlushCallback flushCallback)
     {
-        reserveBuffers = new ConcurrentLinkedQueue<>();
-
+        reserveBuffers = new LinkedBlockingQueue<>();
         this.bufferSize = bufferSize;
         this.flushCallback = flushCallback;
     }
@@ -78,13 +80,10 @@ final class HintsBufferPool
         }
     }
 
-    boolean offer(HintsBuffer buffer)
+    void offer(HintsBuffer buffer)
     {
-        if (!reserveBuffers.isEmpty())
-            return false;
-
-        reserveBuffers.offer(buffer);
-        return true;
+        if (!reserveBuffers.offer(buffer))
+            throw new RuntimeException("Failed to store buffer");
     }
 
     // A wrapper to ensure a non-null currentBuffer value on the first call.
@@ -108,6 +107,18 @@ final class HintsBufferPool
             return false;
 
         HintsBuffer buffer = reserveBuffers.poll();
+        if (buffer == null && allocatedBuffers >= MAX_ALLOCATED_BUFFERS)
+        {
+            try
+            {
+                //This BlockingQueue.take is a target for byteman in 
HintsBufferPoolTest
+                buffer = reserveBuffers.take();
+            }
+            catch (InterruptedException e)
+            {
+                throw new RuntimeException(e);
+            }
+        }
         currentBuffer = buffer == null ? createBuffer() : buffer;
 
         return true;
@@ -115,6 +126,7 @@ final class HintsBufferPool
 
     private HintsBuffer createBuffer()
     {
+        allocatedBuffers++;
         return HintsBuffer.create(bufferSize);
     }
 }

http://git-wip-us.apache.org/repos/asf/cassandra/blob/037d24ef/src/java/org/apache/cassandra/hints/HintsWriteExecutor.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/hints/HintsWriteExecutor.java 
b/src/java/org/apache/cassandra/hints/HintsWriteExecutor.java
index 1a69a4f..eb1bffe 100644
--- a/src/java/org/apache/cassandra/hints/HintsWriteExecutor.java
+++ b/src/java/org/apache/cassandra/hints/HintsWriteExecutor.java
@@ -133,8 +133,7 @@ final class HintsWriteExecutor
             finally
             {
                 HintsBuffer recycledBuffer = buffer.recycle();
-                if (!bufferPool.offer(recycledBuffer))
-                    recycledBuffer.free();
+                bufferPool.offer(recycledBuffer);
             }
         }
     }

http://git-wip-us.apache.org/repos/asf/cassandra/blob/037d24ef/test/unit/org/apache/cassandra/hints/HintsBufferPoolTest.java
----------------------------------------------------------------------
diff --git a/test/unit/org/apache/cassandra/hints/HintsBufferPoolTest.java 
b/test/unit/org/apache/cassandra/hints/HintsBufferPoolTest.java
new file mode 100644
index 0000000..3fb912e
--- /dev/null
+++ b/test/unit/org/apache/cassandra/hints/HintsBufferPoolTest.java
@@ -0,0 +1,75 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.cassandra.hints;
+
+import org.apache.cassandra.Util;
+import org.jboss.byteman.contrib.bmunit.BMRule;
+import org.jboss.byteman.contrib.bmunit.BMUnitRunner;
+import org.junit.BeforeClass;
+import org.junit.Test;
+import org.junit.runner.RunWith;
+
+import com.google.common.collect.ImmutableList;
+
+import static junit.framework.Assert.*;
+
+import java.lang.Thread.State;
+import java.util.Queue;
+import java.util.UUID;
+import java.util.concurrent.ConcurrentLinkedQueue;
+
+@RunWith(BMUnitRunner.class)
+public class HintsBufferPoolTest
+{
+
+    @BeforeClass
+    public static void defineSchema()
+    {
+        HintsBufferTest.defineSchema();
+    }
+
+    /*
+     * Check that the hints buffer pool will only drain a limited number of 
buffers
+     */
+    static volatile boolean blockedOnBackpressure = false;
+    @Test
+    @BMRule(name = "Greatest name in the world",
+            targetClass="HintsBufferPool",
+            targetMethod="switchCurrentBuffer",
+            targetLocation="AT INVOKE java.util.concurrent.BlockingQueue.take",
+            
action="org.apache.cassandra.hints.HintsBufferPoolTest.blockedOnBackpressure = 
true;")
+    public void testBackpressure() throws Exception
+    {
+        Queue<HintsBuffer> returnedBuffers = new ConcurrentLinkedQueue<>();
+        HintsBufferPool pool = new HintsBufferPool(256, (buffer, p) -> 
returnedBuffers.offer(buffer));
+
+        Thread blocked = new Thread(() -> {
+            for (int ii = 0; ii < 512; ii++)
+                pool.write(ImmutableList.of(UUID.randomUUID()), 
HintsBufferTest.createHint(ii, ii));
+        });
+        blocked.start();
+
+        Util.spinAssertEquals(State.WAITING, () -> blocked.getState(), 1);
+
+        while (blocked.isAlive())
+            if (!returnedBuffers.isEmpty())
+                pool.offer(returnedBuffers.poll().recycle());
+
+        assertTrue(blockedOnBackpressure);
+    }
+}

http://git-wip-us.apache.org/repos/asf/cassandra/blob/037d24ef/test/unit/org/apache/cassandra/hints/HintsBufferTest.java
----------------------------------------------------------------------
diff --git a/test/unit/org/apache/cassandra/hints/HintsBufferTest.java 
b/test/unit/org/apache/cassandra/hints/HintsBufferTest.java
index ebc333a..78ea4f4 100644
--- a/test/unit/org/apache/cassandra/hints/HintsBufferTest.java
+++ b/test/unit/org/apache/cassandra/hints/HintsBufferTest.java
@@ -188,7 +188,7 @@ public class HintsBufferTest
         return idx;
     }
 
-    private static Hint createHint(int idx, long baseTimestamp)
+    static Hint createHint(int idx, long baseTimestamp)
     {
         long timestamp = baseTimestamp + idx;
         return Hint.create(createMutation(idx, 
TimeUnit.MILLISECONDS.toMicros(timestamp)), timestamp);

Reply via email to