Fix FileCacheService regressions patch by jbellis; reviewed by pyaskevich and tested by Kai Wang for CASSANDRA-6149
Project: http://git-wip-us.apache.org/repos/asf/cassandra/repo Commit: http://git-wip-us.apache.org/repos/asf/cassandra/commit/01a57eea Tree: http://git-wip-us.apache.org/repos/asf/cassandra/tree/01a57eea Diff: http://git-wip-us.apache.org/repos/asf/cassandra/diff/01a57eea Branch: refs/heads/trunk Commit: 01a57eea841e51fb4a97329ab9fa0f59d0b826f6 Parents: c374aca Author: Jonathan Ellis <jbel...@apache.org> Authored: Mon Oct 7 14:20:42 2013 -0500 Committer: Jonathan Ellis <jbel...@apache.org> Committed: Mon Oct 7 14:20:42 2013 -0500 ---------------------------------------------------------------------- CHANGES.txt | 1 + .../compress/CompressedRandomAccessReader.java | 5 ++ .../cassandra/io/util/RandomAccessReader.java | 2 +- .../apache/cassandra/io/util/SegmentedFile.java | 3 +- .../cassandra/service/FileCacheService.java | 87 ++++++++++---------- 5 files changed, 54 insertions(+), 44 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/cassandra/blob/01a57eea/CHANGES.txt ---------------------------------------------------------------------- diff --git a/CHANGES.txt b/CHANGES.txt index 94fa927..ddd976e 100644 --- a/CHANGES.txt +++ b/CHANGES.txt @@ -1,4 +1,5 @@ 2.0.2 + * Fix FileCacheService regressions (CASSANDRA-6149) * Never return WriteTimeout for CL.ANY (CASSANDRA-6032) * Fix race conditions in bulk loader (CASSANDRA-6129) * Add configurable metrics reporting (CASSANDRA-4430) http://git-wip-us.apache.org/repos/asf/cassandra/blob/01a57eea/src/java/org/apache/cassandra/io/compress/CompressedRandomAccessReader.java ---------------------------------------------------------------------- diff --git a/src/java/org/apache/cassandra/io/compress/CompressedRandomAccessReader.java b/src/java/org/apache/cassandra/io/compress/CompressedRandomAccessReader.java index b6cffa2..131a4d6 100644 --- a/src/java/org/apache/cassandra/io/compress/CompressedRandomAccessReader.java +++ b/src/java/org/apache/cassandra/io/compress/CompressedRandomAccessReader.java @@ -154,6 +154,11 @@ public class CompressedRandomAccessReader extends RandomAccessReader return checksumBytes.getInt(0); } + public int getTotalBufferSize() + { + return super.getTotalBufferSize() + compressed.capacity(); + } + @Override public long length() { http://git-wip-us.apache.org/repos/asf/cassandra/blob/01a57eea/src/java/org/apache/cassandra/io/util/RandomAccessReader.java ---------------------------------------------------------------------- diff --git a/src/java/org/apache/cassandra/io/util/RandomAccessReader.java b/src/java/org/apache/cassandra/io/util/RandomAccessReader.java index 4ceb3c4..9a03480 100644 --- a/src/java/org/apache/cassandra/io/util/RandomAccessReader.java +++ b/src/java/org/apache/cassandra/io/util/RandomAccessReader.java @@ -152,7 +152,7 @@ public class RandomAccessReader extends RandomAccessFile implements FileDataInpu return filePath; } - public int getBufferSize() + public int getTotalBufferSize() { return buffer.length; } http://git-wip-us.apache.org/repos/asf/cassandra/blob/01a57eea/src/java/org/apache/cassandra/io/util/SegmentedFile.java ---------------------------------------------------------------------- diff --git a/src/java/org/apache/cassandra/io/util/SegmentedFile.java b/src/java/org/apache/cassandra/io/util/SegmentedFile.java index 6231fd7..d4da177 100644 --- a/src/java/org/apache/cassandra/io/util/SegmentedFile.java +++ b/src/java/org/apache/cassandra/io/util/SegmentedFile.java @@ -19,6 +19,7 @@ package org.apache.cassandra.io.util; import java.io.DataInput; import java.io.DataOutput; +import java.io.File; import java.io.IOException; import java.nio.MappedByteBuffer; import java.util.Iterator; @@ -57,7 +58,7 @@ public abstract class SegmentedFile protected SegmentedFile(String path, long length, long onDiskLength) { - this.path = path; + this.path = new File(path).getAbsolutePath(); this.length = length; this.onDiskLength = onDiskLength; } http://git-wip-us.apache.org/repos/asf/cassandra/blob/01a57eea/src/java/org/apache/cassandra/service/FileCacheService.java ---------------------------------------------------------------------- diff --git a/src/java/org/apache/cassandra/service/FileCacheService.java b/src/java/org/apache/cassandra/service/FileCacheService.java index e6bc3e5..c939a6f 100644 --- a/src/java/org/apache/cassandra/service/FileCacheService.java +++ b/src/java/org/apache/cassandra/service/FileCacheService.java @@ -22,11 +22,9 @@ import java.util.concurrent.Callable; import java.util.concurrent.ConcurrentLinkedQueue; import java.util.concurrent.ExecutionException; import java.util.concurrent.TimeUnit; +import java.util.concurrent.atomic.AtomicInteger; -import com.google.common.cache.Cache; -import com.google.common.cache.CacheBuilder; -import com.google.common.cache.RemovalListener; -import com.google.common.cache.RemovalNotification; +import com.google.common.cache.*; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -43,37 +41,47 @@ public class FileCacheService public static FileCacheService instance = new FileCacheService(); - private final Cache<String, Queue<RandomAccessReader>> cache; - private final FileCacheMetrics metrics = new FileCacheMetrics(); - public final Callable<Queue<RandomAccessReader>> cacheForPathCreator = new Callable<Queue<RandomAccessReader>>() + private static final Callable<Queue<RandomAccessReader>> cacheForPathCreator = new Callable<Queue<RandomAccessReader>>() { @Override - public Queue<RandomAccessReader> call() throws Exception + public Queue<RandomAccessReader> call() { return new ConcurrentLinkedQueue<RandomAccessReader>(); } }; + private static final AtomicInteger memoryUsage = new AtomicInteger(); + + private final Cache<String, Queue<RandomAccessReader>> cache; + private final FileCacheMetrics metrics = new FileCacheMetrics(); + protected FileCacheService() { + RemovalListener<String, Queue<RandomAccessReader>> onRemove = new RemovalListener<String, Queue<RandomAccessReader>>() + { + @Override + public void onRemoval(RemovalNotification<String, Queue<RandomAccessReader>> notification) + { + Queue<RandomAccessReader> cachedInstances = notification.getValue(); + if (cachedInstances == null) + return; + + if (cachedInstances.size() > 0) + logger.debug("Evicting cold readers for {}", cachedInstances.peek().getPath()); + + for (RandomAccessReader reader : cachedInstances) + { + memoryUsage.addAndGet(-1 * reader.getTotalBufferSize()); + reader.deallocate(); + } + } + }; + cache = CacheBuilder.<String, Queue<RandomAccessReader>>newBuilder() - .expireAfterAccess(AFTER_ACCESS_EXPIRATION, TimeUnit.MILLISECONDS) - .concurrencyLevel(DatabaseDescriptor.getConcurrentReaders()) - .removalListener(new RemovalListener<String, Queue<RandomAccessReader>>() - { - @Override - public void onRemoval(RemovalNotification<String, Queue<RandomAccessReader>> notification) - { - Queue<RandomAccessReader> cachedInstances = notification.getValue(); - - if (cachedInstances == null) - return; - - for (RandomAccessReader reader : cachedInstances) - reader.deallocate(); - } - }) - .build(); + .expireAfterAccess(AFTER_ACCESS_EXPIRATION, TimeUnit.MILLISECONDS) + .concurrencyLevel(DatabaseDescriptor.getConcurrentReaders()) + .removalListener(onRemove) + .build(); } public RandomAccessReader get(String path) @@ -81,12 +89,7 @@ public class FileCacheService metrics.requests.mark(); Queue<RandomAccessReader> instances = getCacheFor(path); - - if (instances == null) - return null; - RandomAccessReader result = instances.poll(); - if (result != null) metrics.hits.mark(); @@ -101,30 +104,30 @@ public class FileCacheService } catch (ExecutionException e) { - // if something bad happened, let's just carry on and return null - // as dysfunctional queue should not interrupt normal operation - logger.debug("Exception fetching cache", e); + throw new AssertionError(e); } - - return null; } public void put(RandomAccessReader instance) { - // This wouldn't be precise sometimes when CRAR is used because - // there is a way for users to dynamically change the size of the buffer, - // but we don't expect that to happen frequently in production. - // Doing accounting this way also allows us to avoid atomic CAS operation on read path. - long memoryUsage = (cache.size() + 1) * instance.getBufferSize(); + int memoryUsed = memoryUsage.get(); + if (logger.isDebugEnabled()) + logger.debug("Estimated memory usage is {} compared to actual usage {}", memoryUsed, sizeInBytes()); - if (memoryUsage >= MEMORY_USAGE_THRESHOLD) + if (memoryUsed >= MEMORY_USAGE_THRESHOLD) + { instance.deallocate(); + } else + { + memoryUsage.addAndGet(instance.getTotalBufferSize()); getCacheFor(instance.getPath()).add(instance); + } } public void invalidate(String path) { + logger.debug("Invalidating cache for {}", path); cache.invalidate(path); } @@ -133,7 +136,7 @@ public class FileCacheService long n = 0; for (Queue<RandomAccessReader> queue : cache.asMap().values()) for (RandomAccessReader reader : queue) - n += reader.getBufferSize(); + n += reader.getTotalBufferSize(); return n; } }