Author: tucu Date: Wed Apr 3 22:27:38 2013 New Revision: 1464220 URL: http://svn.apache.org/r1464220 Log: HADOOP-9401. CodecPool: Add counters for number of (de)compressors leased out. (kkambatl via tucu)
Added: hadoop/common/branches/branch-2/hadoop-common-project/hadoop-common/src/test/java/org/apache/hadoop/io/compress/TestCodecPool.java - copied unchanged from r1464219, hadoop/common/trunk/hadoop-common-project/hadoop-common/src/test/java/org/apache/hadoop/io/compress/TestCodecPool.java Modified: hadoop/common/branches/branch-2/hadoop-common-project/hadoop-common/CHANGES.txt hadoop/common/branches/branch-2/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/io/compress/CodecPool.java Modified: hadoop/common/branches/branch-2/hadoop-common-project/hadoop-common/CHANGES.txt URL: http://svn.apache.org/viewvc/hadoop/common/branches/branch-2/hadoop-common-project/hadoop-common/CHANGES.txt?rev=1464220&r1=1464219&r2=1464220&view=diff ============================================================================== --- hadoop/common/branches/branch-2/hadoop-common-project/hadoop-common/CHANGES.txt (original) +++ hadoop/common/branches/branch-2/hadoop-common-project/hadoop-common/CHANGES.txt Wed Apr 3 22:27:38 2013 @@ -33,6 +33,9 @@ Release 2.0.5-beta - UNRELEASED HADOOP-9358. "Auth failed" log should include exception string (todd) + HADOOP-9401. CodecPool: Add counters for number of (de)compressors + leased out. (kkambatl via tucu) + OPTIMIZATIONS HADOOP-9150. Avoid unnecessary DNS resolution attempts for logical URIs Modified: hadoop/common/branches/branch-2/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/io/compress/CodecPool.java URL: http://svn.apache.org/viewvc/hadoop/common/branches/branch-2/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/io/compress/CodecPool.java?rev=1464220&r1=1464219&r2=1464220&view=diff ============================================================================== --- hadoop/common/branches/branch-2/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/io/compress/CodecPool.java (original) +++ hadoop/common/branches/branch-2/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/io/compress/CodecPool.java Wed Apr 3 22:27:38 2013 @@ -21,6 +21,7 @@ import java.util.ArrayList; import java.util.HashMap; import java.util.List; import java.util.Map; +import java.util.concurrent.atomic.AtomicInteger; import org.apache.commons.logging.Log; import org.apache.commons.logging.LogFactory; @@ -29,6 +30,10 @@ import org.apache.hadoop.classification. import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.util.ReflectionUtils; +import com.google.common.cache.CacheBuilder; +import com.google.common.cache.CacheLoader; +import com.google.common.cache.LoadingCache; + /** * A global compressor/decompressor pool used to save and reuse * (possibly native) compression/decompression codecs. @@ -52,6 +57,29 @@ public class CodecPool { private static final Map<Class<Decompressor>, List<Decompressor>> decompressorPool = new HashMap<Class<Decompressor>, List<Decompressor>>(); + private static <T> LoadingCache<Class<T>, AtomicInteger> createCache( + Class<T> klass) { + return CacheBuilder.newBuilder().build( + new CacheLoader<Class<T>, AtomicInteger>() { + @Override + public AtomicInteger load(Class<T> key) throws Exception { + return new AtomicInteger(); + } + }); + } + + /** + * Map to track the number of leased compressors + */ + private static final LoadingCache<Class<Compressor>, AtomicInteger> compressorCounts = + createCache(Compressor.class); + + /** + * Map to tracks the number of leased decompressors + */ + private static final LoadingCache<Class<Decompressor>, AtomicInteger> decompressorCounts = + createCache(Decompressor.class); + private static <T> T borrow(Map<Class<T>, List<T>> pool, Class<? extends T> codecClass) { T codec = null; @@ -90,6 +118,21 @@ public class CodecPool { } } + @SuppressWarnings("unchecked") + private static <T> int getLeaseCount( + LoadingCache<Class<T>, AtomicInteger> usageCounts, + Class<? extends T> codecClass) { + return usageCounts.getUnchecked((Class<T>) codecClass).get(); + } + + private static <T> void updateLeaseCount( + LoadingCache<Class<T>, AtomicInteger> usageCounts, T codec, int delta) { + if (codec != null) { + Class<T> codecClass = ReflectionUtils.getClass(codec); + usageCounts.getUnchecked(codecClass).addAndGet(delta); + } + } + /** * Get a {@link Compressor} for the given {@link CompressionCodec} from the * pool or a new one. @@ -111,6 +154,7 @@ public class CodecPool { LOG.debug("Got recycled compressor"); } } + updateLeaseCount(compressorCounts, compressor, 1); return compressor; } @@ -137,6 +181,7 @@ public class CodecPool { LOG.debug("Got recycled decompressor"); } } + updateLeaseCount(decompressorCounts, decompressor, 1); return decompressor; } @@ -155,6 +200,7 @@ public class CodecPool { } compressor.reset(); payback(compressorPool, compressor); + updateLeaseCount(compressorCounts, compressor, -1); } /** @@ -173,5 +219,24 @@ public class CodecPool { } decompressor.reset(); payback(decompressorPool, decompressor); + updateLeaseCount(decompressorCounts, decompressor, -1); + } + + /** + * Return the number of leased {@link Compressor}s for this + * {@link CompressionCodec} + */ + public static int getLeasedCompressorsCount(CompressionCodec codec) { + return (codec == null) ? 0 : getLeaseCount(compressorCounts, + codec.getCompressorType()); + } + + /** + * Return the number of leased {@link Decompressor}s for this + * {@link CompressionCodec} + */ + public static int getLeasedDecompressorsCount(CompressionCodec codec) { + return (codec == null) ? 0 : getLeaseCount(decompressorCounts, + codec.getDecompressorType()); } }