Author: tucu
Date: Wed Apr  3 22:27:38 2013
New Revision: 1464220

URL: http://svn.apache.org/r1464220
Log:
HADOOP-9401. CodecPool: Add counters for number of (de)compressors leased out. 
(kkambatl via tucu)

Added:
    
hadoop/common/branches/branch-2/hadoop-common-project/hadoop-common/src/test/java/org/apache/hadoop/io/compress/TestCodecPool.java
      - copied unchanged from r1464219, 
hadoop/common/trunk/hadoop-common-project/hadoop-common/src/test/java/org/apache/hadoop/io/compress/TestCodecPool.java
Modified:
    
hadoop/common/branches/branch-2/hadoop-common-project/hadoop-common/CHANGES.txt
    
hadoop/common/branches/branch-2/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/io/compress/CodecPool.java

Modified: 
hadoop/common/branches/branch-2/hadoop-common-project/hadoop-common/CHANGES.txt
URL: 
http://svn.apache.org/viewvc/hadoop/common/branches/branch-2/hadoop-common-project/hadoop-common/CHANGES.txt?rev=1464220&r1=1464219&r2=1464220&view=diff
==============================================================================
--- 
hadoop/common/branches/branch-2/hadoop-common-project/hadoop-common/CHANGES.txt 
(original)
+++ 
hadoop/common/branches/branch-2/hadoop-common-project/hadoop-common/CHANGES.txt 
Wed Apr  3 22:27:38 2013
@@ -33,6 +33,9 @@ Release 2.0.5-beta - UNRELEASED
 
     HADOOP-9358. "Auth failed" log should include exception string (todd)
 
+    HADOOP-9401. CodecPool: Add counters for number of (de)compressors 
+    leased out. (kkambatl via tucu)
+
   OPTIMIZATIONS
 
     HADOOP-9150. Avoid unnecessary DNS resolution attempts for logical URIs

Modified: 
hadoop/common/branches/branch-2/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/io/compress/CodecPool.java
URL: 
http://svn.apache.org/viewvc/hadoop/common/branches/branch-2/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/io/compress/CodecPool.java?rev=1464220&r1=1464219&r2=1464220&view=diff
==============================================================================
--- 
hadoop/common/branches/branch-2/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/io/compress/CodecPool.java
 (original)
+++ 
hadoop/common/branches/branch-2/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/io/compress/CodecPool.java
 Wed Apr  3 22:27:38 2013
@@ -21,6 +21,7 @@ import java.util.ArrayList;
 import java.util.HashMap;
 import java.util.List;
 import java.util.Map;
+import java.util.concurrent.atomic.AtomicInteger;
 
 import org.apache.commons.logging.Log;
 import org.apache.commons.logging.LogFactory;
@@ -29,6 +30,10 @@ import org.apache.hadoop.classification.
 import org.apache.hadoop.conf.Configuration;
 import org.apache.hadoop.util.ReflectionUtils;
 
+import com.google.common.cache.CacheBuilder;
+import com.google.common.cache.CacheLoader;
+import com.google.common.cache.LoadingCache;
+
 /**
  * A global compressor/decompressor pool used to save and reuse 
  * (possibly native) compression/decompression codecs.
@@ -52,6 +57,29 @@ public class CodecPool {
   private static final Map<Class<Decompressor>, List<Decompressor>> 
decompressorPool = 
     new HashMap<Class<Decompressor>, List<Decompressor>>();
 
+  private static <T> LoadingCache<Class<T>, AtomicInteger> createCache(
+      Class<T> klass) {
+    return CacheBuilder.newBuilder().build(
+        new CacheLoader<Class<T>, AtomicInteger>() {
+          @Override
+          public AtomicInteger load(Class<T> key) throws Exception {
+            return new AtomicInteger();
+          }
+        });
+  }
+
+  /**
+   * Map to track the number of leased compressors
+   */
+  private static final LoadingCache<Class<Compressor>, AtomicInteger> 
compressorCounts =
+      createCache(Compressor.class);
+
+   /**
+   * Map to tracks the number of leased decompressors
+   */
+  private static final LoadingCache<Class<Decompressor>, AtomicInteger> 
decompressorCounts =
+      createCache(Decompressor.class);
+
   private static <T> T borrow(Map<Class<T>, List<T>> pool,
                              Class<? extends T> codecClass) {
     T codec = null;
@@ -90,6 +118,21 @@ public class CodecPool {
     }
   }
   
+  @SuppressWarnings("unchecked")
+  private static <T> int getLeaseCount(
+      LoadingCache<Class<T>, AtomicInteger> usageCounts,
+      Class<? extends T> codecClass) {
+    return usageCounts.getUnchecked((Class<T>) codecClass).get();
+  }
+
+  private static <T> void updateLeaseCount(
+      LoadingCache<Class<T>, AtomicInteger> usageCounts, T codec, int delta) {
+    if (codec != null) {
+      Class<T> codecClass = ReflectionUtils.getClass(codec);
+      usageCounts.getUnchecked(codecClass).addAndGet(delta);
+    }
+  }
+
   /**
    * Get a {@link Compressor} for the given {@link CompressionCodec} from the 
    * pool or a new one.
@@ -111,6 +154,7 @@ public class CodecPool {
         LOG.debug("Got recycled compressor");
       }
     }
+    updateLeaseCount(compressorCounts, compressor, 1);
     return compressor;
   }
   
@@ -137,6 +181,7 @@ public class CodecPool {
         LOG.debug("Got recycled decompressor");
       }
     }
+    updateLeaseCount(decompressorCounts, decompressor, 1);
     return decompressor;
   }
   
@@ -155,6 +200,7 @@ public class CodecPool {
     }
     compressor.reset();
     payback(compressorPool, compressor);
+    updateLeaseCount(compressorCounts, compressor, -1);
   }
   
   /**
@@ -173,5 +219,24 @@ public class CodecPool {
     }
     decompressor.reset();
     payback(decompressorPool, decompressor);
+    updateLeaseCount(decompressorCounts, decompressor, -1);
+  }
+
+  /**
+   * Return the number of leased {@link Compressor}s for this
+   * {@link CompressionCodec}
+   */
+  public static int getLeasedCompressorsCount(CompressionCodec codec) {
+    return (codec == null) ? 0 : getLeaseCount(compressorCounts,
+        codec.getCompressorType());
+  }
+
+  /**
+   * Return the number of leased {@link Decompressor}s for this
+   * {@link CompressionCodec}
+   */
+  public static int getLeasedDecompressorsCount(CompressionCodec codec) {
+    return (codec == null) ? 0 : getLeaseCount(decompressorCounts,
+        codec.getDecompressorType());
   }
 }


Reply via email to