milleruntime closed pull request #140: ACCUMULO-4419: Change how compression
delegation works
URL: https://github.com/apache/accumulo/pull/140
This is a PR merged from a forked repository.
As GitHub hides the original diff on merge, it is displayed below for
the sake of provenance:
As this is a foreign pull request (from a fork), the diff is supplied
below (as it won't show otherwise due to GitHub magic):
diff --git a/core/pom.xml b/core/pom.xml
index 464c725c21..d749f1c641 100644
--- a/core/pom.xml
+++ b/core/pom.xml
@@ -83,6 +83,10 @@
org.apache.commons
commons-math
+
+ org.apache.commons
+ commons-pool2
+
org.apache.commons
commons-vfs2
diff --git a/core/src/main/java/org/apache/accumulo/core/conf/Property.java
b/core/src/main/java/org/apache/accumulo/core/conf/Property.java
index db9d6a61a0..db2713a02c 100644
--- a/core/src/main/java/org/apache/accumulo/core/conf/Property.java
+++ b/core/src/main/java/org/apache/accumulo/core/conf/Property.java
@@ -351,7 +351,18 @@
"Memory to provide to batchwriter to replay mutations for replication"),
TSERV_ASSIGNMENT_MAXCONCURRENT("tserver.assignment.concurrent.max", "2",
PropertyType.COUNT,
"The number of threads available to load tablets. Recoveries are still
performed serially."),
-
+ TSERV_COMPRESSOR_FACTORY("tserver.compressor.factory.class",
"org.apache.accumulo.core.file.rfile.bcfile.codec.CompressorFactory",
PropertyType.CLASSNAME,
+ "Tablet Server configuration for the compressor factory that will be
used when requesting compressors."),
+ TSERV_COMPRESSOR_IN_BUFFER("tserver.compressor.factory.input.buffer.size",
"1K", PropertyType.MEMORY,
+ "Tablet Server configuration for the compressor factory that adjusts the
input buffer size. Zero uses the full compression block size."),
+ TSERV_COMPRESSOR_OUT_BUFFER("tserver.compressor.factory.output.buffer.size",
"1K", PropertyType.MEMORY,
+ "Tablet Server configuration for the compressor factory that adjusts the
output buffer size. Default uses the full compression block size."),
+ TSERV_COMPRESSOR_POOL_IDLE("tserver.compressor.pool.max.idle", "25",
PropertyType.COUNT,
+ "Tablet Server configuration to contrain the maximum number of idle
compressors within the pool"),
+
TSERV_COMPRESSOR_POOL_IDLE_SWEEP_TIME("tserver.compressor.pool.max.idle.sweep.time",
"0ms", PropertyType.TIMEDURATION,
+ "Tablet Server configuration for max idle time between idle object
sweeps. Does not run if set to 0."),
+
TSERV_COMPRESSOR_POOL_IDLE_STORE_TIME("tserver.compressor.pool.max.idle.time",
"0ms", PropertyType.TIMEDURATION,
+ "Tablet Server configuration for max amount of time an idle
(de)compressor will be stored. Does not get evicted if > 0"),
// properties that are specific to logger server behavior
LOGGER_PREFIX("logger.", null, PropertyType.PREFIX, "Properties in this
category affect the behavior of the write-ahead logger servers"),
LOGGER_DIR("logger.dir.walog", "walogs", PropertyType.PATH, "This property
is only needed if Accumulo was upgraded from a 1.4 or earlier version. "
diff --git
a/core/src/main/java/org/apache/accumulo/core/file/rfile/bcfile/Compression.java
b/core/src/main/java/org/apache/accumulo/core/file/rfile/bcfile/Compression.java
index 2b8154102b..1961d76a74 100644
---
a/core/src/main/java/org/apache/accumulo/core/file/rfile/bcfile/Compression.java
+++
b/core/src/main/java/org/apache/accumulo/core/file/rfile/bcfile/Compression.java
@@ -26,11 +26,13 @@
import java.util.Map.Entry;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.atomic.AtomicBoolean;
+import java.util.concurrent.atomic.AtomicReference;
+import org.apache.accumulo.core.file.rfile.bcfile.codec.CompressorFactory;
+import
org.apache.accumulo.core.file.rfile.bcfile.codec.DefaultCompressorFactory;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
import org.apache.hadoop.conf.Configuration;
-import org.apache.hadoop.io.compress.CodecPool;
import org.apache.hadoop.io.compress.CompressionCodec;
import org.apache.hadoop.io.compress.CompressionInputStream;
import org.apache.hadoop.io.compress.CompressionOutputStream;
@@ -39,6 +41,7 @@
import org.apache.hadoop.io.compress.DefaultCodec;
import org.apache.hadoop.util.ReflectionUtils;
+import com.google.common.base.Preconditions;
import com.google.common.cache.CacheBuilder;
import com.google.common.cache.CacheLoader;
import com.google.common.cache.LoadingCache;
@@ -85,6 +88,21 @@ public void flush() throws IOException {
/** compression: none */
public static final String COMPRESSION_NONE = "none";
+ // data input buffer size to absorb small reads from application.
+ private static final int DATA_IBUF_SIZE_DEFAULT = 1 * 1024;
+ // data output buffer size to absorb small writes from application.
+ private static final int DATA_OBUF_SIZE_DEFAULT = 4 * 1024;
+
+ /**
+