milleruntime closed pull request #456: Add ZStandard compression codec. Fixes 
#438
URL: https://github.com/apache/accumulo/pull/456
 
 
   

This is a PR merged from a forked repository.
As GitHub hides the original diff on merge, it is displayed below for
the sake of provenance:

As this is a foreign pull request (from a fork), the diff is supplied
below (as it won't show otherwise due to GitHub magic):

diff --git 
a/client/mapreduce/src/main/java/org/apache/accumulo/core/client/mapreduce/lib/impl/FileOutputConfigurator.java
 
b/client/mapreduce/src/main/java/org/apache/accumulo/core/client/mapreduce/lib/impl/FileOutputConfigurator.java
index e803562de7..06a769ffba 100644
--- 
a/client/mapreduce/src/main/java/org/apache/accumulo/core/client/mapreduce/lib/impl/FileOutputConfigurator.java
+++ 
b/client/mapreduce/src/main/java/org/apache/accumulo/core/client/mapreduce/lib/impl/FileOutputConfigurator.java
@@ -135,14 +135,15 @@ public static AccumuloConfiguration 
getAccumuloConfiguration(Class<?> implementi
    * @param conf
    *          the Hadoop configuration object to configure
    * @param compressionType
-   *          one of "none", "gz", "lzo", or "snappy"
+   *          one of "none", "gz", "lzo", "snappy", or "zstd"
    * @since 1.6.0
    */
   public static void setCompressionType(Class<?> implementingClass, 
Configuration conf,
       String compressionType) {
     if (compressionType == null
-        || !Arrays.asList("none", "gz", "lzo", 
"snappy").contains(compressionType))
-      throw new IllegalArgumentException("Compression type must be one of: 
none, gz, lzo, snappy");
+        || !Arrays.asList("none", "gz", "lzo", "snappy", 
"zstd").contains(compressionType))
+      throw new IllegalArgumentException(
+          "Compression type must be one of: none, gz, lzo, snappy, zstd");
     setAccumuloProperty(implementingClass, conf, 
Property.TABLE_FILE_COMPRESSION_TYPE,
         compressionType);
   }
diff --git a/core/src/main/java/org/apache/accumulo/core/conf/Property.java 
b/core/src/main/java/org/apache/accumulo/core/conf/Property.java
index b9027528b9..3009975255 100644
--- a/core/src/main/java/org/apache/accumulo/core/conf/Property.java
+++ b/core/src/main/java/org/apache/accumulo/core/conf/Property.java
@@ -708,7 +708,7 @@
           + " to change the called Load Balancer for this table"),
   TABLE_FILE_COMPRESSION_TYPE("table.file.compress.type", "gz", 
PropertyType.STRING,
       "Compression algorithm used on index and data blocks before they are"
-          + " written. Possible values: gz, snappy, lzo, none"),
+          + " written. Possible values: zstd, gz, snappy, lzo, none"),
   TABLE_FILE_COMPRESSED_BLOCK_SIZE("table.file.compress.blocksize", "100K", 
PropertyType.BYTES,
       "The maximum size of data blocks in RFiles before they are compressed 
and written."),
   
TABLE_FILE_COMPRESSED_BLOCK_SIZE_INDEX("table.file.compress.blocksize.index", 
"128K",
diff --git 
a/core/src/main/java/org/apache/accumulo/core/file/rfile/bcfile/Compression.java
 
b/core/src/main/java/org/apache/accumulo/core/file/rfile/bcfile/Compression.java
index b979fcd505..71a740f395 100644
--- 
a/core/src/main/java/org/apache/accumulo/core/file/rfile/bcfile/Compression.java
+++ 
b/core/src/main/java/org/apache/accumulo/core/file/rfile/bcfile/Compression.java
@@ -76,6 +76,8 @@ public void flush() throws IOException {
     }
   }
 
+  /** compression: zStandard */
+  public static final String COMPRESSION_ZSTD = "zstd";
   /** snappy codec **/
   public static final String COMPRESSION_SNAPPY = "snappy";
   /** compression: gzip */
@@ -456,6 +458,122 @@ public boolean isSupported() {
 
         return snappyCodec != null;
       }
+    },
+
+    ZSTANDARD(COMPRESSION_ZSTD) {
+      // Use base type to avoid compile-time dependencies.
+      private transient CompressionCodec zstdCodec = null;
+      /**
+       * determines if we've checked the codec status. ensures we don't 
recreate the default codec
+       */
+      private final AtomicBoolean checked = new AtomicBoolean(false);
+      private static final String defaultClazz = 
"org.apache.hadoop.io.compress.ZStandardCodec";
+
+      /**
+       * Buffer size option
+       */
+      private static final String BUFFER_SIZE_OPT = 
"io.compression.codec.zstd.buffersize";
+
+      /**
+       * Default buffer size value
+       */
+      private static final int DEFAULT_BUFFER_SIZE = 64 * 1024;
+
+      @Override
+      public CompressionCodec getCodec() {
+        return zstdCodec;
+      }
+
+      @Override
+      public void initializeDefaultCodec() {
+        if (!checked.get()) {
+          checked.set(true);
+          zstdCodec = createNewCodec(DEFAULT_BUFFER_SIZE);
+        }
+      }
+
+      /**
+       * Creates a new ZStandard codec.
+       *
+       * @param bufferSize
+       *          incoming buffer size
+       * @return new codec or null, depending on if installed
+       */
+      @Override
+      protected CompressionCodec createNewCodec(final int bufferSize) {
+
+        String extClazz = (conf.get(CONF_ZSTD_CLASS) == null ? 
System.getProperty(CONF_ZSTD_CLASS)
+            : null);
+        String clazz = (extClazz != null) ? extClazz : defaultClazz;
+        try {
+          log.info("Trying to load ZStandard codec class: {}", clazz);
+
+          Configuration myConf = new Configuration(conf);
+          // only use the buffersize if > 0, otherwise we'll use
+          // the default defined within the codec
+          if (bufferSize > 0)
+            myConf.setInt(BUFFER_SIZE_OPT, bufferSize);
+
+          return (CompressionCodec) 
ReflectionUtils.newInstance(Class.forName(clazz), myConf);
+
+        } catch (ClassNotFoundException e) {
+          // that is okay
+        }
+
+        return null;
+      }
+
+      @Override
+      public OutputStream createCompressionStream(OutputStream downStream, 
Compressor compressor,
+          int downStreamBufferSize) throws IOException {
+
+        if (!isSupported()) {
+          throw new IOException(
+              "ZStandard codec class not specified. Did you forget to set 
property "
+                  + CONF_ZSTD_CLASS + "?");
+        }
+        OutputStream bos1;
+        if (downStreamBufferSize > 0) {
+          bos1 = new BufferedOutputStream(downStream, downStreamBufferSize);
+        } else {
+          bos1 = downStream;
+        }
+        // use the default codec
+        CompressionOutputStream cos = zstdCodec.createOutputStream(bos1, 
compressor);
+        BufferedOutputStream bos2 = new BufferedOutputStream(
+            new FinishOnFlushCompressionStream(cos), DATA_OBUF_SIZE);
+        return bos2;
+      }
+
+      @Override
+      public InputStream createDecompressionStream(InputStream downStream,
+          Decompressor decompressor, int downStreamBufferSize) throws 
IOException {
+        if (!isSupported()) {
+          throw new IOException(
+              "ZStandard codec class not specified. Did you forget to set 
property "
+                  + CONF_ZSTD_CLASS + "?");
+        }
+
+        CompressionCodec decomCodec = zstdCodec;
+        // if we're not using the same buffer size, we'll pull the codec from 
the loading cache
+        if (DEFAULT_BUFFER_SIZE != downStreamBufferSize) {
+          Entry<Algorithm,Integer> sizeOpt = Maps.immutableEntry(ZSTANDARD, 
downStreamBufferSize);
+          try {
+            decomCodec = codecCache.get(sizeOpt);
+          } catch (ExecutionException e) {
+            throw new IOException(e);
+          }
+        }
+
+        CompressionInputStream cis = decomCodec.createInputStream(downStream, 
decompressor);
+        BufferedInputStream bis2 = new BufferedInputStream(cis, 
DATA_IBUF_SIZE);
+        return bis2;
+      }
+
+      @Override
+      public boolean isSupported() {
+        return zstdCodec != null;
+      }
     };
 
     /**
@@ -502,6 +620,7 @@ public CompressionCodec load(Entry<Algorithm,Integer> key) {
     private static final int DATA_OBUF_SIZE = 4 * 1024;
     public static final String CONF_LZO_CLASS = 
"io.compression.codec.lzo.class";
     public static final String CONF_SNAPPY_CLASS = 
"io.compression.codec.snappy.class";
+    public static final String CONF_ZSTD_CLASS = 
"io.compression.codec.zstd.class";
 
     Algorithm(String name) {
       this.compressName = name;


 

----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services

Reply via email to