Repository: cassandra Updated Branches: refs/heads/trunk 4ecd8542d -> e5394f192
Add backpressure to compressed commit log patch by Ariel Weisberg; reviewed by Benjamin Lerer for CASSANDRA-10971 Project: http://git-wip-us.apache.org/repos/asf/cassandra/repo Commit: http://git-wip-us.apache.org/repos/asf/cassandra/commit/9995521f Tree: http://git-wip-us.apache.org/repos/asf/cassandra/tree/9995521f Diff: http://git-wip-us.apache.org/repos/asf/cassandra/diff/9995521f Branch: refs/heads/trunk Commit: 9995521fb9b3f510ee9c7012d75e6970ec7d5fb7 Parents: 0a5e220 Author: Ariel Weisberg <ariel.weisb...@datastax.com> Authored: Wed Mar 16 18:14:52 2016 +0100 Committer: Benjamin Lerer <b.le...@gmail.com> Committed: Wed Mar 16 18:14:52 2016 +0100 ---------------------------------------------------------------------- CHANGES.txt | 1 + .../db/commitlog/AbstractCommitLogService.java | 1 + .../db/commitlog/CommitLogSegment.java | 19 +++- .../db/commitlog/CommitLogSegmentManager.java | 11 +- .../db/commitlog/CompressedSegment.java | 39 ++++++-- .../commitlog/CommitLogSegmentManagerTest.java | 100 +++++++++++++++++++ 6 files changed, 159 insertions(+), 12 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/cassandra/blob/9995521f/CHANGES.txt ---------------------------------------------------------------------- diff --git a/CHANGES.txt b/CHANGES.txt index 28de247..b264609 100644 --- a/CHANGES.txt +++ b/CHANGES.txt @@ -1,4 +1,5 @@ 3.0.5 + * Add backpressure to compressed commit log (CASSANDRA-10971) * SSTableExport supports secondary index tables (CASSANDRA-11330) * Fix sstabledump to include missing info in debug output (CASSANDRA-11321) * Establish and implement canonical bulk reading workload(s) (CASSANDRA-10331) http://git-wip-us.apache.org/repos/asf/cassandra/blob/9995521f/src/java/org/apache/cassandra/db/commitlog/AbstractCommitLogService.java ---------------------------------------------------------------------- diff --git a/src/java/org/apache/cassandra/db/commitlog/AbstractCommitLogService.java b/src/java/org/apache/cassandra/db/commitlog/AbstractCommitLogService.java index 557bf50..113d1ba 100644 --- a/src/java/org/apache/cassandra/db/commitlog/AbstractCommitLogService.java +++ b/src/java/org/apache/cassandra/db/commitlog/AbstractCommitLogService.java @@ -89,6 +89,7 @@ public abstract class AbstractCommitLogService // sync and signal long syncStarted = System.currentTimeMillis(); + //This is a target for Byteman in CommitLogSegmentManagerTest commitLog.sync(shutdown); lastSyncedAt = syncStarted; syncComplete.signalAll(); http://git-wip-us.apache.org/repos/asf/cassandra/blob/9995521f/src/java/org/apache/cassandra/db/commitlog/CommitLogSegment.java ---------------------------------------------------------------------- diff --git a/src/java/org/apache/cassandra/db/commitlog/CommitLogSegment.java b/src/java/org/apache/cassandra/db/commitlog/CommitLogSegment.java index 5dd7c9f..0e9f502 100644 --- a/src/java/org/apache/cassandra/db/commitlog/CommitLogSegment.java +++ b/src/java/org/apache/cassandra/db/commitlog/CommitLogSegment.java @@ -117,9 +117,20 @@ public abstract class CommitLogSegment final CommitLog commitLog; public final CommitLogDescriptor descriptor; - static CommitLogSegment createSegment(CommitLog commitLog) + static CommitLogSegment createSegment(CommitLog commitLog, Runnable onClose) { - return commitLog.compressor != null ? new CompressedSegment(commitLog) : new MemoryMappedSegment(commitLog); + return commitLog.compressor != null ? new CompressedSegment(commitLog, onClose) : new MemoryMappedSegment(commitLog); + } + + /** + * Checks if the segments use a buffer pool. + * + * @param commitLog the commit log + * @return <code>true</code> if the segments use a buffer pool, <code>false</code> otherwise. + */ + static boolean usesBufferPool(CommitLog commitLog) + { + return commitLog.compressor != null; } static long getNextId() @@ -148,7 +159,7 @@ public abstract class CommitLogSegment { throw new FSWriteError(e, logFile); } - + buffer = createBuffer(commitLog); // write the header CommitLogDescriptor.writeHeader(buffer, descriptor); @@ -255,7 +266,7 @@ public abstract class CommitLogSegment // Note: Even if the very first allocation of this sync section failed, we still want to enter this // to ensure the segment is closed. As allocatePosition is set to 1 beyond the capacity of the buffer, // this will always be entered when a mutation allocation has been attempted after the marker allocation - // succeeded in the previous sync. + // succeeded in the previous sync. assert buffer != null; // Only close once. int startMarker = lastSyncedOffset; http://git-wip-us.apache.org/repos/asf/cassandra/blob/9995521f/src/java/org/apache/cassandra/db/commitlog/CommitLogSegmentManager.java ---------------------------------------------------------------------- diff --git a/src/java/org/apache/cassandra/db/commitlog/CommitLogSegmentManager.java b/src/java/org/apache/cassandra/db/commitlog/CommitLogSegmentManager.java index 564652f..8a8d0e7 100644 --- a/src/java/org/apache/cassandra/db/commitlog/CommitLogSegmentManager.java +++ b/src/java/org/apache/cassandra/db/commitlog/CommitLogSegmentManager.java @@ -114,11 +114,11 @@ public class CommitLogSegmentManager if (task == null) { // if we have no more work to do, check if we should create a new segment - if (availableSegments.isEmpty() && (activeSegments.isEmpty() || createReserveSegments)) + if (!atSegmentLimit() && availableSegments.isEmpty() && (activeSegments.isEmpty() || createReserveSegments)) { logger.trace("No segments in reserve; creating a fresh one"); // TODO : some error handling in case we fail to create a new segment - availableSegments.add(CommitLogSegment.createSegment(commitLog)); + availableSegments.add(CommitLogSegment.createSegment(commitLog, () -> wakeManager())); hasAvailableSegments.signalAll(); } @@ -163,6 +163,12 @@ public class CommitLogSegmentManager } } } + + private boolean atSegmentLimit() + { + return CommitLogSegment.usesBufferPool(commitLog) && CompressedSegment.hasReachedPoolLimit(); + } + }; run = true; @@ -553,5 +559,6 @@ public class CommitLogSegmentManager { return Collections.unmodifiableCollection(activeSegments); } + } http://git-wip-us.apache.org/repos/asf/cassandra/blob/9995521f/src/java/org/apache/cassandra/db/commitlog/CompressedSegment.java ---------------------------------------------------------------------- diff --git a/src/java/org/apache/cassandra/db/commitlog/CompressedSegment.java b/src/java/org/apache/cassandra/db/commitlog/CompressedSegment.java index aa12e1d..0ec0bca 100644 --- a/src/java/org/apache/cassandra/db/commitlog/CompressedSegment.java +++ b/src/java/org/apache/cassandra/db/commitlog/CompressedSegment.java @@ -21,6 +21,7 @@ import java.io.IOException; import java.nio.ByteBuffer; import java.util.Queue; import java.util.concurrent.ConcurrentLinkedQueue; +import java.util.concurrent.atomic.AtomicInteger; import org.apache.cassandra.config.DatabaseDescriptor; import org.apache.cassandra.io.FSWriteError; @@ -44,6 +45,12 @@ public class CompressedSegment extends CommitLogSegment static Queue<ByteBuffer> bufferPool = new ConcurrentLinkedQueue<>(); /** + * The number of buffers in use + */ + private static AtomicInteger usedBuffers = new AtomicInteger(0); + + + /** * Maximum number of buffers in the compression pool. The default value is 3, it should not be set lower than that * (one segment in compression, one written to, one in reserve); delays in compression may cause the log to use * more, depending on how soon the sync policy stops all writing threads. @@ -52,16 +59,18 @@ public class CompressedSegment extends CommitLogSegment static final int COMPRESSED_MARKER_SIZE = SYNC_MARKER_SIZE + 4; final ICompressor compressor; + final Runnable onClose; volatile long lastWrittenPos = 0; /** * Constructs a new segment file. */ - CompressedSegment(CommitLog commitLog) + CompressedSegment(CommitLog commitLog, Runnable onClose) { super(commitLog); this.compressor = commitLog.compressor; + this.onClose = onClose; try { channel.write((ByteBuffer) buffer.duplicate().flip()); @@ -80,6 +89,7 @@ public class CompressedSegment extends CommitLogSegment ByteBuffer createBuffer(CommitLog commitLog) { + usedBuffers.incrementAndGet(); ByteBuffer buf = bufferPool.poll(); if (buf == null) { @@ -138,12 +148,29 @@ public class CompressedSegment extends CommitLogSegment @Override protected void internalClose() { - if (bufferPool.size() < MAX_BUFFERPOOL_SIZE) - bufferPool.add(buffer); - else - FileUtils.clean(buffer); + usedBuffers.decrementAndGet(); + try { + if (bufferPool.size() < MAX_BUFFERPOOL_SIZE) + bufferPool.add(buffer); + else + FileUtils.clean(buffer); + super.internalClose(); + } + finally + { + onClose.run(); + } + } - super.internalClose(); + /** + * Checks if the number of buffers in use is greater or equals to the maximum number of buffers allowed in the pool. + * + * @return <code>true</code> if the number of buffers in use is greater or equals to the maximum number of buffers + * allowed in the pool, <code>false</code> otherwise. + */ + static boolean hasReachedPoolLimit() + { + return usedBuffers.get() >= MAX_BUFFERPOOL_SIZE; } static void shutdown() http://git-wip-us.apache.org/repos/asf/cassandra/blob/9995521f/test/unit/org/apache/cassandra/db/commitlog/CommitLogSegmentManagerTest.java ---------------------------------------------------------------------- diff --git a/test/unit/org/apache/cassandra/db/commitlog/CommitLogSegmentManagerTest.java b/test/unit/org/apache/cassandra/db/commitlog/CommitLogSegmentManagerTest.java new file mode 100644 index 0000000..b5c2f41 --- /dev/null +++ b/test/unit/org/apache/cassandra/db/commitlog/CommitLogSegmentManagerTest.java @@ -0,0 +1,100 @@ +package org.apache.cassandra.db.commitlog; + +import java.nio.ByteBuffer; +import java.util.Random; +import java.util.concurrent.Semaphore; + +import javax.naming.ConfigurationException; + +import org.apache.cassandra.SchemaLoader; +import org.apache.cassandra.Util; +import org.apache.cassandra.config.Config.CommitLogSync; +import org.apache.cassandra.config.DatabaseDescriptor; +import org.apache.cassandra.config.ParameterizedClass; +import org.apache.cassandra.db.ColumnFamilyStore; +import org.apache.cassandra.db.Keyspace; +import org.apache.cassandra.db.Mutation; +import org.apache.cassandra.db.RowUpdateBuilder; +import org.apache.cassandra.db.compaction.CompactionManager; +import org.apache.cassandra.db.marshal.AsciiType; +import org.apache.cassandra.db.marshal.BytesType; +import org.apache.cassandra.schema.KeyspaceParams; +import org.jboss.byteman.contrib.bmunit.BMRule; +import org.jboss.byteman.contrib.bmunit.BMUnitRunner; +import org.junit.Assert; +import org.junit.BeforeClass; +import org.junit.Test; +import org.junit.runner.RunWith; + +import com.google.common.collect.ImmutableMap; + +@RunWith(BMUnitRunner.class) +public class CommitLogSegmentManagerTest +{ + //Block commit log service from syncing + private static final Semaphore allowSync = new Semaphore(0); + + private static final String KEYSPACE1 = "CommitLogTest"; + private static final String STANDARD1 = "Standard1"; + private static final String STANDARD2 = "Standard2"; + + private final static byte[] entropy = new byte[1024 * 256]; + @BeforeClass + public static void defineSchema() throws ConfigurationException + { + new Random().nextBytes(entropy); + DatabaseDescriptor.setCommitLogCompression(new ParameterizedClass("LZ4Compressor", ImmutableMap.of())); + DatabaseDescriptor.setCommitLogSegmentSize(1); + DatabaseDescriptor.setCommitLogSync(CommitLogSync.periodic); + DatabaseDescriptor.setCommitLogSyncPeriod(10 * 1000); + SchemaLoader.prepareServer(); + SchemaLoader.createKeyspace(KEYSPACE1, + KeyspaceParams.simple(1), + SchemaLoader.standardCFMD(KEYSPACE1, STANDARD1, 0, AsciiType.instance, BytesType.instance), + SchemaLoader.standardCFMD(KEYSPACE1, STANDARD2, 0, AsciiType.instance, BytesType.instance)); + + CompactionManager.instance.disableAutoCompaction(); + } + + @Test + @BMRule(name = "Block AbstractCommitLogSegment segment flushing", + targetClass = "AbstractCommitLogService$1", + targetMethod = "run", + targetLocation = "AT INVOKE org.apache.cassandra.db.commitlog.CommitLog.sync", + action = "org.apache.cassandra.db.commitlog.CommitLogSegmentManagerTest.allowSync.acquire()") + public void testCompressedCommitLogBackpressure() throws Throwable + { + CommitLog.instance.resetUnsafe(true); + ColumnFamilyStore cfs1 = Keyspace.open(KEYSPACE1).getColumnFamilyStore(STANDARD1); + + final Mutation m = new RowUpdateBuilder(cfs1.metadata, 0, "k") + .clustering("bytes") + .add("val", ByteBuffer.wrap(entropy)) + .build(); + + Thread dummyThread = new Thread( () -> + { + for (int i = 0; i < 20; i++) + CommitLog.instance.add(m); + }); + dummyThread.start(); + + CommitLogSegmentManager clsm = CommitLog.instance.allocator; + + //Protect against delay, but still break out as fast as possible + long start = System.currentTimeMillis(); + while (System.currentTimeMillis() - start < 5000) + { + if (clsm.getActiveSegments().size() >= 3) + break; + } + Thread.sleep(1000); + + //Should only be able to create 3 segments not 7 because it blocks waiting for truncation that never comes + Assert.assertEquals(3, clsm.getActiveSegments().size()); + + clsm.getActiveSegments().forEach( segment -> clsm.recycleSegment(segment)); + + Util.spinAssertEquals(3, () -> clsm.getActiveSegments().size(), 5); + } +} \ No newline at end of file