milleruntime closed pull request #140: ACCUMULO-4419: Change how compression 
delegation works
URL: https://github.com/apache/accumulo/pull/140
 
 
   

This is a PR merged from a forked repository.
As GitHub hides the original diff on merge, it is displayed below for
the sake of provenance:

As this is a foreign pull request (from a fork), the diff is supplied
below (as it won't show otherwise due to GitHub magic):

diff --git a/core/pom.xml b/core/pom.xml
index 464c725c21..d749f1c641 100644
--- a/core/pom.xml
+++ b/core/pom.xml
@@ -83,6 +83,10 @@
       <groupId>org.apache.commons</groupId>
       <artifactId>commons-math</artifactId>
     </dependency>
+    <dependency>
+      <groupId>org.apache.commons</groupId>
+      <artifactId>commons-pool2</artifactId>
+    </dependency>
     <dependency>
       <groupId>org.apache.commons</groupId>
       <artifactId>commons-vfs2</artifactId>
diff --git a/core/src/main/java/org/apache/accumulo/core/conf/Property.java 
b/core/src/main/java/org/apache/accumulo/core/conf/Property.java
index db9d6a61a0..db2713a02c 100644
--- a/core/src/main/java/org/apache/accumulo/core/conf/Property.java
+++ b/core/src/main/java/org/apache/accumulo/core/conf/Property.java
@@ -351,7 +351,18 @@
       "Memory to provide to batchwriter to replay mutations for replication"),
   TSERV_ASSIGNMENT_MAXCONCURRENT("tserver.assignment.concurrent.max", "2", 
PropertyType.COUNT,
       "The number of threads available to load tablets. Recoveries are still 
performed serially."),
-
+  TSERV_COMPRESSOR_FACTORY("tserver.compressor.factory.class", 
"org.apache.accumulo.core.file.rfile.bcfile.codec.CompressorFactory", 
PropertyType.CLASSNAME,
+      "Tablet Server configuration for the compressor factory that will be 
used when requesting compressors."),
+  TSERV_COMPRESSOR_IN_BUFFER("tserver.compressor.factory.input.buffer.size", 
"1K", PropertyType.MEMORY,
+      "Tablet Server configuration for the compressor factory that adjusts the 
input buffer size. Zero uses the full compression block size."),
+  TSERV_COMPRESSOR_OUT_BUFFER("tserver.compressor.factory.output.buffer.size", 
"1K", PropertyType.MEMORY,
+      "Tablet Server configuration for the compressor factory that adjusts the 
output buffer size. Default uses the full compression block size."),
+  TSERV_COMPRESSOR_POOL_IDLE("tserver.compressor.pool.max.idle", "25", 
PropertyType.COUNT,
+      "Tablet Server configuration to contrain the maximum number of idle 
compressors within the pool"),
+  
TSERV_COMPRESSOR_POOL_IDLE_SWEEP_TIME("tserver.compressor.pool.max.idle.sweep.time",
 "0ms", PropertyType.TIMEDURATION,
+      "Tablet Server configuration for max idle time between idle object 
sweeps. Does not run if set to 0."),
+  
TSERV_COMPRESSOR_POOL_IDLE_STORE_TIME("tserver.compressor.pool.max.idle.time", 
"0ms", PropertyType.TIMEDURATION,
+      "Tablet Server configuration for max amount of time an idle 
(de)compressor will be stored. Does not get evicted if > 0"),
   // properties that are specific to logger server behavior
   LOGGER_PREFIX("logger.", null, PropertyType.PREFIX, "Properties in this 
category affect the behavior of the write-ahead logger servers"),
   LOGGER_DIR("logger.dir.walog", "walogs", PropertyType.PATH, "This property 
is only needed if Accumulo was upgraded from a 1.4 or earlier version. "
diff --git 
a/core/src/main/java/org/apache/accumulo/core/file/rfile/bcfile/Compression.java
 
b/core/src/main/java/org/apache/accumulo/core/file/rfile/bcfile/Compression.java
index 2b8154102b..1961d76a74 100644
--- 
a/core/src/main/java/org/apache/accumulo/core/file/rfile/bcfile/Compression.java
+++ 
b/core/src/main/java/org/apache/accumulo/core/file/rfile/bcfile/Compression.java
@@ -26,11 +26,13 @@
 import java.util.Map.Entry;
 import java.util.concurrent.ExecutionException;
 import java.util.concurrent.atomic.AtomicBoolean;
+import java.util.concurrent.atomic.AtomicReference;
 
+import org.apache.accumulo.core.file.rfile.bcfile.codec.CompressorFactory;
+import 
org.apache.accumulo.core.file.rfile.bcfile.codec.DefaultCompressorFactory;
 import org.apache.commons.logging.Log;
 import org.apache.commons.logging.LogFactory;
 import org.apache.hadoop.conf.Configuration;
-import org.apache.hadoop.io.compress.CodecPool;
 import org.apache.hadoop.io.compress.CompressionCodec;
 import org.apache.hadoop.io.compress.CompressionInputStream;
 import org.apache.hadoop.io.compress.CompressionOutputStream;
@@ -39,6 +41,7 @@
 import org.apache.hadoop.io.compress.DefaultCodec;
 import org.apache.hadoop.util.ReflectionUtils;
 
+import com.google.common.base.Preconditions;
 import com.google.common.cache.CacheBuilder;
 import com.google.common.cache.CacheLoader;
 import com.google.common.cache.LoadingCache;
@@ -85,6 +88,21 @@ public void flush() throws IOException {
   /** compression: none */
   public static final String COMPRESSION_NONE = "none";
 
+  // data input buffer size to absorb small reads from application.
+  private static final int DATA_IBUF_SIZE_DEFAULT = 1 * 1024;
+  // data output buffer size to absorb small writes from application.
+  private static final int DATA_OBUF_SIZE_DEFAULT = 4 * 1024;
+
+  /**
+   * Data input buffer size variable. Defaults is defined, statically, above.
+   */
+  private static volatile int dataInputBufferSize = DATA_IBUF_SIZE_DEFAULT;
+
+  /**
+   * Data output buffer size variable. Defaults is defined, statically, above.
+   */
+  private static volatile int dataOutputBufferSize = DATA_OBUF_SIZE_DEFAULT;
+
   /**
    * Compression algorithms. There is a static initializer, below the values 
defined in the enumeration, that calls the initializer of all defined codecs 
within
    * the Algorithm enum. This promotes a model of the following call graph of 
initialization by the static initializer, followed by calls to getCodec() and
@@ -164,7 +182,7 @@ CompressionCodec createNewCodec(int bufferSize) {
       }
 
       @Override
-      CompressionCodec getCodec() throws IOException {
+      public CompressionCodec getCodec() {
         return codec;
       }
 
@@ -180,7 +198,7 @@ public InputStream createDecompressionStream(InputStream 
downStream, Decompresso
           bis1 = downStream;
         }
         CompressionInputStream cis = codec.createInputStream(bis1, 
decompressor);
-        BufferedInputStream bis2 = new BufferedInputStream(cis, 
DATA_IBUF_SIZE);
+        BufferedInputStream bis2 = new BufferedInputStream(cis, 
dataInputBufferSize == 0 ? downStreamBufferSize : dataInputBufferSize);
         return bis2;
       }
 
@@ -196,7 +214,8 @@ public OutputStream createCompressionStream(OutputStream 
downStream, Compressor
           bos1 = downStream;
         }
         CompressionOutputStream cos = codec.createOutputStream(bos1, 
compressor);
-        BufferedOutputStream bos2 = new BufferedOutputStream(new 
FinishOnFlushCompressionStream(cos), DATA_OBUF_SIZE);
+        BufferedOutputStream bos2 = new BufferedOutputStream(new 
FinishOnFlushCompressionStream(cos), dataOutputBufferSize == 0 ? 
downStreamBufferSize
+            : dataOutputBufferSize);
         return bos2;
       }
 
@@ -217,7 +236,7 @@ public OutputStream createCompressionStream(OutputStream 
downStream, Compressor
       private static final int DEFAULT_BUFFER_SIZE = 32 * 1024;
 
       @Override
-      CompressionCodec getCodec() {
+      public CompressionCodec getCodec() {
         return codec;
       }
 
@@ -259,7 +278,7 @@ public InputStream createDecompressionStream(InputStream 
downStream, Decompresso
           }
         }
         CompressionInputStream cis = decomCodec.createInputStream(downStream, 
decompressor);
-        BufferedInputStream bis2 = new BufferedInputStream(cis, 
DATA_IBUF_SIZE);
+        BufferedInputStream bis2 = new BufferedInputStream(cis, 
dataInputBufferSize);
         return bis2;
       }
 
@@ -273,7 +292,8 @@ public OutputStream createCompressionStream(OutputStream 
downStream, Compressor
         }
         // always uses the default buffer size
         CompressionOutputStream cos = codec.createOutputStream(bos1, 
compressor);
-        BufferedOutputStream bos2 = new BufferedOutputStream(new 
FinishOnFlushCompressionStream(cos), DATA_OBUF_SIZE);
+        BufferedOutputStream bos2 = new BufferedOutputStream(new 
FinishOnFlushCompressionStream(cos), dataOutputBufferSize == 0 ? 
downStreamBufferSize
+            : dataOutputBufferSize);
         return bos2;
       }
 
@@ -285,7 +305,7 @@ public boolean isSupported() {
 
     NONE(COMPRESSION_NONE) {
       @Override
-      CompressionCodec getCodec() {
+      public CompressionCodec getCodec() {
         return null;
       }
 
@@ -342,7 +362,7 @@ public boolean isSupported() {
       private static final int DEFAULT_BUFFER_SIZE = 64 * 1024;
 
       @Override
-      public CompressionCodec getCodec() throws IOException {
+      public CompressionCodec getCodec() {
         return snappyCodec;
       }
 
@@ -398,7 +418,8 @@ public OutputStream createCompressionStream(OutputStream 
downStream, Compressor
         }
         // use the default codec
         CompressionOutputStream cos = snappyCodec.createOutputStream(bos1, 
compressor);
-        BufferedOutputStream bos2 = new BufferedOutputStream(new 
FinishOnFlushCompressionStream(cos), DATA_OBUF_SIZE);
+        BufferedOutputStream bos2 = new BufferedOutputStream(new 
FinishOnFlushCompressionStream(cos), dataOutputBufferSize == 0 ? 
downStreamBufferSize
+            : dataOutputBufferSize);
         return bos2;
       }
 
@@ -420,7 +441,7 @@ public InputStream createDecompressionStream(InputStream 
downStream, Decompresso
         }
 
         CompressionInputStream cis = decomCodec.createInputStream(downStream, 
decompressor);
-        BufferedInputStream bis2 = new BufferedInputStream(cis, 
DATA_IBUF_SIZE);
+        BufferedInputStream bis2 = new BufferedInputStream(cis, 
dataInputBufferSize == 0 ? downStreamBufferSize : dataInputBufferSize);
         return bis2;
       }
 
@@ -463,10 +484,7 @@ public CompressionCodec load(Entry<Algorithm,Integer> key) 
{
     // statically in the Configuration object.
     protected static final Configuration conf;
     private final String compressName;
-    // data input buffer size to absorb small reads from application.
-    private static final int DATA_IBUF_SIZE = 1 * 1024;
-    // data output buffer size to absorb small writes from application.
-    private static final int DATA_OBUF_SIZE = 4 * 1024;
+
     public static final String CONF_LZO_CLASS = 
"io.compression.codec.lzo.class";
     public static final String CONF_SNAPPY_CLASS = 
"io.compression.codec.snappy.class";
 
@@ -474,7 +492,7 @@ public CompressionCodec load(Entry<Algorithm,Integer> key) {
       this.compressName = name;
     }
 
-    abstract CompressionCodec getCodec() throws IOException;
+    public abstract CompressionCodec getCodec();
 
     /**
      * function to create the default codec object.
@@ -497,62 +515,19 @@ public CompressionCodec load(Entry<Algorithm,Integer> 
key) {
     public abstract boolean isSupported();
 
     public Compressor getCompressor() throws IOException {
-      CompressionCodec codec = getCodec();
-      if (codec != null) {
-        Compressor compressor = CodecPool.getCompressor(codec);
-        if (compressor != null) {
-          if (compressor.finished()) {
-            // Somebody returns the compressor to CodecPool but is still using
-            // it.
-            LOG.warn("Compressor obtained from CodecPool already finished()");
-          } else {
-            LOG.debug("Got a compressor: " + compressor.hashCode());
-          }
-          /**
-           * Following statement is necessary to get around bugs in 0.18 where 
a compressor is referenced after returned back to the codec pool.
-           */
-          compressor.reset();
-        }
-        return compressor;
-      }
-      return null;
+      return compressorFactory.get().getCompressor(this);
     }
 
     public void returnCompressor(Compressor compressor) {
-      if (compressor != null) {
-        LOG.debug("Return a compressor: " + compressor.hashCode());
-        CodecPool.returnCompressor(compressor);
-      }
+      compressorFactory.get().releaseCompressor(this, compressor);
     }
 
     public Decompressor getDecompressor() throws IOException {
-      CompressionCodec codec = getCodec();
-      if (codec != null) {
-        Decompressor decompressor = CodecPool.getDecompressor(codec);
-        if (decompressor != null) {
-          if (decompressor.finished()) {
-            // Somebody returns the decompressor to CodecPool but is still 
using
-            // it.
-            LOG.warn("Decompressor obtained from CodecPool already 
finished()");
-          } else {
-            LOG.debug("Got a decompressor: " + decompressor.hashCode());
-          }
-          /**
-           * Following statement is necessary to get around bugs in 0.18 where 
a decompressor is referenced after returned back to the codec pool.
-           */
-          decompressor.reset();
-        }
-        return decompressor;
-      }
-
-      return null;
+      return compressorFactory.get().getDecompressor(this);
     }
 
     public void returnDecompressor(Decompressor decompressor) {
-      if (decompressor != null) {
-        LOG.debug("Returned a decompressor: " + decompressor.hashCode());
-        CodecPool.returnDecompressor(decompressor);
-      }
+      compressorFactory.get().releaseDecompressor(this, decompressor);
     }
 
     public String getName() {
@@ -560,6 +535,46 @@ public String getName() {
     }
   }
 
+  /**
+   * Default implementation will create new compressors.
+   */
+  private static AtomicReference<CompressorFactory> compressorFactory = new 
AtomicReference<CompressorFactory>(new DefaultCompressorFactory(null));
+
+  /**
+   * Allow the compressor factory to be set within this Instance.
+   *
+   * @param compFactory
+   *          incoming compressor factory to be used by all Algorithms
+   */
+  public static void setCompressionFactory(final CompressorFactory 
compFactory) {
+    Preconditions.checkNotNull(compFactory, "Compressor Factory cannot be 
null");
+    if (null != compressorFactory.get()) {
+      compressorFactory.get().close();
+    }
+
+    compressorFactory.set(compFactory);
+  }
+
+  /**
+   * Adjusts the input buffer size
+   *
+   * @param inputBufferSize
+   *          configured input buffer size
+   */
+  public static synchronized void setDataInputBufferSize(final int 
inputBufferSize) {
+    dataInputBufferSize = inputBufferSize;
+  }
+
+  /**
+   * Adjusts the output buffer size
+   *
+   * @param dataOutputBufferSizeOpt
+   *          configured output buffer size
+   */
+  public static synchronized void setDataOutputBufferSize(final int 
dataOutputBufferSizeOpt) {
+    dataOutputBufferSize = dataOutputBufferSizeOpt;
+  }
+
   static Algorithm getCompressionAlgorithmByName(String compressName) {
     Algorithm[] algos = Algorithm.class.getEnumConstants();
 
diff --git 
a/core/src/main/java/org/apache/accumulo/core/file/rfile/bcfile/codec/CompressionUpdater.java
 
b/core/src/main/java/org/apache/accumulo/core/file/rfile/bcfile/codec/CompressionUpdater.java
new file mode 100644
index 0000000000..3f1cdb788b
--- /dev/null
+++ 
b/core/src/main/java/org/apache/accumulo/core/file/rfile/bcfile/codec/CompressionUpdater.java
@@ -0,0 +1,82 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.accumulo.core.file.rfile.bcfile.codec;
+
+import org.apache.accumulo.core.conf.AccumuloConfiguration;
+import org.apache.accumulo.core.conf.Property;
+import org.apache.accumulo.core.file.rfile.bcfile.Compression;
+import org.apache.log4j.Logger;
+
+/**
+ * Runnable that will be used to update the compression class.
+ *
+ */
+public class CompressionUpdater implements Runnable {
+
+  private static final Logger LOG = Logger.getLogger(CompressorFactory.class);
+  /**
+   * Compressor factory class
+   */
+  private Class<? extends CompressorFactory> compressorFactoryClazz = 
CompressorFactory.class;
+
+  private CompressorFactory currentInstance = null;
+
+  /**
+   * Accumulo configuration.
+   */
+  private AccumuloConfiguration acuConf;
+
+  public CompressionUpdater(AccumuloConfiguration acuConf) {
+    this.acuConf = acuConf;
+    currentInstance = new DefaultCompressorFactory(acuConf);
+    Compression.setCompressionFactory(currentInstance);
+  }
+
+  @Override
+  public void run() {
+    final String compressorClass = 
acuConf.get(Property.TSERV_COMPRESSOR_FACTORY);
+    if (!compressorClass.equals(compressorFactoryClazz.getCanonicalName())) {
+      Class<? extends CompressorFactory> tempFactory = null;
+      try {
+        tempFactory = 
Class.forName(compressorClass).asSubclass(CompressorFactory.class);
+      } catch (ClassNotFoundException cfe) {
+        LOG.warn("Could not find class " + compressorClass + " so not setting 
desired CompressorFactory");
+        // do nothing
+        return;
+      }
+      LOG.info("Setting compressor factory to " + tempFactory);
+      try {
+        
Compression.setCompressionFactory(tempFactory.getConstructor(AccumuloConfiguration.class).newInstance(acuConf));
+        compressorFactoryClazz = tempFactory;
+      } catch (Exception e) {
+        LOG.error("Could not set compressor factory to " + 
compressorFactoryClazz + " defaulting to CompressorFactory", e);
+        Compression.setCompressionFactory(new 
DefaultCompressorFactory(acuConf));
+      }
+    } else {
+      currentInstance.update(acuConf);
+    }
+
+    /**
+     * Adjust compression buffer sizes.
+     */
+    final long inputBufferSize = 
acuConf.getMemoryInBytes(Property.TSERV_COMPRESSOR_IN_BUFFER);
+    Compression.setDataInputBufferSize((int) inputBufferSize);
+    final long outputBufferSize = 
acuConf.getMemoryInBytes(Property.TSERV_COMPRESSOR_OUT_BUFFER);
+    Compression.setDataOutputBufferSize((int) outputBufferSize);
+  }
+
+}
diff --git 
a/core/src/main/java/org/apache/accumulo/core/file/rfile/bcfile/codec/CompressorFactory.java
 
b/core/src/main/java/org/apache/accumulo/core/file/rfile/bcfile/codec/CompressorFactory.java
new file mode 100644
index 0000000000..57061dfeb0
--- /dev/null
+++ 
b/core/src/main/java/org/apache/accumulo/core/file/rfile/bcfile/codec/CompressorFactory.java
@@ -0,0 +1,85 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.accumulo.core.file.rfile.bcfile.codec;
+
+import java.io.IOException;
+
+import org.apache.accumulo.core.conf.AccumuloConfiguration;
+import org.apache.accumulo.core.file.rfile.bcfile.Compression.Algorithm;
+import org.apache.hadoop.io.compress.Compressor;
+import org.apache.hadoop.io.compress.Decompressor;
+
+/**
+ * Compressor Factory is an interface to create compressors and decompressors 
based on the supplied algorithm. Extensions may allow for alternative factory
+ * methods, such as object pooling.
+ */
+public interface CompressorFactory {
+
+  /**
+   * Provides the caller a compressor object.
+   *
+   * @param compressionAlgorithm
+   *          compressor's algorithm.
+   * @return compressor.
+   * @throws IOException
+   *           I/O Exception during factory implementation
+   */
+  public Compressor getCompressor(Algorithm compressionAlgorithm) throws 
IOException;
+
+  /**
+   * Method to release a compressor. This implementation will call end on the 
compressor.
+   *
+   * @param algorithm
+   *          Supplied compressor's Algorithm.
+   * @param compressor
+   *          Compressor object
+   */
+  public void releaseCompressor(Algorithm algorithm, Compressor compressor);
+
+  /**
+   * Method to release the decompressor. This implementation will call end on 
the decompressor.
+   *
+   * @param algorithm
+   *          Supplied decompressor's Algorithm.
+   * @param decompressor
+   *          decompressor object.
+   */
+  public void releaseDecompressor(Algorithm algorithm, Decompressor 
decompressor);
+
+  /**
+   * Provides the caller a decompressor object.
+   *
+   * @param compressionAlgorithm
+   *          decompressor's algorithm.
+   * @return decompressor.
+   */
+  public Decompressor getDecompressor(Algorithm compressionAlgorithm);
+
+  /**
+   * Implementations may choose to have a close call implemented.
+   */
+  public void close();
+
+  /**
+   * Provides the capability to update the compression factory
+   *
+   * @param acuConf
+   *          accumulo configuration
+   */
+  public void update(final AccumuloConfiguration acuConf);
+
+}
diff --git 
a/core/src/main/java/org/apache/accumulo/core/file/rfile/bcfile/codec/CompressorPool.java
 
b/core/src/main/java/org/apache/accumulo/core/file/rfile/bcfile/codec/CompressorPool.java
new file mode 100644
index 0000000000..0abd7c3b32
--- /dev/null
+++ 
b/core/src/main/java/org/apache/accumulo/core/file/rfile/bcfile/codec/CompressorPool.java
@@ -0,0 +1,244 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.accumulo.core.file.rfile.bcfile.codec;
+
+import java.io.IOException;
+
+import org.apache.accumulo.core.conf.AccumuloConfiguration;
+import org.apache.accumulo.core.conf.Property;
+import org.apache.accumulo.core.file.rfile.bcfile.Compression.Algorithm;
+import org.apache.commons.pool2.impl.GenericKeyedObjectPool;
+import org.apache.hadoop.io.compress.Compressor;
+import org.apache.hadoop.io.compress.Decompressor;
+import com.google.common.base.Preconditions;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+/**
+ * Compressor factory extension that enables object pooling using Commons 
Pool. The design will have a keyed compressor pool and decompressor pool. The 
key of
+ * which will be the Algorithm itself.
+ *
+ */
+public class CompressorPool extends DefaultCompressorFactory {
+
+  private static final Logger LOG = 
LoggerFactory.getLogger(CompressorPoolFactory.class);
+
+  /**
+   * Compressor pool.
+   */
+  GenericKeyedObjectPool<Algorithm,Compressor> compressorPool;
+
+  /**
+   * Decompressor pool
+   */
+  GenericKeyedObjectPool<Algorithm,Decompressor> decompressorPool;
+
+  public CompressorPool(AccumuloConfiguration acuConf) {
+
+    super(acuConf);
+
+    compressorPool = new GenericKeyedObjectPool<Algorithm,Compressor>(new 
CompressorPoolFactory());
+    // ensure that the pool grows when needed
+
+    compressorPool.setBlockWhenExhausted(false);
+    // no limit
+    compressorPool.setMaxTotal(-1);
+    compressorPool.setMaxTotalPerKey(-1);
+    compressorPool.setTestOnReturn(false);
+
+    decompressorPool = new GenericKeyedObjectPool<Algorithm,Decompressor>(new 
DecompressorPoolFactory());
+    // ensure that the pool grows when needed.
+    decompressorPool.setBlockWhenExhausted(false);
+    // no limit
+    decompressorPool.setMaxTotal(-1);
+    decompressorPool.setMaxTotalPerKey(-1);
+    decompressorPool.setTestOnReturn(false);
+    // perform the initial update.
+    update(acuConf);
+
+  }
+
+  /**
+   * Set the max idle time that the compressor and decompressor pools will 
hold objects.
+   *
+   * @param maxIdle
+   *          maximum idle time.
+   */
+  public void setMaxIdle(final int maxIdle) {
+    // check that we are changing the value.
+    // this will avoid synchronization within the pool
+    if (maxIdle != compressorPool.getMaxIdlePerKey())
+      compressorPool.setMaxIdlePerKey(maxIdle);
+    if (maxIdle != decompressorPool.getMaxIdlePerKey())
+      decompressorPool.setMaxIdlePerKey(maxIdle);
+  }
+
+  @Override
+  public Compressor getCompressor(Algorithm compressionAlgorithm) throws 
IOException {
+    Preconditions.checkNotNull(compressionAlgorithm, "Algorithm cannot be 
null");
+    try {
+      return compressorPool.borrowObject(compressionAlgorithm);
+    } catch (Exception e) {
+      // could not borrow the object, therefore we will attempt to create it
+      // this will most likely result in an exception when returning so an end 
will occur
+      LOG.warn("Could not borrow compressor; creating instead", e);
+      return compressionAlgorithm.getCodec().createCompressor();
+    }
+  }
+
+  @Override
+  public void releaseCompressor(Algorithm compressionAlgorithm, Compressor 
compressor) {
+    Preconditions.checkNotNull(compressionAlgorithm, "Algorithm cannot be 
null");
+    Preconditions.checkNotNull(compressor, "Compressor should not be null");
+    try {
+      compressorPool.returnObject(compressionAlgorithm, compressor);
+    } catch (Exception e) {
+      LOG.warn("Could not return compressor; closing instead", e);
+      // compressor failed to be returned. Let's free the memory associated 
with it
+      compressor.end();
+    }
+
+  }
+
+  @Override
+  public void releaseDecompressor(Algorithm compressionAlgorithm, Decompressor 
decompressor) {
+    Preconditions.checkNotNull(compressionAlgorithm, "Algorithm cannot be 
null");
+    Preconditions.checkNotNull(decompressor, "Deompressor should not be null");
+    try {
+      decompressorPool.returnObject(compressionAlgorithm, decompressor);
+    } catch (Exception e) {
+      LOG.warn("Could not return decompressor; closing instead", e);
+      // compressor failed to be returned. Let's free the memory associated 
with it
+      decompressor.end();
+    }
+
+  }
+
+  @Override
+  public Decompressor getDecompressor(Algorithm compressionAlgorithm) {
+    Preconditions.checkNotNull(compressionAlgorithm, "Algorithm cannot be 
null");
+    try {
+      return decompressorPool.borrowObject(compressionAlgorithm);
+    } catch (Exception e) {
+      LOG.warn("Could not borrow decompressor; creating instead", e);
+      // could not borrow the object, therefore we will attempt to create it
+      // this will most likely result in an exception when returning so an end 
will occur
+      return compressionAlgorithm.getCodec().createDecompressor();
+    }
+  }
+
+  /**
+   * Closes both pools, which will clear and evict the respective 
compressor/decompressors. {@inheritDoc}
+   */
+  @Override
+  public void close() {
+    try {
+      compressorPool.close();
+    } catch (Exception e) {
+      LOG.error("Exception while closing compressor pool", e);
+    }
+    try {
+      decompressorPool.close();
+    } catch (Exception e) {
+      LOG.error("Exception while closing decompressor pool", e);
+    }
+
+  }
+
+  /**
+   * Updates the maximum number of idle objects allowed, the sweep time, and 
the minimum time before eviction is used {@inheritDoc}
+   */
+  @Override
+  public void update(final AccumuloConfiguration acuConf) {
+    try {
+      final int poolMaxIdle = 
acuConf.getCount(Property.TSERV_COMPRESSOR_POOL_IDLE);
+      setMaxIdle(poolMaxIdle);
+
+      final long idleSweepTimeMs = 
acuConf.getTimeInMillis(Property.TSERV_COMPRESSOR_POOL_IDLE_SWEEP_TIME);
+
+      setIdleSweepTime(idleSweepTimeMs);
+      final long idleStoreTimeMs = 
acuConf.getTimeInMillis(Property.TSERV_COMPRESSOR_POOL_IDLE_STORE_TIME);
+      setIdleStoreTime(idleStoreTimeMs);
+
+    } catch (Exception e) {
+      LOG.error("Invalid compressor pool configuration", e);
+    }
+  }
+
+  /**
+   * Sets the minimum amount of time may pass before a (de)compressor may be 
evicted.
+   *
+   * @param idleStoreTimeMs
+   *          minimum time in ms before a (de)compressor is considered for 
eviction.
+   */
+  public void setIdleStoreTime(final long idleStoreTimeMs) {
+
+    if (idleStoreTimeMs > 0) {
+      // if > 0, then we check that we aren't setting it to the same value
+      // we used previously. If so, we call the setter, from which a thread
+      // will be launched.
+      if (compressorPool.getMinEvictableIdleTimeMillis() != idleStoreTimeMs) {
+
+        compressorPool.setMinEvictableIdleTimeMillis(idleStoreTimeMs);
+      }
+
+      if (decompressorPool.getMinEvictableIdleTimeMillis() != idleStoreTimeMs) 
{
+        decompressorPool.setMinEvictableIdleTimeMillis(idleStoreTimeMs);
+      }
+    } else {
+      if (compressorPool.getMinEvictableIdleTimeMillis() > 0) {
+        compressorPool.setMinEvictableIdleTimeMillis(-1);
+      }
+
+      if (decompressorPool.getMinEvictableIdleTimeMillis() > 0) {
+        decompressorPool.setMinEvictableIdleTimeMillis(-1);
+      }
+    }
+  }
+
+  /**
+   * Sets the idle sweep time if &gt; 0.
+   *
+   * @param idleSweepTimeMs
+   *          idle sweep time.
+   */
+  public void setIdleSweepTime(final long idleSweepTimeMs) {
+    if (idleSweepTimeMs > 0) {
+      // if > 0, then we check that we aren't setting it to the same value
+      // we used previously. If so, we call the setter, from which a thread
+      // will be launched.
+      if (compressorPool.getTimeBetweenEvictionRunsMillis() != 
idleSweepTimeMs) {
+
+        compressorPool.setTimeBetweenEvictionRunsMillis(idleSweepTimeMs);
+      }
+
+      if (decompressorPool.getTimeBetweenEvictionRunsMillis() != 
idleSweepTimeMs) {
+        decompressorPool.setTimeBetweenEvictionRunsMillis(idleSweepTimeMs);
+      }
+    } else {
+      if (compressorPool.getTimeBetweenEvictionRunsMillis() > 0) {
+        compressorPool.setTimeBetweenEvictionRunsMillis(-1);
+      }
+
+      if (decompressorPool.getTimeBetweenEvictionRunsMillis() > 0) {
+        decompressorPool.setTimeBetweenEvictionRunsMillis(-1);
+      }
+
+    }
+
+  }
+}
diff --git 
a/core/src/main/java/org/apache/accumulo/core/file/rfile/bcfile/codec/CompressorPoolFactory.java
 
b/core/src/main/java/org/apache/accumulo/core/file/rfile/bcfile/codec/CompressorPoolFactory.java
new file mode 100644
index 0000000000..ad6fa18e66
--- /dev/null
+++ 
b/core/src/main/java/org/apache/accumulo/core/file/rfile/bcfile/codec/CompressorPoolFactory.java
@@ -0,0 +1,56 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.accumulo.core.file.rfile.bcfile.codec;
+
+import org.apache.accumulo.core.file.rfile.bcfile.Compression.Algorithm;
+import org.apache.commons.pool2.KeyedPooledObjectFactory;
+import org.apache.commons.pool2.PooledObject;
+import org.apache.commons.pool2.impl.DefaultPooledObject;
+import org.apache.hadoop.io.compress.Compressor;
+
+/**
+ * Factory pattern used to create compressors within CompressorPool
+ *
+ */
+public class CompressorPoolFactory implements 
KeyedPooledObjectFactory<Algorithm,Compressor> {
+
+  @Override
+  public PooledObject<Compressor> makeObject(Algorithm key) throws Exception {
+    return new 
DefaultPooledObject<Compressor>(key.getCodec().createCompressor());
+  }
+
+  @Override
+  public void destroyObject(Algorithm algorithm, PooledObject<Compressor> 
pooledObject) throws Exception {
+    pooledObject.getObject().end();
+  }
+
+  @Override
+  public boolean validateObject(Algorithm algorithm, PooledObject<Compressor> 
pooledObject) {
+    return pooledObject.getObject().finished();
+  }
+
+  @Override
+  public void activateObject(Algorithm algorithm, PooledObject<Compressor> 
pooledObject) throws Exception {
+    pooledObject.getObject().reset();
+  }
+
+  @Override
+  public void passivateObject(Algorithm algorithm, PooledObject<Compressor> 
pooledObject) throws Exception {
+    pooledObject.getObject().finish();
+  }
+
+}
diff --git 
a/core/src/main/java/org/apache/accumulo/core/file/rfile/bcfile/codec/DecompressorPoolFactory.java
 
b/core/src/main/java/org/apache/accumulo/core/file/rfile/bcfile/codec/DecompressorPoolFactory.java
new file mode 100644
index 0000000000..e5399423a1
--- /dev/null
+++ 
b/core/src/main/java/org/apache/accumulo/core/file/rfile/bcfile/codec/DecompressorPoolFactory.java
@@ -0,0 +1,56 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.accumulo.core.file.rfile.bcfile.codec;
+
+import org.apache.accumulo.core.file.rfile.bcfile.Compression.Algorithm;
+import org.apache.commons.pool2.KeyedPooledObjectFactory;
+import org.apache.commons.pool2.PooledObject;
+import org.apache.commons.pool2.impl.DefaultPooledObject;
+import org.apache.hadoop.io.compress.Decompressor;
+
+/**
+ * Factory pattern used to create decompressors within CompressorPool
+ *
+ */
+public class DecompressorPoolFactory implements 
KeyedPooledObjectFactory<Algorithm,Decompressor> {
+
+  @Override
+  public PooledObject<Decompressor> makeObject(Algorithm key) throws Exception 
{
+    return new 
DefaultPooledObject<Decompressor>(key.getCodec().createDecompressor());
+  }
+
+  @Override
+  public void destroyObject(Algorithm algorithm, PooledObject<Decompressor> 
pooledObject) throws Exception {
+    pooledObject.getObject().end();
+  }
+
+  @Override
+  public boolean validateObject(Algorithm algorithm, 
PooledObject<Decompressor> pooledObject) {
+    return pooledObject.getObject().finished();
+  }
+
+  @Override
+  public void activateObject(Algorithm algorithm, PooledObject<Decompressor> 
pooledObject) throws Exception {
+    pooledObject.getObject().reset();
+  }
+
+  @Override
+  public void passivateObject(Algorithm algorithm, PooledObject<Decompressor> 
pooledObject) throws Exception {
+    pooledObject.getObject().reset();
+  }
+
+}
diff --git 
a/core/src/main/java/org/apache/accumulo/core/file/rfile/bcfile/codec/DefaultCompressorFactory.java
 
b/core/src/main/java/org/apache/accumulo/core/file/rfile/bcfile/codec/DefaultCompressorFactory.java
new file mode 100644
index 0000000000..f291cf9517
--- /dev/null
+++ 
b/core/src/main/java/org/apache/accumulo/core/file/rfile/bcfile/codec/DefaultCompressorFactory.java
@@ -0,0 +1,125 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.accumulo.core.file.rfile.bcfile.codec;
+
+import com.google.common.base.Preconditions;
+import org.apache.accumulo.core.conf.AccumuloConfiguration;
+import org.apache.accumulo.core.file.rfile.bcfile.Compression.Algorithm;
+import org.apache.hadoop.io.compress.Compressor;
+import org.apache.hadoop.io.compress.Decompressor;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.io.IOException;
+
+/**
+ * Implementation of Compressor factory that closes and open decompressors for 
every request.
+ */
+public class DefaultCompressorFactory implements CompressorFactory {
+
+  private static final Logger LOG = 
LoggerFactory.getLogger(DefaultCompressorFactory.class);
+
+  public DefaultCompressorFactory(AccumuloConfiguration acuConf) {}
+
+  /**
+   * Provides the caller a compressor object.
+   *
+   * @param compressionAlgorithm
+   *          compressor's algorithm.
+   * @return compressor.
+   * @throws IOException
+   *           I/O Exception during factory implementation
+   */
+  public Compressor getCompressor(Algorithm compressionAlgorithm) throws 
IOException {
+    if (compressionAlgorithm != null) {
+      Compressor compressor = 
compressionAlgorithm.getCodec().createCompressor();
+      if (compressor != null) {
+
+        LOG.debug("Got a decompressor: {}", compressor.hashCode());
+
+      }
+      return compressor;
+    }
+    return null;
+  }
+
+  /**
+   * Method to release a compressor. This implementation will call end on the 
compressor.
+   *
+   * @param algorithm
+   *          Supplied compressor's Algorithm.
+   * @param compressor
+   *          Compressor object
+   */
+  public void releaseCompressor(Algorithm algorithm, Compressor compressor) {
+    Preconditions.checkNotNull(algorithm, "Algorithm cannot be null");
+    Preconditions.checkNotNull(compressor, "Compressor should not be null");
+    compressor.end();
+  }
+
+  /**
+   * Method to release the decompressor. This implementation will call end on 
the decompressor.
+   *
+   * @param algorithm
+   *          Supplied decompressor's Algorithm.
+   * @param decompressor
+   *          decompressor object.
+   */
+  public void releaseDecompressor(Algorithm algorithm, Decompressor 
decompressor) {
+    Preconditions.checkNotNull(algorithm, "Algorithm cannot be null");
+    Preconditions.checkNotNull(decompressor, "Deompressor should not be null");
+    decompressor.end();
+  }
+
+  /**
+   * Provides the caller a decompressor object.
+   *
+   * @param compressionAlgorithm
+   *          decompressor's algorithm.
+   * @return decompressor.
+   */
+  public Decompressor getDecompressor(Algorithm compressionAlgorithm) {
+    if (compressionAlgorithm != null) {
+      Decompressor decompressor = 
compressionAlgorithm.getCodec().createDecompressor();
+      if (decompressor != null) {
+
+        LOG.debug("Got a decompressor: {}", decompressor.hashCode());
+
+      }
+      return decompressor;
+    }
+    return null;
+  }
+
+  /**
+   * Implementations may choose to have a close call implemented.
+   */
+  public void close() {
+
+  }
+
+  /**
+   * Provides the capability to update the compression factory
+   *
+   * @param acuConf
+   *          accumulo configuration
+   */
+  public void update(final AccumuloConfiguration acuConf) {
+
+  }
+
+}
diff --git 
a/core/src/test/java/org/apache/accumulo/core/file/rfile/bcfile/codec/CompressorFactoryTest.java
 
b/core/src/test/java/org/apache/accumulo/core/file/rfile/bcfile/codec/CompressorFactoryTest.java
new file mode 100644
index 0000000000..d598ac7ab3
--- /dev/null
+++ 
b/core/src/test/java/org/apache/accumulo/core/file/rfile/bcfile/codec/CompressorFactoryTest.java
@@ -0,0 +1,172 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.accumulo.core.file.rfile.bcfile.codec;
+
+import static org.junit.Assert.fail;
+
+import java.io.IOException;
+import java.util.ArrayList;
+import java.util.HashMap;
+import java.util.HashSet;
+import java.util.Set;
+
+import org.apache.accumulo.core.conf.AccumuloConfiguration;
+import org.apache.accumulo.core.file.rfile.bcfile.Compression;
+import org.apache.accumulo.core.file.rfile.bcfile.Compression.Algorithm;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.io.compress.CompressionCodec;
+import org.apache.hadoop.io.compress.Compressor;
+import org.apache.hadoop.io.compress.Decompressor;
+import org.apache.hadoop.util.ReflectionUtils;
+import org.junit.Assert;
+import org.junit.Before;
+import org.junit.Test;
+
+public class CompressorFactoryTest {
+
+  HashMap<Compression.Algorithm,Boolean> isSupported = new 
HashMap<Compression.Algorithm,Boolean>();
+
+  @Before
+  public void testSupport() {
+    // we can safely assert that GZ exists by virtue of it being the 
DefaultCodec
+    isSupported.put(Compression.Algorithm.GZ, true);
+
+    Configuration myConf = new Configuration();
+
+    String extClazz = System.getProperty(Compression.Algorithm.CONF_LZO_CLASS);
+    String clazz = (extClazz != null) ? extClazz : 
"org.apache.hadoop.io.compress.LzoCodec";
+    try {
+      CompressionCodec codec = (CompressionCodec) 
ReflectionUtils.newInstance(Class.forName(clazz), myConf);
+
+      Assert.assertNotNull(codec);
+      isSupported.put(Compression.Algorithm.LZO, true);
+
+    } catch (ClassNotFoundException e) {
+      // that is okay
+    }
+
+  }
+
+  @Test
+  public void testAlgoreithms() throws IOException {
+    CompressorFactory factory = new 
DefaultCompressorFactory(AccumuloConfiguration.getDefaultConfiguration());
+    for (final Algorithm al : Algorithm.values()) {
+      if (isSupported.get(al) != null && isSupported.get(al) == true) {
+
+        Compressor compressor = factory.getCompressor(al);
+        Assert.assertNotNull(compressor);
+        factory.releaseCompressor(al, compressor);
+
+        Decompressor decompressor = factory.getDecompressor(al);
+        Assert.assertNotNull(decompressor);
+        factory.releaseDecompressor(al, decompressor);
+      }
+    }
+  }
+
+  @Test
+  public void testMultipleNotTheSameCompressors() throws IOException {
+    CompressorFactory factory = new 
DefaultCompressorFactory(AccumuloConfiguration.getDefaultConfiguration());
+    for (final Algorithm al : Algorithm.values()) {
+      if (isSupported.get(al) != null && isSupported.get(al) == true) {
+
+        Set<Integer> compressorHashCodes = new HashSet<>();
+        ArrayList<Compressor> compressors = new ArrayList<>();
+        for (int i = 0; i < 25; i++) {
+          Compressor compressor = factory.getCompressor(al);
+          Assert.assertNotNull(compressor);
+          compressors.add(compressor);
+          
compressorHashCodes.add(Integer.valueOf(System.identityHashCode(compressor)));
+        }
+
+        // assert that we have 25 with this particular factory.
+        Assert.assertEquals(25, compressorHashCodes.size());
+
+        // free them for posterity sake
+        for (Compressor compressor : compressors) {
+          factory.releaseCompressor(al, compressor);
+        }
+      }
+    }
+
+  }
+
+  @Test
+  public void testMultipleNotTheSameDecompressors() throws IOException {
+    CompressorFactory factory = new 
DefaultCompressorFactory(AccumuloConfiguration.getDefaultConfiguration());
+    for (final Algorithm al : Algorithm.values()) {
+      if (isSupported.get(al) != null && isSupported.get(al) == true) {
+
+        Set<Integer> compressorHashCodes = new HashSet<>();
+        ArrayList<Decompressor> decompressors = new ArrayList<>();
+        for (int i = 0; i < 25; i++) {
+          Decompressor decompressor = factory.getDecompressor(al);
+          Assert.assertNotNull(decompressor);
+          decompressors.add(decompressor);
+          
compressorHashCodes.add(Integer.valueOf(System.identityHashCode(decompressor)));
+        }
+
+        // assert that we have 25 with this particular factory.
+        Assert.assertEquals(25, compressorHashCodes.size());
+
+        // free them for posterity sake
+        for (Decompressor decompressor : decompressors) {
+          factory.releaseDecompressor(al, decompressor);
+        }
+      }
+    }
+
+  }
+
+  @Test
+  public void returnNull() {
+
+    CompressorFactory factory = new 
DefaultCompressorFactory(AccumuloConfiguration.getDefaultConfiguration());
+    for (final Algorithm al : Algorithm.values()) {
+      if (isSupported.get(al) != null && isSupported.get(al) == true) {
+        try {
+          factory.releaseCompressor(null, null);
+          fail("Should have caught null when passing null algorithm");
+        } catch (NullPointerException npe) {
+
+        }
+
+        try {
+          factory.releaseCompressor(al, null);
+          fail("Should have caught null when passing null compressor");
+        } catch (NullPointerException npe) {
+
+        }
+
+        try {
+          factory.releaseDecompressor(null, null);
+          fail("Should have caught null when passing null algorithm");
+        } catch (NullPointerException npe) {
+
+        }
+
+        try {
+          factory.releaseDecompressor(al, null);
+          fail("Should have caught null when passing null decompressor");
+        } catch (NullPointerException npe) {
+
+        }
+      }
+    }
+  }
+
+}
diff --git 
a/core/src/test/java/org/apache/accumulo/core/file/rfile/bcfile/codec/CompressorPoolTest.java
 
b/core/src/test/java/org/apache/accumulo/core/file/rfile/bcfile/codec/CompressorPoolTest.java
new file mode 100644
index 0000000000..b688022b46
--- /dev/null
+++ 
b/core/src/test/java/org/apache/accumulo/core/file/rfile/bcfile/codec/CompressorPoolTest.java
@@ -0,0 +1,270 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.accumulo.core.file.rfile.bcfile.codec;
+
+import static org.junit.Assert.fail;
+
+import java.io.IOException;
+import java.util.ArrayList;
+import java.util.HashMap;
+import java.util.HashSet;
+import java.util.Set;
+
+import org.apache.accumulo.core.conf.AccumuloConfiguration;
+import org.apache.accumulo.core.file.rfile.bcfile.Compression;
+import org.apache.accumulo.core.file.rfile.bcfile.Compression.Algorithm;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.io.compress.CompressionCodec;
+import org.apache.hadoop.io.compress.Compressor;
+import org.apache.hadoop.io.compress.Decompressor;
+import org.apache.hadoop.util.ReflectionUtils;
+import org.junit.Assert;
+import org.junit.Before;
+import org.junit.Test;
+
+public class CompressorPoolTest {
+
+  HashMap<Compression.Algorithm,Boolean> isSupported = new 
HashMap<Compression.Algorithm,Boolean>();
+
+  @Before
+  public void testSupport() {
+    // we can safely assert that GZ exists by virtue of it being the 
DefaultCodec
+    isSupported.put(Compression.Algorithm.GZ, true);
+
+    Configuration myConf = new Configuration();
+
+    String extClazz = System.getProperty(Compression.Algorithm.CONF_LZO_CLASS);
+    String clazz = (extClazz != null) ? extClazz : 
"org.apache.hadoop.io.compress.LzoCodec";
+    try {
+      CompressionCodec codec = (CompressionCodec) 
ReflectionUtils.newInstance(Class.forName(clazz), myConf);
+
+      Assert.assertNotNull(codec);
+      isSupported.put(Compression.Algorithm.LZO, true);
+
+    } catch (ClassNotFoundException e) {
+      // that is okay
+    }
+
+  }
+
+  @Test
+  public void testAlgorithms() throws IOException {
+    CompressorPool factory = new 
CompressorPool(AccumuloConfiguration.getDefaultConfiguration());
+    for (final Algorithm al : Algorithm.values()) {
+      if (isSupported.get(al) != null && isSupported.get(al) == true) {
+
+        Compressor compressor = factory.getCompressor(al);
+        Assert.assertNotNull(compressor);
+        factory.releaseCompressor(al, compressor);
+
+        Decompressor decompressor = factory.getDecompressor(al);
+        Assert.assertNotNull(decompressor);
+        factory.releaseDecompressor(al, decompressor);
+      }
+    }
+  }
+
+  @Test
+  public void testMultipleEventuallyTheSameCompressors() throws IOException {
+    CompressorPool factory = new 
CompressorPool(AccumuloConfiguration.getDefaultConfiguration());
+    factory.setMaxIdle(25);
+    for (final Algorithm al : Algorithm.values()) {
+      if (isSupported.get(al) != null && isSupported.get(al) == true) {
+        Set<Integer> compressorHashCodes = new HashSet<>();
+        ArrayList<Compressor> compressors = new ArrayList<>();
+        for (int i = 0; i < 25; i++) {
+          Compressor compressor = factory.getCompressor(al);
+          Assert.assertNotNull(compressor);
+          compressors.add(compressor);
+          
compressorHashCodes.add(Integer.valueOf(System.identityHashCode(compressor)));
+        }
+
+        // assert that we have 25 with this particular factory.
+        Assert.assertEquals(25, compressorHashCodes.size());
+
+        // release them for posterity sake
+        for (Compressor compressor : compressors) {
+          factory.releaseCompressor(al, compressor);
+        }
+        /**
+         * At this point we should have released all of our compressors. Since 
we are using the pooled factory, we know that the next 25 we retrieve should be
+         * the same we just returned.
+         */
+        compressors = new ArrayList<>();
+
+        for (int i = 0; i < 25; i++) {
+          Compressor compressor = factory.getCompressor(al);
+          Assert.assertNotNull(compressor);
+          compressors.add(compressor);
+          
compressorHashCodes.add(Integer.valueOf(System.identityHashCode(compressor)));
+        }
+
+        // assert that we have 25 with this particular factory.
+        Assert.assertEquals(25, compressorHashCodes.size());
+
+        // free them for posterity sake
+        for (Compressor compressor : compressors) {
+          factory.releaseCompressor(al, compressor);
+        }
+      }
+    }
+
+  }
+
+  @Test
+  public void testMultipleTestEviction() throws IOException, 
InterruptedException {
+    CompressorPool factory = new 
CompressorPool(AccumuloConfiguration.getDefaultConfiguration());
+    factory.setMaxIdle(25);
+    factory.setIdleSweepTime(100);
+    factory.setIdleStoreTime(10);
+    for (final Algorithm al : Algorithm.values()) {
+      if (isSupported.get(al) != null && isSupported.get(al) == true) {
+        Set<Integer> compressorHashCodes = new HashSet<>();
+        ArrayList<Compressor> compressors = new ArrayList<>();
+        for (int i = 0; i < 25; i++) {
+          Compressor compressor = factory.getCompressor(al);
+          Assert.assertNotNull(compressor);
+          compressors.add(compressor);
+          
compressorHashCodes.add(Integer.valueOf(System.identityHashCode(compressor)));
+        }
+
+        // assert that we have 25 with this particular factory.
+        Assert.assertEquals(25, compressorHashCodes.size());
+
+        // release them for posterity sake
+        for (Compressor compressor : compressors) {
+          factory.releaseCompressor(al, compressor);
+        }
+
+        Thread.sleep(2500);
+
+        for (int i = 0; i < 25; i++) {
+          Compressor compressor = factory.getCompressor(al);
+          Assert.assertNotNull(compressor);
+          compressors.add(compressor);
+          
compressorHashCodes.add(Integer.valueOf(System.identityHashCode(compressor)));
+        }
+
+        // assert that we have 25 with this particular factory.
+        Assert.assertEquals(50, compressorHashCodes.size());
+
+        // free them for posterity sake
+        for (Compressor compressor : compressors) {
+          factory.releaseCompressor(al, compressor);
+        }
+      }
+    }
+
+  }
+
+  @Test
+  public void testMultipleNotTheSameDeompressors() throws IOException {
+    CompressorPool factory = new 
CompressorPool(AccumuloConfiguration.getDefaultConfiguration());
+    for (final Algorithm al : Algorithm.values()) {
+      if (isSupported.get(al) != null && isSupported.get(al) == true) {
+
+        Set<Integer> compressorHashCodes = new HashSet<>();
+        ArrayList<Decompressor> decompressors = new ArrayList<>();
+        for (int i = 0; i < 25; i++) {
+          Decompressor decompressor = factory.getDecompressor(al);
+          Assert.assertNotNull(decompressor);
+          decompressors.add(decompressor);
+          
compressorHashCodes.add(Integer.valueOf(System.identityHashCode(decompressor)));
+        }
+
+        // assert that we have 25 with this particular factory.
+        Assert.assertEquals(25, compressorHashCodes.size());
+
+        // free them for posterity sake
+        for (Decompressor decompressor : decompressors) {
+          factory.releaseDecompressor(al, decompressor);
+        }
+      }
+    }
+
+  }
+
+  @Test
+  public void testMultipleChangeInMiddle() throws IOException {
+    CompressorFactory factory = new 
CompressorPool(AccumuloConfiguration.getDefaultConfiguration());
+    for (final Algorithm al : Algorithm.values()) {
+      if (isSupported.get(al) != null && isSupported.get(al) == true) {
+
+        Set<Integer> compressorHashCodes = new HashSet<>();
+        ArrayList<Decompressor> decompressors = new ArrayList<>();
+        for (int i = 0; i < 25; i++) {
+          Decompressor decompressor = factory.getDecompressor(al);
+          Assert.assertNotNull(decompressor);
+          decompressors.add(decompressor);
+          
compressorHashCodes.add(Integer.valueOf(System.identityHashCode(decompressor)));
+          // stop about half way through and change the pool
+          if (i == 12) {
+            factory.close();
+            factory = new 
DefaultCompressorFactory(AccumuloConfiguration.getDefaultConfiguration());
+          }
+        }
+
+        // assert that we have 25 with this particular factory.
+        Assert.assertEquals(25, compressorHashCodes.size());
+
+        // free them for posterity sake
+        for (Decompressor decompressor : decompressors) {
+          factory.releaseDecompressor(al, decompressor);
+        }
+      }
+    }
+
+  }
+
+  @Test
+  public void returnNull() {
+
+    CompressorPool factory = new 
CompressorPool(AccumuloConfiguration.getDefaultConfiguration());
+    for (final Algorithm al : Algorithm.values()) {
+      if (isSupported.get(al) != null && isSupported.get(al) == true) {
+        try {
+          factory.releaseCompressor(null, null);
+          fail("Should have caught null when passing null algorithm");
+        } catch (NullPointerException npe) {
+          // yay!
+        }
+
+        try {
+          factory.releaseCompressor(al, null);
+          fail("Should have caught null when passing null compressor");
+        } catch (NullPointerException npe) {
+          // yay!
+        }
+
+        try {
+          factory.releaseDecompressor(null, null);
+          fail("Should have caught null when passing null algorithm");
+        } catch (NullPointerException npe) {
+          // yay!
+        }
+
+        try {
+          factory.releaseDecompressor(al, null);
+          fail("Should have caught null when passing null decompressor");
+        } catch (NullPointerException npe) {
+          // yay!
+        }
+      }
+    }
+  }
+
+}
diff --git a/pom.xml b/pom.xml
index 42f1f8a0c8..ef4c1fa173 100644
--- a/pom.xml
+++ b/pom.xml
@@ -359,6 +359,11 @@
         <artifactId>commons-math</artifactId>
         <version>2.1</version>
       </dependency>
+      <dependency>
+        <groupId>org.apache.commons</groupId>
+        <artifactId>commons-pool2</artifactId>
+        <version>2.4.2</version>
+      </dependency>
       <dependency>
         <groupId>org.apache.commons</groupId>
         <artifactId>commons-vfs2</artifactId>
diff --git 
a/server/tserver/src/main/java/org/apache/accumulo/tserver/TabletServer.java 
b/server/tserver/src/main/java/org/apache/accumulo/tserver/TabletServer.java
index 6c585d0ef9..689368787d 100644
--- a/server/tserver/src/main/java/org/apache/accumulo/tserver/TabletServer.java
+++ b/server/tserver/src/main/java/org/apache/accumulo/tserver/TabletServer.java
@@ -98,6 +98,7 @@
 import org.apache.accumulo.core.data.thrift.TMutation;
 import org.apache.accumulo.core.data.thrift.TRange;
 import org.apache.accumulo.core.data.thrift.UpdateErrors;
+import org.apache.accumulo.core.file.rfile.bcfile.codec.CompressionUpdater;
 import org.apache.accumulo.core.iterators.IterationInterruptedException;
 import org.apache.accumulo.core.master.thrift.Compacting;
 import org.apache.accumulo.core.master.thrift.MasterClientService;
@@ -2522,6 +2523,8 @@ public void run() {
     };
     SimpleTimer.getInstance(aconf).schedule(replicationWorkThreadPoolResizer, 
10000, 30000);
 
+    SimpleTimer.getInstance(aconf).schedule(new CompressionUpdater(aconf), 
10000, 30000);
+
     HostAndPort masterHost;
     while (!serverStopRequested) {
       // send all of the pending messages


 

----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services

Reply via email to