Repository: cassandra Updated Branches: refs/heads/cassandra-3.0 fe37e0644 -> 037d24efd refs/heads/trunk 4b27287cd -> fc9c6faa2
Introduce backpressure for hints patch by Ariel Weisberg; reviewed by Benedict Elliott Smith for CASSANDRA-10972 Project: http://git-wip-us.apache.org/repos/asf/cassandra/repo Commit: http://git-wip-us.apache.org/repos/asf/cassandra/commit/037d24ef Tree: http://git-wip-us.apache.org/repos/asf/cassandra/tree/037d24ef Diff: http://git-wip-us.apache.org/repos/asf/cassandra/diff/037d24ef Branch: refs/heads/cassandra-3.0 Commit: 037d24efdf83bd2736556f9880c5e1f6be48fa77 Parents: fe37e06 Author: Ariel Weisberg <ariel.weisb...@datastax.com> Authored: Mon Dec 28 16:32:05 2015 -0500 Committer: Aleksey Yeschenko <alek...@apache.org> Committed: Tue Feb 23 15:28:41 2016 +0000 ---------------------------------------------------------------------- CHANGES.txt | 1 + build.xml | 14 +++- .../apache/cassandra/hints/HintsBufferPool.java | 34 ++++++--- .../cassandra/hints/HintsWriteExecutor.java | 3 +- .../cassandra/hints/HintsBufferPoolTest.java | 75 ++++++++++++++++++++ .../apache/cassandra/hints/HintsBufferTest.java | 2 +- 6 files changed, 114 insertions(+), 15 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/cassandra/blob/037d24ef/CHANGES.txt ---------------------------------------------------------------------- diff --git a/CHANGES.txt b/CHANGES.txt index da91594..a675016 100644 --- a/CHANGES.txt +++ b/CHANGES.txt @@ -1,4 +1,5 @@ 3.0.4 + * Introduce backpressure for hints (CASSANDRA-10972) * Fix ClusteringPrefix not being able to read tombstone range boundaries (CASSANDRA-11158) * Prevent logging in sandboxed state (CASSANDRA-11033) * Disallow drop/alter operations of UDTs used by UDAs (CASSANDRA-10721) http://git-wip-us.apache.org/repos/asf/cassandra/blob/037d24ef/build.xml ---------------------------------------------------------------------- diff --git a/build.xml b/build.xml index d27b77a..6ef99fd 100644 --- a/build.xml +++ b/build.xml @@ -111,6 +111,8 @@ <property name="jacoco.finalexecfile" value="${jacoco.export.dir}/jacoco.exec" /> <property name="jacoco.version" value="0.7.5.201505241946"/> + <property name="byteman.version" value="3.0.3"/> + <property name="ecj.version" value="4.4.2"/> <condition property="maven-ant-tasks.jar.exists"> @@ -382,6 +384,11 @@ <dependency groupId="org.jacoco" artifactId="org.jacoco.agent" version="${jacoco.version}"/> <dependency groupId="org.jacoco" artifactId="org.jacoco.ant" version="${jacoco.version}"/> + <dependency groupId="org.jboss.byteman" artifactId="byteman" version="${byteman.version}"/> + <dependency groupId="org.jboss.byteman" artifactId="byteman-submit" version="${byteman.version}"/> + <dependency groupId="org.jboss.byteman" artifactId="byteman-bmunit" version="${byteman.version}"/> + + <dependency groupId="org.openjdk.jmh" artifactId="jmh-core" version="1.1.1"/> <dependency groupId="org.openjdk.jmh" artifactId="jmh-generator-annprocess" version="1.1.1"/> @@ -479,7 +486,10 @@ artifactId="cassandra-parent" version="${version}"/> <dependency groupId="org.jacoco" artifactId="org.jacoco.agent"/> - <dependency groupId="org.jacoco" artifactId="org.jacoco.ant"/> + <dependency groupId="org.jacoco" artifactId="org.jacoco.ant" /> + <dependency groupId="org.jboss.byteman" artifactId="byteman"/> + <dependency groupId="org.jboss.byteman" artifactId="byteman-submit"/> + <dependency groupId="org.jboss.byteman" artifactId="byteman-bmunit"/> </artifact:pom> <artifact:pom id="test-deps-pom" @@ -1165,6 +1175,7 @@ <jvmarg value="-Dcassandra.keepBriefBrief=${cassandra.keepBriefBrief}" /> <optjvmargs/> <classpath> + <pathelement path="${java.class.path}"/> <path refid="cassandra.classpath" /> <pathelement location="${test.classes}"/> <pathelement location="${test.conf}"/> @@ -1701,6 +1712,7 @@ <classpathentry kind="output" path="build/classes/main"/> <classpathentry kind="lib" path="build/classes/thrift" sourcepath="interface/thrift/gen-java/"/> <classpathentry kind="lib" path="test/conf"/> + <classpathentry kind="lib" path="${java.home}/../lib/tools.jar"/> ]]> </echo> <path id="eclipse-project-libs-path"> http://git-wip-us.apache.org/repos/asf/cassandra/blob/037d24ef/src/java/org/apache/cassandra/hints/HintsBufferPool.java ---------------------------------------------------------------------- diff --git a/src/java/org/apache/cassandra/hints/HintsBufferPool.java b/src/java/org/apache/cassandra/hints/HintsBufferPool.java index 83b155a..25f9bc1 100644 --- a/src/java/org/apache/cassandra/hints/HintsBufferPool.java +++ b/src/java/org/apache/cassandra/hints/HintsBufferPool.java @@ -17,10 +17,11 @@ */ package org.apache.cassandra.hints; -import java.util.Queue; import java.util.UUID; -import java.util.concurrent.ConcurrentLinkedQueue; +import java.util.concurrent.BlockingQueue; +import java.util.concurrent.LinkedBlockingQueue; +import org.apache.cassandra.config.Config; import org.apache.cassandra.net.MessagingService; /** @@ -34,15 +35,16 @@ final class HintsBufferPool void flush(HintsBuffer buffer, HintsBufferPool pool); } + static final int MAX_ALLOCATED_BUFFERS = Integer.getInteger(Config.PROPERTY_PREFIX + "MAX_HINT_BUFFERS", 3); private volatile HintsBuffer currentBuffer; - private final Queue<HintsBuffer> reserveBuffers; + private final BlockingQueue<HintsBuffer> reserveBuffers; private final int bufferSize; private final FlushCallback flushCallback; + private int allocatedBuffers = 0; HintsBufferPool(int bufferSize, FlushCallback flushCallback) { - reserveBuffers = new ConcurrentLinkedQueue<>(); - + reserveBuffers = new LinkedBlockingQueue<>(); this.bufferSize = bufferSize; this.flushCallback = flushCallback; } @@ -78,13 +80,10 @@ final class HintsBufferPool } } - boolean offer(HintsBuffer buffer) + void offer(HintsBuffer buffer) { - if (!reserveBuffers.isEmpty()) - return false; - - reserveBuffers.offer(buffer); - return true; + if (!reserveBuffers.offer(buffer)) + throw new RuntimeException("Failed to store buffer"); } // A wrapper to ensure a non-null currentBuffer value on the first call. @@ -108,6 +107,18 @@ final class HintsBufferPool return false; HintsBuffer buffer = reserveBuffers.poll(); + if (buffer == null && allocatedBuffers >= MAX_ALLOCATED_BUFFERS) + { + try + { + //This BlockingQueue.take is a target for byteman in HintsBufferPoolTest + buffer = reserveBuffers.take(); + } + catch (InterruptedException e) + { + throw new RuntimeException(e); + } + } currentBuffer = buffer == null ? createBuffer() : buffer; return true; @@ -115,6 +126,7 @@ final class HintsBufferPool private HintsBuffer createBuffer() { + allocatedBuffers++; return HintsBuffer.create(bufferSize); } } http://git-wip-us.apache.org/repos/asf/cassandra/blob/037d24ef/src/java/org/apache/cassandra/hints/HintsWriteExecutor.java ---------------------------------------------------------------------- diff --git a/src/java/org/apache/cassandra/hints/HintsWriteExecutor.java b/src/java/org/apache/cassandra/hints/HintsWriteExecutor.java index 1a69a4f..eb1bffe 100644 --- a/src/java/org/apache/cassandra/hints/HintsWriteExecutor.java +++ b/src/java/org/apache/cassandra/hints/HintsWriteExecutor.java @@ -133,8 +133,7 @@ final class HintsWriteExecutor finally { HintsBuffer recycledBuffer = buffer.recycle(); - if (!bufferPool.offer(recycledBuffer)) - recycledBuffer.free(); + bufferPool.offer(recycledBuffer); } } } http://git-wip-us.apache.org/repos/asf/cassandra/blob/037d24ef/test/unit/org/apache/cassandra/hints/HintsBufferPoolTest.java ---------------------------------------------------------------------- diff --git a/test/unit/org/apache/cassandra/hints/HintsBufferPoolTest.java b/test/unit/org/apache/cassandra/hints/HintsBufferPoolTest.java new file mode 100644 index 0000000..3fb912e --- /dev/null +++ b/test/unit/org/apache/cassandra/hints/HintsBufferPoolTest.java @@ -0,0 +1,75 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.cassandra.hints; + +import org.apache.cassandra.Util; +import org.jboss.byteman.contrib.bmunit.BMRule; +import org.jboss.byteman.contrib.bmunit.BMUnitRunner; +import org.junit.BeforeClass; +import org.junit.Test; +import org.junit.runner.RunWith; + +import com.google.common.collect.ImmutableList; + +import static junit.framework.Assert.*; + +import java.lang.Thread.State; +import java.util.Queue; +import java.util.UUID; +import java.util.concurrent.ConcurrentLinkedQueue; + +@RunWith(BMUnitRunner.class) +public class HintsBufferPoolTest +{ + + @BeforeClass + public static void defineSchema() + { + HintsBufferTest.defineSchema(); + } + + /* + * Check that the hints buffer pool will only drain a limited number of buffers + */ + static volatile boolean blockedOnBackpressure = false; + @Test + @BMRule(name = "Greatest name in the world", + targetClass="HintsBufferPool", + targetMethod="switchCurrentBuffer", + targetLocation="AT INVOKE java.util.concurrent.BlockingQueue.take", + action="org.apache.cassandra.hints.HintsBufferPoolTest.blockedOnBackpressure = true;") + public void testBackpressure() throws Exception + { + Queue<HintsBuffer> returnedBuffers = new ConcurrentLinkedQueue<>(); + HintsBufferPool pool = new HintsBufferPool(256, (buffer, p) -> returnedBuffers.offer(buffer)); + + Thread blocked = new Thread(() -> { + for (int ii = 0; ii < 512; ii++) + pool.write(ImmutableList.of(UUID.randomUUID()), HintsBufferTest.createHint(ii, ii)); + }); + blocked.start(); + + Util.spinAssertEquals(State.WAITING, () -> blocked.getState(), 1); + + while (blocked.isAlive()) + if (!returnedBuffers.isEmpty()) + pool.offer(returnedBuffers.poll().recycle()); + + assertTrue(blockedOnBackpressure); + } +} http://git-wip-us.apache.org/repos/asf/cassandra/blob/037d24ef/test/unit/org/apache/cassandra/hints/HintsBufferTest.java ---------------------------------------------------------------------- diff --git a/test/unit/org/apache/cassandra/hints/HintsBufferTest.java b/test/unit/org/apache/cassandra/hints/HintsBufferTest.java index ebc333a..78ea4f4 100644 --- a/test/unit/org/apache/cassandra/hints/HintsBufferTest.java +++ b/test/unit/org/apache/cassandra/hints/HintsBufferTest.java @@ -188,7 +188,7 @@ public class HintsBufferTest return idx; } - private static Hint createHint(int idx, long baseTimestamp) + static Hint createHint(int idx, long baseTimestamp) { long timestamp = baseTimestamp + idx; return Hint.create(createMutation(idx, TimeUnit.MILLISECONDS.toMicros(timestamp)), timestamp);