HADOOP-13578. Add Codec for ZStandard Compression. Contributed by churro morales


Project: http://git-wip-us.apache.org/repos/asf/hadoop/repo
Commit: http://git-wip-us.apache.org/repos/asf/hadoop/commit/a0a27616
Tree: http://git-wip-us.apache.org/repos/asf/hadoop/tree/a0a27616
Diff: http://git-wip-us.apache.org/repos/asf/hadoop/diff/a0a27616

Branch: refs/heads/trunk
Commit: a0a276162147e843a5a4e028abdca5b66f5118da
Parents: e49e0a6
Author: Jason Lowe <jl...@apache.org>
Authored: Wed Jan 4 14:46:25 2017 +0000
Committer: Jason Lowe <jl...@apache.org>
Committed: Wed Jan 4 14:46:25 2017 +0000

----------------------------------------------------------------------
 BUILDING.txt                                    |  25 +
 dev-support/bin/dist-copynativelibs             |  13 +
 hadoop-common-project/hadoop-common/pom.xml     |  21 +
 .../hadoop-common/src/CMakeLists.txt            |  29 ++
 .../hadoop-common/src/config.h.cmake            |   1 +
 .../hadoop/fs/CommonConfigurationKeys.java      |  16 +
 .../apache/hadoop/io/compress/Decompressor.java |   2 +-
 .../hadoop/io/compress/ZStandardCodec.java      | 242 +++++++++
 .../io/compress/zstd/ZStandardCompressor.java   | 305 ++++++++++++
 .../io/compress/zstd/ZStandardDecompressor.java | 323 ++++++++++++
 .../hadoop/io/compress/zstd/package-info.java   |  22 +
 .../apache/hadoop/util/NativeCodeLoader.java    |   5 +
 .../hadoop/util/NativeLibraryChecker.java       |  13 +-
 .../io/compress/zstd/ZStandardCompressor.c      | 259 ++++++++++
 .../io/compress/zstd/ZStandardDecompressor.c    | 218 +++++++++
 .../zstd/org_apache_hadoop_io_compress_zstd.h   |  34 ++
 .../org/apache/hadoop/util/NativeCodeLoader.c   |  11 +
 ...g.apache.hadoop.io.compress.CompressionCodec |   1 +
 .../src/site/markdown/NativeLibraries.md.vm     |   1 +
 .../apache/hadoop/io/compress/TestCodec.java    |  12 +
 .../io/compress/TestCompressionStreamReuse.java |   8 +
 .../TestZStandardCompressorDecompressor.java    | 485 +++++++++++++++++++
 .../src/test/resources/zstd/test_file.txt       |  71 +++
 .../src/test/resources/zstd/test_file.txt.zst   | Bin 0 -> 3690 bytes
 .../hadoop-mapreduce-client-nativetask/pom.xml  |   8 +
 hadoop-project-dist/pom.xml                     |   6 +
 hadoop-project/pom.xml                          |   2 +
 27 files changed, 2130 insertions(+), 3 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/hadoop/blob/a0a27616/BUILDING.txt
----------------------------------------------------------------------
diff --git a/BUILDING.txt b/BUILDING.txt
index 7afc3f0..a1721ba 100644
--- a/BUILDING.txt
+++ b/BUILDING.txt
@@ -83,6 +83,8 @@ Optional packages:
   $ sudo apt-get install libjansson-dev
 * Linux FUSE
   $ sudo apt-get install fuse libfuse-dev
+* ZStandard compression
+    $ sudo apt-get install zstd
 
 
----------------------------------------------------------------------------------
 Maven main modules:
@@ -155,6 +157,29 @@ Maven build goals:
     and it ignores the -Dsnappy.prefix option. If -Dsnappy.lib isn't given, the
     bundling and building will fail.
 
+
+ ZStandard build options:
+
+   ZStandard is a compression library that can be utilized by the native code.
+   It is currently an optional component, meaning that Hadoop can be built with
+   or without this dependency.
+
+  * Use -Drequire.zstd to fail the build if libzstd.so is not found.
+    If this option is not specified and the zstd library is missing.
+
+  * Use -Dzstd.prefix to specify a nonstandard location for the libzstd
+    header files and library files. You do not need this option if you have
+    installed zstandard using a package manager.
+
+  * Use -Dzstd.lib to specify a nonstandard location for the libzstd library
+    files.  Similarly to zstd.prefix, you do not need this option if you have
+    installed using a package manager.
+
+  * Use -Dbundle.zstd to copy the contents of the zstd.lib directory into
+    the final tar file. This option requires that -Dzstd.lib is also given,
+    and it ignores the -Dzstd.prefix option. If -Dzstd.lib isn't given, the
+    bundling and building will fail.
+
  OpenSSL build options:
 
    OpenSSL includes a crypto library that can be utilized by the native code.

http://git-wip-us.apache.org/repos/asf/hadoop/blob/a0a27616/dev-support/bin/dist-copynativelibs
----------------------------------------------------------------------
diff --git a/dev-support/bin/dist-copynativelibs 
b/dev-support/bin/dist-copynativelibs
index efe250b..67d2edf 100755
--- a/dev-support/bin/dist-copynativelibs
+++ b/dev-support/bin/dist-copynativelibs
@@ -114,6 +114,15 @@ for i in "$@"; do
     --snappylibbundle=*)
       SNAPPYLIBBUNDLE=${i#*=}
     ;;
+    --zstdbinbundle=*)
+      ZSTDBINBUNDLE=${i#*=}
+    ;;
+     --zstdlib=*)
+      ZSTDLIB=${i#*=}
+    ;;
+    --zstdlibbundle=*)
+      ZSTDLIBBUNDLE=${i#*=}
+    ;;
 
   esac
 done
@@ -139,6 +148,8 @@ if [[ -d "${LIB_DIR}" ]]; then
 
   bundle_native_lib "${SNAPPYLIBBUNDLE}" "snappy.lib" "snappy" "${SNAPPYLIB}"
 
+  bundle_native_lib "${ZSTDLIBBUNDLE}" "zstd.lib" "zstd" "${ZSTDLIB}"
+
   bundle_native_lib "${OPENSSLLIBBUNDLE}" "openssl.lib" "crypto" 
"${OPENSSLLIB}"
 
   bundle_native_lib "${ISALBUNDLE}" "isal.lib" "isa" "${ISALLIB}"
@@ -159,6 +170,8 @@ if [[ -d "${BIN_DIR}" ]] ; then
 
   bundle_native_bin "${SNAPPYBINBUNDLE}" "${SNAPPYLIBBUNDLE}" "snappy.lib" 
"snappy" "${SNAPPYLIB}"
 
+  bundle_native_bin "${ZSTDBINBUNDLE}" "${ZSTDLIBBUNDLE}" "zstd.lib" "zstd" 
"${ZSTDLIB}"
+
   bundle_native_bin "${OPENSSLBINBUNDLE}" "${OPENSSLLIBBUNDLE}" "openssl.lib" 
"crypto" "${OPENSSLLIB}"
 
 fi

http://git-wip-us.apache.org/repos/asf/hadoop/blob/a0a27616/hadoop-common-project/hadoop-common/pom.xml
----------------------------------------------------------------------
diff --git a/hadoop-common-project/hadoop-common/pom.xml 
b/hadoop-common-project/hadoop-common/pom.xml
index c9b282f..b616074 100644
--- a/hadoop-common-project/hadoop-common/pom.xml
+++ b/hadoop-common-project/hadoop-common/pom.xml
@@ -595,6 +595,10 @@
         <snappy.lib></snappy.lib>
         <snappy.include></snappy.include>
         <require.snappy>false</require.snappy>
+        <zstd.prefix></zstd.prefix>
+        <zstd.lib></zstd.lib>
+        <zstd.include></zstd.include>
+        <require.zstd>false</require.zstd>
         <openssl.prefix></openssl.prefix>
         <openssl.lib></openssl.lib>
         <openssl.include></openssl.include>
@@ -652,6 +656,8 @@
                     
<javahClassName>org.apache.hadoop.security.JniBasedUnixGroupsNetgroupMapping</javahClassName>
                     
<javahClassName>org.apache.hadoop.io.compress.snappy.SnappyCompressor</javahClassName>
                     
<javahClassName>org.apache.hadoop.io.compress.snappy.SnappyDecompressor</javahClassName>
+                    
<javahClassName>org.apache.hadoop.io.compress.zstd.ZStandardCompressor</javahClassName>
+                    
<javahClassName>org.apache.hadoop.io.compress.zstd.ZStandardDecompressor</javahClassName>
                     
<javahClassName>org.apache.hadoop.io.compress.lz4.Lz4Compressor</javahClassName>
                     
<javahClassName>org.apache.hadoop.io.compress.lz4.Lz4Decompressor</javahClassName>
                     
<javahClassName>org.apache.hadoop.io.erasurecode.ErasureCodeNative</javahClassName>
@@ -685,9 +691,13 @@
                     
<JVM_ARCH_DATA_MODEL>${sun.arch.data.model}</JVM_ARCH_DATA_MODEL>
                     <REQUIRE_BZIP2>${require.bzip2}</REQUIRE_BZIP2>
                     <REQUIRE_SNAPPY>${require.snappy}</REQUIRE_SNAPPY>
+                    <REQUIRE_ZSTD>${require.zstd}</REQUIRE_ZSTD>
                     
<CUSTOM_SNAPPY_PREFIX>${snappy.prefix}</CUSTOM_SNAPPY_PREFIX>
                     <CUSTOM_SNAPPY_LIB>${snappy.lib} </CUSTOM_SNAPPY_LIB>
                     <CUSTOM_SNAPPY_INCLUDE>${snappy.include} 
</CUSTOM_SNAPPY_INCLUDE>
+                    <CUSTOM_ZSTD_PREFIX>${zstd.prefix}</CUSTOM_ZSTD_PREFIX>
+                    <CUSTOM_ZSTD_LIB>${zstd.lib} </CUSTOM_ZSTD_LIB>
+                    <CUSTOM_ZSTD_INCLUDE>${zstd.include} </CUSTOM_ZSTD_INCLUDE>
                     <REQUIRE_ISAL>${require.isal} </REQUIRE_ISAL>
                     <CUSTOM_ISAL_PREFIX>${isal.prefix} </CUSTOM_ISAL_PREFIX>
                     <CUSTOM_ISAL_LIB>${isal.lib} </CUSTOM_ISAL_LIB>
@@ -745,6 +755,11 @@
         <isal.lib></isal.lib>
         <require.snappy>false</require.snappy>
         <bundle.snappy.in.bin>true</bundle.snappy.in.bin>
+        <zstd.prefix></zstd.prefix>
+        <zstd.lib></zstd.lib>
+        <zstd.include></zstd.include>
+        <require.ztsd>false</require.ztsd>
+        <bundle.zstd.in.bin>true</bundle.zstd.in.bin>
         <openssl.prefix></openssl.prefix>
         <openssl.lib></openssl.lib>
         <openssl.include></openssl.include>
@@ -794,6 +809,8 @@
                     
<javahClassName>org.apache.hadoop.security.JniBasedUnixGroupsNetgroupMapping</javahClassName>
                     
<javahClassName>org.apache.hadoop.io.compress.snappy.SnappyCompressor</javahClassName>
                     
<javahClassName>org.apache.hadoop.io.compress.snappy.SnappyDecompressor</javahClassName>
+                    
<javahClassName>org.apache.hadoop.io.compress.zstd.ZStandardCompressor</javahClassName>
+                    
<javahClassName>org.apache.hadoop.io.compress.zstd.ZStandardDecompressor</javahClassName>
                     
<javahClassName>org.apache.hadoop.io.compress.lz4.Lz4Compressor</javahClassName>
                     
<javahClassName>org.apache.hadoop.io.compress.lz4.Lz4Decompressor</javahClassName>
                     
<javahClassName>org.apache.hadoop.io.erasurecode.ErasureCodeNative</javahClassName>
@@ -850,6 +867,10 @@
                     <argument>/p:CustomSnappyLib=${snappy.lib}</argument>
                     
<argument>/p:CustomSnappyInclude=${snappy.include}</argument>
                     <argument>/p:RequireSnappy=${require.snappy}</argument>
+                    <argument>/p:CustomZstdPrefix=${zstd.prefix}</argument>
+                    <argument>/p:CustomZstdLib=${zstd.lib}</argument>
+                    <argument>/p:CustomZstdInclude=${zstd.include}</argument>
+                    <argument>/p:RequireZstd=${require.ztsd}</argument>
                     
<argument>/p:CustomOpensslPrefix=${openssl.prefix}</argument>
                     <argument>/p:CustomOpensslLib=${openssl.lib}</argument>
                     
<argument>/p:CustomOpensslInclude=${openssl.include}</argument>

http://git-wip-us.apache.org/repos/asf/hadoop/blob/a0a27616/hadoop-common-project/hadoop-common/src/CMakeLists.txt
----------------------------------------------------------------------
diff --git a/hadoop-common-project/hadoop-common/src/CMakeLists.txt 
b/hadoop-common-project/hadoop-common/src/CMakeLists.txt
index 8317a46..10b0f23 100644
--- a/hadoop-common-project/hadoop-common/src/CMakeLists.txt
+++ b/hadoop-common-project/hadoop-common/src/CMakeLists.txt
@@ -94,6 +94,33 @@ else()
     endif()
 endif()
 
+# Require zstandard
+SET(STORED_CMAKE_FIND_LIBRARY_SUFFIXES ${CMAKE_FIND_LIBRARY_SUFFIXES})
+hadoop_set_find_shared_library_version("1")
+find_library(ZSTD_LIBRARY
+    NAMES zstd
+    PATHS ${CUSTOM_ZSTD_PREFIX} ${CUSTOM_ZSTD_PREFIX}/lib
+          ${CUSTOM_ZSTD_PREFIX}/lib64 ${CUSTOM_ZSTD_LIB})
+SET(CMAKE_FIND_LIBRARY_SUFFIXES ${STORED_CMAKE_FIND_LIBRARY_SUFFIXES})
+find_path(ZSTD_INCLUDE_DIR
+    NAMES zstd.h
+    PATHS ${CUSTOM_ZSTD_PREFIX} ${CUSTOM_ZSTD_PREFIX}/include
+          ${CUSTOM_ZSTD_INCLUDE})
+if (ZSTD_LIBRARY AND ZSTD_INCLUDE_DIR)
+    GET_FILENAME_COMPONENT(HADOOP_ZSTD_LIBRARY ${ZSTD_LIBRARY} NAME)
+    set(ZSTD_SOURCE_FILES
+        "${SRC}/io/compress/zstd/ZStandardCompressor.c"
+        "${SRC}/io/compress/zstd/ZStandardDecompressor.c")
+    set(REQUIRE_ZSTD ${REQUIRE_ZSTD}) # Stop warning about unused variable.
+        message(STATUS "Found ZStandard: ${ZSTD_LIBRARY}")
+else ()
+    set(ZSTD_INCLUDE_DIR "")
+    set(ZSTD_SOURCE_FILES "")
+    IF(REQUIRE_ZSTD)
+        MESSAGE(FATAL_ERROR "Required zstandard library could not be found.  
ZSTD_LIBRARY=${ZSTD_LIBRARY}, ZSTD_INCLUDE_DIR=${ZSTD_INCLUDE_DIR}, 
CUSTOM_ZSTD_INCLUDE_DIR=${CUSTOM_ZSTD_INCLUDE_DIR}, 
CUSTOM_ZSTD_PREFIX=${CUSTOM_ZSTD_PREFIX}, 
CUSTOM_ZSTD_INCLUDE=${CUSTOM_ZSTD_INCLUDE}")
+    ENDIF(REQUIRE_ZSTD)
+endif ()
+
 set(STORED_CMAKE_FIND_LIBRARY_SUFFIXES ${CMAKE_FIND_LIBRARY_SUFFIXES})
 hadoop_set_find_shared_library_version("2")
 find_library(ISAL_LIBRARY
@@ -208,6 +235,7 @@ include_directories(
     ${BZIP2_INCLUDE_DIR}
     ${SNAPPY_INCLUDE_DIR}
     ${ISAL_INCLUDE_DIR}
+    ${ZSTD_INCLUDE_DIR}
     ${OPENSSL_INCLUDE_DIR}
     ${SRC}/util
 )
@@ -222,6 +250,7 @@ hadoop_add_dual_library(hadoop
     ${SRC}/io/compress/lz4/lz4hc.c
     ${ISAL_SOURCE_FILES}
     ${SNAPPY_SOURCE_FILES}
+    ${ZSTD_SOURCE_FILES}
     ${OPENSSL_SOURCE_FILES}
     ${SRC}/io/compress/zlib/ZlibCompressor.c
     ${SRC}/io/compress/zlib/ZlibDecompressor.c

http://git-wip-us.apache.org/repos/asf/hadoop/blob/a0a27616/hadoop-common-project/hadoop-common/src/config.h.cmake
----------------------------------------------------------------------
diff --git a/hadoop-common-project/hadoop-common/src/config.h.cmake 
b/hadoop-common-project/hadoop-common/src/config.h.cmake
index 445cc33..40aa467 100644
--- a/hadoop-common-project/hadoop-common/src/config.h.cmake
+++ b/hadoop-common-project/hadoop-common/src/config.h.cmake
@@ -21,6 +21,7 @@
 #cmakedefine HADOOP_ZLIB_LIBRARY "@HADOOP_ZLIB_LIBRARY@"
 #cmakedefine HADOOP_BZIP2_LIBRARY "@HADOOP_BZIP2_LIBRARY@"
 #cmakedefine HADOOP_SNAPPY_LIBRARY "@HADOOP_SNAPPY_LIBRARY@"
+#cmakedefine HADOOP_ZSTD_LIBRARY "@HADOOP_ZSTD_LIBRARY@"
 #cmakedefine HADOOP_OPENSSL_LIBRARY "@HADOOP_OPENSSL_LIBRARY@"
 #cmakedefine HADOOP_ISAL_LIBRARY "@HADOOP_ISAL_LIBRARY@"
 #cmakedefine HAVE_SYNC_FILE_RANGE

http://git-wip-us.apache.org/repos/asf/hadoop/blob/a0a27616/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/fs/CommonConfigurationKeys.java
----------------------------------------------------------------------
diff --git 
a/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/fs/CommonConfigurationKeys.java
 
b/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/fs/CommonConfigurationKeys.java
index fe522b3..b8a60d6 100644
--- 
a/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/fs/CommonConfigurationKeys.java
+++ 
b/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/fs/CommonConfigurationKeys.java
@@ -141,6 +141,22 @@ public class CommonConfigurationKeys extends 
CommonConfigurationKeysPublic {
   public static final int IO_COMPRESSION_CODEC_SNAPPY_BUFFERSIZE_DEFAULT =
       256 * 1024;
 
+  /** ZStandard compression level. */
+  public static final String IO_COMPRESSION_CODEC_ZSTD_LEVEL_KEY =
+      "io.compression.codec.zstd.level";
+
+  /** Default value for IO_COMPRESSION_CODEC_ZSTD_LEVEL_KEY. */
+  public static final int IO_COMPRESSION_CODEC_ZSTD_LEVEL_DEFAULT = 3;
+
+  /** ZStandard buffer size. */
+  public static final String IO_COMPRESSION_CODEC_ZSTD_BUFFER_SIZE_KEY =
+      "io.compression.codec.zstd.buffersize";
+
+  /** ZStandard buffer size a value of 0 means use the recommended zstd
+   * buffer size that the library recommends. */
+  public static final int
+      IO_COMPRESSION_CODEC_ZSTD_BUFFER_SIZE_DEFAULT = 0;
+
   /** Internal buffer size for Lz4 compressor/decompressors */
   public static final String IO_COMPRESSION_CODEC_LZ4_BUFFERSIZE_KEY =
       "io.compression.codec.lz4.buffersize";

http://git-wip-us.apache.org/repos/asf/hadoop/blob/a0a27616/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/io/compress/Decompressor.java
----------------------------------------------------------------------
diff --git 
a/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/io/compress/Decompressor.java
 
b/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/io/compress/Decompressor.java
index 8cb0b2a..3808003 100644
--- 
a/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/io/compress/Decompressor.java
+++ 
b/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/io/compress/Decompressor.java
@@ -95,7 +95,7 @@ public interface Decompressor {
    * @param b Buffer for the compressed data
    * @param off Start offset of the data
    * @param len Size of the buffer
-   * @return The actual number of bytes of compressed data.
+   * @return The actual number of bytes of uncompressed data.
    * @throws IOException
    */
   public int decompress(byte[] b, int off, int len) throws IOException;

http://git-wip-us.apache.org/repos/asf/hadoop/blob/a0a27616/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/io/compress/ZStandardCodec.java
----------------------------------------------------------------------
diff --git 
a/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/io/compress/ZStandardCodec.java
 
b/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/io/compress/ZStandardCodec.java
new file mode 100644
index 0000000..c56bbba
--- /dev/null
+++ 
b/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/io/compress/ZStandardCodec.java
@@ -0,0 +1,242 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.io.compress;
+
+import org.apache.hadoop.conf.Configurable;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.CommonConfigurationKeys;
+import org.apache.hadoop.io.compress.zstd.ZStandardCompressor;
+import org.apache.hadoop.io.compress.zstd.ZStandardDecompressor;
+import org.apache.hadoop.util.NativeCodeLoader;
+
+import java.io.IOException;
+import java.io.InputStream;
+import java.io.OutputStream;
+
+import static 
org.apache.hadoop.fs.CommonConfigurationKeys.IO_COMPRESSION_CODEC_ZSTD_BUFFER_SIZE_DEFAULT;
+import static 
org.apache.hadoop.fs.CommonConfigurationKeys.IO_COMPRESSION_CODEC_ZSTD_BUFFER_SIZE_KEY;
+
+/**
+ * This class creates zstd compressors/decompressors.
+ */
+public class ZStandardCodec implements
+    Configurable, CompressionCodec, DirectDecompressionCodec  {
+  private Configuration conf;
+
+  /**
+   * Set the configuration to be used by this object.
+   *
+   * @param conf the configuration object.
+   */
+  @Override
+  public void setConf(Configuration conf) {
+    this.conf = conf;
+  }
+
+  /**
+   * Return the configuration used by this object.
+   *
+   * @return the configuration object used by this object.
+   */
+  @Override
+  public Configuration getConf() {
+    return conf;
+  }
+
+  public static void checkNativeCodeLoaded() {
+    if (!NativeCodeLoader.isNativeCodeLoaded() ||
+        !NativeCodeLoader.buildSupportsZstd()) {
+      throw new RuntimeException("native zStandard library "
+          + "not available: this version of libhadoop was built "
+          + "without zstd support.");
+    }
+    if (!ZStandardCompressor.isNativeCodeLoaded()) {
+      throw new RuntimeException("native zStandard library not "
+          + "available: ZStandardCompressor has not been loaded.");
+    }
+    if (!ZStandardDecompressor.isNativeCodeLoaded()) {
+      throw new RuntimeException("native zStandard library not "
+          + "available: ZStandardDecompressor has not been loaded.");
+    }
+  }
+
+  public static boolean isNativeCodeLoaded() {
+    return ZStandardCompressor.isNativeCodeLoaded()
+        && ZStandardDecompressor.isNativeCodeLoaded();
+  }
+
+  public static String getLibraryName() {
+    return ZStandardCompressor.getLibraryName();
+  }
+
+  public static int getCompressionLevel(Configuration conf) {
+    return conf.getInt(
+        CommonConfigurationKeys.IO_COMPRESSION_CODEC_ZSTD_LEVEL_KEY,
+        CommonConfigurationKeys.IO_COMPRESSION_CODEC_ZSTD_LEVEL_DEFAULT);
+  }
+
+  public static int getCompressionBufferSize(Configuration conf) {
+    int bufferSize = getBufferSize(conf);
+    return bufferSize == 0 ?
+        ZStandardCompressor.getRecommendedBufferSize() :
+        bufferSize;
+  }
+
+  public static int getDecompressionBufferSize(Configuration conf) {
+    int bufferSize = getBufferSize(conf);
+    return bufferSize == 0 ?
+        ZStandardDecompressor.getRecommendedBufferSize() :
+        bufferSize;
+  }
+
+  private static int getBufferSize(Configuration conf) {
+    return conf.getInt(IO_COMPRESSION_CODEC_ZSTD_BUFFER_SIZE_KEY,
+        IO_COMPRESSION_CODEC_ZSTD_BUFFER_SIZE_DEFAULT);
+  }
+
+  /**
+   * Create a {@link CompressionOutputStream} that will write to the given
+   * {@link OutputStream}.
+   *
+   * @param out the location for the final output stream
+   * @return a stream the user can write uncompressed data to have compressed
+   * @throws IOException
+   */
+  @Override
+  public CompressionOutputStream createOutputStream(OutputStream out)
+      throws IOException {
+    return Util.
+        createOutputStreamWithCodecPool(this, conf, out);
+  }
+
+  /**
+   * Create a {@link CompressionOutputStream} that will write to the given
+   * {@link OutputStream} with the given {@link Compressor}.
+   *
+   * @param out        the location for the final output stream
+   * @param compressor compressor to use
+   * @return a stream the user can write uncompressed data to have compressed
+   * @throws IOException
+   */
+  @Override
+  public CompressionOutputStream createOutputStream(OutputStream out,
+      Compressor compressor)
+      throws IOException {
+    checkNativeCodeLoaded();
+    return new CompressorStream(out, compressor,
+        getCompressionBufferSize(conf));
+  }
+
+  /**
+   * Get the type of {@link Compressor} needed by this {@link 
CompressionCodec}.
+   *
+   * @return the type of compressor needed by this codec.
+   */
+  @Override
+  public Class<? extends Compressor> getCompressorType() {
+    checkNativeCodeLoaded();
+    return ZStandardCompressor.class;
+  }
+
+  /**
+   * Create a new {@link Compressor} for use by this {@link CompressionCodec}.
+   *
+   * @return a new compressor for use by this codec
+   */
+  @Override
+  public Compressor createCompressor() {
+    checkNativeCodeLoaded();
+    return new ZStandardCompressor(
+        getCompressionLevel(conf), getCompressionBufferSize(conf));
+  }
+
+
+  /**
+   * Create a {@link CompressionInputStream} that will read from the given
+   * input stream.
+   *
+   * @param in the stream to read compressed bytes from
+   * @return a stream to read uncompressed bytes from
+   * @throws IOException
+   */
+  @Override
+  public CompressionInputStream createInputStream(InputStream in)
+      throws IOException {
+    return Util.
+        createInputStreamWithCodecPool(this, conf, in);
+  }
+
+  /**
+   * Create a {@link CompressionInputStream} that will read from the given
+   * {@link InputStream} with the given {@link Decompressor}.
+   *
+   * @param in           the stream to read compressed bytes from
+   * @param decompressor decompressor to use
+   * @return a stream to read uncompressed bytes from
+   * @throws IOException
+   */
+  @Override
+  public CompressionInputStream createInputStream(InputStream in,
+                                                  Decompressor decompressor)
+      throws IOException {
+    checkNativeCodeLoaded();
+    return new DecompressorStream(in, decompressor,
+        getDecompressionBufferSize(conf));
+  }
+
+  /**
+   * Get the type of {@link Decompressor} needed by
+   * this {@link CompressionCodec}.
+   *
+   * @return the type of decompressor needed by this codec.
+   */
+  @Override
+  public Class<? extends Decompressor> getDecompressorType() {
+    checkNativeCodeLoaded();
+    return ZStandardDecompressor.class;
+  }
+
+  /**
+   * Create a new {@link Decompressor} for use by this {@link 
CompressionCodec}.
+   *
+   * @return a new decompressor for use by this codec
+   */
+  @Override
+  public Decompressor createDecompressor() {
+    checkNativeCodeLoaded();
+    return new ZStandardDecompressor(getDecompressionBufferSize(conf));
+  }
+
+  /**
+   * Get the default filename extension for this kind of compression.
+   *
+   * @return <code>.zst</code>.
+   */
+  @Override
+  public String getDefaultExtension() {
+    return ".zst";
+  }
+
+  @Override
+  public DirectDecompressor createDirectDecompressor() {
+    return new ZStandardDecompressor.ZStandardDirectDecompressor(
+        getDecompressionBufferSize(conf)
+    );
+  }
+}

http://git-wip-us.apache.org/repos/asf/hadoop/blob/a0a27616/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/io/compress/zstd/ZStandardCompressor.java
----------------------------------------------------------------------
diff --git 
a/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/io/compress/zstd/ZStandardCompressor.java
 
b/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/io/compress/zstd/ZStandardCompressor.java
new file mode 100644
index 0000000..eb2121a
--- /dev/null
+++ 
b/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/io/compress/zstd/ZStandardCompressor.java
@@ -0,0 +1,305 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.io.compress.zstd;
+
+import com.google.common.annotations.VisibleForTesting;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.CommonConfigurationKeys;
+import org.apache.hadoop.fs.CommonConfigurationKeysPublic;
+import org.apache.hadoop.io.compress.Compressor;
+import org.apache.hadoop.io.compress.ZStandardCodec;
+import org.apache.hadoop.util.NativeCodeLoader;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.io.IOException;
+import java.nio.ByteBuffer;
+
+/**
+ * A {@link Compressor} based on the zStandard compression algorithm.
+ * https://github.com/facebook/zstd
+ */
+public class ZStandardCompressor implements Compressor {
+
+  private static final Logger LOG =
+      LoggerFactory.getLogger(ZStandardCompressor.class);
+
+  private long stream;
+  private int level;
+  private int directBufferSize;
+  private byte[] userBuf = null;
+  private int userBufOff = 0, userBufLen = 0;
+  private ByteBuffer uncompressedDirectBuf = null;
+  private int uncompressedDirectBufOff = 0, uncompressedDirectBufLen = 0;
+  private boolean keepUncompressedBuf = false;
+  private ByteBuffer compressedDirectBuf = null;
+  private boolean finish, finished;
+  private long bytesRead = 0;
+  private long bytesWritten = 0;
+
+  private static boolean nativeZStandardLoaded = false;
+
+  static {
+    if (NativeCodeLoader.isNativeCodeLoaded()) {
+      try {
+        // Initialize the native library
+        initIDs();
+        nativeZStandardLoaded = true;
+      } catch (Throwable t) {
+        LOG.warn("Error loading zstandard native libraries: " + t);
+      }
+    }
+  }
+
+  public static boolean isNativeCodeLoaded() {
+    return nativeZStandardLoaded;
+  }
+
+  public static int getRecommendedBufferSize() {
+    return getStreamSize();
+  }
+
+  @VisibleForTesting
+  ZStandardCompressor() {
+    this(CommonConfigurationKeys.IO_COMPRESSION_CODEC_ZSTD_LEVEL_DEFAULT,
+        CommonConfigurationKeysPublic.IO_FILE_BUFFER_SIZE_DEFAULT);
+  }
+
+  /**
+   * Creates a new compressor with the default compression level.
+   * Compressed data will be generated in ZStandard format.
+   */
+  public ZStandardCompressor(int level, int bufferSize) {
+    this(level, bufferSize, bufferSize);
+  }
+
+  @VisibleForTesting
+  ZStandardCompressor(int level, int inputBufferSize, int outputBufferSize) {
+    this.level = level;
+    stream = create();
+    this.directBufferSize = outputBufferSize;
+    uncompressedDirectBuf = ByteBuffer.allocateDirect(inputBufferSize);
+    compressedDirectBuf = ByteBuffer.allocateDirect(outputBufferSize);
+    compressedDirectBuf.position(outputBufferSize);
+    reset();
+  }
+
+  /**
+   * Prepare the compressor to be used in a new stream with settings defined in
+   * the given Configuration. It will reset the compressor's compression level
+   * and compression strategy.
+   *
+   * @param conf Configuration storing new settings
+   */
+  @Override
+  public void reinit(Configuration conf) {
+    if (conf == null) {
+      return;
+    }
+    level = ZStandardCodec.getCompressionLevel(conf);
+    reset();
+    LOG.debug("Reinit compressor with new compression configuration");
+  }
+
+  @Override
+  public void setInput(byte[] b, int off, int len) {
+    if (b == null) {
+      throw new NullPointerException();
+    }
+    if (off < 0 || len < 0 || off > b.length - len) {
+      throw new ArrayIndexOutOfBoundsException();
+    }
+
+    this.userBuf = b;
+    this.userBufOff = off;
+    this.userBufLen = len;
+    uncompressedDirectBufOff = 0;
+    setInputFromSavedData();
+
+    compressedDirectBuf.limit(directBufferSize);
+    compressedDirectBuf.position(directBufferSize);
+  }
+
+  //copy enough data from userBuf to uncompressedDirectBuf
+  private void setInputFromSavedData() {
+    int len = Math.min(userBufLen, uncompressedDirectBuf.remaining());
+    uncompressedDirectBuf.put(userBuf, userBufOff, len);
+    userBufLen -= len;
+    userBufOff += len;
+    uncompressedDirectBufLen = uncompressedDirectBuf.position();
+  }
+
+  @Override
+  public void setDictionary(byte[] b, int off, int len) {
+    throw new UnsupportedOperationException(
+        "Dictionary support is not enabled");
+  }
+
+  @Override
+  public boolean needsInput() {
+    // Consume remaining compressed data?
+    if (compressedDirectBuf.remaining() > 0) {
+      return false;
+    }
+
+    // have we consumed all input
+    if (keepUncompressedBuf && uncompressedDirectBufLen > 0) {
+      return false;
+    }
+
+    if (uncompressedDirectBuf.remaining() > 0) {
+      // Check if we have consumed all user-input
+      if (userBufLen <= 0) {
+        return true;
+      } else {
+        // copy enough data from userBuf to uncompressedDirectBuf
+        setInputFromSavedData();
+        // uncompressedDirectBuf is not full
+        return uncompressedDirectBuf.remaining() > 0;
+      }
+    }
+
+    return false;
+  }
+
+  @Override
+  public void finish() {
+    finish = true;
+  }
+
+  @Override
+  public boolean finished() {
+    // Check if 'zstd' says its 'finished' and all compressed
+    // data has been consumed
+    return (finished && compressedDirectBuf.remaining() == 0);
+  }
+
+  @Override
+  public int compress(byte[] b, int off, int len) throws IOException {
+    checkStream();
+    if (b == null) {
+      throw new NullPointerException();
+    }
+    if (off < 0 || len < 0 || off > b.length - len) {
+      throw new ArrayIndexOutOfBoundsException();
+    }
+
+    // Check if there is compressed data
+    int n = compressedDirectBuf.remaining();
+    if (n > 0) {
+      n = Math.min(n, len);
+      compressedDirectBuf.get(b, off, n);
+      return n;
+    }
+
+    // Re-initialize the output direct buffer
+    compressedDirectBuf.rewind();
+    compressedDirectBuf.limit(directBufferSize);
+
+    // Compress data
+    n = deflateBytesDirect(
+        uncompressedDirectBuf,
+        uncompressedDirectBufOff,
+        uncompressedDirectBufLen,
+        compressedDirectBuf,
+        directBufferSize
+    );
+    compressedDirectBuf.limit(n);
+
+    // Check if we have consumed all input buffer
+    if (uncompressedDirectBufLen <= 0) {
+      // consumed all input buffer
+      keepUncompressedBuf = false;
+      uncompressedDirectBuf.clear();
+      uncompressedDirectBufOff = 0;
+      uncompressedDirectBufLen = 0;
+    } else {
+      //  did not consume all input buffer
+      keepUncompressedBuf = true;
+    }
+
+    // Get at most 'len' bytes
+    n = Math.min(n, len);
+    compressedDirectBuf.get(b, off, n);
+    return n;
+  }
+
+  /**
+   * Returns the total number of compressed bytes output so far.
+   *
+   * @return the total (non-negative) number of compressed bytes output so far
+   */
+  @Override
+  public long getBytesWritten() {
+    checkStream();
+    return bytesWritten;
+  }
+
+  /**
+   * <p>Returns the total number of uncompressed bytes input so far.</p>
+   *
+   * @return the total (non-negative) number of uncompressed bytes input so far
+   */
+  @Override
+  public long getBytesRead() {
+    checkStream();
+    return bytesRead;
+  }
+
+  @Override
+  public void reset() {
+    checkStream();
+    init(level, stream);
+    finish = false;
+    finished = false;
+    bytesRead = 0;
+    bytesWritten = 0;
+    uncompressedDirectBuf.rewind();
+    uncompressedDirectBufOff = 0;
+    uncompressedDirectBufLen = 0;
+    keepUncompressedBuf = false;
+    compressedDirectBuf.limit(directBufferSize);
+    compressedDirectBuf.position(directBufferSize);
+    userBufOff = 0;
+    userBufLen = 0;
+  }
+
+  @Override
+  public void end() {
+    if (stream != 0) {
+      end(stream);
+      stream = 0;
+    }
+  }
+
+  private void checkStream() {
+    if (stream == 0) {
+      throw new NullPointerException();
+    }
+  }
+
+  private native static long create();
+  private native static void init(int level, long stream);
+  private native int deflateBytesDirect(ByteBuffer src, int srcOffset,
+      int srcLen, ByteBuffer dst, int dstLen);
+  private static native int getStreamSize();
+  private native static void end(long strm);
+  private native static void initIDs();
+  public native static String getLibraryName();
+}

http://git-wip-us.apache.org/repos/asf/hadoop/blob/a0a27616/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/io/compress/zstd/ZStandardDecompressor.java
----------------------------------------------------------------------
diff --git 
a/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/io/compress/zstd/ZStandardDecompressor.java
 
b/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/io/compress/zstd/ZStandardDecompressor.java
new file mode 100644
index 0000000..73d73e1
--- /dev/null
+++ 
b/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/io/compress/zstd/ZStandardDecompressor.java
@@ -0,0 +1,323 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.io.compress.zstd;
+
+import org.apache.hadoop.io.compress.Decompressor;
+import org.apache.hadoop.io.compress.DirectDecompressor;
+import org.apache.hadoop.util.NativeCodeLoader;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.io.IOException;
+import java.nio.ByteBuffer;
+
+/**
+ * A {@link Decompressor} based on the zStandard compression algorithm.
+ * https://github.com/facebook/zstd
+ */
+public class ZStandardDecompressor implements Decompressor {
+  private static final Logger LOG =
+      LoggerFactory.getLogger(ZStandardDecompressor.class);
+
+  private long stream;
+  private int directBufferSize;
+  private ByteBuffer compressedDirectBuf = null;
+  private int compressedDirectBufOff, bytesInCompressedBuffer;
+  private ByteBuffer uncompressedDirectBuf = null;
+  private byte[] userBuf = null;
+  private int userBufOff = 0, userBufferBytesToConsume = 0;
+  private boolean finished;
+  private int remaining = 0;
+
+  private static boolean nativeZStandardLoaded = false;
+
+  static {
+    if (NativeCodeLoader.isNativeCodeLoaded()) {
+      try {
+        // Initialize the native library
+        initIDs();
+        nativeZStandardLoaded = true;
+      } catch (Throwable t) {
+        LOG.warn("Error loading zstandard native libraries: " + t);
+      }
+    }
+  }
+
+  public static boolean isNativeCodeLoaded() {
+    return nativeZStandardLoaded;
+  }
+
+  public static int getRecommendedBufferSize() {
+    return getStreamSize();
+  }
+
+  public ZStandardDecompressor() {
+    this(getStreamSize());
+  }
+
+  /**
+   * Creates a new decompressor.
+   */
+  public ZStandardDecompressor(int bufferSize) {
+    this.directBufferSize = bufferSize;
+    compressedDirectBuf = ByteBuffer.allocateDirect(directBufferSize);
+    uncompressedDirectBuf = ByteBuffer.allocateDirect(directBufferSize);
+    uncompressedDirectBuf.position(directBufferSize);
+    stream = create();
+    reset();
+  }
+
+  @Override
+  public void setInput(byte[] b, int off, int len) {
+    if (b == null) {
+      throw new NullPointerException();
+    }
+    if (off < 0 || len < 0 || off > b.length - len) {
+      throw new ArrayIndexOutOfBoundsException();
+    }
+
+    this.userBuf = b;
+    this.userBufOff = off;
+    this.userBufferBytesToConsume = len;
+
+    setInputFromSavedData();
+
+    uncompressedDirectBuf.limit(directBufferSize);
+    uncompressedDirectBuf.position(directBufferSize);
+  }
+
+  private void setInputFromSavedData() {
+    compressedDirectBufOff = 0;
+    bytesInCompressedBuffer = userBufferBytesToConsume;
+    if (bytesInCompressedBuffer > directBufferSize) {
+      bytesInCompressedBuffer = directBufferSize;
+    }
+
+    compressedDirectBuf.rewind();
+    compressedDirectBuf.put(
+        userBuf, userBufOff, bytesInCompressedBuffer);
+
+    userBufOff += bytesInCompressedBuffer;
+    userBufferBytesToConsume -= bytesInCompressedBuffer;
+  }
+
+  // dictionary is not supported
+  @Override
+  public void setDictionary(byte[] b, int off, int len) {
+    throw new UnsupportedOperationException(
+        "Dictionary support is not enabled");
+  }
+
+  @Override
+  public boolean needsInput() {
+    // Consume remaining compressed data?
+    if (uncompressedDirectBuf.remaining() > 0) {
+      return false;
+    }
+
+    // Check if we have consumed all input
+    if (bytesInCompressedBuffer - compressedDirectBufOff <= 0) {
+      // Check if we have consumed all user-input
+      if (userBufferBytesToConsume <= 0) {
+        return true;
+      } else {
+        setInputFromSavedData();
+      }
+    }
+    return false;
+  }
+
+  // dictionary is not supported.
+  @Override
+  public boolean needsDictionary() {
+    return false;
+  }
+
+  @Override
+  public boolean finished() {
+    // finished == true if ZSTD_decompressStream() returns 0
+    // also check we have nothing left in our buffer
+    return (finished && uncompressedDirectBuf.remaining() == 0);
+  }
+
+  @Override
+  public int decompress(byte[] b, int off, int len)
+      throws IOException {
+    checkStream();
+    if (b == null) {
+      throw new NullPointerException();
+    }
+    if (off < 0 || len < 0 || off > b.length - len) {
+      throw new ArrayIndexOutOfBoundsException();
+    }
+
+    // Check if there is uncompressed data
+    int n = uncompressedDirectBuf.remaining();
+    if (n > 0) {
+      return populateUncompressedBuffer(b, off, len, n);
+    }
+
+    // Re-initialize the output direct buffer
+    uncompressedDirectBuf.rewind();
+    uncompressedDirectBuf.limit(directBufferSize);
+
+    // Decompress data
+    n = inflateBytesDirect(
+        compressedDirectBuf,
+        compressedDirectBufOff,
+        bytesInCompressedBuffer,
+        uncompressedDirectBuf,
+        0,
+        directBufferSize
+    );
+    uncompressedDirectBuf.limit(n);
+
+    // Get at most 'len' bytes
+    return populateUncompressedBuffer(b, off, len, n);
+  }
+
+  /**
+   * <p>Returns the number of bytes remaining in the input buffers;
+   * normally called when finished() is true to determine amount of post-stream
+   * data.</p>
+   *
+   * @return the total (non-negative) number of unprocessed bytes in input
+   */
+  @Override
+  public int getRemaining() {
+    checkStream();
+    // userBuf + compressedDirectBuf
+    return userBufferBytesToConsume + remaining;
+  }
+
+  /**
+   * Resets everything including the input buffers (user and direct).
+   */
+  @Override
+  public void reset() {
+    checkStream();
+    init(stream);
+    remaining = 0;
+    finished = false;
+    compressedDirectBufOff = 0;
+    bytesInCompressedBuffer = 0;
+    uncompressedDirectBuf.limit(directBufferSize);
+    uncompressedDirectBuf.position(directBufferSize);
+    userBufOff = 0;
+    userBufferBytesToConsume = 0;
+  }
+
+  @Override
+  public void end() {
+    if (stream != 0) {
+      free(stream);
+      stream = 0;
+    }
+  }
+
+  @Override
+  protected void finalize() {
+    reset();
+  }
+
+  private void checkStream() {
+    if (stream == 0) {
+      throw new NullPointerException("Stream not initialized");
+    }
+  }
+
+  private int populateUncompressedBuffer(byte[] b, int off, int len, int n) {
+    n = Math.min(n, len);
+    uncompressedDirectBuf.get(b, off, n);
+    return n;
+  }
+
+  private native static void initIDs();
+  private native static long create();
+  private native static void init(long stream);
+  private native int inflateBytesDirect(ByteBuffer src, int srcOffset,
+      int srcLen, ByteBuffer dst, int dstOffset, int dstLen);
+  private native static void free(long strm);
+  private native static int getStreamSize();
+
+  int inflateDirect(ByteBuffer src, ByteBuffer dst) throws IOException {
+    assert
+        (this instanceof ZStandardDecompressor.ZStandardDirectDecompressor);
+
+    int originalPosition = dst.position();
+    int n = inflateBytesDirect(
+        src, src.position(), src.remaining(), dst, dst.position(),
+        dst.remaining()
+    );
+    dst.position(originalPosition + n);
+    if (bytesInCompressedBuffer > 0) {
+      src.position(compressedDirectBufOff);
+    } else {
+      src.position(src.limit());
+    }
+    return n;
+  }
+
+  /**
+   * A {@link DirectDecompressor} for ZStandard
+   * https://github.com/facebook/zstd.
+   */
+  public static class ZStandardDirectDecompressor
+      extends ZStandardDecompressor implements DirectDecompressor {
+
+    public ZStandardDirectDecompressor(int directBufferSize) {
+      super(directBufferSize);
+    }
+
+    @Override
+    public boolean finished() {
+      return (endOfInput && super.finished());
+    }
+
+    @Override
+    public void reset() {
+      super.reset();
+      endOfInput = true;
+    }
+
+    private boolean endOfInput;
+
+    @Override
+    public void decompress(ByteBuffer src, ByteBuffer dst)
+        throws IOException {
+      assert dst.isDirect() : "dst.isDirect()";
+      assert src.isDirect() : "src.isDirect()";
+      assert dst.remaining() > 0 : "dst.remaining() > 0";
+      this.inflateDirect(src, dst);
+      endOfInput = !src.hasRemaining();
+    }
+
+    @Override
+    public void setDictionary(byte[] b, int off, int len) {
+      throw new UnsupportedOperationException(
+          "byte[] arrays are not supported for DirectDecompressor");
+    }
+
+    @Override
+    public int decompress(byte[] b, int off, int len) {
+      throw new UnsupportedOperationException(
+          "byte[] arrays are not supported for DirectDecompressor");
+    }
+  }
+}

http://git-wip-us.apache.org/repos/asf/hadoop/blob/a0a27616/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/io/compress/zstd/package-info.java
----------------------------------------------------------------------
diff --git 
a/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/io/compress/zstd/package-info.java
 
b/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/io/compress/zstd/package-info.java
new file mode 100644
index 0000000..9069070
--- /dev/null
+++ 
b/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/io/compress/zstd/package-info.java
@@ -0,0 +1,22 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+@InterfaceAudience.Private
+@InterfaceStability.Unstable
+package org.apache.hadoop.io.compress.zstd;
+import org.apache.hadoop.classification.InterfaceAudience;
+import org.apache.hadoop.classification.InterfaceStability;

http://git-wip-us.apache.org/repos/asf/hadoop/blob/a0a27616/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/util/NativeCodeLoader.java
----------------------------------------------------------------------
diff --git 
a/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/util/NativeCodeLoader.java
 
b/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/util/NativeCodeLoader.java
index dd04a19..c381336 100644
--- 
a/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/util/NativeCodeLoader.java
+++ 
b/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/util/NativeCodeLoader.java
@@ -85,6 +85,11 @@ public final class NativeCodeLoader {
   public static native boolean buildSupportsIsal();
 
   /**
+  * Returns true only if this build was compiled with support for ZStandard.
+   */
+  public static native boolean buildSupportsZstd();
+
+  /**
    * Returns true only if this build was compiled with support for openssl.
    */
   public static native boolean buildSupportsOpenssl();

http://git-wip-us.apache.org/repos/asf/hadoop/blob/a0a27616/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/util/NativeLibraryChecker.java
----------------------------------------------------------------------
diff --git 
a/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/util/NativeLibraryChecker.java
 
b/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/util/NativeLibraryChecker.java
index e166bec..776839c 100644
--- 
a/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/util/NativeLibraryChecker.java
+++ 
b/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/util/NativeLibraryChecker.java
@@ -18,6 +18,7 @@
 
 package org.apache.hadoop.util;
 
+import org.apache.hadoop.io.compress.ZStandardCodec;
 import org.apache.hadoop.io.erasurecode.ErasureCodeNative;
 import org.apache.hadoop.conf.Configuration;
 import org.apache.hadoop.crypto.OpensslCipher;
@@ -67,6 +68,7 @@ public class NativeLibraryChecker {
     boolean zlibLoaded = false;
     boolean snappyLoaded = false;
     boolean isalLoaded = false;
+    boolean zStdLoaded = false;
     // lz4 is linked within libhadoop
     boolean lz4Loaded = nativeHadoopLoaded;
     boolean bzip2Loaded = Bzip2Factory.isNativeBzip2Loaded(conf);
@@ -78,6 +80,7 @@ public class NativeLibraryChecker {
     String zlibLibraryName = "";
     String snappyLibraryName = "";
     String isalDetail = "";
+    String zstdLibraryName = "";
     String lz4LibraryName = "";
     String bzip2LibraryName = "";
     String winutilsPath = null;
@@ -88,7 +91,11 @@ public class NativeLibraryChecker {
       if (zlibLoaded) {
         zlibLibraryName = ZlibFactory.getLibraryName();
       }
-
+      zStdLoaded = NativeCodeLoader.buildSupportsZstd() &&
+        ZStandardCodec.isNativeCodeLoaded();
+      if (zStdLoaded && NativeCodeLoader.buildSupportsZstd()) {
+        zstdLibraryName = ZStandardCodec.getLibraryName();
+      }
       snappyLoaded = NativeCodeLoader.buildSupportsSnappy() &&
           SnappyCodec.isNativeCodeLoaded();
       if (snappyLoaded && NativeCodeLoader.buildSupportsSnappy()) {
@@ -135,6 +142,7 @@ public class NativeLibraryChecker {
     System.out.println("Native library checking:");
     System.out.printf("hadoop:  %b %s%n", nativeHadoopLoaded, 
hadoopLibraryName);
     System.out.printf("zlib:    %b %s%n", zlibLoaded, zlibLibraryName);
+    System.out.printf("zstd  :  %b %s%n", zStdLoaded, zstdLibraryName);
     System.out.printf("snappy:  %b %s%n", snappyLoaded, snappyLibraryName);
     System.out.printf("lz4:     %b %s%n", lz4Loaded, lz4LibraryName);
     System.out.printf("bzip2:   %b %s%n", bzip2Loaded, bzip2LibraryName);
@@ -146,7 +154,8 @@ public class NativeLibraryChecker {
     }
 
     if ((!nativeHadoopLoaded) || (Shell.WINDOWS && (!winutilsExists)) ||
-        (checkAll && !(zlibLoaded && snappyLoaded && lz4Loaded && bzip2Loaded 
&& isalLoaded))) {
+        (checkAll && !(zlibLoaded && snappyLoaded && lz4Loaded
+            && bzip2Loaded && isalLoaded && zStdLoaded))) {
       // return 1 to indicated check failed
       ExitUtil.terminate(1);
     }

http://git-wip-us.apache.org/repos/asf/hadoop/blob/a0a27616/hadoop-common-project/hadoop-common/src/main/native/src/org/apache/hadoop/io/compress/zstd/ZStandardCompressor.c
----------------------------------------------------------------------
diff --git 
a/hadoop-common-project/hadoop-common/src/main/native/src/org/apache/hadoop/io/compress/zstd/ZStandardCompressor.c
 
b/hadoop-common-project/hadoop-common/src/main/native/src/org/apache/hadoop/io/compress/zstd/ZStandardCompressor.c
new file mode 100644
index 0000000..04f2a3e
--- /dev/null
+++ 
b/hadoop-common-project/hadoop-common/src/main/native/src/org/apache/hadoop/io/compress/zstd/ZStandardCompressor.c
@@ -0,0 +1,259 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+
+#include "org_apache_hadoop_io_compress_zstd.h"
+
+#if defined HADOOP_ZSTD_LIBRARY
+
+#include <stdio.h>
+#include <stdlib.h>
+#include <string.h>
+
+#ifdef UNIX
+#include <dlfcn.h>
+#include "config.h"
+#endif
+
+#include "org_apache_hadoop_io_compress_zstd_ZStandardCompressor.h"
+
+static jfieldID ZStandardCompressor_stream;
+static jfieldID ZStandardCompressor_uncompressedDirectBufOff;
+static jfieldID ZStandardCompressor_uncompressedDirectBufLen;
+static jfieldID ZStandardCompressor_directBufferSize;
+static jfieldID ZStandardCompressor_finish;
+static jfieldID ZStandardCompressor_finished;
+static jfieldID ZStandardCompressor_bytesWritten;
+static jfieldID ZStandardCompressor_bytesRead;
+
+#ifdef UNIX
+static size_t (*dlsym_ZSTD_CStreamInSize)(void);
+static size_t (*dlsym_ZSTD_CStreamOutSize)(void);
+static ZSTD_CStream* (*dlsym_ZSTD_createCStream)(void);
+static size_t (*dlsym_ZSTD_initCStream)(ZSTD_CStream*, int);
+static size_t (*dlsym_ZSTD_freeCStream)(ZSTD_CStream*);
+static size_t (*dlsym_ZSTD_compressStream)(ZSTD_CStream*, ZSTD_outBuffer*, 
ZSTD_inBuffer*);
+static size_t (*dlsym_ZSTD_endStream)(ZSTD_CStream*, ZSTD_outBuffer*);
+static size_t (*dlsym_ZSTD_flushStream)(ZSTD_CStream*, ZSTD_outBuffer*);
+static unsigned (*dlsym_ZSTD_isError)(size_t);
+static const char * (*dlsym_ZSTD_getErrorName)(size_t);
+#endif
+
+#ifdef WINDOWS
+typedef size_t (__cdecl *__dlsym_ZSTD_CStreamInSize)(void);
+typedef size_t (__cdecl *__dlsym_ZSTD_CStreamOutSize)(void);
+typedef ZSTD_CStream* (__cdecl *__dlsym_ZSTD_createCStream)(void);
+typedef size_t (__cdecl *__dlsym_ZSTD_initCStream)(ZSTD_CStream*, int);
+typedef size_t (__cdecl *__dlsym_ZSTD_freeCStream)(ZSTD_CStream*);
+typedef size_t (__cdecl *__dlsym_ZSTD_compressStream)(ZSTD_CStream*, 
ZSTD_outBuffer*, ZSTD_inBuffer*);
+typedef size_t (__cdecl *__dlsym_ZSTD_endStream)(ZSTD_CStream*, 
ZSTD_outBuffer*);
+typedef size_t (__cdecl *__dlsym_ZSTD_flushStream)(ZSTD_CStream*, 
ZSTD_outBuffer*);
+typedef unsigned (__cdecl *__dlsym_ZSTD_isError)(size_t);
+typedef const char * (__cdecl *__dlsym_ZSTD_getErrorName)(size_t);
+
+static __dlsym_ZSTD_CStreamInSize dlsym_ZSTD_CStreamInSize;
+static __dlsym_ZSTD_CStreamOutSize dlsym_ZSTD_CStreamOutSize;
+static __dlsym_ZSTD_createCStream dlsym_ZSTD_createCStream;
+static __dlsym_ZSTD_initCStream dlsym_ZSTD_initCStream;
+static __dlsym_ZSTD_freeCStream dlsym_ZSTD_freeCStream;
+static __dlsym_ZSTD_compressStream dlsym_ZSTD_compressStream;
+static __dlsym_ZSTD_endStream dlsym_ZSTD_endStream;
+static __dlsym_ZSTD_flushStream dlsym_ZSTD_flushStream;
+static __dlsym_ZSTD_isError dlsym_ZSTD_isError;
+static __dlsym_ZSTD_getErrorName dlsym_ZSTD_getErrorName;
+#endif
+
+// Load the libztsd.so from disk
+JNIEXPORT void JNICALL 
Java_org_apache_hadoop_io_compress_zstd_ZStandardCompressor_initIDs (JNIEnv 
*env, jclass clazz) {
+#ifdef UNIX
+    // Load libzstd.so
+    void *libzstd = dlopen(HADOOP_ZSTD_LIBRARY, RTLD_LAZY | RTLD_GLOBAL);
+    if (!libzstd) {
+        char* msg = (char*)malloc(10000);
+        snprintf(msg, 10000, "%s (%s)!", "Cannot load " HADOOP_ZSTD_LIBRARY, 
dlerror());
+        THROW(env, "java/lang/InternalError", msg);
+        return;
+    }
+#endif
+
+#ifdef WINDOWS
+    HMODULE libzstd = LoadLibrary(HADOOP_ZSTD_LIBRARY);
+    if (!libzstd) {
+        THROW(env, "java/lang/UnsatisfiedLinkError", "Cannot load zstd.dll");
+        return;
+    }
+#endif
+
+#ifdef UNIX
+    // load dynamic symbols
+    dlerror();
+    LOAD_DYNAMIC_SYMBOL(dlsym_ZSTD_CStreamInSize, env, libzstd, 
"ZSTD_CStreamInSize");
+    LOAD_DYNAMIC_SYMBOL(dlsym_ZSTD_CStreamOutSize, env, libzstd, 
"ZSTD_CStreamOutSize");
+    LOAD_DYNAMIC_SYMBOL(dlsym_ZSTD_createCStream, env, libzstd, 
"ZSTD_createCStream");
+    LOAD_DYNAMIC_SYMBOL(dlsym_ZSTD_initCStream, env, libzstd, 
"ZSTD_initCStream");
+    LOAD_DYNAMIC_SYMBOL(dlsym_ZSTD_freeCStream, env, libzstd, 
"ZSTD_freeCStream");
+    LOAD_DYNAMIC_SYMBOL(dlsym_ZSTD_compressStream, env, libzstd, 
"ZSTD_compressStream");
+    LOAD_DYNAMIC_SYMBOL(dlsym_ZSTD_endStream, env, libzstd, "ZSTD_endStream");
+    LOAD_DYNAMIC_SYMBOL(dlsym_ZSTD_flushStream, env, libzstd, 
"ZSTD_flushStream");
+    LOAD_DYNAMIC_SYMBOL(dlsym_ZSTD_isError, env, libzstd, "ZSTD_isError");
+    LOAD_DYNAMIC_SYMBOL(dlsym_ZSTD_getErrorName, env, libzstd, 
"ZSTD_getErrorName");
+#endif
+
+#ifdef WINDOWS
+    LOAD_DYNAMIC_SYMBOL(__dlsym_ZSTD_CStreamInSize, dlsym_ZSTD_CStreamInSize, 
env, libzstd, "ZSTD_CStreamInSize");
+    LOAD_DYNAMIC_SYMBOL(__dlsym_ZSTD_CStreamOutSize, 
dlsym_ZSTD_CStreamOutSize, env, libzstd, "ZSTD_CStreamOutSize");
+    LOAD_DYNAMIC_SYMBOL(__dlsym_ZSTD_createCStream, dlsym_ZSTD_createCStream, 
env, libzstd, "ZSTD_createCStream");
+    LOAD_DYNAMIC_SYMBOL(__dlsym_ZSTD_initCStream, dlsym_ZSTD_initCStream, env, 
libzstd, "ZSTD_initCStream");
+    LOAD_DYNAMIC_SYMBOL(__dlsym_ZSTD_freeCStream, dlsym_ZSTD_freeCStream, env, 
libzstd, "ZSTD_freeCStream");
+    LOAD_DYNAMIC_SYMBOL(__dlsym_ZSTD_compressStream, 
dlsym_ZSTD_compressStream, env, libzstd, "ZSTD_compressStream");
+    LOAD_DYNAMIC_SYMBOL(__dlsym_ZSTD_endStream, dlsym_ZSTD_endStream, env, 
libzstd, "ZSTD_endStream");
+    LOAD_DYNAMIC_SYMBOL(__dlsym_ZSTD_flushStream, dlsym_ZSTD_flushStream, env, 
libzstd, "ZSTD_flushStream");
+    LOAD_DYNAMIC_SYMBOL(__dlsym_ZSTD_isError, dlsym_ZSTD_isError, env, 
libzstd, "ZSTD_isError");
+    LOAD_DYNAMIC_SYMBOL(__dlsym_ZSTD_getErrorName, dlsym_ZSTD_getErrorName, 
env, libzstd, "ZSTD_getErrorName");
+#endif
+
+    // load fields
+    ZStandardCompressor_stream = (*env)->GetFieldID(env, clazz, "stream", "J");
+    ZStandardCompressor_finish = (*env)->GetFieldID(env, clazz, "finish", "Z");
+    ZStandardCompressor_finished = (*env)->GetFieldID(env, clazz, "finished", 
"Z");
+    ZStandardCompressor_uncompressedDirectBufOff = (*env)->GetFieldID(env, 
clazz, "uncompressedDirectBufOff", "I");
+    ZStandardCompressor_uncompressedDirectBufLen = (*env)->GetFieldID(env, 
clazz, "uncompressedDirectBufLen", "I");
+    ZStandardCompressor_directBufferSize = (*env)->GetFieldID(env, clazz, 
"directBufferSize", "I");
+    ZStandardCompressor_bytesRead = (*env)->GetFieldID(env, clazz, 
"bytesRead", "J");
+    ZStandardCompressor_bytesWritten = (*env)->GetFieldID(env, clazz, 
"bytesWritten", "J");
+}
+
+// Create the compression stream
+JNIEXPORT jlong JNICALL 
Java_org_apache_hadoop_io_compress_zstd_ZStandardCompressor_create (JNIEnv 
*env, jobject this) {
+    ZSTD_CStream* const stream =  dlsym_ZSTD_createCStream();
+    if (stream == NULL) {
+        THROW(env, "java/lang/InternalError", "Error creating the stream");
+        return (jlong)0;
+    }
+    return (jlong) stream;
+}
+
+// Initialize the compression stream
+JNIEXPORT void JNICALL 
Java_org_apache_hadoop_io_compress_zstd_ZStandardCompressor_init (JNIEnv *env, 
jobject this, jint level, jlong stream) {
+    size_t result = dlsym_ZSTD_initCStream((ZSTD_CStream *) stream, level);
+    if (dlsym_ZSTD_isError(result)) {
+        THROW(env, "java/lang/InternalError", dlsym_ZSTD_getErrorName(result));
+        return;
+    }
+}
+
+// free the compression stream
+JNIEXPORT void JNICALL 
Java_org_apache_hadoop_io_compress_zstd_ZStandardCompressor_end (JNIEnv *env, 
jobject this, jlong stream) {
+    size_t result = dlsym_ZSTD_freeCStream((ZSTD_CStream *) stream);
+    if (dlsym_ZSTD_isError(result)) {
+        THROW(env, "java/lang/InternalError", dlsym_ZSTD_getErrorName(result));
+        return;
+    }
+}
+
+JNIEXPORT jint 
Java_org_apache_hadoop_io_compress_zstd_ZStandardCompressor_deflateBytesDirect
+(JNIEnv *env, jobject this, jobject uncompressed_direct_buf, jint 
uncompressed_direct_buf_off, jint uncompressed_direct_buf_len, jobject 
compressed_direct_buf, jint compressed_direct_buf_len ) {
+    ZSTD_CStream* const stream = (ZSTD_CStream*) (*env)->GetLongField(env, 
this, ZStandardCompressor_stream);
+    if (!stream) {
+        THROW(env, "java/lang/NullPointerException", NULL);
+        return (jint)0;
+    }
+
+    jlong bytes_read = (*env)->GetLongField(env, this, 
ZStandardCompressor_bytesRead);
+    jlong bytes_written = (*env)->GetLongField(env, this, 
ZStandardCompressor_bytesWritten);
+    jboolean finish = (*env)->GetBooleanField(env, this, 
ZStandardCompressor_finish);
+
+    // Get the input direct buffer
+    void * uncompressed_bytes = (*env)->GetDirectBufferAddress(env, 
uncompressed_direct_buf);
+    if (!uncompressed_bytes) {
+        THROW(env, "java/lang/InternalError", "Undefined memory address for 
uncompressedDirectBuf");
+        return (jint) 0;
+    }
+
+    // Get the output direct buffer
+    void * compressed_bytes = (*env)->GetDirectBufferAddress(env, 
compressed_direct_buf);
+    if (!compressed_bytes) {
+        THROW(env, "java/lang/InternalError", "Undefined memory address for 
compressedDirectBuf");
+        return (jint) 0;
+    }
+
+    ZSTD_inBuffer input = { uncompressed_bytes, uncompressed_direct_buf_len, 
uncompressed_direct_buf_off };
+    ZSTD_outBuffer output = { compressed_bytes, compressed_direct_buf_len, 0 };
+
+    size_t size = dlsym_ZSTD_compressStream(stream, &output, &input);
+    if (dlsym_ZSTD_isError(size)) {
+        THROW(env, "java/lang/InternalError", dlsym_ZSTD_getErrorName(size));
+        return (jint) 0;
+    }
+    if (finish && input.pos == input.size) {
+        // end the stream, flush and  write the frame epilogue
+        size = dlsym_ZSTD_endStream(stream, &output);
+        if (!size) {
+            (*env)->SetBooleanField(env, this, ZStandardCompressor_finished, 
JNI_TRUE);
+        }
+    } else {
+        // need to flush the output buffer
+        // this also updates the output buffer position.
+        size = dlsym_ZSTD_flushStream(stream, &output);
+    }
+    if (dlsym_ZSTD_isError(size)) {
+        THROW(env, "java/lang/InternalError", dlsym_ZSTD_getErrorName(size));
+        return (jint) 0;
+    }
+
+    bytes_read += input.pos;
+    bytes_written += output.pos;
+    (*env)->SetLongField(env, this, ZStandardCompressor_bytesRead, bytes_read);
+    (*env)->SetLongField(env, this, ZStandardCompressor_bytesWritten, 
bytes_written);
+
+    (*env)->SetIntField(env, this, 
ZStandardCompressor_uncompressedDirectBufOff, input.pos);
+    (*env)->SetIntField(env, this, 
ZStandardCompressor_uncompressedDirectBufLen, input.size - input.pos);
+    return (jint) output.pos;
+}
+
+JNIEXPORT jstring JNICALL 
Java_org_apache_hadoop_io_compress_zstd_ZStandardCompressor_getLibraryName
+(JNIEnv *env, jclass class) {
+#ifdef UNIX
+    if (dlsym_ZSTD_isError) {
+        Dl_info dl_info;
+        if (dladdr( dlsym_ZSTD_isError, &dl_info)) {
+            return (*env)->NewStringUTF(env, dl_info.dli_fname);
+        }
+    }
+    return (*env)->NewStringUTF(env, HADOOP_ZSTD_LIBRARY);
+#endif
+#ifdef WINDOWS
+    LPWSTR filename = NULL;
+    GetLibraryName(dlsym_ZSTD_isError, &filename);
+    if (filename != NULL) {
+        return (*env)->NewString(env, filename, (jsize) wcslen(filename));
+    } else {
+        return (*env)->NewStringUTF(env, "Unavailable");
+    }
+#endif
+}
+
+// returns the max size of the recommended input and output buffers
+JNIEXPORT jint JNICALL 
Java_org_apache_hadoop_io_compress_zstd_ZStandardCompressor_getStreamSize
+(JNIEnv *env, jobject this) {
+    int x = (int) dlsym_ZSTD_CStreamInSize();
+    int y = (int) dlsym_ZSTD_CStreamOutSize();
+    return (x >= y) ? x : y;
+}
+
+#endif //define HADOOP_ZSTD_LIBRARY
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/hadoop/blob/a0a27616/hadoop-common-project/hadoop-common/src/main/native/src/org/apache/hadoop/io/compress/zstd/ZStandardDecompressor.c
----------------------------------------------------------------------
diff --git 
a/hadoop-common-project/hadoop-common/src/main/native/src/org/apache/hadoop/io/compress/zstd/ZStandardDecompressor.c
 
b/hadoop-common-project/hadoop-common/src/main/native/src/org/apache/hadoop/io/compress/zstd/ZStandardDecompressor.c
new file mode 100644
index 0000000..1236756
--- /dev/null
+++ 
b/hadoop-common-project/hadoop-common/src/main/native/src/org/apache/hadoop/io/compress/zstd/ZStandardDecompressor.c
@@ -0,0 +1,218 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+#include "org_apache_hadoop_io_compress_zstd.h"
+
+#if defined HADOOP_ZSTD_LIBRARY
+
+#include <stdio.h>
+#include <stdlib.h>
+#include <string.h>
+
+#ifdef UNIX
+#include <dlfcn.h>
+#include "config.h"
+#endif
+
+#include "org_apache_hadoop_io_compress_zstd_ZStandardDecompressor.h"
+
+static jfieldID ZStandardDecompressor_stream;
+static jfieldID ZStandardDecompressor_compressedDirectBufOff;
+static jfieldID ZStandardDecompressor_bytesInCompressedBuffer;
+static jfieldID ZStandardDecompressor_directBufferSize;
+static jfieldID ZStandardDecompressor_finished;
+static jfieldID ZStandardDecompressor_remaining;
+
+#ifdef UNIX
+static size_t (*dlsym_ZSTD_DStreamOutSize)(void);
+static size_t (*dlsym_ZSTD_DStreamInSize)(void);
+static ZSTD_DStream* (*dlsym_ZSTD_createDStream)(void);
+static size_t (*dlsym_ZSTD_initDStream)(ZSTD_DStream*);
+static size_t (*dlsym_ZSTD_freeDStream)(ZSTD_DStream*);
+static size_t (*dlsym_ZSTD_resetDStream)(ZSTD_DStream*);
+static size_t (*dlsym_ZSTD_decompressStream)(ZSTD_DStream*, ZSTD_outBuffer*, 
ZSTD_inBuffer*);
+static size_t (*dlsym_ZSTD_flushStream)(ZSTD_CStream*, ZSTD_outBuffer*);
+static unsigned (*dlsym_ZSTD_isError)(size_t);
+static const char * (*dlsym_ZSTD_getErrorName)(size_t);
+#endif
+
+#ifdef WINDOWS
+typedef size_t (__cdecl *__dlsym_ZSTD_DStreamOutSize)(void);
+typedef size_t (__cdecl *__dlsym_ZSTD_DStreamInSize)(void);
+typedef ZSTD_DStream* (__cdecl *__dlsym_ZSTD_createDStream)(void);
+typedef size_t (__cdecl *__dlsym_ZSTD_initDStream)(ZSTD_DStream*);
+typedef size_t (__cdecl *__dlsym_ZSTD_freeDStream)(ZSTD_DStream*);
+typedef size_t (__cdecl *__dlsym_ZSTD_resetDStream)(ZSTD_DStream*);
+typedef size_t (__cdecl *__dlsym_ZSTD_decompressStream)(ZSTD_DStream*, 
ZSTD_outBuffer*, ZSTD_inBuffer*);
+typedef size_t (__cdecl *__dlsym_ZSTD_flushStream)(ZSTD_CStream*, 
ZSTD_outBuffer*);
+typedef unsigned (__cdecl *__dlsym_ZSTD_isError)(size_t);
+typedef const char * (__cdecl *__dlsym_ZSTD_getErrorName)(size_t);
+
+static __dlsym_ZSTD_DStreamOutSize dlsym_ZSTD_DStreamOutSize;
+static __dlsym_ZSTD_DStreamInSize dlsym_ZSTD_DStreamInSize;
+static __dlsym_ZSTD_createDStream dlsym_ZSTD_createDStream;
+static __dlsym_ZSTD_initDStream dlsym_ZSTD_initDStream;
+static __dlsym_ZSTD_freeDStream dlsym_ZSTD_freeDStream;
+static __dlsym_ZSTD_resetDStream dlsym_ZSTD_resetDStream;
+static __dlsym_ZSTD_decompressStream dlsym_ZSTD_decompressStream;
+static __dlsym_ZSTD_isError dlsym_ZSTD_isError;
+static __dlsym_ZSTD_getErrorName dlsym_ZSTD_getErrorName;
+static __dlsym_ZSTD_flushStream dlsym_ZSTD_flushStream;
+#endif
+
+JNIEXPORT void JNICALL 
Java_org_apache_hadoop_io_compress_zstd_ZStandardDecompressor_initIDs (JNIEnv 
*env, jclass clazz) {
+    // Load libzstd.so
+#ifdef UNIX
+    void *libzstd = dlopen(HADOOP_ZSTD_LIBRARY, RTLD_LAZY | RTLD_GLOBAL);
+    if (!libzstd) {
+        char* msg = (char*)malloc(1000);
+        snprintf(msg, 1000, "%s (%s)!", "Cannot load " HADOOP_ZSTD_LIBRARY, 
dlerror());
+        THROW(env, "java/lang/UnsatisfiedLinkError", msg);
+        return;
+    }
+#endif
+
+#ifdef WINDOWS
+    HMODULE libzstd = LoadLibrary(HADOOP_ZSTD_LIBRARY);
+    if (!libzstd) {
+        THROW(env, "java/lang/UnsatisfiedLinkError", "Cannot load zstd.dll");
+        return;
+    }
+#endif
+
+#ifdef UNIX
+    dlerror();
+    LOAD_DYNAMIC_SYMBOL(dlsym_ZSTD_DStreamOutSize, env, libzstd, 
"ZSTD_DStreamOutSize");
+    LOAD_DYNAMIC_SYMBOL(dlsym_ZSTD_DStreamInSize, env, libzstd, 
"ZSTD_DStreamInSize");
+    LOAD_DYNAMIC_SYMBOL(dlsym_ZSTD_createDStream, env, libzstd, 
"ZSTD_createDStream");
+    LOAD_DYNAMIC_SYMBOL(dlsym_ZSTD_initDStream, env, libzstd, 
"ZSTD_initDStream");
+    LOAD_DYNAMIC_SYMBOL(dlsym_ZSTD_freeDStream, env, libzstd, 
"ZSTD_freeDStream");
+    LOAD_DYNAMIC_SYMBOL(dlsym_ZSTD_resetDStream, env, libzstd, 
"ZSTD_resetDStream");
+    LOAD_DYNAMIC_SYMBOL(dlsym_ZSTD_decompressStream, env, libzstd, 
"ZSTD_decompressStream");
+    LOAD_DYNAMIC_SYMBOL(dlsym_ZSTD_isError, env, libzstd, "ZSTD_isError");
+    LOAD_DYNAMIC_SYMBOL(dlsym_ZSTD_getErrorName, env, libzstd, 
"ZSTD_getErrorName");
+    LOAD_DYNAMIC_SYMBOL(dlsym_ZSTD_flushStream, env, libzstd, 
"ZSTD_flushStream");
+#endif
+
+#ifdef WINDOWS
+    LOAD_DYNAMIC_SYMBOL(__dlsym_ZSTD_DStreamOutSize, 
dlsym_ZSTD_DStreamOutSize, env, libzstd, "ZSTD_DStreamOutSize");
+    LOAD_DYNAMIC_SYMBOL(__dlsym_ZSTD_DStreamInSize, dlsym_ZSTD_DStreamInSize, 
env, libzstd, "ZSTD_DStreamInSize");
+    LOAD_DYNAMIC_SYMBOL(__dlsym_ZSTD_createDStream, dlsym_ZSTD_createDStream, 
env, libzstd, "ZSTD_createDStream");
+    LOAD_DYNAMIC_SYMBOL(__dlsym_ZSTD_initDStream, dlsym_ZSTD_initDStream, env, 
libzstd, "ZSTD_initDStream");
+    LOAD_DYNAMIC_SYMBOL(__dlsym_ZSTD_freeDStream, dlsym_ZSTD_freeDStream, env, 
libzstd, "ZSTD_freeDStream");
+    LOAD_DYNAMIC_SYMBOL(__dlsym_ZSTD_resetDStream, dlsym_ZSTD_resetDStream, 
env, libzstd, "ZSTD_resetDStream");
+    LOAD_DYNAMIC_SYMBOL(__dlsym_ZSTD_decompressStream, 
dlsym_ZSTD_decompressStream, env, libzstd, "ZSTD_decompressStream");
+    LOAD_DYNAMIC_SYMBOL(__dlsym_ZSTD_isError, dlsym_ZSTD_isError, env, 
libzstd, "ZSTD_isError");
+    LOAD_DYNAMIC_SYMBOL(__dlsym_ZSTD_getErrorName, dlsym_ZSTD_getErrorName, 
env, libzstd, "ZSTD_getErrorName");
+    LOAD_DYNAMIC_SYMBOL(__dlsym_ZSTD_flushStream, dlsym_ZSTD_flushStream, env, 
libzstd, "ZSTD_flushStream");
+#endif
+
+    ZStandardDecompressor_stream = (*env)->GetFieldID(env, clazz, "stream", 
"J");
+    ZStandardDecompressor_finished = (*env)->GetFieldID(env, clazz, 
"finished", "Z");
+    ZStandardDecompressor_compressedDirectBufOff = (*env)->GetFieldID(env, 
clazz, "compressedDirectBufOff", "I");
+    ZStandardDecompressor_bytesInCompressedBuffer = (*env)->GetFieldID(env, 
clazz, "bytesInCompressedBuffer", "I");
+    ZStandardDecompressor_directBufferSize = (*env)->GetFieldID(env, clazz, 
"directBufferSize", "I");
+    ZStandardDecompressor_remaining = (*env)->GetFieldID(env, clazz, 
"remaining", "I");
+}
+
+JNIEXPORT jlong JNICALL 
Java_org_apache_hadoop_io_compress_zstd_ZStandardDecompressor_create(JNIEnv 
*env, jobject this) {
+    ZSTD_DStream * stream = dlsym_ZSTD_createDStream();
+    if (stream == NULL) {
+        THROW(env, "java/lang/InternalError", "Error creating stream");
+        return (jlong) 0;
+    }
+    return (jlong) stream;
+}
+
+JNIEXPORT void JNICALL 
Java_org_apache_hadoop_io_compress_zstd_ZStandardDecompressor_init(JNIEnv *env, 
jobject this, jlong stream) {
+    size_t result = dlsym_ZSTD_initDStream((ZSTD_DStream *) stream);
+    if (dlsym_ZSTD_isError(result)) {
+        THROW(env, "java/lang/InternalError", dlsym_ZSTD_getErrorName(result));
+        return;
+    }
+    (*env)->SetLongField(env, this, ZStandardDecompressor_remaining, 0);
+}
+
+
+JNIEXPORT void JNICALL 
Java_org_apache_hadoop_io_compress_zstd_ZStandardDecompressor_free(JNIEnv *env, 
jclass obj, jlong stream) {
+    size_t result = dlsym_ZSTD_freeDStream((ZSTD_DStream *) stream);
+    if (dlsym_ZSTD_isError(result)) {
+        THROW(env, "java/lang/InternalError", dlsym_ZSTD_getErrorName(result));
+        return;
+    }
+}
+
+JNIEXPORT jint JNICALL 
Java_org_apache_hadoop_io_compress_zstd_ZStandardDecompressor_inflateBytesDirect
+(JNIEnv *env, jobject this, jobject compressed_direct_buf, jint 
compressed_direct_buf_off, jint compressed_direct_buf_len, jobject 
uncompressed_direct_buf, jint uncompressed_direct_buf_off, jint 
uncompressed_direct_buf_len) {
+    ZSTD_DStream *stream = (ZSTD_DStream *) (*env)->GetLongField(env, this, 
ZStandardDecompressor_stream);
+    if (!stream) {
+        THROW(env, "java/lang/NullPointerException", NULL);
+        return (jint)0;
+    }
+
+    // Get the input direct buffer
+    void * compressed_bytes = (*env)->GetDirectBufferAddress(env, 
compressed_direct_buf);
+    if (!compressed_bytes) {
+        THROW(env, "java/lang/InternalError", "Undefined memory address for 
compressedDirectBuf");
+        return (jint) 0;
+    }
+
+    // Get the output direct buffer
+    void * uncompressed_bytes = (*env)->GetDirectBufferAddress(env, 
uncompressed_direct_buf);
+    if (!uncompressed_bytes) {
+        THROW(env, "java/lang/InternalError", "Undefined memory address for 
uncompressedDirectBuf");
+        return (jint) 0;
+    }
+    uncompressed_bytes = ((char*) uncompressed_bytes) + 
uncompressed_direct_buf_off;
+
+    ZSTD_inBuffer input = { compressed_bytes, compressed_direct_buf_len, 
compressed_direct_buf_off };
+    ZSTD_outBuffer output = { uncompressed_bytes, uncompressed_direct_buf_len, 
0 };
+
+    size_t const size = dlsym_ZSTD_decompressStream(stream, &output, &input);
+
+    // check for errors
+    if (dlsym_ZSTD_isError(size)) {
+        THROW(env, "java/lang/InternalError", dlsym_ZSTD_getErrorName(size));
+        return (jint) 0;
+    }
+    int remaining = input.size - input.pos;
+    (*env)->SetIntField(env, this, ZStandardDecompressor_remaining, remaining);
+
+    // the entire frame has been decoded
+    if (size == 0) {
+        (*env)->SetBooleanField(env, this, ZStandardDecompressor_finished, 
JNI_TRUE);
+        size_t result = dlsym_ZSTD_initDStream(stream);
+        if (dlsym_ZSTD_isError(result)) {
+            THROW(env, "java/lang/InternalError", 
dlsym_ZSTD_getErrorName(result));
+            return (jint) 0;
+        }
+    }
+    (*env)->SetIntField(env, this, 
ZStandardDecompressor_compressedDirectBufOff, input.pos);
+    (*env)->SetIntField(env, this, 
ZStandardDecompressor_bytesInCompressedBuffer, input.size);
+    return (jint) output.pos;
+}
+
+// returns the max size of the recommended input and output buffers
+JNIEXPORT jint JNICALL 
Java_org_apache_hadoop_io_compress_zstd_ZStandardDecompressor_getStreamSize
+(JNIEnv *env, jclass obj) {
+    int x = (int) dlsym_ZSTD_DStreamInSize();
+    int y = (int) dlsym_ZSTD_DStreamOutSize();
+    return (x >= y) ? x : y;
+}
+
+#endif //define HADOOP_ZSTD_LIBRARY
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/hadoop/blob/a0a27616/hadoop-common-project/hadoop-common/src/main/native/src/org/apache/hadoop/io/compress/zstd/org_apache_hadoop_io_compress_zstd.h
----------------------------------------------------------------------
diff --git 
a/hadoop-common-project/hadoop-common/src/main/native/src/org/apache/hadoop/io/compress/zstd/org_apache_hadoop_io_compress_zstd.h
 
b/hadoop-common-project/hadoop-common/src/main/native/src/org/apache/hadoop/io/compress/zstd/org_apache_hadoop_io_compress_zstd.h
new file mode 100644
index 0000000..78fc0a4
--- /dev/null
+++ 
b/hadoop-common-project/hadoop-common/src/main/native/src/org/apache/hadoop/io/compress/zstd/org_apache_hadoop_io_compress_zstd.h
@@ -0,0 +1,34 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+
+#ifndef ORG_APACHE_HADOOP_IO_COMPRESS_ZSTD_ZSTD_H
+#define ORG_APACHE_HADOOP_IO_COMPRESS_ZSTD_ZSTD_H
+
+#include "org_apache_hadoop.h"
+
+#ifdef UNIX
+#include <dlfcn.h>
+#endif
+
+#include <jni.h>
+#include <zstd.h>
+#include <stddef.h>
+
+
+#endif //ORG_APACHE_HADOOP_IO_COMPRESS_ZSTD_ZSTD_H

http://git-wip-us.apache.org/repos/asf/hadoop/blob/a0a27616/hadoop-common-project/hadoop-common/src/main/native/src/org/apache/hadoop/util/NativeCodeLoader.c
----------------------------------------------------------------------
diff --git 
a/hadoop-common-project/hadoop-common/src/main/native/src/org/apache/hadoop/util/NativeCodeLoader.c
 
b/hadoop-common-project/hadoop-common/src/main/native/src/org/apache/hadoop/util/NativeCodeLoader.c
index ae8263a..1bd7fa1 100644
--- 
a/hadoop-common-project/hadoop-common/src/main/native/src/org/apache/hadoop/util/NativeCodeLoader.c
+++ 
b/hadoop-common-project/hadoop-common/src/main/native/src/org/apache/hadoop/util/NativeCodeLoader.c
@@ -39,6 +39,17 @@ JNIEXPORT jboolean JNICALL 
Java_org_apache_hadoop_util_NativeCodeLoader_buildSup
 #endif
 }
 
+JNIEXPORT jboolean JNICALL 
Java_org_apache_hadoop_util_NativeCodeLoader_buildSupportsZstd
+  (JNIEnv *env, jclass clazz)
+{
+#ifdef HADOOP_ZSTD_LIBRARY
+  return JNI_TRUE;
+#else
+  return JNI_FALSE;
+#endif
+}
+
+
 JNIEXPORT jboolean JNICALL 
Java_org_apache_hadoop_util_NativeCodeLoader_buildSupportsOpenssl
   (JNIEnv *env, jclass clazz)
 {

http://git-wip-us.apache.org/repos/asf/hadoop/blob/a0a27616/hadoop-common-project/hadoop-common/src/main/resources/META-INF/services/org.apache.hadoop.io.compress.CompressionCodec
----------------------------------------------------------------------
diff --git 
a/hadoop-common-project/hadoop-common/src/main/resources/META-INF/services/org.apache.hadoop.io.compress.CompressionCodec
 
b/hadoop-common-project/hadoop-common/src/main/resources/META-INF/services/org.apache.hadoop.io.compress.CompressionCodec
index df46e32..99b6fb2 100644
--- 
a/hadoop-common-project/hadoop-common/src/main/resources/META-INF/services/org.apache.hadoop.io.compress.CompressionCodec
+++ 
b/hadoop-common-project/hadoop-common/src/main/resources/META-INF/services/org.apache.hadoop.io.compress.CompressionCodec
@@ -17,4 +17,5 @@ org.apache.hadoop.io.compress.DeflateCodec
 org.apache.hadoop.io.compress.GzipCodec
 org.apache.hadoop.io.compress.Lz4Codec
 org.apache.hadoop.io.compress.SnappyCodec
+org.apache.hadoop.io.compress.ZStandardCodec
 

http://git-wip-us.apache.org/repos/asf/hadoop/blob/a0a27616/hadoop-common-project/hadoop-common/src/site/markdown/NativeLibraries.md.vm
----------------------------------------------------------------------
diff --git 
a/hadoop-common-project/hadoop-common/src/site/markdown/NativeLibraries.md.vm 
b/hadoop-common-project/hadoop-common/src/site/markdown/NativeLibraries.md.vm
index 04ff426..e4f720c 100644
--- 
a/hadoop-common-project/hadoop-common/src/site/markdown/NativeLibraries.md.vm
+++ 
b/hadoop-common-project/hadoop-common/src/site/markdown/NativeLibraries.md.vm
@@ -118,6 +118,7 @@ NativeLibraryChecker is a tool to check whether native 
libraries are loaded corr
        hadoop: true /home/ozawa/hadoop/lib/native/libhadoop.so.1.0.0
        zlib:   true /lib/x86_64-linux-gnu/libz.so.1
        snappy: true /usr/lib/libsnappy.so.1
+       zstd: true /usr/lib/libzstd.so.1
        lz4:    true revision:99
        bzip2:  false
 

http://git-wip-us.apache.org/repos/asf/hadoop/blob/a0a27616/hadoop-common-project/hadoop-common/src/test/java/org/apache/hadoop/io/compress/TestCodec.java
----------------------------------------------------------------------
diff --git 
a/hadoop-common-project/hadoop-common/src/test/java/org/apache/hadoop/io/compress/TestCodec.java
 
b/hadoop-common-project/hadoop-common/src/test/java/org/apache/hadoop/io/compress/TestCodec.java
index 5443ca7..3955aa2 100644
--- 
a/hadoop-common-project/hadoop-common/src/test/java/org/apache/hadoop/io/compress/TestCodec.java
+++ 
b/hadoop-common-project/hadoop-common/src/test/java/org/apache/hadoop/io/compress/TestCodec.java
@@ -505,6 +505,18 @@ public class TestCodec {
   }
 
   @Test(timeout=20000)
+  public void testSequenceFileZStandardCodec() throws Exception {
+    assumeTrue(ZStandardCodec.isNativeCodeLoaded());
+    Configuration conf = new Configuration();
+    sequenceFileCodecTest(conf, 0,
+        "org.apache.hadoop.io.compress.ZStandardCodec", 100);
+    sequenceFileCodecTest(conf, 100,
+        "org.apache.hadoop.io.compress.ZStandardCodec", 100);
+    sequenceFileCodecTest(conf, 200000,
+        "org.apache.hadoop.io.compress.ZStandardCodec", 1000000);
+  }
+
+  @Test(timeout=20000)
   public void testSequenceFileBZip2NativeCodec() throws IOException, 
                         ClassNotFoundException, InstantiationException, 
                         IllegalAccessException {

http://git-wip-us.apache.org/repos/asf/hadoop/blob/a0a27616/hadoop-common-project/hadoop-common/src/test/java/org/apache/hadoop/io/compress/TestCompressionStreamReuse.java
----------------------------------------------------------------------
diff --git 
a/hadoop-common-project/hadoop-common/src/test/java/org/apache/hadoop/io/compress/TestCompressionStreamReuse.java
 
b/hadoop-common-project/hadoop-common/src/test/java/org/apache/hadoop/io/compress/TestCompressionStreamReuse.java
index 2d75a2d..dd7bdd2 100644
--- 
a/hadoop-common-project/hadoop-common/src/test/java/org/apache/hadoop/io/compress/TestCompressionStreamReuse.java
+++ 
b/hadoop-common-project/hadoop-common/src/test/java/org/apache/hadoop/io/compress/TestCompressionStreamReuse.java
@@ -37,6 +37,7 @@ import org.apache.hadoop.util.ReflectionUtils;
 
 import org.junit.Test;
 import static org.junit.Assert.assertTrue;
+import static org.junit.Assume.assumeTrue;
 
 public class TestCompressionStreamReuse {
   private static final Log LOG = LogFactory
@@ -59,6 +60,13 @@ public class TestCompressionStreamReuse {
   }
 
   @Test
+  public void testZStandardCompressStreamReuse() throws IOException {
+    assumeTrue(ZStandardCodec.isNativeCodeLoaded());
+    resetStateTest(conf, seed, count,
+        "org.apache.hadoop.io.compress.ZStandardCodec");
+  }
+
+  @Test
   public void testGzipCompressStreamReuseWithParam() throws IOException {
     Configuration conf = new Configuration(this.conf);
     ZlibFactory


---------------------------------------------------------------------
To unsubscribe, e-mail: common-commits-unsubscr...@hadoop.apache.org
For additional commands, e-mail: common-commits-h...@hadoop.apache.org

Reply via email to