This is an automated email from the ASF dual-hosted git repository. jasonstack pushed a commit to branch trunk in repository https://gitbox.apache.org/repos/asf/cassandra.git
The following commit(s) were added to refs/heads/trunk by this push: new 699a1f7 CASSANDRA-15229: Segregate Network and Chunk Cache BufferPools and Recirculate Partially Freed Chunks 699a1f7 is described below commit 699a1f74fcc1da1952da6b2b0309c9e2474c67f4 Author: Zhao Yang <zhaoyangsingap...@gmail.com> AuthorDate: Thu Oct 15 22:53:44 2020 +0800 CASSANDRA-15229: Segregate Network and Chunk Cache BufferPools and Recirculate Partially Freed Chunks * initiate multiple buffer pool for different lifespan and usages - Chunk Cache Buffer Pool - conf.file_cache_size_in_mb=512mb - Networking Buffer Pool - conf.temporary_cache_size_in_mb=128mb * Add overflowSize and usedSize to buffer pool metrics * re-circulate buffer pool Chunk for ChunkCache whenever it has free space, even thoughput it may not be able to allocate due to fragmentation patch by Zhao Yang; reviewed by Caleb Rackliffe and Aleksey Yeschenko for CASSANDRA-15229 --- CHANGES.txt | 1 + conf/cassandra.yaml | 13 +- .../org/apache/cassandra/cache/ChunkCache.java | 14 +- src/java/org/apache/cassandra/config/Config.java | 2 + .../cassandra/config/DatabaseDescriptor.java | 14 + .../db/streaming/CassandraStreamWriter.java | 6 +- .../cassandra/hints/ChecksummedDataInput.java | 6 +- .../hints/CompressedChecksummedDataInput.java | 13 +- .../io/util/BufferManagingRebufferer.java | 6 +- .../cassandra/metrics/BufferPoolMetrics.java | 45 +- .../cassandra/net/AsyncStreamingOutputPlus.java | 13 +- .../apache/cassandra/net/BufferPoolAllocator.java | 13 +- .../cassandra/net/FrameDecoderLegacyLZ4.java | 11 +- .../org/apache/cassandra/net/FrameEncoder.java | 9 +- .../org/apache/cassandra/net/FrameEncoderCrc.java | 2 +- .../org/apache/cassandra/net/FrameEncoderLZ4.java | 9 +- .../cassandra/net/FrameEncoderLegacyLZ4.java | 8 +- .../cassandra/net/FrameEncoderUnprotected.java | 2 +- .../apache/cassandra/net/HandshakeProtocol.java | 6 +- .../cassandra/net/InboundConnectionInitiator.java | 6 +- .../cassandra/net/LocalBufferPoolAllocator.java | 3 +- .../cassandra/net/OutboundConnectionInitiator.java | 4 +- .../org/apache/cassandra/net/ShareableBytes.java | 6 +- .../apache/cassandra/utils/memory/BufferPool.java | 466 ++++++++++++++++----- .../apache/cassandra/utils/memory/BufferPools.java | 79 ++++ .../apache/cassandra/net/ConnectionBurnTest.java | 4 +- .../cassandra/utils/memory/LongBufferPoolTest.java | 111 ++--- test/data/jmxdump/cassandra-4.0-jmx.yaml | 75 +++- .../cassandra/distributed/impl/Instance.java | 4 +- .../cassandra/metrics/BufferPoolMetricsTest.java | 125 ++++-- .../unit/org/apache/cassandra/net/FramingTest.java | 6 +- .../cassandra/utils/memory/BufferPoolTest.java | 361 +++++++++++----- 32 files changed, 1067 insertions(+), 376 deletions(-) diff --git a/CHANGES.txt b/CHANGES.txt index fe3fef8..543a1cf 100644 --- a/CHANGES.txt +++ b/CHANGES.txt @@ -1,4 +1,5 @@ 4.0-beta3 + * Segregate Network and Chunk Cache BufferPools and Recirculate Partially Freed Chunks (CASSANDRA-15229) * Fail truncation requests when they fail on a replica (CASSANDRA-16208) * Move compact storage validation earlier in startup process (CASSANDRA-16063) * Fix ByteBufferAccessor cast exceptions are thrown when trying to query a virtual table (CASSANDRA-16155) diff --git a/conf/cassandra.yaml b/conf/cassandra.yaml index ff414ed..37b18f9 100644 --- a/conf/cassandra.yaml +++ b/conf/cassandra.yaml @@ -469,13 +469,22 @@ concurrent_counter_writes: 32 # be limited by the less of concurrent reads or concurrent writes. concurrent_materialized_view_writes: 32 +# Maximum memory to use for inter-node and client-server networking buffers. +# +# Defaults to the smaller of 1/16 of heap or 128MB. This pool is allocated off-heap, +# so is in addition to the memory allocated for heap. The cache also has on-heap +# overhead which is roughly 128 bytes per chunk (i.e. 0.2% of the reserved size +# if the default 64k chunk size is used). +# Memory is only allocated when needed. +# networking_cache_size_in_mb: 128 + # Enable the sstable chunk cache. The chunk cache will store recently accessed # sections of the sstable in-memory as uncompressed buffers. # file_cache_enabled: false # Maximum memory to use for sstable chunk cache and buffer pooling. -# 32MB of this are reserved for pooling buffers, the rest is used as an -# cache that holds uncompressed sstable chunks. +# 32MB of this are reserved for pooling buffers, the rest is used for chunk cache +# that holds uncompressed sstable chunks. # Defaults to the smaller of 1/4 of heap or 512MB. This pool is allocated off-heap, # so is in addition to the memory allocated for heap. The cache also has on-heap # overhead which is roughly 128 bytes per chunk (i.e. 0.2% of the reserved size diff --git a/src/java/org/apache/cassandra/cache/ChunkCache.java b/src/java/org/apache/cassandra/cache/ChunkCache.java index ae38015..c53810a 100644 --- a/src/java/org/apache/cassandra/cache/ChunkCache.java +++ b/src/java/org/apache/cassandra/cache/ChunkCache.java @@ -34,6 +34,7 @@ import org.apache.cassandra.io.sstable.CorruptSSTableException; import org.apache.cassandra.io.util.*; import org.apache.cassandra.metrics.ChunkCacheMetrics; import org.apache.cassandra.utils.memory.BufferPool; +import org.apache.cassandra.utils.memory.BufferPools; public class ChunkCache implements CacheLoader<ChunkCache.Key, ChunkCache.Buffer>, RemovalListener<ChunkCache.Key, ChunkCache.Buffer>, CacheSize @@ -43,7 +44,9 @@ public class ChunkCache public static final boolean roundUp = DatabaseDescriptor.getFileCacheRoundUp(); private static boolean enabled = DatabaseDescriptor.getFileCacheEnabled() && cacheSize > 0; - public static final ChunkCache instance = enabled ? new ChunkCache() : null; + public static final ChunkCache instance = enabled ? new ChunkCache(BufferPools.forChunkCache()) : null; + + private final BufferPool bufferPool; private final LoadingCache<Key, Buffer> cache; public final ChunkCacheMetrics metrics; @@ -86,7 +89,7 @@ public class ChunkCache } } - static class Buffer implements Rebufferer.BufferHolder + class Buffer implements Rebufferer.BufferHolder { private final ByteBuffer buffer; private final long offset; @@ -130,12 +133,13 @@ public class ChunkCache public void release() { if (references.decrementAndGet() == 0) - BufferPool.put(buffer); + bufferPool.put(buffer); } } - private ChunkCache() + private ChunkCache(BufferPool pool) { + bufferPool = pool; metrics = new ChunkCacheMetrics(this); cache = Caffeine.newBuilder() .maximumWeight(cacheSize) @@ -149,7 +153,7 @@ public class ChunkCache @Override public Buffer load(Key key) { - ByteBuffer buffer = BufferPool.get(key.file.chunkSize(), key.file.preferredBufferType()); + ByteBuffer buffer = bufferPool.get(key.file.chunkSize(), key.file.preferredBufferType()); assert buffer != null; key.file.readChunk(key.position, buffer); return new Buffer(buffer, key.position); diff --git a/src/java/org/apache/cassandra/config/Config.java b/src/java/org/apache/cassandra/config/Config.java index da410155..26854da 100644 --- a/src/java/org/apache/cassandra/config/Config.java +++ b/src/java/org/apache/cassandra/config/Config.java @@ -302,6 +302,8 @@ public class Config private static boolean isClientMode = false; private static Supplier<Config> overrideLoadConfig = null; + public Integer networking_cache_size_in_mb; + public Integer file_cache_size_in_mb; public boolean file_cache_enabled = Boolean.getBoolean("cassandra.file_cache_enabled"); diff --git a/src/java/org/apache/cassandra/config/DatabaseDescriptor.java b/src/java/org/apache/cassandra/config/DatabaseDescriptor.java index e8e66fa..0387105 100644 --- a/src/java/org/apache/cassandra/config/DatabaseDescriptor.java +++ b/src/java/org/apache/cassandra/config/DatabaseDescriptor.java @@ -453,6 +453,9 @@ public class DatabaseDescriptor if (conf.concurrent_replicates != null) logger.warn("concurrent_replicates has been deprecated and should be removed from cassandra.yaml"); + if (conf.networking_cache_size_in_mb == null) + conf.networking_cache_size_in_mb = Math.min(128, (int) (Runtime.getRuntime().maxMemory() / (16 * 1048576))); + if (conf.file_cache_size_in_mb == null) conf.file_cache_size_in_mb = Math.min(512, (int) (Runtime.getRuntime().maxMemory() / (4 * 1048576))); @@ -2449,6 +2452,17 @@ public class DatabaseDescriptor return conf.file_cache_size_in_mb; } + public static int getNetworkingCacheSizeInMB() + { + if (conf.networking_cache_size_in_mb == null) + { + // In client mode the value is not set. + assert DatabaseDescriptor.isClientInitialized(); + return 0; + } + return conf.networking_cache_size_in_mb; + } + public static boolean getFileCacheRoundUp() { if (conf.file_cache_round_up == null) diff --git a/src/java/org/apache/cassandra/db/streaming/CassandraStreamWriter.java b/src/java/org/apache/cassandra/db/streaming/CassandraStreamWriter.java index 10296fb..6481f4b 100644 --- a/src/java/org/apache/cassandra/db/streaming/CassandraStreamWriter.java +++ b/src/java/org/apache/cassandra/db/streaming/CassandraStreamWriter.java @@ -41,7 +41,7 @@ import org.apache.cassandra.streaming.StreamManager.StreamRateLimiter; import org.apache.cassandra.streaming.StreamSession; import org.apache.cassandra.streaming.async.StreamCompressionSerializer; import org.apache.cassandra.utils.FBUtilities; -import org.apache.cassandra.utils.memory.BufferPool; +import org.apache.cassandra.utils.memory.BufferPools; import static org.apache.cassandra.net.MessagingService.current_version; @@ -152,7 +152,7 @@ public class CassandraStreamWriter // this buffer will hold the data from disk. as it will be compressed on the fly by // AsyncChannelCompressedStreamWriter.write(ByteBuffer), we can release this buffer as soon as we can. - ByteBuffer buffer = BufferPool.get(minReadable, BufferType.OFF_HEAP); + ByteBuffer buffer = BufferPools.forNetworking().get(minReadable, BufferType.OFF_HEAP); try { int readCount = proxy.read(buffer, start); @@ -171,7 +171,7 @@ public class CassandraStreamWriter } finally { - BufferPool.put(buffer); + BufferPools.forNetworking().put(buffer); } return toTransfer; diff --git a/src/java/org/apache/cassandra/hints/ChecksummedDataInput.java b/src/java/org/apache/cassandra/hints/ChecksummedDataInput.java index 30d18fa..e6e8b38 100644 --- a/src/java/org/apache/cassandra/hints/ChecksummedDataInput.java +++ b/src/java/org/apache/cassandra/hints/ChecksummedDataInput.java @@ -26,9 +26,9 @@ import com.google.common.base.Preconditions; import org.apache.cassandra.io.compress.BufferType; import org.apache.cassandra.io.util.*; +import org.apache.cassandra.utils.ByteBufferUtil; import org.apache.cassandra.utils.Throwables; import org.apache.cassandra.utils.NativeLibrary; -import org.apache.cassandra.utils.memory.BufferPool; /** * A {@link RandomAccessReader} wrapper that calculates the CRC in place. @@ -55,7 +55,7 @@ public class ChecksummedDataInput extends RebufferingInputStream ChecksummedDataInput(ChannelProxy channel, BufferType bufferType) { - super(BufferPool.get(RandomAccessReader.DEFAULT_BUFFER_SIZE, bufferType)); + super(bufferType.allocate(RandomAccessReader.DEFAULT_BUFFER_SIZE)); crc = new CRC32(); crcPosition = 0; @@ -244,7 +244,7 @@ public class ChecksummedDataInput extends RebufferingInputStream @Override public void close() { - BufferPool.put(buffer); + FileUtils.clean(buffer); channel.close(); } diff --git a/src/java/org/apache/cassandra/hints/CompressedChecksummedDataInput.java b/src/java/org/apache/cassandra/hints/CompressedChecksummedDataInput.java index 0381b00..2f442be 100644 --- a/src/java/org/apache/cassandra/hints/CompressedChecksummedDataInput.java +++ b/src/java/org/apache/cassandra/hints/CompressedChecksummedDataInput.java @@ -28,9 +28,12 @@ import org.apache.cassandra.io.compress.ICompressor; import org.apache.cassandra.io.util.ChannelProxy; import org.apache.cassandra.utils.memory.BufferPool; import org.apache.cassandra.utils.Throwables; +import org.apache.cassandra.utils.memory.BufferPools; public final class CompressedChecksummedDataInput extends ChecksummedDataInput { + private static final BufferPool bufferPool = BufferPools.forChunkCache(); + private final ICompressor compressor; private volatile long filePosition = 0; // Current position in file, advanced when reading chunk. private volatile long sourcePosition = 0; // Current position in file to report, advanced after consuming chunk. @@ -117,9 +120,9 @@ public final class CompressedChecksummedDataInput extends ChecksummedDataInput int bufferSize = compressedSize + (compressedSize / 20); // allocate +5% to cover variability in compressed size if (compressedBuffer != null) { - BufferPool.put(compressedBuffer); + bufferPool.put(compressedBuffer); } - compressedBuffer = BufferPool.get(bufferSize, compressor.preferredBufferType()); + compressedBuffer = bufferPool.get(bufferSize, compressor.preferredBufferType()); } compressedBuffer.clear(); @@ -131,8 +134,8 @@ public final class CompressedChecksummedDataInput extends ChecksummedDataInput if (buffer.capacity() < uncompressedSize) { int bufferSize = uncompressedSize + (uncompressedSize / 20); - BufferPool.put(buffer); - buffer = BufferPool.get(bufferSize, compressor.preferredBufferType()); + bufferPool.put(buffer); + buffer = bufferPool.get(bufferSize, compressor.preferredBufferType()); } buffer.clear(); @@ -151,7 +154,7 @@ public final class CompressedChecksummedDataInput extends ChecksummedDataInput @Override public void close() { - BufferPool.put(compressedBuffer); + bufferPool.put(compressedBuffer); super.close(); } diff --git a/src/java/org/apache/cassandra/io/util/BufferManagingRebufferer.java b/src/java/org/apache/cassandra/io/util/BufferManagingRebufferer.java index f3b9a88..3a297ee 100644 --- a/src/java/org/apache/cassandra/io/util/BufferManagingRebufferer.java +++ b/src/java/org/apache/cassandra/io/util/BufferManagingRebufferer.java @@ -23,7 +23,7 @@ package org.apache.cassandra.io.util; import java.nio.ByteBuffer; import java.nio.ByteOrder; -import org.apache.cassandra.utils.memory.BufferPool; +import org.apache.cassandra.utils.memory.BufferPools; /** * Buffer manager used for reading from a ChunkReader when cache is not in use. Instances of this class are @@ -42,14 +42,14 @@ public abstract class BufferManagingRebufferer implements Rebufferer, Rebufferer protected BufferManagingRebufferer(ChunkReader wrapped) { this.source = wrapped; - buffer = BufferPool.get(wrapped.chunkSize(), wrapped.preferredBufferType()).order(ByteOrder.BIG_ENDIAN); + buffer = BufferPools.forChunkCache().get(wrapped.chunkSize(), wrapped.preferredBufferType()).order(ByteOrder.BIG_ENDIAN); buffer.limit(0); } @Override public void closeReader() { - BufferPool.put(buffer); + BufferPools.forChunkCache().put(buffer); offset = -1; } diff --git a/src/java/org/apache/cassandra/metrics/BufferPoolMetrics.java b/src/java/org/apache/cassandra/metrics/BufferPoolMetrics.java index c9c859a..78e7265 100644 --- a/src/java/org/apache/cassandra/metrics/BufferPoolMetrics.java +++ b/src/java/org/apache/cassandra/metrics/BufferPoolMetrics.java @@ -1,4 +1,4 @@ -/** +/* * Licensed to the Apache Software Foundation (ASF) under one * or more contributor license agreements. See the NOTICE file * distributed with this work for additional information @@ -25,24 +25,47 @@ import static org.apache.cassandra.metrics.CassandraMetricsRegistry.Metrics; public class BufferPoolMetrics { - private static final MetricNameFactory factory = new DefaultNameFactory("BufferPool"); + /** Total number of hits */ + public final Meter hits; /** Total number of misses */ public final Meter misses; - /** Total size of buffer pools, in bytes */ + /** Total size of buffer pools, in bytes, including overflow allocation */ public final Gauge<Long> size; - public BufferPoolMetrics() + /** Total size, in bytes, of active buffered being used from the pool currently + overflow */ + public final Gauge<Long> usedSize; + + /** + * Total size, in bytes, of direct or heap buffers allocated by the pool but not part of the pool + * either because they are too large to fit or because the pool has exceeded its maximum limit or because it's + * on-heap allocation. + */ + public final Gauge<Long> overflowSize; + + public BufferPoolMetrics(String scope, BufferPool bufferPool) { + MetricNameFactory factory = new DefaultNameFactory("BufferPool", scope); + + hits = Metrics.meter(factory.createMetricName("Hits")); + misses = Metrics.meter(factory.createMetricName("Misses")); - size = Metrics.register(factory.createMetricName("Size"), new Gauge<Long>() - { - public Long getValue() - { - return BufferPool.sizeInBytes(); - } - }); + overflowSize = Metrics.register(factory.createMetricName("OverflowSize"), bufferPool::overflowMemoryInBytes); + + usedSize = Metrics.register(factory.createMetricName("UsedSize"), bufferPool::usedSizeInBytes); + + size = Metrics.register(factory.createMetricName("Size"), bufferPool::sizeInBytes); + } + + /** + * used to register alias for 3.0/3.11 compatibility + */ + public void register3xAlias() + { + MetricNameFactory legacyFactory = new DefaultNameFactory("BufferPool"); + Metrics.registerMBean(misses, legacyFactory.createMetricName("Misses").getMBeanName()); + Metrics.registerMBean(size, legacyFactory.createMetricName("Size").getMBeanName()); } } diff --git a/src/java/org/apache/cassandra/net/AsyncStreamingOutputPlus.java b/src/java/org/apache/cassandra/net/AsyncStreamingOutputPlus.java index 680a9d3..3a9c075 100644 --- a/src/java/org/apache/cassandra/net/AsyncStreamingOutputPlus.java +++ b/src/java/org/apache/cassandra/net/AsyncStreamingOutputPlus.java @@ -37,6 +37,7 @@ import org.apache.cassandra.io.util.DataOutputStreamPlus; import org.apache.cassandra.net.SharedDefaultFileRegion.SharedFileChannel; import org.apache.cassandra.streaming.StreamManager.StreamRateLimiter; import org.apache.cassandra.utils.memory.BufferPool; +import org.apache.cassandra.utils.memory.BufferPools; import static java.lang.Math.min; @@ -53,6 +54,8 @@ public class AsyncStreamingOutputPlus extends AsyncChannelOutputPlus { private static final Logger logger = LoggerFactory.getLogger(AsyncStreamingOutputPlus.class); + private final BufferPool bufferPool = BufferPools.forNetworking(); + final int defaultLowWaterMark; final int defaultHighWaterMark; @@ -68,7 +71,7 @@ public class AsyncStreamingOutputPlus extends AsyncChannelOutputPlus private void allocateBuffer() { // this buffer is only used for small quantities of data - buffer = BufferPool.getAtLeast(8 << 10, BufferType.OFF_HEAP); + buffer = bufferPool.getAtLeast(8 << 10, BufferType.OFF_HEAP); } @Override @@ -140,7 +143,7 @@ public class AsyncStreamingOutputPlus extends AsyncChannelOutputPlus throw new IllegalStateException("Can only allocate one ByteBuffer"); limiter.acquire(size); holder.promise = beginFlush(size, defaultLowWaterMark, defaultHighWaterMark); - holder.buffer = BufferPool.get(size, BufferType.OFF_HEAP); + holder.buffer = bufferPool.get(size, BufferType.OFF_HEAP); return holder.buffer; }); } @@ -148,14 +151,14 @@ public class AsyncStreamingOutputPlus extends AsyncChannelOutputPlus { // we don't currently support cancelling the flush, but at this point we are recoverable if we want if (holder.buffer != null) - BufferPool.put(holder.buffer); + bufferPool.put(holder.buffer); if (holder.promise != null) holder.promise.tryFailure(t); throw t; } ByteBuffer buffer = holder.buffer; - BufferPool.putUnusedPortion(buffer); + bufferPool.putUnusedPortion(buffer); int length = buffer.limit(); channel.writeAndFlush(GlobalBufferPoolAllocator.wrap(buffer), holder.promise); @@ -260,7 +263,7 @@ public class AsyncStreamingOutputPlus extends AsyncChannelOutputPlus { if (buffer != null) { - BufferPool.put(buffer); + bufferPool.put(buffer); buffer = null; } } diff --git a/src/java/org/apache/cassandra/net/BufferPoolAllocator.java b/src/java/org/apache/cassandra/net/BufferPoolAllocator.java index 224f690..11c0641 100644 --- a/src/java/org/apache/cassandra/net/BufferPoolAllocator.java +++ b/src/java/org/apache/cassandra/net/BufferPoolAllocator.java @@ -25,6 +25,7 @@ import io.netty.buffer.Unpooled; import io.netty.buffer.UnpooledUnsafeDirectByteBuf; import org.apache.cassandra.io.compress.BufferType; import org.apache.cassandra.utils.memory.BufferPool; +import org.apache.cassandra.utils.memory.BufferPools; /** * A trivial wrapper around BufferPool for integrating with Netty, but retaining ownership of pooling behaviour @@ -32,6 +33,8 @@ import org.apache.cassandra.utils.memory.BufferPool; */ public abstract class BufferPoolAllocator extends AbstractByteBufAllocator { + private static final BufferPool bufferPool = BufferPools.forNetworking(); + BufferPoolAllocator() { super(true); @@ -60,22 +63,22 @@ public abstract class BufferPoolAllocator extends AbstractByteBufAllocator ByteBuffer get(int size) { - return BufferPool.get(size, BufferType.OFF_HEAP); + return bufferPool.get(size, BufferType.OFF_HEAP); } ByteBuffer getAtLeast(int size) { - return BufferPool.getAtLeast(size, BufferType.OFF_HEAP); + return bufferPool.getAtLeast(size, BufferType.OFF_HEAP); } void put(ByteBuffer buffer) { - BufferPool.put(buffer); + bufferPool.put(buffer); } void putUnusedPortion(ByteBuffer buffer) { - BufferPool.putUnusedPortion(buffer); + bufferPool.putUnusedPortion(buffer); } void release() @@ -100,7 +103,7 @@ public abstract class BufferPoolAllocator extends AbstractByteBufAllocator public void deallocate() { if (wrapped != null) - BufferPool.put(wrapped); + bufferPool.put(wrapped); } public ByteBuffer adopt() diff --git a/src/java/org/apache/cassandra/net/FrameDecoderLegacyLZ4.java b/src/java/org/apache/cassandra/net/FrameDecoderLegacyLZ4.java index bf6bc17..4c620c7 100644 --- a/src/java/org/apache/cassandra/net/FrameDecoderLegacyLZ4.java +++ b/src/java/org/apache/cassandra/net/FrameDecoderLegacyLZ4.java @@ -32,6 +32,7 @@ import net.jpountz.lz4.LZ4SafeDecompressor; import net.jpountz.xxhash.XXHash32; import net.jpountz.xxhash.XXHashFactory; import org.apache.cassandra.utils.memory.BufferPool; +import org.apache.cassandra.utils.memory.BufferPools; import static java.lang.Integer.reverseBytes; import static java.lang.String.format; @@ -46,6 +47,8 @@ import static org.apache.cassandra.utils.ByteBufferUtil.copyBytes; */ class FrameDecoderLegacyLZ4 extends FrameDecoderLegacy { + private static final BufferPool bufferPool = BufferPools.forNetworking(); + FrameDecoderLegacyLZ4(BufferPoolAllocator allocator, int messagingVersion) { super(allocator, messagingVersion); @@ -122,7 +125,7 @@ class FrameDecoderLegacyLZ4 extends FrameDecoderLegacy assert msg instanceof BufferPoolAllocator.Wrapped; ByteBuffer buf = ((BufferPoolAllocator.Wrapped) msg).adopt(); // netty will probably have mis-predicted the space needed - BufferPool.putUnusedPortion(buf); + bufferPool.putUnusedPortion(buf); CorruptLZ4Frame error = null; try @@ -252,7 +255,7 @@ class FrameDecoderLegacyLZ4 extends FrameDecoderLegacy } catch (Throwable t) { - BufferPool.put(out); + bufferPool.put(out); throw t; } } @@ -269,7 +272,7 @@ class FrameDecoderLegacyLZ4 extends FrameDecoderLegacy { if (null != stash) { - BufferPool.put(stash); + bufferPool.put(stash); stash = null; } @@ -348,7 +351,7 @@ class FrameDecoderLegacyLZ4 extends FrameDecoderLegacy ByteBuffer out = allocator.getAtLeast(capacity); in.flip(); out.put(in); - BufferPool.put(in); + bufferPool.put(in); return out; } diff --git a/src/java/org/apache/cassandra/net/FrameEncoder.java b/src/java/org/apache/cassandra/net/FrameEncoder.java index d9df166..5f2dc37 100644 --- a/src/java/org/apache/cassandra/net/FrameEncoder.java +++ b/src/java/org/apache/cassandra/net/FrameEncoder.java @@ -25,9 +25,12 @@ import io.netty.channel.ChannelOutboundHandlerAdapter; import io.netty.channel.ChannelPromise; import org.apache.cassandra.io.compress.BufferType; import org.apache.cassandra.utils.memory.BufferPool; +import org.apache.cassandra.utils.memory.BufferPools; abstract class FrameEncoder extends ChannelOutboundHandlerAdapter { + protected static final BufferPool bufferPool = BufferPools.forNetworking(); + /** * An abstraction useful for transparently allocating buffers that can be written to upstream * of the {@code FrameEncoder} without knowledge of the encoder's frame layout, while ensuring @@ -57,7 +60,7 @@ abstract class FrameEncoder extends ChannelOutboundHandlerAdapter this.headerLength = headerLength; this.trailerLength = trailerLength; - buffer = BufferPool.getAtLeast(payloadCapacity + headerLength + trailerLength, BufferType.OFF_HEAP); + buffer = bufferPool.getAtLeast(payloadCapacity + headerLength + trailerLength, BufferType.OFF_HEAP); assert buffer.capacity() >= payloadCapacity + headerLength + trailerLength; buffer.position(headerLength); buffer.limit(buffer.capacity() - trailerLength); @@ -103,12 +106,12 @@ abstract class FrameEncoder extends ChannelOutboundHandlerAdapter isFinished = true; buffer.limit(buffer.position() + trailerLength); buffer.position(0); - BufferPool.putUnusedPortion(buffer); + bufferPool.putUnusedPortion(buffer); } void release() { - BufferPool.put(buffer); + bufferPool.put(buffer); } } diff --git a/src/java/org/apache/cassandra/net/FrameEncoderCrc.java b/src/java/org/apache/cassandra/net/FrameEncoderCrc.java index 2d07d6d..5049f29 100644 --- a/src/java/org/apache/cassandra/net/FrameEncoderCrc.java +++ b/src/java/org/apache/cassandra/net/FrameEncoderCrc.java @@ -91,7 +91,7 @@ class FrameEncoderCrc extends FrameEncoder } catch (Throwable t) { - BufferPool.put(frame); + bufferPool.put(frame); throw t; } } diff --git a/src/java/org/apache/cassandra/net/FrameEncoderLZ4.java b/src/java/org/apache/cassandra/net/FrameEncoderLZ4.java index 12351ce..2d76170 100644 --- a/src/java/org/apache/cassandra/net/FrameEncoderLZ4.java +++ b/src/java/org/apache/cassandra/net/FrameEncoderLZ4.java @@ -27,7 +27,6 @@ import net.jpountz.lz4.LZ4Compressor; import net.jpountz.lz4.LZ4Factory; import org.apache.cassandra.io.compress.BufferType; import org.apache.cassandra.utils.ByteBufferUtil; -import org.apache.cassandra.utils.memory.BufferPool; import static org.apache.cassandra.net.Crc.*; @@ -74,7 +73,7 @@ class FrameEncoderLZ4 extends FrameEncoder throw new IllegalArgumentException("Maximum uncompressed payload size is 128KiB"); int maxOutputLength = compressor.maxCompressedLength(uncompressedLength); - frame = BufferPool.getAtLeast(HEADER_AND_TRAILER_LENGTH + maxOutputLength, BufferType.OFF_HEAP); + frame = bufferPool.getAtLeast(HEADER_AND_TRAILER_LENGTH + maxOutputLength, BufferType.OFF_HEAP); int compressedLength = compressor.compress(in, in.position(), uncompressedLength, frame, HEADER_LENGTH, maxOutputLength); @@ -101,18 +100,18 @@ class FrameEncoderLZ4 extends FrameEncoder frame.putInt(frameCrc); frame.position(0); - BufferPool.putUnusedPortion(frame); + bufferPool.putUnusedPortion(frame); return GlobalBufferPoolAllocator.wrap(frame); } catch (Throwable t) { if (frame != null) - BufferPool.put(frame); + bufferPool.put(frame); throw t; } finally { - BufferPool.put(in); + bufferPool.put(in); } } } diff --git a/src/java/org/apache/cassandra/net/FrameEncoderLegacyLZ4.java b/src/java/org/apache/cassandra/net/FrameEncoderLegacyLZ4.java index 3b29ecb..000fab7 100644 --- a/src/java/org/apache/cassandra/net/FrameEncoderLegacyLZ4.java +++ b/src/java/org/apache/cassandra/net/FrameEncoderLegacyLZ4.java @@ -68,7 +68,7 @@ class FrameEncoderLegacyLZ4 extends FrameEncoder ByteBuffer frame = null; try { - frame = BufferPool.getAtLeast(calculateMaxFrameLength(payload), BufferType.OFF_HEAP); + frame = bufferPool.getAtLeast(calculateMaxFrameLength(payload), BufferType.OFF_HEAP); int frameOffset = 0; int payloadOffset = 0; @@ -82,19 +82,19 @@ class FrameEncoderLegacyLZ4 extends FrameEncoder } frame.limit(frameOffset); - BufferPool.putUnusedPortion(frame); + bufferPool.putUnusedPortion(frame); return GlobalBufferPoolAllocator.wrap(frame); } catch (Throwable t) { if (null != frame) - BufferPool.put(frame); + bufferPool.put(frame); throw t; } finally { - BufferPool.put(payload); + bufferPool.put(payload); } } diff --git a/src/java/org/apache/cassandra/net/FrameEncoderUnprotected.java b/src/java/org/apache/cassandra/net/FrameEncoderUnprotected.java index 3bca41c..6158713 100644 --- a/src/java/org/apache/cassandra/net/FrameEncoderUnprotected.java +++ b/src/java/org/apache/cassandra/net/FrameEncoderUnprotected.java @@ -59,7 +59,7 @@ class FrameEncoderUnprotected extends FrameEncoder } catch (Throwable t) { - BufferPool.put(frame); + bufferPool.put(frame); throw t; } } diff --git a/src/java/org/apache/cassandra/net/HandshakeProtocol.java b/src/java/org/apache/cassandra/net/HandshakeProtocol.java index 47d0ec6..bfdcc2c 100644 --- a/src/java/org/apache/cassandra/net/HandshakeProtocol.java +++ b/src/java/org/apache/cassandra/net/HandshakeProtocol.java @@ -32,7 +32,7 @@ import org.apache.cassandra.io.util.DataInputBuffer; import org.apache.cassandra.io.util.DataInputPlus; import org.apache.cassandra.io.util.DataOutputBufferFixed; import org.apache.cassandra.locator.InetAddressAndPort; -import org.apache.cassandra.utils.memory.BufferPool; +import org.apache.cassandra.utils.memory.BufferPools; import static java.util.concurrent.TimeUnit.MILLISECONDS; import static org.apache.cassandra.locator.InetAddressAndPort.Serializer.inetAddressAndPortSerializer; @@ -135,7 +135,7 @@ class HandshakeProtocol ByteBuf encode() { - ByteBuffer buffer = BufferPool.get(MAX_LENGTH, BufferType.OFF_HEAP); + ByteBuffer buffer = BufferPools.forNetworking().get(MAX_LENGTH, BufferType.OFF_HEAP); try (DataOutputBufferFixed out = new DataOutputBufferFixed(buffer)) { out.writeInt(Message.PROTOCOL_MAGIC); @@ -347,7 +347,7 @@ class HandshakeProtocol ByteBuf encode() { - ByteBuffer buffer = BufferPool.get(MAX_LENGTH, BufferType.OFF_HEAP); + ByteBuffer buffer = BufferPools.forNetworking().get(MAX_LENGTH, BufferType.OFF_HEAP); try (DataOutputBufferFixed out = new DataOutputBufferFixed(buffer)) { out.writeInt(maxMessagingVersion); diff --git a/src/java/org/apache/cassandra/net/InboundConnectionInitiator.java b/src/java/org/apache/cassandra/net/InboundConnectionInitiator.java index e02512b..f2339eb 100644 --- a/src/java/org/apache/cassandra/net/InboundConnectionInitiator.java +++ b/src/java/org/apache/cassandra/net/InboundConnectionInitiator.java @@ -52,7 +52,7 @@ import org.apache.cassandra.locator.InetAddressAndPort; import org.apache.cassandra.net.OutboundConnectionSettings.Framing; import org.apache.cassandra.security.SSLFactory; import org.apache.cassandra.streaming.async.StreamingInboundHandler; -import org.apache.cassandra.utils.memory.BufferPool; +import org.apache.cassandra.utils.memory.BufferPools; import static java.lang.Math.*; import static java.util.concurrent.TimeUnit.MILLISECONDS; @@ -389,7 +389,7 @@ public class InboundConnectionInitiator from = InetAddressAndPort.getByAddressOverrideDefaults(address.getAddress(), address.getPort()); } - BufferPool.setRecycleWhenFreeForCurrentThread(false); + BufferPools.forNetworking().setRecycleWhenFreeForCurrentThread(false); pipeline.replace(this, "streamInbound", new StreamingInboundHandler(from, current_version, null)); logger.info("{} streaming connection established, version = {}, framing = {}, encryption = {}", @@ -411,7 +411,7 @@ public class InboundConnectionInitiator // record the "true" endpoint, i.e. the one the peer is identified with, as opposed to the socket it connected over instance().versions.set(from, maxMessagingVersion); - BufferPool.setRecycleWhenFreeForCurrentThread(false); + BufferPools.forNetworking().setRecycleWhenFreeForCurrentThread(false); BufferPoolAllocator allocator = GlobalBufferPoolAllocator.instance; if (initiate.type == ConnectionType.LARGE_MESSAGES) { diff --git a/src/java/org/apache/cassandra/net/LocalBufferPoolAllocator.java b/src/java/org/apache/cassandra/net/LocalBufferPoolAllocator.java index b2d487f..8017854 100644 --- a/src/java/org/apache/cassandra/net/LocalBufferPoolAllocator.java +++ b/src/java/org/apache/cassandra/net/LocalBufferPoolAllocator.java @@ -21,6 +21,7 @@ import java.nio.ByteBuffer; import io.netty.channel.EventLoop; import org.apache.cassandra.utils.memory.BufferPool; +import org.apache.cassandra.utils.memory.BufferPools; /** * Equivalent to {@link GlobalBufferPoolAllocator}, except explicitly using a specified @@ -36,7 +37,7 @@ class LocalBufferPoolAllocator extends BufferPoolAllocator LocalBufferPoolAllocator(EventLoop eventLoop) { - this.pool = new BufferPool.LocalPool().recycleWhenFree(false); + this.pool = BufferPools.forNetworking().create().recycleWhenFree(false); this.eventLoop = eventLoop; } diff --git a/src/java/org/apache/cassandra/net/OutboundConnectionInitiator.java b/src/java/org/apache/cassandra/net/OutboundConnectionInitiator.java index 5f3eced..4a5585a 100644 --- a/src/java/org/apache/cassandra/net/OutboundConnectionInitiator.java +++ b/src/java/org/apache/cassandra/net/OutboundConnectionInitiator.java @@ -54,7 +54,7 @@ import org.apache.cassandra.net.OutboundConnectionInitiator.Result.MessagingSucc import org.apache.cassandra.net.OutboundConnectionInitiator.Result.StreamingSuccess; import org.apache.cassandra.security.SSLFactory; import org.apache.cassandra.utils.JVMStabilityInspector; -import org.apache.cassandra.utils.memory.BufferPool; +import org.apache.cassandra.utils.memory.BufferPools; import static java.util.concurrent.TimeUnit.*; import static org.apache.cassandra.net.MessagingService.VERSION_40; @@ -338,7 +338,7 @@ public class OutboundConnectionInitiator<SuccessType extends OutboundConnectionI ChannelPipeline pipeline = ctx.pipeline(); if (result.isSuccess()) { - BufferPool.setRecycleWhenFreeForCurrentThread(false); + BufferPools.forNetworking().setRecycleWhenFreeForCurrentThread(false); if (type.isMessaging()) { assert frameEncoder != null; diff --git a/src/java/org/apache/cassandra/net/ShareableBytes.java b/src/java/org/apache/cassandra/net/ShareableBytes.java index e4f2460..71c272a 100644 --- a/src/java/org/apache/cassandra/net/ShareableBytes.java +++ b/src/java/org/apache/cassandra/net/ShareableBytes.java @@ -20,10 +20,10 @@ package org.apache.cassandra.net; import java.nio.ByteBuffer; import java.util.concurrent.atomic.AtomicIntegerFieldUpdater; -import org.apache.cassandra.utils.memory.BufferPool; +import org.apache.cassandra.utils.memory.BufferPools; /** - * A wrapper for possibly sharing portions of a single, {@link BufferPool} managed, {@link ByteBuffer}; + * A wrapper for possibly sharing portions of a single, {@link BufferPools#forNetworking()} managed, {@link ByteBuffer}; * optimised for the case where no sharing is necessary. * * When sharing is necessary, {@link #share()} method must be invoked by the owning thread @@ -136,7 +136,7 @@ class ShareableBytes throw new IllegalStateException("Already released"); if (count == RELEASED) - BufferPool.put(bytes); + BufferPools.forNetworking().put(bytes); } boolean isReleased() diff --git a/src/java/org/apache/cassandra/utils/memory/BufferPool.java b/src/java/org/apache/cassandra/utils/memory/BufferPool.java index b18f689..531b492 100644 --- a/src/java/org/apache/cassandra/utils/memory/BufferPool.java +++ b/src/java/org/apache/cassandra/utils/memory/BufferPool.java @@ -29,6 +29,8 @@ import java.util.Set; import java.util.concurrent.*; import java.util.concurrent.atomic.AtomicLong; import java.util.concurrent.atomic.AtomicLongFieldUpdater; +import java.util.concurrent.atomic.LongAdder; +import java.util.concurrent.atomic.AtomicReferenceFieldUpdater; import java.util.function.BiPredicate; import java.util.function.Consumer; import java.util.function.Supplier; @@ -41,7 +43,7 @@ import org.slf4j.Logger; import org.slf4j.LoggerFactory; import io.netty.util.concurrent.FastThreadLocal; -import org.apache.cassandra.config.DatabaseDescriptor; + import org.apache.cassandra.io.compress.BufferType; import org.apache.cassandra.io.util.FileUtils; import org.apache.cassandra.metrics.BufferPoolMetrics; @@ -54,10 +56,52 @@ import static org.apache.cassandra.utils.FBUtilities.prettyPrintMemory; import static org.apache.cassandra.utils.memory.MemoryUtil.isExactlyDirect; /** - * A pool of ByteBuffers that can be recycled. + * A pool of ByteBuffers that can be recycled to reduce system direct memory fragmentation and improve buffer allocation + * performance. + * <p/> + * + * Each {@link BufferPool} instance has one {@link GlobalPool} which allocates two kinds of chunks: + * <ul> + * <li>Macro Chunk + * <ul> + * <li>A memory slab that has size of MACRO_CHUNK_SIZE which is 64 * NORMAL_CHUNK_SIZE</li> + * <li>Used to allocate normal chunk with size of NORMAL_CHUNK_SIZE</li> + * </ul> + * </li> + * <li>Normal Chunk + * <ul> + * <li>Used by {@link LocalPool} to serve buffer allocation</li> + * <li>Minimum allocation unit is NORMAL_CHUNK_SIZE / 64</li> + * </ul> + * </li> + * </ul> + * + * {@link GlobalPool} maintains two kinds of freed chunks, fully freed chunks where all buffers are released, and + * partially freed chunks where some buffers are not released, eg. held by {@link org.apache.cassandra.cache.ChunkCache}. + * Partially freed chunks are used to improve cache utilization and have lower priority compared to fully freed chunks. + * + * <p/> + * + * {@link LocalPool} is a thread local pool to serve buffer allocation requests. There are two kinds of local pool: + * <ul> + * <li>Normal Pool: + * <ul> + * <li>used to serve allocation size that is larger than half of NORMAL_ALLOCATION_UNIT but less than NORMAL_CHUNK_SIZE</li> + * <li>when there is insufficient space in the local queue, it will request global pool for more normal chunks</li> + * <li>when normal chunk is recycled either fully or partially, it will be passed to global pool to be used by other pools</li> + * </ul> + * </li> + * <li>Tiny Pool: + * <ul> + * <li>used to serve allocation size that is less than NORMAL_ALLOCATION_UNIT</li> + * <li>when there is insufficient space in the local queue, it will request parent normal pool for more tiny chunks</li> + * <li>when tiny chunk is fully freed, it will be passed to paretn normal pool and corresponding buffer in the parent normal chunk is freed</li> + * </ul> + * </li> + * </ul> * - * TODO: document the semantics of this class carefully - * Notably: we do not automatically release from the local pool any chunk that has been incompletely allocated from + * Note: even though partially freed chunks improves cache utilization when chunk cache holds outstanding buffer for + * arbitrary period, there is still fragmentation in the partially freed chunk because of non-uniform allocation size. */ public class BufferPool { @@ -68,23 +112,40 @@ public class BufferPool public static final int TINY_ALLOCATION_UNIT = TINY_CHUNK_SIZE / 64; public static final int TINY_ALLOCATION_LIMIT = TINY_CHUNK_SIZE / 2; - private final static BufferPoolMetrics metrics = new BufferPoolMetrics(); - - // TODO: this should not be using FileCacheSizeInMB - private static long MEMORY_USAGE_THRESHOLD = DatabaseDescriptor.getFileCacheSizeInMB() * 1024L * 1024L; - private static String READABLE_MEMORY_USAGE_THRESHOLD = prettyPrintMemory(MEMORY_USAGE_THRESHOLD); - - private static Debug debug; - private static final Logger logger = LoggerFactory.getLogger(BufferPool.class); private static final NoSpamLogger noSpamLogger = NoSpamLogger.getLogger(logger, 15L, TimeUnit.MINUTES); private static final ByteBuffer EMPTY_BUFFER = ByteBuffer.allocateDirect(0); + private volatile Debug debug = Debug.NO_OP; + + protected final String name; + protected final BufferPoolMetrics metrics; + private final long memoryUsageThreshold; + private final String readableMemoryUsageThreshold; + + /** + * Size of unpooled buffer being allocated outside of buffer pool in bytes. + */ + private final LongAdder overflowMemoryUsage = new LongAdder(); + + /** + * Size of buffer being used in bytes, including pooled buffer and unpooled buffer. + */ + private final LongAdder memoryInUse = new LongAdder(); + + /** + * Size of allocated buffer pool slabs in bytes + */ + private final AtomicLong memoryAllocated = new AtomicLong(); + /** A global pool of chunks (page aligned buffers) */ - private static final GlobalPool globalPool = new GlobalPool(); + private final GlobalPool globalPool; + + /** Allow partially freed chunk to be recycled for allocation*/ + private final boolean recyclePartially; /** A thread local pool of chunks, where chunks come from the global pool */ - private static final FastThreadLocal<LocalPool> localPool = new FastThreadLocal<LocalPool>() + private final FastThreadLocal<LocalPool> localPool = new FastThreadLocal<LocalPool>() { @Override protected LocalPool initialValue() @@ -98,7 +159,31 @@ public class BufferPool } }; - public static ByteBuffer get(int size, BufferType bufferType) + private final Set<LocalPoolRef> localPoolReferences = Collections.newSetFromMap(new ConcurrentHashMap<>()); + + private final ReferenceQueue<Object> localPoolRefQueue = new ReferenceQueue<>(); + private final InfiniteLoopExecutor localPoolCleaner; + + public BufferPool(String name, long memoryUsageThreshold, boolean recyclePartially) + { + this.name = name; + this.memoryUsageThreshold = memoryUsageThreshold; + this.readableMemoryUsageThreshold = prettyPrintMemory(memoryUsageThreshold); + this.globalPool = new GlobalPool(); + this.metrics = new BufferPoolMetrics(name, this); + this.recyclePartially = recyclePartially; + this.localPoolCleaner = new InfiniteLoopExecutor("LocalPool-Cleaner-" + name, this::cleanupOneReference).start(); + } + + /** + * @return a local pool instance and caller is responsible to release the pool + */ + public LocalPool create() + { + return new LocalPool(); + } + + public ByteBuffer get(int size, BufferType bufferType) { if (bufferType == BufferType.ON_HEAP) return allocate(size, bufferType); @@ -106,7 +191,7 @@ public class BufferPool return localPool.get().get(size); } - public static ByteBuffer getAtLeast(int size, BufferType bufferType) + public ByteBuffer getAtLeast(int size, BufferType bufferType) { if (bufferType == BufferType.ON_HEAP) return allocate(size, bufferType); @@ -115,30 +200,33 @@ public class BufferPool } /** Unlike the get methods, this will return null if the pool is exhausted */ - public static ByteBuffer tryGet(int size) + public ByteBuffer tryGet(int size) { return localPool.get().tryGet(size, false); } - public static ByteBuffer tryGetAtLeast(int size) + public ByteBuffer tryGetAtLeast(int size) { return localPool.get().tryGet(size, true); } - private static ByteBuffer allocate(int size, BufferType bufferType) + private ByteBuffer allocate(int size, BufferType bufferType) { + updateOverflowMemoryUsage(size); return bufferType == BufferType.ON_HEAP ? ByteBuffer.allocate(size) : ByteBuffer.allocateDirect(size); } - public static void put(ByteBuffer buffer) + public void put(ByteBuffer buffer) { if (isExactlyDirect(buffer)) localPool.get().put(buffer); + else + updateOverflowMemoryUsage(-buffer.capacity()); } - public static void putUnusedPortion(ByteBuffer buffer) + public void putUnusedPortion(ByteBuffer buffer) { if (isExactlyDirect(buffer)) { @@ -150,43 +238,93 @@ public class BufferPool } } - public static void setRecycleWhenFreeForCurrentThread(boolean recycleWhenFree) + private void updateOverflowMemoryUsage(int size) + { + overflowMemoryUsage.add(size); + } + + public void setRecycleWhenFreeForCurrentThread(boolean recycleWhenFree) { localPool.get().recycleWhenFree(recycleWhenFree); } - public static long sizeInBytes() + /** + * @return buffer size being allocated, including pooled buffers and unpooled buffers + */ + public long sizeInBytes() { - return globalPool.sizeInBytes(); + return memoryAllocated.get() + overflowMemoryUsage.longValue(); } - @VisibleForTesting - public static void setMemoryUsageThreshold(long threshold) + /** + * @return buffer size being used, including used pooled buffers and unpooled buffers + */ + public long usedSizeInBytes() { - MEMORY_USAGE_THRESHOLD = threshold; - READABLE_MEMORY_USAGE_THRESHOLD = prettyPrintMemory(MEMORY_USAGE_THRESHOLD); + return memoryInUse.longValue() + overflowMemoryUsage.longValue(); + } + + /** + * @return unpooled buffer size being allocated outside of buffer pool. + */ + public long overflowMemoryInBytes() + { + return overflowMemoryUsage.longValue(); + } + + /** + * @return maximum pooled buffer size in bytes + */ + public long memoryUsageThreshold() + { + return memoryUsageThreshold; } @VisibleForTesting - public static long getMemoryUsageThreshold() + public GlobalPool globalPool() { - return MEMORY_USAGE_THRESHOLD; + return globalPool; } interface Debug { + public static Debug NO_OP = new Debug() + { + @Override + public void registerNormal(Chunk chunk) {} + @Override + public void recycleNormal(Chunk oldVersion, Chunk newVersion) {} + @Override + public void recyclePartial(Chunk chunk) { } + }; + void registerNormal(Chunk chunk); void recycleNormal(Chunk oldVersion, Chunk newVersion); + void recyclePartial(Chunk chunk); } - public static void debug(Debug setDebug) + public void debug(Debug setDebug) { - debug = setDebug; + assert setDebug != null; + this.debug = setDebug; } interface Recycler { + /** + * Recycle a fully freed chunk + */ void recycle(Chunk chunk); + + /** + * @return true if chunk can be reused before fully freed. + */ + boolean canRecyclePartially(); + + /** + * Recycle a partially freed chunk + */ + void recyclePartially(Chunk chunk); } /** @@ -196,30 +334,32 @@ public class BufferPool * * This class is shared by multiple thread local pools and must be thread-safe. */ - static final class GlobalPool implements Supplier<Chunk>, Recycler + final class GlobalPool implements Supplier<Chunk>, Recycler { /** The size of a bigger chunk, 1 MiB, must be a multiple of NORMAL_CHUNK_SIZE */ static final int MACRO_CHUNK_SIZE = 64 * NORMAL_CHUNK_SIZE; - private static final String READABLE_MACRO_CHUNK_SIZE = prettyPrintMemory(MACRO_CHUNK_SIZE); - - static - { - assert Integer.bitCount(NORMAL_CHUNK_SIZE) == 1; // must be a power of 2 - assert Integer.bitCount(MACRO_CHUNK_SIZE) == 1; // must be a power of 2 - assert MACRO_CHUNK_SIZE % NORMAL_CHUNK_SIZE == 0; // must be a multiple - - logger.info("Global buffer pool limit is {}", prettyPrintMemory(MEMORY_USAGE_THRESHOLD)); - } + private final String READABLE_MACRO_CHUNK_SIZE = prettyPrintMemory(MACRO_CHUNK_SIZE); private final Queue<Chunk> macroChunks = new ConcurrentLinkedQueue<>(); // TODO (future): it would be preferable to use a CLStack to improve cache occupancy; it would also be preferable to use "CoreLocal" storage + // It contains fully free chunks and when it runs out, partially freed chunks will be used. private final Queue<Chunk> chunks = new ConcurrentLinkedQueue<>(); - private final AtomicLong memoryUsage = new AtomicLong(); + // Partially freed chunk which is recirculated whenever chunk has free spaces to + // improve buffer utilization when chunk cache is holding a piece of buffer for a long period. + // Note: fragmentation still exists, as holes are with different sizes. + private final Queue<Chunk> partiallyFreedChunks = new ConcurrentLinkedQueue<>(); /** Used in logging statements to lazily build a human-readable current memory usage. */ - private final Object readableMemoryUsage = + private final Object readableMemoryUsage = new Object() { @Override public String toString() { return prettyPrintMemory(sizeInBytes()); } }; + public GlobalPool() + { + assert Integer.bitCount(NORMAL_CHUNK_SIZE) == 1; // must be a power of 2 + assert Integer.bitCount(MACRO_CHUNK_SIZE) == 1; // must be a power of 2 + assert MACRO_CHUNK_SIZE % NORMAL_CHUNK_SIZE == 0; // must be a multiple + } + /** Return a chunk, the caller will take owership of the parent chunk. */ public Chunk get() { @@ -232,7 +372,10 @@ public class BufferPool return chunk; // another thread may have just allocated last macro chunk, so make one final attempt before returning null - return chunks.poll(); + chunk = chunks.poll(); + + // try to use partially freed chunk if there is no more fully freed chunk. + return chunk == null ? partiallyFreedChunks.poll() : chunk; } /** @@ -243,17 +386,17 @@ public class BufferPool { while (true) { - long cur = memoryUsage.get(); - if (cur + MACRO_CHUNK_SIZE > MEMORY_USAGE_THRESHOLD) + long cur = memoryAllocated.get(); + if (cur + MACRO_CHUNK_SIZE > memoryUsageThreshold) { - if (MEMORY_USAGE_THRESHOLD > 0) + if (memoryUsageThreshold > 0) { noSpamLogger.info("Maximum memory usage reached ({}), cannot allocate chunk of {}", - READABLE_MEMORY_USAGE_THRESHOLD, READABLE_MACRO_CHUNK_SIZE); + readableMemoryUsageThreshold, READABLE_MACRO_CHUNK_SIZE); } return null; } - if (memoryUsage.compareAndSet(cur, cur + MACRO_CHUNK_SIZE)) + if (memoryAllocated.compareAndSet(cur, cur + MACRO_CHUNK_SIZE)) break; } @@ -265,10 +408,10 @@ public class BufferPool } catch (OutOfMemoryError oom) { - noSpamLogger.error("Buffer pool failed to allocate chunk of {}, current size {} ({}). " + + noSpamLogger.error("{} buffer pool failed to allocate chunk of {}, current size {} ({}). " + "Attempting to continue; buffers will be allocated in on-heap memory which can degrade performance. " + "Make sure direct memory size (-XX:MaxDirectMemorySize) is large enough to accommodate off-heap memtables and caches.", - READABLE_MACRO_CHUNK_SIZE, readableMemoryUsage, oom.getClass().getName()); + name, READABLE_MACRO_CHUNK_SIZE, readableMemoryUsage, oom.getClass().getName()); return null; } @@ -276,29 +419,35 @@ public class BufferPool macroChunks.add(chunk); final Chunk callerChunk = new Chunk(this, chunk.get(NORMAL_CHUNK_SIZE)); - if (debug != null) - debug.registerNormal(callerChunk); + debug.registerNormal(callerChunk); for (int i = NORMAL_CHUNK_SIZE; i < MACRO_CHUNK_SIZE; i += NORMAL_CHUNK_SIZE) { Chunk add = new Chunk(this, chunk.get(NORMAL_CHUNK_SIZE)); chunks.add(add); - if (debug != null) - debug.registerNormal(add); + debug.registerNormal(add); } return callerChunk; } + @Override public void recycle(Chunk chunk) { Chunk recycleAs = new Chunk(chunk); - if (debug != null) - debug.recycleNormal(chunk, recycleAs); + debug.recycleNormal(chunk, recycleAs); chunks.add(recycleAs); } - public long sizeInBytes() + @Override + public void recyclePartially(Chunk chunk) { - return memoryUsage.get(); + debug.recyclePartial(chunk); + partiallyFreedChunks.add(chunk); + } + + @Override + public boolean canRecyclePartially() + { + return recyclePartially; } /** This is not thread safe and should only be used for unit testing. */ @@ -308,10 +457,23 @@ public class BufferPool while (!chunks.isEmpty()) chunks.poll().unsafeFree(); + while (!partiallyFreedChunks.isEmpty()) + partiallyFreedChunks.poll().unsafeFree(); + while (!macroChunks.isEmpty()) macroChunks.poll().unsafeFree(); + } + + @VisibleForTesting + boolean isPartiallyFreed(Chunk chunk) + { + return partiallyFreedChunks.contains(chunk); + } - memoryUsage.set(0); + @VisibleForTesting + boolean isFullyFreed(Chunk chunk) + { + return chunks.contains(chunk); } } @@ -545,7 +707,7 @@ public class BufferPool * A thread local class that grabs chunks from the global pool for this thread allocations. * Only one thread can do the allocations but multiple threads can release the allocations. */ - public static final class LocalPool implements Recycler + public final class LocalPool implements Recycler { private final Queue<ByteBuffer> reuseObjects; private final Supplier<Chunk> parent; @@ -575,9 +737,7 @@ public class BufferPool { this.parent = () -> { ByteBuffer buffer = parent.tryGetInternal(TINY_CHUNK_SIZE, false); - if (buffer == null) - return null; - return new Chunk(parent, buffer); + return buffer == null ? null : new Chunk(parent, buffer); }; this.tinyLimit = 0; // we only currently permit one layer of nesting (which brings us down to 32 byte allocations, so is plenty) this.reuseObjects = parent.reuseObjects; // we share the same ByteBuffer object reuse pool, as we both have the same exclusive access to it @@ -594,13 +754,21 @@ public class BufferPool public void put(ByteBuffer buffer) { Chunk chunk = Chunk.getParentChunk(buffer); + int size = buffer.capacity(); + if (chunk == null) + { FileUtils.clean(buffer); + updateOverflowMemoryUsage(-size); + } else + { put(buffer, chunk); + memoryInUse.add(-size); + } } - public void put(ByteBuffer buffer, Chunk chunk) + private void put(ByteBuffer buffer, Chunk chunk) { LocalPool owner = chunk.owner; if (owner != null && owner == tinyPool) @@ -609,23 +777,39 @@ public class BufferPool return; } - // ask the free method to take exclusive ownership of the act of recycling - // if we are either: already not owned by anyone, or owned by ourselves - long free = chunk.free(buffer, owner == null || (owner == this && recycleWhenFree)); + // ask the free method to take exclusive ownership of the act of recycling if chunk is owned by ourselves + long free = chunk.free(buffer, owner == this && recycleWhenFree); + // free: + // * 0L: current pool must be the owner. we can fully recyle the chunk. + // * -1L: + // * for normal chunk: + // a) if it has owner, do nothing. + // b) if it not owner, try to recyle it either fully or partially if not already recyled. + // * for tiny chunk: + // a) if it has owner, do nothing. + // b) if it has not owner, recycle the tiny chunk back to parent chunk + // * others: + // * for normal chunk: partial recycle the chunk if it can be partially recycled but not yet recycled. + // * for tiny chunk: do nothing. if (free == 0L) { + assert owner == this; // 0L => we own recycling responsibility, so must recycle; - // if we are the owner, we must remove the Chunk from our local queue - if (owner == this) - remove(chunk); + // We must remove the Chunk from our local queue + remove(chunk); chunk.recycle(); } - else if (((free == -1L) && owner != this) && chunk.owner == null) + else if (free == -1L && owner != this && chunk.owner == null && !chunk.recycler.canRecyclePartially()) { // although we try to take recycle ownership cheaply, it is not always possible to do so if the owner is racing to unset. // we must also check after completely freeing if the owner has since been unset, and try to recycle chunk.tryRecycle(); } + else if (chunk.owner == null && chunk.recycler.canRecyclePartially() && chunk.setInUse(Chunk.Status.EVICTED)) + { + // re-cirlate partially freed normal chunk to global list + chunk.partiallyRecycle(); + } if (owner == this) { @@ -638,10 +822,16 @@ public class BufferPool public void putUnusedPortion(ByteBuffer buffer) { Chunk chunk = Chunk.getParentChunk(buffer); + int size = buffer.capacity() - buffer.limit(); + if (chunk == null) + { + updateOverflowMemoryUsage(-size); return; + } chunk.freeUnusedPortion(buffer); + memoryInUse.add(-size); } public ByteBuffer get(int size) @@ -658,7 +848,11 @@ public class BufferPool { ByteBuffer ret = tryGet(size, sizeIsLowerBound); if (ret != null) + { + metrics.hits.mark(); + memoryInUse.add(ret.capacity()); return ret; + } if (size > NORMAL_CHUNK_SIZE) { @@ -677,16 +871,6 @@ public class BufferPool return allocate(size, BufferType.OFF_HEAP); } - public ByteBuffer tryGet(int size) - { - return tryGet(size, false); - } - - public ByteBuffer tryGetAtLeast(int size) - { - return tryGet(size, true); - } - private ByteBuffer tryGet(int size, boolean sizeIsLowerBound) { LocalPool pool = this; @@ -715,7 +899,9 @@ public class BufferPool ByteBuffer reuse = this.reuseObjects.poll(); ByteBuffer buffer = chunks.get(size, sizeIsLowerBound, reuse); if (buffer != null) + { return buffer; + } // else ask the global pool Chunk chunk = addChunkFromParent(); @@ -731,7 +917,8 @@ public class BufferPool return null; } - // recycle + // recycle entire tiny chunk from tiny pool back to local pool + @Override public void recycle(Chunk chunk) { ByteBuffer buffer = chunk.slab; @@ -739,6 +926,20 @@ public class BufferPool put(buffer, parentChunk); } + @Override + public void recyclePartially(Chunk chunk) + { + throw new UnsupportedOperationException("Tiny chunk doesn't support partial recycle."); + } + + @Override + public boolean canRecyclePartially() + { + // tiny pool doesn't support partial recycle, as we want to have tiny chunk fully freed and put back to + // parent normal chunk. + return false; + } + private void remove(Chunk chunk) { chunks.remove(chunk); @@ -763,8 +964,11 @@ public class BufferPool if (evict != null) { if (tinyPool != null) + // releasing tiny chunks may result in releasing current evicted chunk tinyPool.chunks.removeIf((child, parent) -> Chunk.getParentChunk(child.slab) == parent, evict); evict.release(); + // Mark it as evicted and will be eligible for partial recyle if recycler allows + evict.setEvicted(Chunk.Status.IN_USE); } } @@ -784,6 +988,12 @@ public class BufferPool chunks.unsafeRecycle(); } + @VisibleForTesting + public boolean isTinyPool() + { + return !(parent instanceof GlobalPool); + } + public LocalPool recycleWhenFree(boolean recycleWhenFree) { this.recycleWhenFree = recycleWhenFree; @@ -808,12 +1018,7 @@ public class BufferPool } } - private static final Set<LocalPoolRef> localPoolReferences = Collections.newSetFromMap(new ConcurrentHashMap<>()); - - private static final ReferenceQueue<Object> localPoolRefQueue = new ReferenceQueue<>(); - private static final InfiniteLoopExecutor EXEC = new InfiniteLoopExecutor("LocalPool-Cleaner", BufferPool::cleanupOneReference).start(); - - private static void cleanupOneReference() throws InterruptedException + private void cleanupOneReference() throws InterruptedException { Object obj = localPoolRefQueue.remove(100); if (obj instanceof LocalPoolRef) @@ -865,10 +1070,19 @@ public class BufferPool */ final static class Chunk { + enum Status + { + /** The slab is serving or ready to serve requests */ + IN_USE, + /** The slab is not serving requests and ready for partial recycle*/ + EVICTED; + } + private final ByteBuffer slab; - private final long baseAddress; + final long baseAddress; private final int shift; + // it may be 0L when all slots are allocated after "get" or when all slots are freed after "free" private volatile long freeSlots; private static final AtomicLongFieldUpdater<Chunk> freeSlotsUpdater = AtomicLongFieldUpdater.newUpdater(Chunk.class, "freeSlots"); @@ -878,6 +1092,10 @@ public class BufferPool private volatile LocalPool owner; private final Recycler recycler; + private static final AtomicReferenceFieldUpdater<Chunk, Status> statusUpdater = + AtomicReferenceFieldUpdater.newUpdater(Chunk.class, Status.class, "status"); + private volatile Status status = Status.IN_USE; + @VisibleForTesting Object debugAttachment; @@ -939,6 +1157,12 @@ public class BufferPool recycler.recycle(this); } + public void partiallyRecycle() + { + assert owner == null; + recycler.recyclePartially(this); + } + /** * We stash the chunk in the attachment of a buffer * that was returned by get(), this method simply @@ -1182,6 +1406,12 @@ public class BufferPool } @VisibleForTesting + public LocalPool owner() + { + return this.owner; + } + + @VisibleForTesting void unsafeFree() { Chunk parent = getParentChunk(slab); @@ -1200,6 +1430,26 @@ public class BufferPool chunk.recycle(); } } + + Status status() + { + return status; + } + + private boolean setStatus(Status current, Status update) + { + return statusUpdater.compareAndSet(this, current, update); + } + + boolean setInUse(Status prev) + { + return setStatus(prev, Status.IN_USE); + } + + boolean setEvicted(Status prev) + { + return setStatus(prev, Status.EVICTED); + } } @VisibleForTesting @@ -1218,49 +1468,41 @@ public class BufferPool } @VisibleForTesting - public static void shutdownLocalCleaner(long timeout, TimeUnit unit) throws InterruptedException, TimeoutException + public void shutdownLocalCleaner(long timeout, TimeUnit unit) throws InterruptedException, TimeoutException { - shutdownNow(of(EXEC)); - awaitTermination(timeout, unit, of(EXEC)); + shutdownNow(of(localPoolCleaner)); + awaitTermination(timeout, unit, of(localPoolCleaner)); } - public static long unsafeGetBytesInUse() + @VisibleForTesting + public BufferPoolMetrics metrics() { - long totalMemory = globalPool.memoryUsage.get(); - class L { long v; } - final L availableMemory = new L(); - for (Chunk chunk : globalPool.chunks) - { - availableMemory.v += chunk.capacity(); - } - for (LocalPoolRef ref : localPoolReferences) - { - ref.chunks.forEach(chunk -> availableMemory.v += chunk.free()); - } - return totalMemory - availableMemory.v; + return metrics; } /** This is not thread safe and should only be used for unit testing. */ @VisibleForTesting - static void unsafeReset() + public void unsafeReset() { + overflowMemoryUsage.reset(); + memoryInUse.reset(); + memoryAllocated.set(0); localPool.get().unsafeRecycle(); globalPool.unsafeFree(); } @VisibleForTesting - static Chunk unsafeCurrentChunk() + Chunk unsafeCurrentChunk() { return localPool.get().chunks.chunk0; } @VisibleForTesting - static int unsafeNumChunks() + int unsafeNumChunks() { LocalPool pool = localPool.get(); return (pool.chunks.chunk0 != null ? 1 : 0) + (pool.chunks.chunk1 != null ? 1 : 0) + (pool.chunks.chunk2 != null ? 1 : 0); } - } diff --git a/src/java/org/apache/cassandra/utils/memory/BufferPools.java b/src/java/org/apache/cassandra/utils/memory/BufferPools.java new file mode 100644 index 0000000..736e1cd --- /dev/null +++ b/src/java/org/apache/cassandra/utils/memory/BufferPools.java @@ -0,0 +1,79 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.cassandra.utils.memory; + +import java.util.concurrent.TimeUnit; +import java.util.concurrent.TimeoutException; + +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import org.apache.cassandra.config.DatabaseDescriptor; + +import static org.apache.cassandra.utils.FBUtilities.prettyPrintMemory; + +public class BufferPools +{ + private static final Logger logger = LoggerFactory.getLogger(BufferPools.class); + + /** + * Used by chunk cache to store decompressed data and buffers may be held by chunk cache for arbitrary period. + */ + private static final long FILE_MEMORY_USAGE_THRESHOLD = DatabaseDescriptor.getFileCacheSizeInMB() * 1024L * 1024L; + private static final BufferPool CHUNK_CACHE_POOL = new BufferPool("chunk-cache", FILE_MEMORY_USAGE_THRESHOLD, true); + + /** + * Used by client-server or inter-node requests, buffers should be released immediately after use. + */ + private static final long NETWORKING_MEMORY_USAGE_THRESHOLD = DatabaseDescriptor.getNetworkingCacheSizeInMB() * 1024L * 1024L; + private static final BufferPool NETWORKING_POOL = new BufferPool("networking", NETWORKING_MEMORY_USAGE_THRESHOLD, false); + + static + { + logger.info("Global buffer pool limit is {} for {} and {} for {}", + prettyPrintMemory(FILE_MEMORY_USAGE_THRESHOLD), + CHUNK_CACHE_POOL.name, + prettyPrintMemory(NETWORKING_MEMORY_USAGE_THRESHOLD), + NETWORKING_POOL.name); + + CHUNK_CACHE_POOL.metrics().register3xAlias(); + } + /** + * Long-lived buffers used for chunk cache and other disk access + */ + public static BufferPool forChunkCache() + { + return CHUNK_CACHE_POOL; + } + + /** + * Short-lived buffers used for internode messaging or client-server connections. + */ + public static BufferPool forNetworking() + { + return NETWORKING_POOL; + } + + public static void shutdownLocalCleaner(long timeout, TimeUnit unit) throws TimeoutException, InterruptedException + { + CHUNK_CACHE_POOL.shutdownLocalCleaner(timeout, unit); + NETWORKING_POOL.shutdownLocalCleaner(timeout, unit); + } + +} diff --git a/test/burn/org/apache/cassandra/net/ConnectionBurnTest.java b/test/burn/org/apache/cassandra/net/ConnectionBurnTest.java index a421f3e..dea552c 100644 --- a/test/burn/org/apache/cassandra/net/ConnectionBurnTest.java +++ b/test/burn/org/apache/cassandra/net/ConnectionBurnTest.java @@ -59,7 +59,7 @@ import org.apache.cassandra.locator.InetAddressAndPort; import org.apache.cassandra.net.MessageGenerator.UniformPayloadGenerator; import org.apache.cassandra.utils.ExecutorUtils; import org.apache.cassandra.utils.MonotonicClock; -import org.apache.cassandra.utils.memory.BufferPool; +import org.apache.cassandra.utils.memory.BufferPools; import static java.lang.Math.min; import static org.apache.cassandra.net.MessagingService.current_version; @@ -435,7 +435,7 @@ public class ConnectionBurnTest checkStoppedTo .accept(endpoint, getConnections(endpoint, true )); checkStoppedFrom.accept(endpoint, getConnections(endpoint, false)); } - long inUse = BufferPool.unsafeGetBytesInUse(); + long inUse = BufferPools.forNetworking().usedSizeInBytes(); if (inUse > 0) { // try diff --git a/test/burn/org/apache/cassandra/utils/memory/LongBufferPoolTest.java b/test/burn/org/apache/cassandra/utils/memory/LongBufferPoolTest.java index 0af4199..fc603c9 100644 --- a/test/burn/org/apache/cassandra/utils/memory/LongBufferPoolTest.java +++ b/test/burn/org/apache/cassandra/utils/memory/LongBufferPoolTest.java @@ -27,6 +27,7 @@ import java.util.concurrent.*; import java.util.concurrent.atomic.AtomicBoolean; import com.google.common.util.concurrent.Uninterruptibles; +import org.junit.AfterClass; import org.junit.BeforeClass; import org.junit.Test; @@ -84,7 +85,7 @@ public class LongBufferPoolTest } long recycleRound = 1; final List<BufferPool.Chunk> normalChunks = new ArrayList<>(); - final List<BufferPool.Chunk> tinyChunks = new ArrayList<>(); + public synchronized void registerNormal(BufferPool.Chunk chunk) { chunk.debugAttachment = new DebugChunk(); @@ -95,13 +96,14 @@ public class LongBufferPoolTest newVersion.debugAttachment = oldVersion.debugAttachment; DebugChunk.get(oldVersion).lastRecycled = recycleRound; } + public void recyclePartial(BufferPool.Chunk chunk) + { + DebugChunk.get(chunk).lastRecycled = recycleRound; + } public synchronized void check() { -// for (BufferPool.Chunk chunk : tinyChunks) -// assert DebugChunk.get(chunk).lastRecycled == recycleRound; for (BufferPool.Chunk chunk : normalChunks) assert DebugChunk.get(chunk).lastRecycled == recycleRound; - tinyChunks.clear(); // they don't survive a recycleRound recycleRound++; } } @@ -112,10 +114,29 @@ public class LongBufferPoolTest DatabaseDescriptor.daemonInitialization(); } + @AfterClass + public static void teardown() + { + BufferPools.forChunkCache().unsafeReset(); + BufferPools.forNetworking().unsafeReset(); + } + @Test - public void testAllocate() throws InterruptedException, ExecutionException + public void testPoolAllocateWithRecyclePartially() throws InterruptedException, ExecutionException + { + testPoolAllocate(true); + } + + @Test + public void testPoolAllocateWithoutRecyclePartially() throws InterruptedException, ExecutionException + { + testPoolAllocate(false); + } + + private void testPoolAllocate(boolean recyclePartially) throws InterruptedException, ExecutionException { - testAllocate(Runtime.getRuntime().availableProcessors() * 2, TimeUnit.MINUTES.toNanos(2L), 16 << 20); + BufferPool pool = new BufferPool("test_pool", 16 << 20, recyclePartially); + testAllocate(pool, Runtime.getRuntime().availableProcessors() * 2, TimeUnit.MINUTES.toNanos(2L)); } private static final class BufferCheck @@ -160,11 +181,11 @@ public class LongBufferPoolTest final List<Future<Boolean>> threadResultFuture; final int targetSizeQuanta; - TestEnvironment(int threadCount, long duration, int poolSize) + TestEnvironment(int threadCount, long duration, long poolSize) { this.threadCount = threadCount; this.duration = duration; - this.poolSize = poolSize; + this.poolSize = Math.toIntExact(poolSize); until = System.nanoTime() + duration; latch = new CountDownLatch(threadCount); sharedRecycle = new SPSCQueue[threadCount]; @@ -188,7 +209,7 @@ public class LongBufferPoolTest // // This should divide double the poolSize across the working threads, // plus NORMAL_CHUNK_SIZE for thread0 and 1/10 poolSize for the burn producer/consumer pair. - targetSizeQuanta = 2 * poolSize / sum1toN(threadCount - 1); + targetSizeQuanta = 2 * this.poolSize / sum1toN(threadCount - 1); } void addCheckedFuture(Future<Boolean> future) @@ -238,24 +259,19 @@ public class LongBufferPoolTest } } - public void testAllocate(int threadCount, long duration, int poolSize) throws InterruptedException, ExecutionException + public void testAllocate(BufferPool bufferPool, int threadCount, long duration) throws InterruptedException, ExecutionException { - System.out.println(String.format("%s - testing %d threads for %dm", - DATE_FORMAT.format(new Date()), - threadCount, - TimeUnit.NANOSECONDS.toMinutes(duration))); - long prevPoolSize = BufferPool.getMemoryUsageThreshold(); - logger.info("Overriding configured BufferPool.MEMORY_USAGE_THRESHOLD={} and enabling BufferPool.DEBUG", poolSize); - BufferPool.setMemoryUsageThreshold(poolSize); + logger.info("{} - testing {} threads for {}m", DATE_FORMAT.format(new Date()), threadCount, TimeUnit.NANOSECONDS.toMinutes(duration)); + logger.info("Testing BufferPool with memoryUsageThreshold={} and enabling BufferPool.DEBUG", bufferPool.memoryUsageThreshold()); Debug debug = new Debug(); - BufferPool.debug(debug); + bufferPool.debug(debug); - TestEnvironment testEnv = new TestEnvironment(threadCount, duration, poolSize); + TestEnvironment testEnv = new TestEnvironment(threadCount, duration, bufferPool.memoryUsageThreshold()); - startBurnerThreads(testEnv); + startBurnerThreads(bufferPool, testEnv); for (int threadIdx = 0; threadIdx < threadCount; threadIdx++) - testEnv.addCheckedFuture(startWorkerThread(testEnv, threadIdx)); + testEnv.addCheckedFuture(startWorkerThread(bufferPool, testEnv, threadIdx)); while (!testEnv.latch.await(10L, TimeUnit.SECONDS)) { @@ -281,25 +297,23 @@ public class LongBufferPoolTest while ( null != (check = queue.poll()) ) { check.validate(); - BufferPool.put(check.buffer); + bufferPool.put(check.buffer); } } assertEquals(0, testEnv.executorService.shutdownNow().size()); - logger.info("Reverting BufferPool.MEMORY_USAGE_THRESHOLD={}", prevPoolSize); - BufferPool.setMemoryUsageThreshold(prevPoolSize); - BufferPool.debug(null); + logger.info("Reverting BufferPool DEBUG config"); + bufferPool.debug(BufferPool.Debug.NO_OP); testEnv.assertCheckedThreadsSucceeded(); - System.out.println(String.format("%s - finished.", - DATE_FORMAT.format(new Date()))); + logger.info("{} - finished.", DATE_FORMAT.format(new Date())); } - private Future<Boolean> startWorkerThread(TestEnvironment testEnv, final int threadIdx) + private Future<Boolean> startWorkerThread(BufferPool bufferPool, TestEnvironment testEnv, final int threadIdx) { - return testEnv.executorService.submit(new TestUntil(testEnv.until) + return testEnv.executorService.submit(new TestUntil(bufferPool, testEnv.until) { final int targetSize = threadIdx == 0 ? BufferPool.NORMAL_CHUNK_SIZE : testEnv.targetSizeQuanta * threadIdx; final SPSCQueue<BufferCheck> shareFrom = testEnv.sharedRecycle[threadIdx]; @@ -362,7 +376,7 @@ public class LongBufferPoolTest else { check.validate(); - BufferPool.put(check.buffer); + bufferPool.put(check.buffer); totalSize -= size; } } @@ -407,7 +421,7 @@ public class LongBufferPoolTest while (checks.size() > 0) { BufferCheck check = checks.get(0); - BufferPool.put(check.buffer); + bufferPool.put(check.buffer); checks.remove(check.listnode); } testEnv.latch.countDown(); @@ -419,13 +433,13 @@ public class LongBufferPoolTest if (check == null) return false; check.validate(); - BufferPool.put(check.buffer); + bufferPool.put(check.buffer); return true; } BufferCheck allocate(int size) { - ByteBuffer buffer = BufferPool.get(size, BufferType.OFF_HEAP); + ByteBuffer buffer = bufferPool.get(size, BufferType.OFF_HEAP); assertNotNull(buffer); BufferCheck check = new BufferCheck(buffer, rand.nextLong()); assertEquals(size, buffer.capacity()); @@ -470,12 +484,12 @@ public class LongBufferPoolTest }); } - private void startBurnerThreads(TestEnvironment testEnv) + private void startBurnerThreads(BufferPool bufferPool, TestEnvironment testEnv) { // setup some high churn allocate/deallocate, without any checking final SPSCQueue<ByteBuffer> burn = new SPSCQueue<>(); final CountDownLatch doneAdd = new CountDownLatch(1); - testEnv.addCheckedFuture(testEnv.executorService.submit(new TestUntil(testEnv.until) + testEnv.addCheckedFuture(testEnv.executorService.submit(new TestUntil(bufferPool, testEnv.until) { int count = 0; final ThreadLocalRandom rand = ThreadLocalRandom.current(); @@ -494,8 +508,9 @@ public class LongBufferPoolTest return; } - ByteBuffer buffer = rand.nextInt(4) < 1 ? BufferPool.tryGet(BufferPool.NORMAL_CHUNK_SIZE) - : BufferPool.tryGet(BufferPool.TINY_ALLOCATION_LIMIT); + ByteBuffer buffer = rand.nextInt(4) < 1 + ? bufferPool.tryGet(BufferPool.NORMAL_CHUNK_SIZE) + : bufferPool.tryGet(BufferPool.TINY_ALLOCATION_LIMIT); if (buffer == null) { Thread.yield(); @@ -505,7 +520,7 @@ public class LongBufferPoolTest // 50/50 chance of returning the buffer from the producer thread, or // pass it on to the consumer. if (rand.nextBoolean()) - BufferPool.put(buffer); + bufferPool.put(buffer); else burn.add(buffer); @@ -516,7 +531,7 @@ public class LongBufferPoolTest doneAdd.countDown(); } })); - testEnv.threadResultFuture.add(testEnv.executorService.submit(new TestUntil(testEnv.until) + testEnv.threadResultFuture.add(testEnv.executorService.submit(new TestUntil(bufferPool, testEnv.until) { void testOne() throws Exception { @@ -526,7 +541,7 @@ public class LongBufferPoolTest Thread.yield(); return; } - BufferPool.put(buffer); + bufferPool.put(buffer); } void cleanup() { @@ -537,9 +552,11 @@ public class LongBufferPoolTest static abstract class TestUntil implements Callable<Boolean> { + final BufferPool bufferPool; final long until; - protected TestUntil(long until) + protected TestUntil(BufferPool bufferPool, long until) { + this.bufferPool = bufferPool; this.until = until; } @@ -562,7 +579,7 @@ public class LongBufferPoolTest { logger.error("Got exception {}, current chunk {}", ex.getMessage(), - BufferPool.unsafeCurrentChunk()); + bufferPool.unsafeCurrentChunk()); ex.printStackTrace(); return false; } @@ -570,7 +587,7 @@ public class LongBufferPoolTest { logger.error("Got throwable {}, current chunk {}", tr.getMessage(), - BufferPool.unsafeCurrentChunk()); + bufferPool.unsafeCurrentChunk()); tr.printStackTrace(); return false; } @@ -587,14 +604,14 @@ public class LongBufferPoolTest try { LongBufferPoolTest.setup(); - new LongBufferPoolTest().testAllocate(Runtime.getRuntime().availableProcessors(), - TimeUnit.HOURS.toNanos(2L), 16 << 20); + new LongBufferPoolTest().testAllocate(new BufferPool("test_pool", 16 << 20, true), + Runtime.getRuntime().availableProcessors(), + TimeUnit.HOURS.toNanos(2L)); System.exit(0); } catch (Throwable tr) { - System.out.println(String.format("Test failed - %s", tr.getMessage())); - tr.printStackTrace(); + logger.error("Test failed - {}", tr.getMessage(), tr); System.exit(1); // Force exit so that non-daemon threads like REQUEST-SCHEDULER do not hang the process on failure } } diff --git a/test/data/jmxdump/cassandra-4.0-jmx.yaml b/test/data/jmxdump/cassandra-4.0-jmx.yaml index a013ebc..05f4773 100644 --- a/test/data/jmxdump/cassandra-4.0-jmx.yaml +++ b/test/data/jmxdump/cassandra-4.0-jmx.yaml @@ -7155,7 +7155,7 @@ org.apache.cassandra.internal:type=ViewBuildExecutor: - {access: read/write, name: MaximumPoolSize, type: int} - {access: read/write, name: MaximumThreads, type: int} operations: [] -org.apache.cassandra.metrics:type=BufferPool,name=Misses: +org.apache.cassandra.metrics:type=BufferPool,scope=chunk-cache,name=Misses: attributes: - {access: read-only, name: Count, type: long} - {access: read-only, name: FifteenMinuteRate, type: double} @@ -7167,7 +7167,78 @@ org.apache.cassandra.metrics:type=BufferPool,name=Misses: - name: objectName parameters: [] returnType: javax.management.ObjectName -org.apache.cassandra.metrics:type=BufferPool,name=Size: +org.apache.cassandra.metrics:type=BufferPool,scope=chunk-cache,name=Hits: + attributes: + - {access: read-only, name: Count, type: long} + - {access: read-only, name: FifteenMinuteRate, type: double} + - {access: read-only, name: FiveMinuteRate, type: double} + - {access: read-only, name: MeanRate, type: double} + - {access: read-only, name: OneMinuteRate, type: double} + - {access: read-only, name: RateUnit, type: java.lang.String} + operations: + - name: objectName + parameters: [] + returnType: javax.management.ObjectName +org.apache.cassandra.metrics:type=BufferPool,scope=chunk-cache,name=Size: + attributes: + - {access: read-only, name: Value, type: java.lang.Object} + operations: + - name: objectName + parameters: [] + returnType: javax.management.ObjectName +org.apache.cassandra.metrics:type=BufferPool,scope=chunk-cache,name=UsedSize: + attributes: + - {access: read-only, name: Value, type: java.lang.Object} + operations: + - name: objectName + parameters: [] + returnType: javax.management.ObjectName +org.apache.cassandra.metrics:type=BufferPool,scope=chunk-cache,name=OverflowSize: + attributes: + - {access: read-only, name: Value, type: java.lang.Object} + operations: + - name: objectName + parameters: [] + returnType: javax.management.ObjectName +org.apache.cassandra.metrics:type=BufferPool,scope=networking,name=Misses: + attributes: + - {access: read-only, name: Count, type: long} + - {access: read-only, name: FifteenMinuteRate, type: double} + - {access: read-only, name: FiveMinuteRate, type: double} + - {access: read-only, name: MeanRate, type: double} + - {access: read-only, name: OneMinuteRate, type: double} + - {access: read-only, name: RateUnit, type: java.lang.String} + operations: + - name: objectName + parameters: [] + returnType: javax.management.ObjectName +org.apache.cassandra.metrics:type=BufferPool,scope=networking,name=Hits: + attributes: + - {access: read-only, name: Count, type: long} + - {access: read-only, name: FifteenMinuteRate, type: double} + - {access: read-only, name: FiveMinuteRate, type: double} + - {access: read-only, name: MeanRate, type: double} + - {access: read-only, name: OneMinuteRate, type: double} + - {access: read-only, name: RateUnit, type: java.lang.String} + operations: + - name: objectName + parameters: [] + returnType: javax.management.ObjectName +org.apache.cassandra.metrics:type=BufferPool,scope=networking,name=Size: + attributes: + - {access: read-only, name: Value, type: java.lang.Object} + operations: + - name: objectName + parameters: [] + returnType: javax.management.ObjectName +org.apache.cassandra.metrics:type=BufferPool,scope=networking,name=UsedSize: + attributes: + - {access: read-only, name: Value, type: java.lang.Object} + operations: + - name: objectName + parameters: [] + returnType: javax.management.ObjectName +org.apache.cassandra.metrics:type=BufferPool,scope=networking,name=OverflowSize: attributes: - {access: read-only, name: Value, type: java.lang.Object} operations: diff --git a/test/distributed/org/apache/cassandra/distributed/impl/Instance.java b/test/distributed/org/apache/cassandra/distributed/impl/Instance.java index 51c58d4..9a3fd08 100644 --- a/test/distributed/org/apache/cassandra/distributed/impl/Instance.java +++ b/test/distributed/org/apache/cassandra/distributed/impl/Instance.java @@ -120,8 +120,8 @@ import org.apache.cassandra.utils.FBUtilities; import org.apache.cassandra.utils.JVMStabilityInspector; import org.apache.cassandra.utils.Throwables; import org.apache.cassandra.utils.concurrent.Ref; -import org.apache.cassandra.utils.memory.BufferPool; import org.apache.cassandra.utils.progress.jmx.JMXBroadcastExecutor; +import org.apache.cassandra.utils.memory.BufferPools; import static java.util.concurrent.TimeUnit.MINUTES; import static org.apache.cassandra.distributed.api.Feature.GOSSIP; @@ -617,7 +617,7 @@ public class Instance extends IsolatedExecutor implements IInvokableInstance () -> IndexSummaryManager.instance.shutdownAndWait(1L, MINUTES), () -> ColumnFamilyStore.shutdownExecutorsAndWait(1L, MINUTES), () -> PendingRangeCalculatorService.instance.shutdownAndWait(1L, MINUTES), - () -> BufferPool.shutdownLocalCleaner(1L, MINUTES), + () -> BufferPools.shutdownLocalCleaner(1L, MINUTES), () -> Ref.shutdownReferenceReaper(1L, MINUTES), () -> Memtable.MEMORY_POOL.shutdownAndWait(1L, MINUTES), () -> DiagnosticSnapshotService.instance.shutdownAndWait(1L, MINUTES), diff --git a/test/unit/org/apache/cassandra/metrics/BufferPoolMetricsTest.java b/test/unit/org/apache/cassandra/metrics/BufferPoolMetricsTest.java index 313aa3f..5e0286f 100644 --- a/test/unit/org/apache/cassandra/metrics/BufferPoolMetricsTest.java +++ b/test/unit/org/apache/cassandra/metrics/BufferPoolMetricsTest.java @@ -20,7 +20,6 @@ package org.apache.cassandra.metrics; import java.util.Random; -import org.junit.After; import org.junit.Before; import org.junit.BeforeClass; import org.junit.Test; @@ -31,7 +30,6 @@ import org.apache.cassandra.config.DatabaseDescriptor; import org.apache.cassandra.exceptions.ConfigurationException; import org.apache.cassandra.io.compress.BufferType; import org.apache.cassandra.utils.memory.BufferPool; -import org.apache.cassandra.utils.memory.BufferPoolTest; import static org.assertj.core.api.Assertions.assertThat; import static org.assertj.core.api.Assertions.assertThatExceptionOfType; @@ -40,7 +38,8 @@ import static org.junit.Assert.assertEquals; @RunWith(OrderedJUnit4ClassRunner.class) public class BufferPoolMetricsTest { - private static final BufferPoolMetrics metrics = new BufferPoolMetrics(); + private BufferPool bufferPool; + private BufferPoolMetrics metrics; @BeforeClass() public static void setup() throws ConfigurationException @@ -51,21 +50,15 @@ public class BufferPoolMetricsTest @Before public void setUp() { - BufferPool.setMemoryUsageThreshold(16 * 1024L * 1024L); - } - - @After - public void cleanUp() - { - BufferPoolTest.resetBufferPool(); - metrics.misses.mark(metrics.misses.getCount() * -1); + this.bufferPool = new BufferPool("test_" + System.currentTimeMillis(), 16 * 1024L * 1024L, true); + this.metrics = bufferPool.metrics(); } @Test public void testMetricsSize() { // basically want to test changes in the metric being reported as the buffer pool grows - starts at zero - assertThat(metrics.size.getValue()).isEqualTo(BufferPool.sizeInBytes()) + assertThat(metrics.size.getValue()).isEqualTo(bufferPool.sizeInBytes()) .isEqualTo(0); // the idea is to test changes in the sizeOfBufferPool metric which starts at zero. it will bump up @@ -77,7 +70,7 @@ public class BufferPoolMetricsTest final long seed = System.currentTimeMillis(); final Random rand = new Random(seed); final String assertionMessage = String.format("Failed with seed of %s", seed); - final long maxIterations = BufferPool.getMemoryUsageThreshold(); + final long maxIterations = bufferPool.memoryUsageThreshold(); final int maxBufferSize = BufferPool.NORMAL_CHUNK_SIZE - 1; int nextSizeToRequest; long totalBytesRequestedFromPool = 0; @@ -87,15 +80,15 @@ public class BufferPoolMetricsTest { nextSizeToRequest = rand.nextInt(maxBufferSize) + 1; totalBytesRequestedFromPool = totalBytesRequestedFromPool + nextSizeToRequest; - BufferPool.get(nextSizeToRequest, BufferType.OFF_HEAP); + bufferPool.get(nextSizeToRequest, BufferType.OFF_HEAP); assertThat(metrics.size.getValue()).as(assertionMessage) - .isEqualTo(BufferPool.sizeInBytes()) + .isEqualTo(bufferPool.sizeInBytes()) .isGreaterThanOrEqualTo(totalBytesRequestedFromPool); if (initialSizeInBytesAfterZero == 0) { - initialSizeInBytesAfterZero = BufferPool.sizeInBytes(); + initialSizeInBytesAfterZero = bufferPool.sizeInBytes(); } else { @@ -115,6 +108,74 @@ public class BufferPoolMetricsTest } @Test + public void testMetricsOverflowSize() + { + assertEquals(0, metrics.overflowSize.getValue().longValue()); + + final int tinyBufferSizeThatHits = BufferPool.NORMAL_CHUNK_SIZE - 1; + final int bigBufferSizeThatMisses = BufferPool.NORMAL_CHUNK_SIZE + 1; + + int iterations = 16; + for (int ix = 0; ix < iterations; ix++) + { + bufferPool.get(tinyBufferSizeThatHits, BufferType.OFF_HEAP); + assertEquals(0, metrics.overflowSize.getValue().longValue()); + } + + for (int ix = 0; ix < iterations; ix++) + { + bufferPool.get(bigBufferSizeThatMisses, BufferType.OFF_HEAP); + assertEquals(bigBufferSizeThatMisses * (ix + 1), metrics.overflowSize.getValue().longValue()); + } + } + + @Test + public void testMetricsUsedSize() + { + assertEquals(0, metrics.usedSize.getValue().longValue()); + + final int tinyBufferSizeThatHits = BufferPool.NORMAL_CHUNK_SIZE - 1; + final int bigBufferSizeThatMisses = BufferPool.NORMAL_CHUNK_SIZE + 1; + + long usedSize = 0; + int iterations = 16; + for (int ix = 0; ix < iterations; ix++) + { + bufferPool.get(tinyBufferSizeThatHits, BufferType.OFF_HEAP); + assertEquals(usedSize += tinyBufferSizeThatHits, metrics.usedSize.getValue().longValue()); + } + + for (int ix = 0; ix < iterations; ix++) + { + bufferPool.get(bigBufferSizeThatMisses, BufferType.OFF_HEAP); + assertEquals(usedSize += bigBufferSizeThatMisses, metrics.usedSize.getValue().longValue()); + } + } + + @Test + public void testMetricsHits() + { + assertEquals(0, metrics.hits.getCount()); + + final int tinyBufferSizeThatHits = BufferPool.NORMAL_CHUNK_SIZE - 1; + final int bigBufferSizeThatMisses = BufferPool.NORMAL_CHUNK_SIZE + 1; + + int iterations = 16; + for (int ix = 0; ix < iterations; ix++) + { + bufferPool.get(tinyBufferSizeThatHits, BufferType.OFF_HEAP); + assertEquals(ix + 1, metrics.hits.getCount()); + } + + long currentHits = metrics.hits.getCount(); + for (int ix = 0; ix < iterations; ix++) + { + bufferPool.get(bigBufferSizeThatMisses + ix, BufferType.OFF_HEAP); + assertEquals(currentHits, metrics.hits.getCount()); + } + } + + @Test public void testMetricsMisses() { assertEquals(0, metrics.misses.getCount()); @@ -125,13 +186,13 @@ public class BufferPoolMetricsTest int iterations = 16; for (int ix = 0; ix < iterations; ix++) { - BufferPool.get(tinyBufferSizeThatHits, BufferType.OFF_HEAP); + bufferPool.get(tinyBufferSizeThatHits, BufferType.OFF_HEAP); assertEquals(0, metrics.misses.getCount()); } for (int ix = 0; ix < iterations; ix++) { - BufferPool.get(bigBufferSizeThatMisses + ix, BufferType.OFF_HEAP); + bufferPool.get(bigBufferSizeThatMisses + ix, BufferType.OFF_HEAP); assertEquals(ix + 1, metrics.misses.getCount()); } } @@ -140,23 +201,23 @@ public class BufferPoolMetricsTest public void testZeroSizeRequestsDontChangeMetrics() { assertEquals(0, metrics.misses.getCount()); - assertThat(metrics.size.getValue()).isEqualTo(BufferPool.sizeInBytes()) + assertThat(metrics.size.getValue()).isEqualTo(bufferPool.sizeInBytes()) .isEqualTo(0); - BufferPool.get(0, BufferType.OFF_HEAP); + bufferPool.get(0, BufferType.OFF_HEAP); assertEquals(0, metrics.misses.getCount()); - assertThat(metrics.size.getValue()).isEqualTo(BufferPool.sizeInBytes()) + assertThat(metrics.size.getValue()).isEqualTo(bufferPool.sizeInBytes()) .isEqualTo(0); - BufferPool.get(65536, BufferType.OFF_HEAP); - BufferPool.get(0, BufferType.OFF_HEAP); - BufferPool.get(0, BufferType.OFF_HEAP); - BufferPool.get(0, BufferType.OFF_HEAP); - BufferPool.get(0, BufferType.OFF_HEAP); + bufferPool.get(65536, BufferType.OFF_HEAP); + bufferPool.get(0, BufferType.OFF_HEAP); + bufferPool.get(0, BufferType.OFF_HEAP); + bufferPool.get(0, BufferType.OFF_HEAP); + bufferPool.get(0, BufferType.OFF_HEAP); assertEquals(0, metrics.misses.getCount()); - assertThat(metrics.size.getValue()).isEqualTo(BufferPool.sizeInBytes()) + assertThat(metrics.size.getValue()).isEqualTo(bufferPool.sizeInBytes()) .isGreaterThanOrEqualTo(65536); } @@ -164,29 +225,29 @@ public class BufferPoolMetricsTest public void testFailedRequestsDontChangeMetrics() { assertEquals(0, metrics.misses.getCount()); - assertThat(metrics.size.getValue()).isEqualTo(BufferPool.sizeInBytes()) + assertThat(metrics.size.getValue()).isEqualTo(bufferPool.sizeInBytes()) .isEqualTo(0); tryRequestNegativeBufferSize(); assertEquals(0, metrics.misses.getCount()); - assertThat(metrics.size.getValue()).isEqualTo(BufferPool.sizeInBytes()) + assertThat(metrics.size.getValue()).isEqualTo(bufferPool.sizeInBytes()) .isEqualTo(0); - BufferPool.get(65536, BufferType.OFF_HEAP); + bufferPool.get(65536, BufferType.OFF_HEAP); tryRequestNegativeBufferSize(); tryRequestNegativeBufferSize(); tryRequestNegativeBufferSize(); tryRequestNegativeBufferSize(); assertEquals(0, metrics.misses.getCount()); - assertThat(metrics.size.getValue()).isEqualTo(BufferPool.sizeInBytes()) + assertThat(metrics.size.getValue()).isEqualTo(bufferPool.sizeInBytes()) .isGreaterThanOrEqualTo(65536); } private void tryRequestNegativeBufferSize() { assertThatExceptionOfType(IllegalArgumentException.class).isThrownBy( - () -> BufferPool.get(-1, BufferType.OFF_HEAP)); + () -> bufferPool.get(-1, BufferType.OFF_HEAP)); } } diff --git a/test/unit/org/apache/cassandra/net/FramingTest.java b/test/unit/org/apache/cassandra/net/FramingTest.java index 8a7f428..27c8003 100644 --- a/test/unit/org/apache/cassandra/net/FramingTest.java +++ b/test/unit/org/apache/cassandra/net/FramingTest.java @@ -41,7 +41,7 @@ import org.apache.cassandra.io.compress.BufferType; import org.apache.cassandra.io.util.DataInputPlus; import org.apache.cassandra.io.util.DataOutputBuffer; import org.apache.cassandra.io.util.DataOutputPlus; -import org.apache.cassandra.utils.memory.BufferPool; +import org.apache.cassandra.utils.memory.BufferPools; import org.apache.cassandra.utils.vint.VIntCoding; import static java.lang.Math.*; @@ -197,7 +197,7 @@ public class FramingTest cumulativeCompressedLength[i] = (i == 0 ? 0 : cumulativeCompressedLength[i - 1]) + buffer.readableBytes(); } - ByteBuffer frames = BufferPool.getAtLeast(cumulativeCompressedLength[frameCount - 1], BufferType.OFF_HEAP); + ByteBuffer frames = BufferPools.forNetworking().getAtLeast(cumulativeCompressedLength[frameCount - 1], BufferType.OFF_HEAP); for (ByteBuf buffer : compressed) { frames.put(buffer.internalNioBuffer(buffer.readerIndex(), buffer.readableBytes())); @@ -412,7 +412,7 @@ public class FramingTest cumulativeLength[i] = (i == 0 ? 0 : cumulativeLength[i - 1]) + message.length; } - ByteBuffer frames = BufferPool.getAtLeast(cumulativeLength[messageCount - 1], BufferType.OFF_HEAP); + ByteBuffer frames = BufferPools.forNetworking().getAtLeast(cumulativeLength[messageCount - 1], BufferType.OFF_HEAP); for (byte[] buffer : messages) frames.put(buffer); frames.flip(); diff --git a/test/unit/org/apache/cassandra/utils/memory/BufferPoolTest.java b/test/unit/org/apache/cassandra/utils/memory/BufferPoolTest.java index 4bfc54a..eb3cc1b 100644 --- a/test/unit/org/apache/cassandra/utils/memory/BufferPoolTest.java +++ b/test/unit/org/apache/cassandra/utils/memory/BufferPoolTest.java @@ -23,13 +23,11 @@ import java.nio.ByteBuffer; import java.util.*; import java.util.concurrent.*; -import org.junit.After; +import com.google.common.collect.Iterables; import org.junit.Before; -import org.junit.BeforeClass; import org.junit.Ignore; import org.junit.Test; -import org.apache.cassandra.config.DatabaseDescriptor; import org.apache.cassandra.io.compress.BufferType; import org.apache.cassandra.io.util.RandomAccessReader; @@ -37,31 +35,12 @@ import static org.junit.Assert.*; public class BufferPoolTest { - @BeforeClass - public static void setupDD() - { - DatabaseDescriptor.daemonInitialization(); - } + private BufferPool bufferPool; @Before public void setUp() { - BufferPool.setMemoryUsageThreshold(8 * 1024L * 1024L); - } - - @After - public void cleanUp() - { - resetBufferPool(); - } - - /** - * Exposes a utility method on this test that other tests might use to access the protected - * {@link BufferPool#unsafeReset()} method. - */ - public static void resetBufferPool() - { - BufferPool.unsafeReset(); + bufferPool = new BufferPool("test_pool", 8 * 1024 * 1024, true); } @Test @@ -69,18 +48,20 @@ public class BufferPoolTest { final int size = RandomAccessReader.DEFAULT_BUFFER_SIZE; - ByteBuffer buffer = BufferPool.get(size, BufferType.OFF_HEAP); + ByteBuffer buffer = bufferPool.get(size, BufferType.OFF_HEAP); assertNotNull(buffer); assertEquals(size, buffer.capacity()); assertEquals(true, buffer.isDirect()); + assertEquals(size, bufferPool.usedSizeInBytes()); - BufferPool.Chunk chunk = BufferPool.unsafeCurrentChunk(); + BufferPool.Chunk chunk = bufferPool.unsafeCurrentChunk(); assertNotNull(chunk); - assertEquals(BufferPool.GlobalPool.MACRO_CHUNK_SIZE, BufferPool.sizeInBytes()); + assertEquals(BufferPool.GlobalPool.MACRO_CHUNK_SIZE, bufferPool.sizeInBytes()); - BufferPool.put(buffer); - assertEquals(null, BufferPool.unsafeCurrentChunk()); - assertEquals(BufferPool.GlobalPool.MACRO_CHUNK_SIZE, BufferPool.sizeInBytes()); + bufferPool.put(buffer); + assertEquals(null, bufferPool.unsafeCurrentChunk()); + assertEquals(BufferPool.GlobalPool.MACRO_CHUNK_SIZE, bufferPool.sizeInBytes()); + assertEquals(0, bufferPool.usedSizeInBytes()); } @@ -98,7 +79,7 @@ public class BufferPoolTest private void checkPageAligned(int size) { - ByteBuffer buffer = BufferPool.get(size, BufferType.OFF_HEAP); + ByteBuffer buffer = bufferPool.get(size, BufferType.OFF_HEAP); assertNotNull(buffer); assertEquals(size, buffer.capacity()); assertTrue(buffer.isDirect()); @@ -106,7 +87,7 @@ public class BufferPoolTest long address = MemoryUtil.getAddress(buffer); assertTrue((address % MemoryUtil.pageSize()) == 0); - BufferPool.put(buffer); + bufferPool.put(buffer); } @Test @@ -115,23 +96,23 @@ public class BufferPoolTest final int size1 = 1024; final int size2 = 2048; - ByteBuffer buffer1 = BufferPool.get(size1, BufferType.OFF_HEAP); + ByteBuffer buffer1 = bufferPool.get(size1, BufferType.OFF_HEAP); assertNotNull(buffer1); assertEquals(size1, buffer1.capacity()); - ByteBuffer buffer2 = BufferPool.get(size2, BufferType.OFF_HEAP); + ByteBuffer buffer2 = bufferPool.get(size2, BufferType.OFF_HEAP); assertNotNull(buffer2); assertEquals(size2, buffer2.capacity()); - BufferPool.Chunk chunk = BufferPool.unsafeCurrentChunk(); + BufferPool.Chunk chunk = bufferPool.unsafeCurrentChunk(); assertNotNull(chunk); - assertEquals(BufferPool.GlobalPool.MACRO_CHUNK_SIZE, BufferPool.sizeInBytes()); + assertEquals(BufferPool.GlobalPool.MACRO_CHUNK_SIZE, bufferPool.sizeInBytes()); - BufferPool.put(buffer1); - BufferPool.put(buffer2); + bufferPool.put(buffer1); + bufferPool.put(buffer2); - assertEquals(null, BufferPool.unsafeCurrentChunk()); - assertEquals(BufferPool.GlobalPool.MACRO_CHUNK_SIZE, BufferPool.sizeInBytes()); + assertEquals(null, bufferPool.unsafeCurrentChunk()); + assertEquals(BufferPool.GlobalPool.MACRO_CHUNK_SIZE, bufferPool.sizeInBytes()); } @Test @@ -149,14 +130,13 @@ public class BufferPoolTest @Test public void testMaxMemoryExceeded_SameAsChunkSize() { - BufferPool.setMemoryUsageThreshold(BufferPool.GlobalPool.MACRO_CHUNK_SIZE); requestDoubleMaxMemory(); } @Test public void testMaxMemoryExceeded_SmallerThanChunkSize() { - BufferPool.setMemoryUsageThreshold(BufferPool.GlobalPool.MACRO_CHUNK_SIZE / 2); + bufferPool = new BufferPool("test_pool", BufferPool.GlobalPool.MACRO_CHUNK_SIZE / 2, false); requestDoubleMaxMemory(); } @@ -168,7 +148,7 @@ public class BufferPoolTest private void requestDoubleMaxMemory() { - requestUpToSize(RandomAccessReader.DEFAULT_BUFFER_SIZE, (int)(2 * BufferPool.getMemoryUsageThreshold())); + requestUpToSize(RandomAccessReader.DEFAULT_BUFFER_SIZE, (int)(2 * bufferPool.memoryUsageThreshold())); } private void requestUpToSize(int bufferSize, int totalSize) @@ -178,7 +158,7 @@ public class BufferPoolTest List<ByteBuffer> buffers = new ArrayList<>(numBuffers); for (int i = 0; i < numBuffers; i++) { - ByteBuffer buffer = BufferPool.get(bufferSize, BufferType.OFF_HEAP); + ByteBuffer buffer = bufferPool.get(bufferSize, BufferType.OFF_HEAP); assertNotNull(buffer); assertEquals(bufferSize, buffer.capacity()); assertTrue(buffer.isDirect()); @@ -186,7 +166,7 @@ public class BufferPoolTest } for (ByteBuffer buffer : buffers) - BufferPool.put(buffer); + bufferPool.put(buffer); } @Test @@ -194,10 +174,10 @@ public class BufferPoolTest { final int size = BufferPool.NORMAL_CHUNK_SIZE + 1; - ByteBuffer buffer = BufferPool.get(size, BufferType.OFF_HEAP); + ByteBuffer buffer = bufferPool.get(size, BufferType.OFF_HEAP); assertNotNull(buffer); assertEquals(size, buffer.capacity()); - BufferPool.put(buffer); + bufferPool.put(buffer); } @Test @@ -209,25 +189,25 @@ public class BufferPoolTest List<ByteBuffer> buffers1 = new ArrayList<>(numBuffers); List<ByteBuffer> buffers2 = new ArrayList<>(numBuffers); for (int i = 0; i < numBuffers; i++) - buffers1.add(BufferPool.get(size, BufferType.OFF_HEAP)); + buffers1.add(bufferPool.get(size, BufferType.OFF_HEAP)); - BufferPool.Chunk chunk1 = BufferPool.unsafeCurrentChunk(); + BufferPool.Chunk chunk1 = bufferPool.unsafeCurrentChunk(); assertNotNull(chunk1); for (int i = 0; i < numBuffers; i++) - buffers2.add(BufferPool.get(size, BufferType.OFF_HEAP)); + buffers2.add(bufferPool.get(size, BufferType.OFF_HEAP)); - assertEquals(2, BufferPool.unsafeNumChunks()); + assertEquals(2, bufferPool.unsafeNumChunks()); for (ByteBuffer buffer : buffers1) - BufferPool.put(buffer); + bufferPool.put(buffer); - assertEquals(1, BufferPool.unsafeNumChunks()); + assertEquals(1, bufferPool.unsafeNumChunks()); for (ByteBuffer buffer : buffers2) - BufferPool.put(buffer); + bufferPool.put(buffer); - assertEquals(0, BufferPool.unsafeNumChunks()); + assertEquals(0, bufferPool.unsafeNumChunks()); buffers2.clear(); } @@ -263,16 +243,16 @@ public class BufferPoolTest { doTestRandomFrees(12345567878L); - BufferPool.unsafeReset(); + bufferPool.unsafeReset(); doTestRandomFrees(20452249587L); - BufferPool.unsafeReset(); + bufferPool.unsafeReset(); doTestRandomFrees(82457252948L); - BufferPool.unsafeReset(); + bufferPool.unsafeReset(); doTestRandomFrees(98759284579L); - BufferPool.unsafeReset(); + bufferPool.unsafeReset(); doTestRandomFrees(19475257244L); } @@ -303,10 +283,10 @@ public class BufferPoolTest List<ByteBuffer> buffers = new ArrayList<>(maxFreeSlots); for (int i = 0; i < maxFreeSlots; i++) { - buffers.add(BufferPool.get(size, BufferType.OFF_HEAP)); + buffers.add(bufferPool.get(size, BufferType.OFF_HEAP)); } - BufferPool.Chunk chunk = BufferPool.unsafeCurrentChunk(); + BufferPool.Chunk chunk = bufferPool.unsafeCurrentChunk(); assertFalse(chunk.isFree()); int freeSize = BufferPool.NORMAL_CHUNK_SIZE - maxFreeSlots * size; @@ -318,7 +298,7 @@ public class BufferPoolTest assertNotNull(buffer); assertEquals(size, buffer.capacity()); - BufferPool.put(buffer); + bufferPool.put(buffer); freeSize += size; if (freeSize == chunk.capacity()) @@ -341,18 +321,18 @@ public class BufferPoolTest List<ByteBuffer> buffers = new ArrayList<>(sizes.length); for (int i = 0; i < sizes.length; i++) { - ByteBuffer buffer = BufferPool.get(sizes[i], BufferType.OFF_HEAP); + ByteBuffer buffer = bufferPool.get(sizes[i], BufferType.OFF_HEAP); assertNotNull(buffer); assertTrue(buffer.capacity() >= sizes[i]); buffers.add(buffer); - sum += BufferPool.unsafeCurrentChunk().roundUp(buffer.capacity()); + sum += bufferPool.unsafeCurrentChunk().roundUp(buffer.capacity()); } // else the test will fail, adjust sizes as required assertTrue(sum <= BufferPool.GlobalPool.MACRO_CHUNK_SIZE); - BufferPool.Chunk chunk = BufferPool.unsafeCurrentChunk(); + BufferPool.Chunk chunk = bufferPool.unsafeCurrentChunk(); assertNotNull(chunk); Random rnd = new Random(); @@ -362,11 +342,11 @@ public class BufferPoolTest int index = rnd.nextInt(buffers.size()); ByteBuffer buffer = buffers.remove(index); - BufferPool.put(buffer); + bufferPool.put(buffer); } - BufferPool.put(buffers.remove(0)); + bufferPool.put(buffers.remove(0)); - assertEquals(null, BufferPool.unsafeCurrentChunk()); + assertEquals(null, bufferPool.unsafeCurrentChunk()); assertEquals(0, chunk.free()); } @@ -381,7 +361,7 @@ public class BufferPoolTest List<ByteBuffer> buffers = new ArrayList<>(sizes.length); for (int i = 0; i < sizes.length; i++) { - ByteBuffer buffer = BufferPool.get(sizes[i], BufferType.OFF_HEAP); + ByteBuffer buffer = bufferPool.get(sizes[i], BufferType.OFF_HEAP); assertNotNull(buffer); assertTrue(buffer.capacity() >= sizes[i]); buffers.add(buffer); @@ -392,15 +372,15 @@ public class BufferPoolTest // else the test will fail, adjust sizes as required assertTrue(sum <= BufferPool.GlobalPool.MACRO_CHUNK_SIZE); - BufferPool.Chunk chunk = BufferPool.unsafeCurrentChunk(); + BufferPool.Chunk chunk = bufferPool.unsafeCurrentChunk(); assertNotNull(chunk); for (int i = 0; i < sizes.length; i++) { - BufferPool.put(buffers.get(i)); + bufferPool.put(buffers.get(i)); } - assertEquals(null, BufferPool.unsafeCurrentChunk()); + assertEquals(null, bufferPool.unsafeCurrentChunk()); assertEquals(0, chunk.free()); } @@ -415,22 +395,22 @@ public class BufferPoolTest for (int i = 0; i < numBuffersInChunk; i++) { - ByteBuffer buffer = BufferPool.get(size, BufferType.OFF_HEAP); + ByteBuffer buffer = bufferPool.get(size, BufferType.OFF_HEAP); buffers.add(buffer); addresses.add(MemoryUtil.getAddress(buffer)); } for (int i = numBuffersInChunk - 1; i >= 0; i--) - BufferPool.put(buffers.get(i)); + bufferPool.put(buffers.get(i)); buffers.clear(); for (int i = 0; i < numBuffersInChunk; i++) { - ByteBuffer buffer = BufferPool.get(size, BufferType.OFF_HEAP); + ByteBuffer buffer = bufferPool.get(size, BufferType.OFF_HEAP); assertNotNull(buffer); assertEquals(size, buffer.capacity()); - addresses.remove(MemoryUtil.getAddress(buffer)); + assert addresses.remove(MemoryUtil.getAddress(buffer)); buffers.add(buffer); } @@ -438,18 +418,18 @@ public class BufferPoolTest assertTrue(addresses.isEmpty()); // all 5 released buffers were used for (ByteBuffer buffer : buffers) - BufferPool.put(buffer); + bufferPool.put(buffer); } @Test public void testHeapBuffer() { - ByteBuffer buffer = BufferPool.get(1024, BufferType.ON_HEAP); + ByteBuffer buffer = bufferPool.get(1024, BufferType.ON_HEAP); assertNotNull(buffer); assertEquals(1024, buffer.capacity()); assertFalse(buffer.isDirect()); assertNotNull(buffer.array()); - BufferPool.put(buffer); + bufferPool.put(buffer); } @Test @@ -497,17 +477,17 @@ public class BufferPoolTest private void checkBuffer(int size) { - ByteBuffer buffer = BufferPool.get(size, BufferType.OFF_HEAP); + ByteBuffer buffer = bufferPool.get(size, BufferType.OFF_HEAP); assertEquals(size, buffer.capacity()); if (size > 0 && size < BufferPool.NORMAL_CHUNK_SIZE) { - BufferPool.Chunk chunk = BufferPool.unsafeCurrentChunk(); + BufferPool.Chunk chunk = bufferPool.unsafeCurrentChunk(); assertNotNull(chunk); assertEquals(chunk.capacity(), chunk.free() + chunk.roundUp(size)); } - BufferPool.put(buffer); + bufferPool.put(buffer); } @Test @@ -525,14 +505,14 @@ public class BufferPoolTest for (int size : sizes) { - ByteBuffer buffer = BufferPool.get(size, BufferType.OFF_HEAP); + ByteBuffer buffer = bufferPool.get(size, BufferType.OFF_HEAP); assertEquals(size, buffer.capacity()); buffers.add(buffer); } for (ByteBuffer buffer : buffers) - BufferPool.put(buffer); + bufferPool.put(buffer); } @Test @@ -544,36 +524,36 @@ public class BufferPoolTest private void checkBufferWithGivenSlots(int size, long freeSlots) { //first allocate to make sure there is a chunk - ByteBuffer buffer = BufferPool.get(size, BufferType.OFF_HEAP); + ByteBuffer buffer = bufferPool.get(size, BufferType.OFF_HEAP); // now get the current chunk and override the free slots mask - BufferPool.Chunk chunk = BufferPool.unsafeCurrentChunk(); + BufferPool.Chunk chunk = bufferPool.unsafeCurrentChunk(); assertNotNull(chunk); long oldFreeSlots = chunk.setFreeSlots(freeSlots); // now check we can still get the buffer with the free slots mask changed - ByteBuffer buffer2 = BufferPool.get(size, BufferType.OFF_HEAP); + ByteBuffer buffer2 = bufferPool.get(size, BufferType.OFF_HEAP); assertEquals(size, buffer.capacity()); - BufferPool.put(buffer2); + bufferPool.put(buffer2); // unsafeReset the free slots chunk.setFreeSlots(oldFreeSlots); - BufferPool.put(buffer); + bufferPool.put(buffer); } @Test public void testZeroSizeRequest() { - ByteBuffer buffer = BufferPool.get(0, BufferType.OFF_HEAP); + ByteBuffer buffer = bufferPool.get(0, BufferType.OFF_HEAP); assertNotNull(buffer); assertEquals(0, buffer.capacity()); - BufferPool.put(buffer); + bufferPool.put(buffer); } @Test(expected = IllegalArgumentException.class) public void testNegativeSizeRequest() { - BufferPool.get(-1, BufferType.OFF_HEAP); + bufferPool.get(-1, BufferType.OFF_HEAP); } @Test @@ -689,7 +669,7 @@ public class BufferPoolTest for (int j = 0; j < threadSizes.length; j++) { - ByteBuffer buffer = BufferPool.get(threadSizes[j], BufferType.OFF_HEAP); + ByteBuffer buffer = bufferPool.get(threadSizes[j], BufferType.OFF_HEAP); assertNotNull(buffer); assertEquals(threadSizes[j], buffer.capacity()); @@ -704,17 +684,17 @@ public class BufferPoolTest assertEquals(i, buffer.getInt()); if (returnImmediately) - BufferPool.put(buffer); + bufferPool.put(buffer); else toBeReturned.add(buffer); - assertTrue(BufferPool.sizeInBytes() > 0); + assertTrue(bufferPool.sizeInBytes() > 0); } Thread.sleep(rand.nextInt(3)); for (ByteBuffer buffer : toBeReturned) - BufferPool.put(buffer); + bufferPool.put(buffer); } catch (Exception ex) { @@ -758,13 +738,13 @@ public class BufferPoolTest int sum = 0; for (int i = 0; i < sizes.length; i++) { - buffers[i] = BufferPool.get(sizes[i], BufferType.OFF_HEAP); + buffers[i] = bufferPool.get(sizes[i], BufferType.OFF_HEAP); assertNotNull(buffers[i]); assertEquals(sizes[i], buffers[i].capacity()); - sum += BufferPool.unsafeCurrentChunk().roundUp(buffers[i].capacity()); + sum += bufferPool.unsafeCurrentChunk().roundUp(buffers[i].capacity()); } - final BufferPool.Chunk chunk = BufferPool.unsafeCurrentChunk(); + final BufferPool.Chunk chunk = bufferPool.unsafeCurrentChunk(); assertNotNull(chunk); assertFalse(chunk.isFree()); @@ -786,8 +766,8 @@ public class BufferPoolTest { try { - assertNotSame(chunk, BufferPool.unsafeCurrentChunk()); - BufferPool.put(buffer); + assertNotSame(chunk, bufferPool.unsafeCurrentChunk()); + bufferPool.put(buffer); } catch (AssertionError ex) { //this is expected if we release a buffer more than once @@ -816,12 +796,185 @@ public class BufferPoolTest System.gc(); System.gc(); - assertTrue(BufferPool.unsafeCurrentChunk().isFree()); + assertTrue(bufferPool.unsafeCurrentChunk().isFree()); //make sure the main thread can still allocate buffers - ByteBuffer buffer = BufferPool.get(sizes[0], BufferType.OFF_HEAP); + ByteBuffer buffer = bufferPool.get(sizes[0], BufferType.OFF_HEAP); assertNotNull(buffer); assertEquals(sizes[0], buffer.capacity()); - BufferPool.put(buffer); + bufferPool.put(buffer); + } + + @Test + public void testOverflowAllocation() + { + int macroChunkSize = BufferPool.GlobalPool.MACRO_CHUNK_SIZE; + int allocationSize = BufferPool.NORMAL_CHUNK_SIZE; + int allocations = BufferPool.GlobalPool.MACRO_CHUNK_SIZE / allocationSize; + + // occupy entire buffer pool + List<ByteBuffer> buffers = new ArrayList<>(); + allocate(allocations, allocationSize, buffers); + + assertEquals(macroChunkSize, bufferPool.sizeInBytes()); + assertEquals(macroChunkSize, bufferPool.usedSizeInBytes()); + assertEquals(0, bufferPool.overflowMemoryInBytes()); + + // allocate overflow due to pool exhaust + ByteBuffer overflowBuffer = bufferPool.get(BufferPool.NORMAL_ALLOCATION_UNIT, BufferType.OFF_HEAP); + + assertEquals(macroChunkSize + overflowBuffer.capacity(), bufferPool.sizeInBytes()); + assertEquals(macroChunkSize + overflowBuffer.capacity(), bufferPool.usedSizeInBytes()); + assertEquals(overflowBuffer.capacity(), bufferPool.overflowMemoryInBytes()); + + // free all buffer + bufferPool.put(overflowBuffer); + release(buffers); + + assertEquals(macroChunkSize, bufferPool.sizeInBytes()); + assertEquals(0, bufferPool.usedSizeInBytes()); + assertEquals(0, bufferPool.overflowMemoryInBytes()); + + // allocate overflow due to on-heap + overflowBuffer = bufferPool.get(BufferPool.NORMAL_ALLOCATION_UNIT, BufferType.ON_HEAP); + assertEquals(macroChunkSize + overflowBuffer.capacity(), bufferPool.sizeInBytes()); + assertEquals(overflowBuffer.capacity(), bufferPool.usedSizeInBytes()); + assertEquals(overflowBuffer.capacity(), bufferPool.overflowMemoryInBytes()); + bufferPool.put(overflowBuffer); + + // allocate overflow due to over allocation size + overflowBuffer = bufferPool.get(2 * BufferPool.NORMAL_CHUNK_SIZE, BufferType.ON_HEAP); + assertEquals(macroChunkSize + overflowBuffer.capacity(), bufferPool.sizeInBytes()); + assertEquals(overflowBuffer.capacity(), bufferPool.usedSizeInBytes()); + assertEquals(overflowBuffer.capacity(), bufferPool.overflowMemoryInBytes()); + } + + @Test + public void testRecyclePartialFreeChunk() + { + // normal chunk size is 128kb + int halfNormalChunk = BufferPool.NORMAL_CHUNK_SIZE / 2; // 64kb, half of normal chunk + List<ByteBuffer> toRelease = new ArrayList<>(); + + // allocate three buffers on different chunks + ByteBuffer buffer0 = bufferPool.get(halfNormalChunk, BufferType.OFF_HEAP); + BufferPool.Chunk chunk0 = BufferPool.Chunk.getParentChunk(buffer0); + assertFalse(chunk0.isFree()); + allocate(1, halfNormalChunk, toRelease); // allocate remaining buffers in the chunk + + ByteBuffer buffer1 = bufferPool.get(halfNormalChunk, BufferType.OFF_HEAP); + BufferPool.Chunk chunk1 = BufferPool.Chunk.getParentChunk(buffer1); + assertFalse(chunk1.isFree()); + assertNotEquals(chunk0, chunk1); + allocate(1, halfNormalChunk, toRelease); // allocate remaining buffers in the chunk + + ByteBuffer buffer2 = bufferPool.get(halfNormalChunk, BufferType.OFF_HEAP); + BufferPool.Chunk chunk2 = BufferPool.Chunk.getParentChunk(buffer2); + assertFalse(chunk2.isFree()); + assertNotEquals(chunk0, chunk2); + assertNotEquals(chunk1, chunk2); + allocate(1, halfNormalChunk, toRelease); // allocate remaining buffers in the chunk + + // now all 3 chunks in local pool is full, allocate one more buffer to evict chunk2 + ByteBuffer buffer3 = bufferPool.get(halfNormalChunk, BufferType.OFF_HEAP); + BufferPool.Chunk chunk3 = BufferPool.Chunk.getParentChunk(buffer3); + assertNotEquals(chunk0, chunk3); + assertNotEquals(chunk1, chunk3); + assertNotEquals(chunk2, chunk3); + + // verify chunk2 got evicted, it doesn't have a owner + assertNotNull(chunk0.owner()); + assertEquals(BufferPool.Chunk.Status.IN_USE, chunk0.status()); + assertNotNull(chunk1.owner()); + assertEquals(BufferPool.Chunk.Status.IN_USE, chunk1.status()); + assertNull(chunk2.owner()); + assertEquals(BufferPool.Chunk.Status.EVICTED, chunk2.status()); + + // release half buffers for chunk0/1/2 + release(toRelease); + BufferPool.Chunk partiallyFreed = chunk2; + + // try to recirculate chunk2 and verify freed space + assertFalse(bufferPool.globalPool().isFullyFreed(partiallyFreed)); + assertTrue(bufferPool.globalPool().isPartiallyFreed(partiallyFreed)); + assertEquals(BufferPool.Chunk.Status.IN_USE, partiallyFreed.status()); + assertEquals(halfNormalChunk, partiallyFreed.free()); + ByteBuffer buffer = partiallyFreed.get(halfNormalChunk, false, null); + assertEquals(halfNormalChunk, buffer.capacity()); + + // cleanup allocated buffers + for (ByteBuffer buf : Arrays.asList(buffer0, buffer1, buffer2, buffer3, buffer)) + bufferPool.put(buf); + + // verify that fully freed chunk are prioritized over partially freed chunks + List<BufferPool.Chunk> remainingChunks = new ArrayList<>(); + BufferPool.Chunk chunkForAllocation; + while ((chunkForAllocation = bufferPool.globalPool().get()) != null) + remainingChunks.add(chunkForAllocation); + + int totalNormalChunks = BufferPool.GlobalPool.MACRO_CHUNK_SIZE / BufferPool.NORMAL_CHUNK_SIZE; // 64; + assertEquals(totalNormalChunks, remainingChunks.size()); + assertSame(partiallyFreed, remainingChunks.get(remainingChunks.size() - 1)); // last one is partially freed + + // cleanup polled chunks + remainingChunks.forEach(BufferPool.Chunk::release); + } + + @Test + public void testTinyPool() + { + int total = 0; + final int size = BufferPool.TINY_ALLOCATION_UNIT; + final int allocationPerChunk = 64; + + // occupy 3 tiny chunks + List<ByteBuffer> buffers0 = new ArrayList<>(); + BufferPool.Chunk chunk0 = allocate(allocationPerChunk, size, buffers0); + assertTrue(chunk0.owner().isTinyPool()); + List<ByteBuffer> buffers1 = new ArrayList<>(); + BufferPool.Chunk chunk1 = allocate(allocationPerChunk, size, buffers1); + assertTrue(chunk1.owner().isTinyPool()); + List<ByteBuffer> buffers2 = new ArrayList<>(); + BufferPool.Chunk chunk2 = allocate(allocationPerChunk, size, buffers2); + assertTrue(chunk2.owner().isTinyPool()); + total += 3 * BufferPool.TINY_CHUNK_SIZE; + assertEquals(total, bufferPool.usedSizeInBytes()); + + // allocate another tiny chunk.. chunk2 should be evicted + List<ByteBuffer> buffers3 = new ArrayList<>(); + BufferPool.Chunk chunk3 = allocate(allocationPerChunk, size, buffers3); + assertTrue(chunk3.owner().isTinyPool()); + total += BufferPool.TINY_CHUNK_SIZE; + assertEquals(total, bufferPool.usedSizeInBytes()); + + // verify chunk2 is full and evicted + assertEquals(0, chunk2.free()); + assertNull(chunk2.owner()); + + // release chunk2's buffer + for (int i = 0; i < buffers2.size(); i++) + { + bufferPool.put(buffers2.get(i)); + total -= buffers2.get(i).capacity(); + assertEquals(total, bufferPool.usedSizeInBytes()); + } + + // cleanup allocated buffers + for (ByteBuffer buffer : Iterables.concat(buffers0, buffers1, buffers3)) + bufferPool.put(buffer); + } + + private BufferPool.Chunk allocate(int num, int bufferSize, List<ByteBuffer> buffers) + { + for (int i = 0; i < num; i++) + buffers.add(bufferPool.get(bufferSize, BufferType.OFF_HEAP)); + + return BufferPool.Chunk.getParentChunk(buffers.get(buffers.size() - 1)); + } + + private void release(List<ByteBuffer> toRelease) + { + for (ByteBuffer buffer : toRelease) + bufferPool.put(buffer); } } --------------------------------------------------------------------- To unsubscribe, e-mail: commits-unsubscr...@cassandra.apache.org For additional commands, e-mail: commits-h...@cassandra.apache.org