HADOOP-13578. Add Codec for ZStandard Compression. Contributed by churro morales
Project: http://git-wip-us.apache.org/repos/asf/hadoop/repo Commit: http://git-wip-us.apache.org/repos/asf/hadoop/commit/a0a27616 Tree: http://git-wip-us.apache.org/repos/asf/hadoop/tree/a0a27616 Diff: http://git-wip-us.apache.org/repos/asf/hadoop/diff/a0a27616 Branch: refs/heads/trunk Commit: a0a276162147e843a5a4e028abdca5b66f5118da Parents: e49e0a6 Author: Jason Lowe <jl...@apache.org> Authored: Wed Jan 4 14:46:25 2017 +0000 Committer: Jason Lowe <jl...@apache.org> Committed: Wed Jan 4 14:46:25 2017 +0000 ---------------------------------------------------------------------- BUILDING.txt | 25 + dev-support/bin/dist-copynativelibs | 13 + hadoop-common-project/hadoop-common/pom.xml | 21 + .../hadoop-common/src/CMakeLists.txt | 29 ++ .../hadoop-common/src/config.h.cmake | 1 + .../hadoop/fs/CommonConfigurationKeys.java | 16 + .../apache/hadoop/io/compress/Decompressor.java | 2 +- .../hadoop/io/compress/ZStandardCodec.java | 242 +++++++++ .../io/compress/zstd/ZStandardCompressor.java | 305 ++++++++++++ .../io/compress/zstd/ZStandardDecompressor.java | 323 ++++++++++++ .../hadoop/io/compress/zstd/package-info.java | 22 + .../apache/hadoop/util/NativeCodeLoader.java | 5 + .../hadoop/util/NativeLibraryChecker.java | 13 +- .../io/compress/zstd/ZStandardCompressor.c | 259 ++++++++++ .../io/compress/zstd/ZStandardDecompressor.c | 218 +++++++++ .../zstd/org_apache_hadoop_io_compress_zstd.h | 34 ++ .../org/apache/hadoop/util/NativeCodeLoader.c | 11 + ...g.apache.hadoop.io.compress.CompressionCodec | 1 + .../src/site/markdown/NativeLibraries.md.vm | 1 + .../apache/hadoop/io/compress/TestCodec.java | 12 + .../io/compress/TestCompressionStreamReuse.java | 8 + .../TestZStandardCompressorDecompressor.java | 485 +++++++++++++++++++ .../src/test/resources/zstd/test_file.txt | 71 +++ .../src/test/resources/zstd/test_file.txt.zst | Bin 0 -> 3690 bytes .../hadoop-mapreduce-client-nativetask/pom.xml | 8 + hadoop-project-dist/pom.xml | 6 + hadoop-project/pom.xml | 2 + 27 files changed, 2130 insertions(+), 3 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/hadoop/blob/a0a27616/BUILDING.txt ---------------------------------------------------------------------- diff --git a/BUILDING.txt b/BUILDING.txt index 7afc3f0..a1721ba 100644 --- a/BUILDING.txt +++ b/BUILDING.txt @@ -83,6 +83,8 @@ Optional packages: $ sudo apt-get install libjansson-dev * Linux FUSE $ sudo apt-get install fuse libfuse-dev +* ZStandard compression + $ sudo apt-get install zstd ---------------------------------------------------------------------------------- Maven main modules: @@ -155,6 +157,29 @@ Maven build goals: and it ignores the -Dsnappy.prefix option. If -Dsnappy.lib isn't given, the bundling and building will fail. + + ZStandard build options: + + ZStandard is a compression library that can be utilized by the native code. + It is currently an optional component, meaning that Hadoop can be built with + or without this dependency. + + * Use -Drequire.zstd to fail the build if libzstd.so is not found. + If this option is not specified and the zstd library is missing. + + * Use -Dzstd.prefix to specify a nonstandard location for the libzstd + header files and library files. You do not need this option if you have + installed zstandard using a package manager. + + * Use -Dzstd.lib to specify a nonstandard location for the libzstd library + files. Similarly to zstd.prefix, you do not need this option if you have + installed using a package manager. + + * Use -Dbundle.zstd to copy the contents of the zstd.lib directory into + the final tar file. This option requires that -Dzstd.lib is also given, + and it ignores the -Dzstd.prefix option. If -Dzstd.lib isn't given, the + bundling and building will fail. + OpenSSL build options: OpenSSL includes a crypto library that can be utilized by the native code. http://git-wip-us.apache.org/repos/asf/hadoop/blob/a0a27616/dev-support/bin/dist-copynativelibs ---------------------------------------------------------------------- diff --git a/dev-support/bin/dist-copynativelibs b/dev-support/bin/dist-copynativelibs index efe250b..67d2edf 100755 --- a/dev-support/bin/dist-copynativelibs +++ b/dev-support/bin/dist-copynativelibs @@ -114,6 +114,15 @@ for i in "$@"; do --snappylibbundle=*) SNAPPYLIBBUNDLE=${i#*=} ;; + --zstdbinbundle=*) + ZSTDBINBUNDLE=${i#*=} + ;; + --zstdlib=*) + ZSTDLIB=${i#*=} + ;; + --zstdlibbundle=*) + ZSTDLIBBUNDLE=${i#*=} + ;; esac done @@ -139,6 +148,8 @@ if [[ -d "${LIB_DIR}" ]]; then bundle_native_lib "${SNAPPYLIBBUNDLE}" "snappy.lib" "snappy" "${SNAPPYLIB}" + bundle_native_lib "${ZSTDLIBBUNDLE}" "zstd.lib" "zstd" "${ZSTDLIB}" + bundle_native_lib "${OPENSSLLIBBUNDLE}" "openssl.lib" "crypto" "${OPENSSLLIB}" bundle_native_lib "${ISALBUNDLE}" "isal.lib" "isa" "${ISALLIB}" @@ -159,6 +170,8 @@ if [[ -d "${BIN_DIR}" ]] ; then bundle_native_bin "${SNAPPYBINBUNDLE}" "${SNAPPYLIBBUNDLE}" "snappy.lib" "snappy" "${SNAPPYLIB}" + bundle_native_bin "${ZSTDBINBUNDLE}" "${ZSTDLIBBUNDLE}" "zstd.lib" "zstd" "${ZSTDLIB}" + bundle_native_bin "${OPENSSLBINBUNDLE}" "${OPENSSLLIBBUNDLE}" "openssl.lib" "crypto" "${OPENSSLLIB}" fi http://git-wip-us.apache.org/repos/asf/hadoop/blob/a0a27616/hadoop-common-project/hadoop-common/pom.xml ---------------------------------------------------------------------- diff --git a/hadoop-common-project/hadoop-common/pom.xml b/hadoop-common-project/hadoop-common/pom.xml index c9b282f..b616074 100644 --- a/hadoop-common-project/hadoop-common/pom.xml +++ b/hadoop-common-project/hadoop-common/pom.xml @@ -595,6 +595,10 @@ <snappy.lib></snappy.lib> <snappy.include></snappy.include> <require.snappy>false</require.snappy> + <zstd.prefix></zstd.prefix> + <zstd.lib></zstd.lib> + <zstd.include></zstd.include> + <require.zstd>false</require.zstd> <openssl.prefix></openssl.prefix> <openssl.lib></openssl.lib> <openssl.include></openssl.include> @@ -652,6 +656,8 @@ <javahClassName>org.apache.hadoop.security.JniBasedUnixGroupsNetgroupMapping</javahClassName> <javahClassName>org.apache.hadoop.io.compress.snappy.SnappyCompressor</javahClassName> <javahClassName>org.apache.hadoop.io.compress.snappy.SnappyDecompressor</javahClassName> + <javahClassName>org.apache.hadoop.io.compress.zstd.ZStandardCompressor</javahClassName> + <javahClassName>org.apache.hadoop.io.compress.zstd.ZStandardDecompressor</javahClassName> <javahClassName>org.apache.hadoop.io.compress.lz4.Lz4Compressor</javahClassName> <javahClassName>org.apache.hadoop.io.compress.lz4.Lz4Decompressor</javahClassName> <javahClassName>org.apache.hadoop.io.erasurecode.ErasureCodeNative</javahClassName> @@ -685,9 +691,13 @@ <JVM_ARCH_DATA_MODEL>${sun.arch.data.model}</JVM_ARCH_DATA_MODEL> <REQUIRE_BZIP2>${require.bzip2}</REQUIRE_BZIP2> <REQUIRE_SNAPPY>${require.snappy}</REQUIRE_SNAPPY> + <REQUIRE_ZSTD>${require.zstd}</REQUIRE_ZSTD> <CUSTOM_SNAPPY_PREFIX>${snappy.prefix}</CUSTOM_SNAPPY_PREFIX> <CUSTOM_SNAPPY_LIB>${snappy.lib} </CUSTOM_SNAPPY_LIB> <CUSTOM_SNAPPY_INCLUDE>${snappy.include} </CUSTOM_SNAPPY_INCLUDE> + <CUSTOM_ZSTD_PREFIX>${zstd.prefix}</CUSTOM_ZSTD_PREFIX> + <CUSTOM_ZSTD_LIB>${zstd.lib} </CUSTOM_ZSTD_LIB> + <CUSTOM_ZSTD_INCLUDE>${zstd.include} </CUSTOM_ZSTD_INCLUDE> <REQUIRE_ISAL>${require.isal} </REQUIRE_ISAL> <CUSTOM_ISAL_PREFIX>${isal.prefix} </CUSTOM_ISAL_PREFIX> <CUSTOM_ISAL_LIB>${isal.lib} </CUSTOM_ISAL_LIB> @@ -745,6 +755,11 @@ <isal.lib></isal.lib> <require.snappy>false</require.snappy> <bundle.snappy.in.bin>true</bundle.snappy.in.bin> + <zstd.prefix></zstd.prefix> + <zstd.lib></zstd.lib> + <zstd.include></zstd.include> + <require.ztsd>false</require.ztsd> + <bundle.zstd.in.bin>true</bundle.zstd.in.bin> <openssl.prefix></openssl.prefix> <openssl.lib></openssl.lib> <openssl.include></openssl.include> @@ -794,6 +809,8 @@ <javahClassName>org.apache.hadoop.security.JniBasedUnixGroupsNetgroupMapping</javahClassName> <javahClassName>org.apache.hadoop.io.compress.snappy.SnappyCompressor</javahClassName> <javahClassName>org.apache.hadoop.io.compress.snappy.SnappyDecompressor</javahClassName> + <javahClassName>org.apache.hadoop.io.compress.zstd.ZStandardCompressor</javahClassName> + <javahClassName>org.apache.hadoop.io.compress.zstd.ZStandardDecompressor</javahClassName> <javahClassName>org.apache.hadoop.io.compress.lz4.Lz4Compressor</javahClassName> <javahClassName>org.apache.hadoop.io.compress.lz4.Lz4Decompressor</javahClassName> <javahClassName>org.apache.hadoop.io.erasurecode.ErasureCodeNative</javahClassName> @@ -850,6 +867,10 @@ <argument>/p:CustomSnappyLib=${snappy.lib}</argument> <argument>/p:CustomSnappyInclude=${snappy.include}</argument> <argument>/p:RequireSnappy=${require.snappy}</argument> + <argument>/p:CustomZstdPrefix=${zstd.prefix}</argument> + <argument>/p:CustomZstdLib=${zstd.lib}</argument> + <argument>/p:CustomZstdInclude=${zstd.include}</argument> + <argument>/p:RequireZstd=${require.ztsd}</argument> <argument>/p:CustomOpensslPrefix=${openssl.prefix}</argument> <argument>/p:CustomOpensslLib=${openssl.lib}</argument> <argument>/p:CustomOpensslInclude=${openssl.include}</argument> http://git-wip-us.apache.org/repos/asf/hadoop/blob/a0a27616/hadoop-common-project/hadoop-common/src/CMakeLists.txt ---------------------------------------------------------------------- diff --git a/hadoop-common-project/hadoop-common/src/CMakeLists.txt b/hadoop-common-project/hadoop-common/src/CMakeLists.txt index 8317a46..10b0f23 100644 --- a/hadoop-common-project/hadoop-common/src/CMakeLists.txt +++ b/hadoop-common-project/hadoop-common/src/CMakeLists.txt @@ -94,6 +94,33 @@ else() endif() endif() +# Require zstandard +SET(STORED_CMAKE_FIND_LIBRARY_SUFFIXES ${CMAKE_FIND_LIBRARY_SUFFIXES}) +hadoop_set_find_shared_library_version("1") +find_library(ZSTD_LIBRARY + NAMES zstd + PATHS ${CUSTOM_ZSTD_PREFIX} ${CUSTOM_ZSTD_PREFIX}/lib + ${CUSTOM_ZSTD_PREFIX}/lib64 ${CUSTOM_ZSTD_LIB}) +SET(CMAKE_FIND_LIBRARY_SUFFIXES ${STORED_CMAKE_FIND_LIBRARY_SUFFIXES}) +find_path(ZSTD_INCLUDE_DIR + NAMES zstd.h + PATHS ${CUSTOM_ZSTD_PREFIX} ${CUSTOM_ZSTD_PREFIX}/include + ${CUSTOM_ZSTD_INCLUDE}) +if (ZSTD_LIBRARY AND ZSTD_INCLUDE_DIR) + GET_FILENAME_COMPONENT(HADOOP_ZSTD_LIBRARY ${ZSTD_LIBRARY} NAME) + set(ZSTD_SOURCE_FILES + "${SRC}/io/compress/zstd/ZStandardCompressor.c" + "${SRC}/io/compress/zstd/ZStandardDecompressor.c") + set(REQUIRE_ZSTD ${REQUIRE_ZSTD}) # Stop warning about unused variable. + message(STATUS "Found ZStandard: ${ZSTD_LIBRARY}") +else () + set(ZSTD_INCLUDE_DIR "") + set(ZSTD_SOURCE_FILES "") + IF(REQUIRE_ZSTD) + MESSAGE(FATAL_ERROR "Required zstandard library could not be found. ZSTD_LIBRARY=${ZSTD_LIBRARY}, ZSTD_INCLUDE_DIR=${ZSTD_INCLUDE_DIR}, CUSTOM_ZSTD_INCLUDE_DIR=${CUSTOM_ZSTD_INCLUDE_DIR}, CUSTOM_ZSTD_PREFIX=${CUSTOM_ZSTD_PREFIX}, CUSTOM_ZSTD_INCLUDE=${CUSTOM_ZSTD_INCLUDE}") + ENDIF(REQUIRE_ZSTD) +endif () + set(STORED_CMAKE_FIND_LIBRARY_SUFFIXES ${CMAKE_FIND_LIBRARY_SUFFIXES}) hadoop_set_find_shared_library_version("2") find_library(ISAL_LIBRARY @@ -208,6 +235,7 @@ include_directories( ${BZIP2_INCLUDE_DIR} ${SNAPPY_INCLUDE_DIR} ${ISAL_INCLUDE_DIR} + ${ZSTD_INCLUDE_DIR} ${OPENSSL_INCLUDE_DIR} ${SRC}/util ) @@ -222,6 +250,7 @@ hadoop_add_dual_library(hadoop ${SRC}/io/compress/lz4/lz4hc.c ${ISAL_SOURCE_FILES} ${SNAPPY_SOURCE_FILES} + ${ZSTD_SOURCE_FILES} ${OPENSSL_SOURCE_FILES} ${SRC}/io/compress/zlib/ZlibCompressor.c ${SRC}/io/compress/zlib/ZlibDecompressor.c http://git-wip-us.apache.org/repos/asf/hadoop/blob/a0a27616/hadoop-common-project/hadoop-common/src/config.h.cmake ---------------------------------------------------------------------- diff --git a/hadoop-common-project/hadoop-common/src/config.h.cmake b/hadoop-common-project/hadoop-common/src/config.h.cmake index 445cc33..40aa467 100644 --- a/hadoop-common-project/hadoop-common/src/config.h.cmake +++ b/hadoop-common-project/hadoop-common/src/config.h.cmake @@ -21,6 +21,7 @@ #cmakedefine HADOOP_ZLIB_LIBRARY "@HADOOP_ZLIB_LIBRARY@" #cmakedefine HADOOP_BZIP2_LIBRARY "@HADOOP_BZIP2_LIBRARY@" #cmakedefine HADOOP_SNAPPY_LIBRARY "@HADOOP_SNAPPY_LIBRARY@" +#cmakedefine HADOOP_ZSTD_LIBRARY "@HADOOP_ZSTD_LIBRARY@" #cmakedefine HADOOP_OPENSSL_LIBRARY "@HADOOP_OPENSSL_LIBRARY@" #cmakedefine HADOOP_ISAL_LIBRARY "@HADOOP_ISAL_LIBRARY@" #cmakedefine HAVE_SYNC_FILE_RANGE http://git-wip-us.apache.org/repos/asf/hadoop/blob/a0a27616/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/fs/CommonConfigurationKeys.java ---------------------------------------------------------------------- diff --git a/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/fs/CommonConfigurationKeys.java b/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/fs/CommonConfigurationKeys.java index fe522b3..b8a60d6 100644 --- a/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/fs/CommonConfigurationKeys.java +++ b/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/fs/CommonConfigurationKeys.java @@ -141,6 +141,22 @@ public class CommonConfigurationKeys extends CommonConfigurationKeysPublic { public static final int IO_COMPRESSION_CODEC_SNAPPY_BUFFERSIZE_DEFAULT = 256 * 1024; + /** ZStandard compression level. */ + public static final String IO_COMPRESSION_CODEC_ZSTD_LEVEL_KEY = + "io.compression.codec.zstd.level"; + + /** Default value for IO_COMPRESSION_CODEC_ZSTD_LEVEL_KEY. */ + public static final int IO_COMPRESSION_CODEC_ZSTD_LEVEL_DEFAULT = 3; + + /** ZStandard buffer size. */ + public static final String IO_COMPRESSION_CODEC_ZSTD_BUFFER_SIZE_KEY = + "io.compression.codec.zstd.buffersize"; + + /** ZStandard buffer size a value of 0 means use the recommended zstd + * buffer size that the library recommends. */ + public static final int + IO_COMPRESSION_CODEC_ZSTD_BUFFER_SIZE_DEFAULT = 0; + /** Internal buffer size for Lz4 compressor/decompressors */ public static final String IO_COMPRESSION_CODEC_LZ4_BUFFERSIZE_KEY = "io.compression.codec.lz4.buffersize"; http://git-wip-us.apache.org/repos/asf/hadoop/blob/a0a27616/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/io/compress/Decompressor.java ---------------------------------------------------------------------- diff --git a/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/io/compress/Decompressor.java b/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/io/compress/Decompressor.java index 8cb0b2a..3808003 100644 --- a/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/io/compress/Decompressor.java +++ b/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/io/compress/Decompressor.java @@ -95,7 +95,7 @@ public interface Decompressor { * @param b Buffer for the compressed data * @param off Start offset of the data * @param len Size of the buffer - * @return The actual number of bytes of compressed data. + * @return The actual number of bytes of uncompressed data. * @throws IOException */ public int decompress(byte[] b, int off, int len) throws IOException; http://git-wip-us.apache.org/repos/asf/hadoop/blob/a0a27616/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/io/compress/ZStandardCodec.java ---------------------------------------------------------------------- diff --git a/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/io/compress/ZStandardCodec.java b/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/io/compress/ZStandardCodec.java new file mode 100644 index 0000000..c56bbba --- /dev/null +++ b/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/io/compress/ZStandardCodec.java @@ -0,0 +1,242 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.hadoop.io.compress; + +import org.apache.hadoop.conf.Configurable; +import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.fs.CommonConfigurationKeys; +import org.apache.hadoop.io.compress.zstd.ZStandardCompressor; +import org.apache.hadoop.io.compress.zstd.ZStandardDecompressor; +import org.apache.hadoop.util.NativeCodeLoader; + +import java.io.IOException; +import java.io.InputStream; +import java.io.OutputStream; + +import static org.apache.hadoop.fs.CommonConfigurationKeys.IO_COMPRESSION_CODEC_ZSTD_BUFFER_SIZE_DEFAULT; +import static org.apache.hadoop.fs.CommonConfigurationKeys.IO_COMPRESSION_CODEC_ZSTD_BUFFER_SIZE_KEY; + +/** + * This class creates zstd compressors/decompressors. + */ +public class ZStandardCodec implements + Configurable, CompressionCodec, DirectDecompressionCodec { + private Configuration conf; + + /** + * Set the configuration to be used by this object. + * + * @param conf the configuration object. + */ + @Override + public void setConf(Configuration conf) { + this.conf = conf; + } + + /** + * Return the configuration used by this object. + * + * @return the configuration object used by this object. + */ + @Override + public Configuration getConf() { + return conf; + } + + public static void checkNativeCodeLoaded() { + if (!NativeCodeLoader.isNativeCodeLoaded() || + !NativeCodeLoader.buildSupportsZstd()) { + throw new RuntimeException("native zStandard library " + + "not available: this version of libhadoop was built " + + "without zstd support."); + } + if (!ZStandardCompressor.isNativeCodeLoaded()) { + throw new RuntimeException("native zStandard library not " + + "available: ZStandardCompressor has not been loaded."); + } + if (!ZStandardDecompressor.isNativeCodeLoaded()) { + throw new RuntimeException("native zStandard library not " + + "available: ZStandardDecompressor has not been loaded."); + } + } + + public static boolean isNativeCodeLoaded() { + return ZStandardCompressor.isNativeCodeLoaded() + && ZStandardDecompressor.isNativeCodeLoaded(); + } + + public static String getLibraryName() { + return ZStandardCompressor.getLibraryName(); + } + + public static int getCompressionLevel(Configuration conf) { + return conf.getInt( + CommonConfigurationKeys.IO_COMPRESSION_CODEC_ZSTD_LEVEL_KEY, + CommonConfigurationKeys.IO_COMPRESSION_CODEC_ZSTD_LEVEL_DEFAULT); + } + + public static int getCompressionBufferSize(Configuration conf) { + int bufferSize = getBufferSize(conf); + return bufferSize == 0 ? + ZStandardCompressor.getRecommendedBufferSize() : + bufferSize; + } + + public static int getDecompressionBufferSize(Configuration conf) { + int bufferSize = getBufferSize(conf); + return bufferSize == 0 ? + ZStandardDecompressor.getRecommendedBufferSize() : + bufferSize; + } + + private static int getBufferSize(Configuration conf) { + return conf.getInt(IO_COMPRESSION_CODEC_ZSTD_BUFFER_SIZE_KEY, + IO_COMPRESSION_CODEC_ZSTD_BUFFER_SIZE_DEFAULT); + } + + /** + * Create a {@link CompressionOutputStream} that will write to the given + * {@link OutputStream}. + * + * @param out the location for the final output stream + * @return a stream the user can write uncompressed data to have compressed + * @throws IOException + */ + @Override + public CompressionOutputStream createOutputStream(OutputStream out) + throws IOException { + return Util. + createOutputStreamWithCodecPool(this, conf, out); + } + + /** + * Create a {@link CompressionOutputStream} that will write to the given + * {@link OutputStream} with the given {@link Compressor}. + * + * @param out the location for the final output stream + * @param compressor compressor to use + * @return a stream the user can write uncompressed data to have compressed + * @throws IOException + */ + @Override + public CompressionOutputStream createOutputStream(OutputStream out, + Compressor compressor) + throws IOException { + checkNativeCodeLoaded(); + return new CompressorStream(out, compressor, + getCompressionBufferSize(conf)); + } + + /** + * Get the type of {@link Compressor} needed by this {@link CompressionCodec}. + * + * @return the type of compressor needed by this codec. + */ + @Override + public Class<? extends Compressor> getCompressorType() { + checkNativeCodeLoaded(); + return ZStandardCompressor.class; + } + + /** + * Create a new {@link Compressor} for use by this {@link CompressionCodec}. + * + * @return a new compressor for use by this codec + */ + @Override + public Compressor createCompressor() { + checkNativeCodeLoaded(); + return new ZStandardCompressor( + getCompressionLevel(conf), getCompressionBufferSize(conf)); + } + + + /** + * Create a {@link CompressionInputStream} that will read from the given + * input stream. + * + * @param in the stream to read compressed bytes from + * @return a stream to read uncompressed bytes from + * @throws IOException + */ + @Override + public CompressionInputStream createInputStream(InputStream in) + throws IOException { + return Util. + createInputStreamWithCodecPool(this, conf, in); + } + + /** + * Create a {@link CompressionInputStream} that will read from the given + * {@link InputStream} with the given {@link Decompressor}. + * + * @param in the stream to read compressed bytes from + * @param decompressor decompressor to use + * @return a stream to read uncompressed bytes from + * @throws IOException + */ + @Override + public CompressionInputStream createInputStream(InputStream in, + Decompressor decompressor) + throws IOException { + checkNativeCodeLoaded(); + return new DecompressorStream(in, decompressor, + getDecompressionBufferSize(conf)); + } + + /** + * Get the type of {@link Decompressor} needed by + * this {@link CompressionCodec}. + * + * @return the type of decompressor needed by this codec. + */ + @Override + public Class<? extends Decompressor> getDecompressorType() { + checkNativeCodeLoaded(); + return ZStandardDecompressor.class; + } + + /** + * Create a new {@link Decompressor} for use by this {@link CompressionCodec}. + * + * @return a new decompressor for use by this codec + */ + @Override + public Decompressor createDecompressor() { + checkNativeCodeLoaded(); + return new ZStandardDecompressor(getDecompressionBufferSize(conf)); + } + + /** + * Get the default filename extension for this kind of compression. + * + * @return <code>.zst</code>. + */ + @Override + public String getDefaultExtension() { + return ".zst"; + } + + @Override + public DirectDecompressor createDirectDecompressor() { + return new ZStandardDecompressor.ZStandardDirectDecompressor( + getDecompressionBufferSize(conf) + ); + } +} http://git-wip-us.apache.org/repos/asf/hadoop/blob/a0a27616/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/io/compress/zstd/ZStandardCompressor.java ---------------------------------------------------------------------- diff --git a/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/io/compress/zstd/ZStandardCompressor.java b/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/io/compress/zstd/ZStandardCompressor.java new file mode 100644 index 0000000..eb2121a --- /dev/null +++ b/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/io/compress/zstd/ZStandardCompressor.java @@ -0,0 +1,305 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.hadoop.io.compress.zstd; + +import com.google.common.annotations.VisibleForTesting; +import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.fs.CommonConfigurationKeys; +import org.apache.hadoop.fs.CommonConfigurationKeysPublic; +import org.apache.hadoop.io.compress.Compressor; +import org.apache.hadoop.io.compress.ZStandardCodec; +import org.apache.hadoop.util.NativeCodeLoader; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import java.io.IOException; +import java.nio.ByteBuffer; + +/** + * A {@link Compressor} based on the zStandard compression algorithm. + * https://github.com/facebook/zstd + */ +public class ZStandardCompressor implements Compressor { + + private static final Logger LOG = + LoggerFactory.getLogger(ZStandardCompressor.class); + + private long stream; + private int level; + private int directBufferSize; + private byte[] userBuf = null; + private int userBufOff = 0, userBufLen = 0; + private ByteBuffer uncompressedDirectBuf = null; + private int uncompressedDirectBufOff = 0, uncompressedDirectBufLen = 0; + private boolean keepUncompressedBuf = false; + private ByteBuffer compressedDirectBuf = null; + private boolean finish, finished; + private long bytesRead = 0; + private long bytesWritten = 0; + + private static boolean nativeZStandardLoaded = false; + + static { + if (NativeCodeLoader.isNativeCodeLoaded()) { + try { + // Initialize the native library + initIDs(); + nativeZStandardLoaded = true; + } catch (Throwable t) { + LOG.warn("Error loading zstandard native libraries: " + t); + } + } + } + + public static boolean isNativeCodeLoaded() { + return nativeZStandardLoaded; + } + + public static int getRecommendedBufferSize() { + return getStreamSize(); + } + + @VisibleForTesting + ZStandardCompressor() { + this(CommonConfigurationKeys.IO_COMPRESSION_CODEC_ZSTD_LEVEL_DEFAULT, + CommonConfigurationKeysPublic.IO_FILE_BUFFER_SIZE_DEFAULT); + } + + /** + * Creates a new compressor with the default compression level. + * Compressed data will be generated in ZStandard format. + */ + public ZStandardCompressor(int level, int bufferSize) { + this(level, bufferSize, bufferSize); + } + + @VisibleForTesting + ZStandardCompressor(int level, int inputBufferSize, int outputBufferSize) { + this.level = level; + stream = create(); + this.directBufferSize = outputBufferSize; + uncompressedDirectBuf = ByteBuffer.allocateDirect(inputBufferSize); + compressedDirectBuf = ByteBuffer.allocateDirect(outputBufferSize); + compressedDirectBuf.position(outputBufferSize); + reset(); + } + + /** + * Prepare the compressor to be used in a new stream with settings defined in + * the given Configuration. It will reset the compressor's compression level + * and compression strategy. + * + * @param conf Configuration storing new settings + */ + @Override + public void reinit(Configuration conf) { + if (conf == null) { + return; + } + level = ZStandardCodec.getCompressionLevel(conf); + reset(); + LOG.debug("Reinit compressor with new compression configuration"); + } + + @Override + public void setInput(byte[] b, int off, int len) { + if (b == null) { + throw new NullPointerException(); + } + if (off < 0 || len < 0 || off > b.length - len) { + throw new ArrayIndexOutOfBoundsException(); + } + + this.userBuf = b; + this.userBufOff = off; + this.userBufLen = len; + uncompressedDirectBufOff = 0; + setInputFromSavedData(); + + compressedDirectBuf.limit(directBufferSize); + compressedDirectBuf.position(directBufferSize); + } + + //copy enough data from userBuf to uncompressedDirectBuf + private void setInputFromSavedData() { + int len = Math.min(userBufLen, uncompressedDirectBuf.remaining()); + uncompressedDirectBuf.put(userBuf, userBufOff, len); + userBufLen -= len; + userBufOff += len; + uncompressedDirectBufLen = uncompressedDirectBuf.position(); + } + + @Override + public void setDictionary(byte[] b, int off, int len) { + throw new UnsupportedOperationException( + "Dictionary support is not enabled"); + } + + @Override + public boolean needsInput() { + // Consume remaining compressed data? + if (compressedDirectBuf.remaining() > 0) { + return false; + } + + // have we consumed all input + if (keepUncompressedBuf && uncompressedDirectBufLen > 0) { + return false; + } + + if (uncompressedDirectBuf.remaining() > 0) { + // Check if we have consumed all user-input + if (userBufLen <= 0) { + return true; + } else { + // copy enough data from userBuf to uncompressedDirectBuf + setInputFromSavedData(); + // uncompressedDirectBuf is not full + return uncompressedDirectBuf.remaining() > 0; + } + } + + return false; + } + + @Override + public void finish() { + finish = true; + } + + @Override + public boolean finished() { + // Check if 'zstd' says its 'finished' and all compressed + // data has been consumed + return (finished && compressedDirectBuf.remaining() == 0); + } + + @Override + public int compress(byte[] b, int off, int len) throws IOException { + checkStream(); + if (b == null) { + throw new NullPointerException(); + } + if (off < 0 || len < 0 || off > b.length - len) { + throw new ArrayIndexOutOfBoundsException(); + } + + // Check if there is compressed data + int n = compressedDirectBuf.remaining(); + if (n > 0) { + n = Math.min(n, len); + compressedDirectBuf.get(b, off, n); + return n; + } + + // Re-initialize the output direct buffer + compressedDirectBuf.rewind(); + compressedDirectBuf.limit(directBufferSize); + + // Compress data + n = deflateBytesDirect( + uncompressedDirectBuf, + uncompressedDirectBufOff, + uncompressedDirectBufLen, + compressedDirectBuf, + directBufferSize + ); + compressedDirectBuf.limit(n); + + // Check if we have consumed all input buffer + if (uncompressedDirectBufLen <= 0) { + // consumed all input buffer + keepUncompressedBuf = false; + uncompressedDirectBuf.clear(); + uncompressedDirectBufOff = 0; + uncompressedDirectBufLen = 0; + } else { + // did not consume all input buffer + keepUncompressedBuf = true; + } + + // Get at most 'len' bytes + n = Math.min(n, len); + compressedDirectBuf.get(b, off, n); + return n; + } + + /** + * Returns the total number of compressed bytes output so far. + * + * @return the total (non-negative) number of compressed bytes output so far + */ + @Override + public long getBytesWritten() { + checkStream(); + return bytesWritten; + } + + /** + * <p>Returns the total number of uncompressed bytes input so far.</p> + * + * @return the total (non-negative) number of uncompressed bytes input so far + */ + @Override + public long getBytesRead() { + checkStream(); + return bytesRead; + } + + @Override + public void reset() { + checkStream(); + init(level, stream); + finish = false; + finished = false; + bytesRead = 0; + bytesWritten = 0; + uncompressedDirectBuf.rewind(); + uncompressedDirectBufOff = 0; + uncompressedDirectBufLen = 0; + keepUncompressedBuf = false; + compressedDirectBuf.limit(directBufferSize); + compressedDirectBuf.position(directBufferSize); + userBufOff = 0; + userBufLen = 0; + } + + @Override + public void end() { + if (stream != 0) { + end(stream); + stream = 0; + } + } + + private void checkStream() { + if (stream == 0) { + throw new NullPointerException(); + } + } + + private native static long create(); + private native static void init(int level, long stream); + private native int deflateBytesDirect(ByteBuffer src, int srcOffset, + int srcLen, ByteBuffer dst, int dstLen); + private static native int getStreamSize(); + private native static void end(long strm); + private native static void initIDs(); + public native static String getLibraryName(); +} http://git-wip-us.apache.org/repos/asf/hadoop/blob/a0a27616/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/io/compress/zstd/ZStandardDecompressor.java ---------------------------------------------------------------------- diff --git a/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/io/compress/zstd/ZStandardDecompressor.java b/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/io/compress/zstd/ZStandardDecompressor.java new file mode 100644 index 0000000..73d73e1 --- /dev/null +++ b/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/io/compress/zstd/ZStandardDecompressor.java @@ -0,0 +1,323 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.hadoop.io.compress.zstd; + +import org.apache.hadoop.io.compress.Decompressor; +import org.apache.hadoop.io.compress.DirectDecompressor; +import org.apache.hadoop.util.NativeCodeLoader; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import java.io.IOException; +import java.nio.ByteBuffer; + +/** + * A {@link Decompressor} based on the zStandard compression algorithm. + * https://github.com/facebook/zstd + */ +public class ZStandardDecompressor implements Decompressor { + private static final Logger LOG = + LoggerFactory.getLogger(ZStandardDecompressor.class); + + private long stream; + private int directBufferSize; + private ByteBuffer compressedDirectBuf = null; + private int compressedDirectBufOff, bytesInCompressedBuffer; + private ByteBuffer uncompressedDirectBuf = null; + private byte[] userBuf = null; + private int userBufOff = 0, userBufferBytesToConsume = 0; + private boolean finished; + private int remaining = 0; + + private static boolean nativeZStandardLoaded = false; + + static { + if (NativeCodeLoader.isNativeCodeLoaded()) { + try { + // Initialize the native library + initIDs(); + nativeZStandardLoaded = true; + } catch (Throwable t) { + LOG.warn("Error loading zstandard native libraries: " + t); + } + } + } + + public static boolean isNativeCodeLoaded() { + return nativeZStandardLoaded; + } + + public static int getRecommendedBufferSize() { + return getStreamSize(); + } + + public ZStandardDecompressor() { + this(getStreamSize()); + } + + /** + * Creates a new decompressor. + */ + public ZStandardDecompressor(int bufferSize) { + this.directBufferSize = bufferSize; + compressedDirectBuf = ByteBuffer.allocateDirect(directBufferSize); + uncompressedDirectBuf = ByteBuffer.allocateDirect(directBufferSize); + uncompressedDirectBuf.position(directBufferSize); + stream = create(); + reset(); + } + + @Override + public void setInput(byte[] b, int off, int len) { + if (b == null) { + throw new NullPointerException(); + } + if (off < 0 || len < 0 || off > b.length - len) { + throw new ArrayIndexOutOfBoundsException(); + } + + this.userBuf = b; + this.userBufOff = off; + this.userBufferBytesToConsume = len; + + setInputFromSavedData(); + + uncompressedDirectBuf.limit(directBufferSize); + uncompressedDirectBuf.position(directBufferSize); + } + + private void setInputFromSavedData() { + compressedDirectBufOff = 0; + bytesInCompressedBuffer = userBufferBytesToConsume; + if (bytesInCompressedBuffer > directBufferSize) { + bytesInCompressedBuffer = directBufferSize; + } + + compressedDirectBuf.rewind(); + compressedDirectBuf.put( + userBuf, userBufOff, bytesInCompressedBuffer); + + userBufOff += bytesInCompressedBuffer; + userBufferBytesToConsume -= bytesInCompressedBuffer; + } + + // dictionary is not supported + @Override + public void setDictionary(byte[] b, int off, int len) { + throw new UnsupportedOperationException( + "Dictionary support is not enabled"); + } + + @Override + public boolean needsInput() { + // Consume remaining compressed data? + if (uncompressedDirectBuf.remaining() > 0) { + return false; + } + + // Check if we have consumed all input + if (bytesInCompressedBuffer - compressedDirectBufOff <= 0) { + // Check if we have consumed all user-input + if (userBufferBytesToConsume <= 0) { + return true; + } else { + setInputFromSavedData(); + } + } + return false; + } + + // dictionary is not supported. + @Override + public boolean needsDictionary() { + return false; + } + + @Override + public boolean finished() { + // finished == true if ZSTD_decompressStream() returns 0 + // also check we have nothing left in our buffer + return (finished && uncompressedDirectBuf.remaining() == 0); + } + + @Override + public int decompress(byte[] b, int off, int len) + throws IOException { + checkStream(); + if (b == null) { + throw new NullPointerException(); + } + if (off < 0 || len < 0 || off > b.length - len) { + throw new ArrayIndexOutOfBoundsException(); + } + + // Check if there is uncompressed data + int n = uncompressedDirectBuf.remaining(); + if (n > 0) { + return populateUncompressedBuffer(b, off, len, n); + } + + // Re-initialize the output direct buffer + uncompressedDirectBuf.rewind(); + uncompressedDirectBuf.limit(directBufferSize); + + // Decompress data + n = inflateBytesDirect( + compressedDirectBuf, + compressedDirectBufOff, + bytesInCompressedBuffer, + uncompressedDirectBuf, + 0, + directBufferSize + ); + uncompressedDirectBuf.limit(n); + + // Get at most 'len' bytes + return populateUncompressedBuffer(b, off, len, n); + } + + /** + * <p>Returns the number of bytes remaining in the input buffers; + * normally called when finished() is true to determine amount of post-stream + * data.</p> + * + * @return the total (non-negative) number of unprocessed bytes in input + */ + @Override + public int getRemaining() { + checkStream(); + // userBuf + compressedDirectBuf + return userBufferBytesToConsume + remaining; + } + + /** + * Resets everything including the input buffers (user and direct). + */ + @Override + public void reset() { + checkStream(); + init(stream); + remaining = 0; + finished = false; + compressedDirectBufOff = 0; + bytesInCompressedBuffer = 0; + uncompressedDirectBuf.limit(directBufferSize); + uncompressedDirectBuf.position(directBufferSize); + userBufOff = 0; + userBufferBytesToConsume = 0; + } + + @Override + public void end() { + if (stream != 0) { + free(stream); + stream = 0; + } + } + + @Override + protected void finalize() { + reset(); + } + + private void checkStream() { + if (stream == 0) { + throw new NullPointerException("Stream not initialized"); + } + } + + private int populateUncompressedBuffer(byte[] b, int off, int len, int n) { + n = Math.min(n, len); + uncompressedDirectBuf.get(b, off, n); + return n; + } + + private native static void initIDs(); + private native static long create(); + private native static void init(long stream); + private native int inflateBytesDirect(ByteBuffer src, int srcOffset, + int srcLen, ByteBuffer dst, int dstOffset, int dstLen); + private native static void free(long strm); + private native static int getStreamSize(); + + int inflateDirect(ByteBuffer src, ByteBuffer dst) throws IOException { + assert + (this instanceof ZStandardDecompressor.ZStandardDirectDecompressor); + + int originalPosition = dst.position(); + int n = inflateBytesDirect( + src, src.position(), src.remaining(), dst, dst.position(), + dst.remaining() + ); + dst.position(originalPosition + n); + if (bytesInCompressedBuffer > 0) { + src.position(compressedDirectBufOff); + } else { + src.position(src.limit()); + } + return n; + } + + /** + * A {@link DirectDecompressor} for ZStandard + * https://github.com/facebook/zstd. + */ + public static class ZStandardDirectDecompressor + extends ZStandardDecompressor implements DirectDecompressor { + + public ZStandardDirectDecompressor(int directBufferSize) { + super(directBufferSize); + } + + @Override + public boolean finished() { + return (endOfInput && super.finished()); + } + + @Override + public void reset() { + super.reset(); + endOfInput = true; + } + + private boolean endOfInput; + + @Override + public void decompress(ByteBuffer src, ByteBuffer dst) + throws IOException { + assert dst.isDirect() : "dst.isDirect()"; + assert src.isDirect() : "src.isDirect()"; + assert dst.remaining() > 0 : "dst.remaining() > 0"; + this.inflateDirect(src, dst); + endOfInput = !src.hasRemaining(); + } + + @Override + public void setDictionary(byte[] b, int off, int len) { + throw new UnsupportedOperationException( + "byte[] arrays are not supported for DirectDecompressor"); + } + + @Override + public int decompress(byte[] b, int off, int len) { + throw new UnsupportedOperationException( + "byte[] arrays are not supported for DirectDecompressor"); + } + } +} http://git-wip-us.apache.org/repos/asf/hadoop/blob/a0a27616/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/io/compress/zstd/package-info.java ---------------------------------------------------------------------- diff --git a/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/io/compress/zstd/package-info.java b/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/io/compress/zstd/package-info.java new file mode 100644 index 0000000..9069070 --- /dev/null +++ b/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/io/compress/zstd/package-info.java @@ -0,0 +1,22 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +@InterfaceAudience.Private +@InterfaceStability.Unstable +package org.apache.hadoop.io.compress.zstd; +import org.apache.hadoop.classification.InterfaceAudience; +import org.apache.hadoop.classification.InterfaceStability; http://git-wip-us.apache.org/repos/asf/hadoop/blob/a0a27616/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/util/NativeCodeLoader.java ---------------------------------------------------------------------- diff --git a/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/util/NativeCodeLoader.java b/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/util/NativeCodeLoader.java index dd04a19..c381336 100644 --- a/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/util/NativeCodeLoader.java +++ b/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/util/NativeCodeLoader.java @@ -85,6 +85,11 @@ public final class NativeCodeLoader { public static native boolean buildSupportsIsal(); /** + * Returns true only if this build was compiled with support for ZStandard. + */ + public static native boolean buildSupportsZstd(); + + /** * Returns true only if this build was compiled with support for openssl. */ public static native boolean buildSupportsOpenssl(); http://git-wip-us.apache.org/repos/asf/hadoop/blob/a0a27616/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/util/NativeLibraryChecker.java ---------------------------------------------------------------------- diff --git a/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/util/NativeLibraryChecker.java b/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/util/NativeLibraryChecker.java index e166bec..776839c 100644 --- a/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/util/NativeLibraryChecker.java +++ b/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/util/NativeLibraryChecker.java @@ -18,6 +18,7 @@ package org.apache.hadoop.util; +import org.apache.hadoop.io.compress.ZStandardCodec; import org.apache.hadoop.io.erasurecode.ErasureCodeNative; import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.crypto.OpensslCipher; @@ -67,6 +68,7 @@ public class NativeLibraryChecker { boolean zlibLoaded = false; boolean snappyLoaded = false; boolean isalLoaded = false; + boolean zStdLoaded = false; // lz4 is linked within libhadoop boolean lz4Loaded = nativeHadoopLoaded; boolean bzip2Loaded = Bzip2Factory.isNativeBzip2Loaded(conf); @@ -78,6 +80,7 @@ public class NativeLibraryChecker { String zlibLibraryName = ""; String snappyLibraryName = ""; String isalDetail = ""; + String zstdLibraryName = ""; String lz4LibraryName = ""; String bzip2LibraryName = ""; String winutilsPath = null; @@ -88,7 +91,11 @@ public class NativeLibraryChecker { if (zlibLoaded) { zlibLibraryName = ZlibFactory.getLibraryName(); } - + zStdLoaded = NativeCodeLoader.buildSupportsZstd() && + ZStandardCodec.isNativeCodeLoaded(); + if (zStdLoaded && NativeCodeLoader.buildSupportsZstd()) { + zstdLibraryName = ZStandardCodec.getLibraryName(); + } snappyLoaded = NativeCodeLoader.buildSupportsSnappy() && SnappyCodec.isNativeCodeLoaded(); if (snappyLoaded && NativeCodeLoader.buildSupportsSnappy()) { @@ -135,6 +142,7 @@ public class NativeLibraryChecker { System.out.println("Native library checking:"); System.out.printf("hadoop: %b %s%n", nativeHadoopLoaded, hadoopLibraryName); System.out.printf("zlib: %b %s%n", zlibLoaded, zlibLibraryName); + System.out.printf("zstd : %b %s%n", zStdLoaded, zstdLibraryName); System.out.printf("snappy: %b %s%n", snappyLoaded, snappyLibraryName); System.out.printf("lz4: %b %s%n", lz4Loaded, lz4LibraryName); System.out.printf("bzip2: %b %s%n", bzip2Loaded, bzip2LibraryName); @@ -146,7 +154,8 @@ public class NativeLibraryChecker { } if ((!nativeHadoopLoaded) || (Shell.WINDOWS && (!winutilsExists)) || - (checkAll && !(zlibLoaded && snappyLoaded && lz4Loaded && bzip2Loaded && isalLoaded))) { + (checkAll && !(zlibLoaded && snappyLoaded && lz4Loaded + && bzip2Loaded && isalLoaded && zStdLoaded))) { // return 1 to indicated check failed ExitUtil.terminate(1); } http://git-wip-us.apache.org/repos/asf/hadoop/blob/a0a27616/hadoop-common-project/hadoop-common/src/main/native/src/org/apache/hadoop/io/compress/zstd/ZStandardCompressor.c ---------------------------------------------------------------------- diff --git a/hadoop-common-project/hadoop-common/src/main/native/src/org/apache/hadoop/io/compress/zstd/ZStandardCompressor.c b/hadoop-common-project/hadoop-common/src/main/native/src/org/apache/hadoop/io/compress/zstd/ZStandardCompressor.c new file mode 100644 index 0000000..04f2a3e --- /dev/null +++ b/hadoop-common-project/hadoop-common/src/main/native/src/org/apache/hadoop/io/compress/zstd/ZStandardCompressor.c @@ -0,0 +1,259 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + + +#include "org_apache_hadoop_io_compress_zstd.h" + +#if defined HADOOP_ZSTD_LIBRARY + +#include <stdio.h> +#include <stdlib.h> +#include <string.h> + +#ifdef UNIX +#include <dlfcn.h> +#include "config.h" +#endif + +#include "org_apache_hadoop_io_compress_zstd_ZStandardCompressor.h" + +static jfieldID ZStandardCompressor_stream; +static jfieldID ZStandardCompressor_uncompressedDirectBufOff; +static jfieldID ZStandardCompressor_uncompressedDirectBufLen; +static jfieldID ZStandardCompressor_directBufferSize; +static jfieldID ZStandardCompressor_finish; +static jfieldID ZStandardCompressor_finished; +static jfieldID ZStandardCompressor_bytesWritten; +static jfieldID ZStandardCompressor_bytesRead; + +#ifdef UNIX +static size_t (*dlsym_ZSTD_CStreamInSize)(void); +static size_t (*dlsym_ZSTD_CStreamOutSize)(void); +static ZSTD_CStream* (*dlsym_ZSTD_createCStream)(void); +static size_t (*dlsym_ZSTD_initCStream)(ZSTD_CStream*, int); +static size_t (*dlsym_ZSTD_freeCStream)(ZSTD_CStream*); +static size_t (*dlsym_ZSTD_compressStream)(ZSTD_CStream*, ZSTD_outBuffer*, ZSTD_inBuffer*); +static size_t (*dlsym_ZSTD_endStream)(ZSTD_CStream*, ZSTD_outBuffer*); +static size_t (*dlsym_ZSTD_flushStream)(ZSTD_CStream*, ZSTD_outBuffer*); +static unsigned (*dlsym_ZSTD_isError)(size_t); +static const char * (*dlsym_ZSTD_getErrorName)(size_t); +#endif + +#ifdef WINDOWS +typedef size_t (__cdecl *__dlsym_ZSTD_CStreamInSize)(void); +typedef size_t (__cdecl *__dlsym_ZSTD_CStreamOutSize)(void); +typedef ZSTD_CStream* (__cdecl *__dlsym_ZSTD_createCStream)(void); +typedef size_t (__cdecl *__dlsym_ZSTD_initCStream)(ZSTD_CStream*, int); +typedef size_t (__cdecl *__dlsym_ZSTD_freeCStream)(ZSTD_CStream*); +typedef size_t (__cdecl *__dlsym_ZSTD_compressStream)(ZSTD_CStream*, ZSTD_outBuffer*, ZSTD_inBuffer*); +typedef size_t (__cdecl *__dlsym_ZSTD_endStream)(ZSTD_CStream*, ZSTD_outBuffer*); +typedef size_t (__cdecl *__dlsym_ZSTD_flushStream)(ZSTD_CStream*, ZSTD_outBuffer*); +typedef unsigned (__cdecl *__dlsym_ZSTD_isError)(size_t); +typedef const char * (__cdecl *__dlsym_ZSTD_getErrorName)(size_t); + +static __dlsym_ZSTD_CStreamInSize dlsym_ZSTD_CStreamInSize; +static __dlsym_ZSTD_CStreamOutSize dlsym_ZSTD_CStreamOutSize; +static __dlsym_ZSTD_createCStream dlsym_ZSTD_createCStream; +static __dlsym_ZSTD_initCStream dlsym_ZSTD_initCStream; +static __dlsym_ZSTD_freeCStream dlsym_ZSTD_freeCStream; +static __dlsym_ZSTD_compressStream dlsym_ZSTD_compressStream; +static __dlsym_ZSTD_endStream dlsym_ZSTD_endStream; +static __dlsym_ZSTD_flushStream dlsym_ZSTD_flushStream; +static __dlsym_ZSTD_isError dlsym_ZSTD_isError; +static __dlsym_ZSTD_getErrorName dlsym_ZSTD_getErrorName; +#endif + +// Load the libztsd.so from disk +JNIEXPORT void JNICALL Java_org_apache_hadoop_io_compress_zstd_ZStandardCompressor_initIDs (JNIEnv *env, jclass clazz) { +#ifdef UNIX + // Load libzstd.so + void *libzstd = dlopen(HADOOP_ZSTD_LIBRARY, RTLD_LAZY | RTLD_GLOBAL); + if (!libzstd) { + char* msg = (char*)malloc(10000); + snprintf(msg, 10000, "%s (%s)!", "Cannot load " HADOOP_ZSTD_LIBRARY, dlerror()); + THROW(env, "java/lang/InternalError", msg); + return; + } +#endif + +#ifdef WINDOWS + HMODULE libzstd = LoadLibrary(HADOOP_ZSTD_LIBRARY); + if (!libzstd) { + THROW(env, "java/lang/UnsatisfiedLinkError", "Cannot load zstd.dll"); + return; + } +#endif + +#ifdef UNIX + // load dynamic symbols + dlerror(); + LOAD_DYNAMIC_SYMBOL(dlsym_ZSTD_CStreamInSize, env, libzstd, "ZSTD_CStreamInSize"); + LOAD_DYNAMIC_SYMBOL(dlsym_ZSTD_CStreamOutSize, env, libzstd, "ZSTD_CStreamOutSize"); + LOAD_DYNAMIC_SYMBOL(dlsym_ZSTD_createCStream, env, libzstd, "ZSTD_createCStream"); + LOAD_DYNAMIC_SYMBOL(dlsym_ZSTD_initCStream, env, libzstd, "ZSTD_initCStream"); + LOAD_DYNAMIC_SYMBOL(dlsym_ZSTD_freeCStream, env, libzstd, "ZSTD_freeCStream"); + LOAD_DYNAMIC_SYMBOL(dlsym_ZSTD_compressStream, env, libzstd, "ZSTD_compressStream"); + LOAD_DYNAMIC_SYMBOL(dlsym_ZSTD_endStream, env, libzstd, "ZSTD_endStream"); + LOAD_DYNAMIC_SYMBOL(dlsym_ZSTD_flushStream, env, libzstd, "ZSTD_flushStream"); + LOAD_DYNAMIC_SYMBOL(dlsym_ZSTD_isError, env, libzstd, "ZSTD_isError"); + LOAD_DYNAMIC_SYMBOL(dlsym_ZSTD_getErrorName, env, libzstd, "ZSTD_getErrorName"); +#endif + +#ifdef WINDOWS + LOAD_DYNAMIC_SYMBOL(__dlsym_ZSTD_CStreamInSize, dlsym_ZSTD_CStreamInSize, env, libzstd, "ZSTD_CStreamInSize"); + LOAD_DYNAMIC_SYMBOL(__dlsym_ZSTD_CStreamOutSize, dlsym_ZSTD_CStreamOutSize, env, libzstd, "ZSTD_CStreamOutSize"); + LOAD_DYNAMIC_SYMBOL(__dlsym_ZSTD_createCStream, dlsym_ZSTD_createCStream, env, libzstd, "ZSTD_createCStream"); + LOAD_DYNAMIC_SYMBOL(__dlsym_ZSTD_initCStream, dlsym_ZSTD_initCStream, env, libzstd, "ZSTD_initCStream"); + LOAD_DYNAMIC_SYMBOL(__dlsym_ZSTD_freeCStream, dlsym_ZSTD_freeCStream, env, libzstd, "ZSTD_freeCStream"); + LOAD_DYNAMIC_SYMBOL(__dlsym_ZSTD_compressStream, dlsym_ZSTD_compressStream, env, libzstd, "ZSTD_compressStream"); + LOAD_DYNAMIC_SYMBOL(__dlsym_ZSTD_endStream, dlsym_ZSTD_endStream, env, libzstd, "ZSTD_endStream"); + LOAD_DYNAMIC_SYMBOL(__dlsym_ZSTD_flushStream, dlsym_ZSTD_flushStream, env, libzstd, "ZSTD_flushStream"); + LOAD_DYNAMIC_SYMBOL(__dlsym_ZSTD_isError, dlsym_ZSTD_isError, env, libzstd, "ZSTD_isError"); + LOAD_DYNAMIC_SYMBOL(__dlsym_ZSTD_getErrorName, dlsym_ZSTD_getErrorName, env, libzstd, "ZSTD_getErrorName"); +#endif + + // load fields + ZStandardCompressor_stream = (*env)->GetFieldID(env, clazz, "stream", "J"); + ZStandardCompressor_finish = (*env)->GetFieldID(env, clazz, "finish", "Z"); + ZStandardCompressor_finished = (*env)->GetFieldID(env, clazz, "finished", "Z"); + ZStandardCompressor_uncompressedDirectBufOff = (*env)->GetFieldID(env, clazz, "uncompressedDirectBufOff", "I"); + ZStandardCompressor_uncompressedDirectBufLen = (*env)->GetFieldID(env, clazz, "uncompressedDirectBufLen", "I"); + ZStandardCompressor_directBufferSize = (*env)->GetFieldID(env, clazz, "directBufferSize", "I"); + ZStandardCompressor_bytesRead = (*env)->GetFieldID(env, clazz, "bytesRead", "J"); + ZStandardCompressor_bytesWritten = (*env)->GetFieldID(env, clazz, "bytesWritten", "J"); +} + +// Create the compression stream +JNIEXPORT jlong JNICALL Java_org_apache_hadoop_io_compress_zstd_ZStandardCompressor_create (JNIEnv *env, jobject this) { + ZSTD_CStream* const stream = dlsym_ZSTD_createCStream(); + if (stream == NULL) { + THROW(env, "java/lang/InternalError", "Error creating the stream"); + return (jlong)0; + } + return (jlong) stream; +} + +// Initialize the compression stream +JNIEXPORT void JNICALL Java_org_apache_hadoop_io_compress_zstd_ZStandardCompressor_init (JNIEnv *env, jobject this, jint level, jlong stream) { + size_t result = dlsym_ZSTD_initCStream((ZSTD_CStream *) stream, level); + if (dlsym_ZSTD_isError(result)) { + THROW(env, "java/lang/InternalError", dlsym_ZSTD_getErrorName(result)); + return; + } +} + +// free the compression stream +JNIEXPORT void JNICALL Java_org_apache_hadoop_io_compress_zstd_ZStandardCompressor_end (JNIEnv *env, jobject this, jlong stream) { + size_t result = dlsym_ZSTD_freeCStream((ZSTD_CStream *) stream); + if (dlsym_ZSTD_isError(result)) { + THROW(env, "java/lang/InternalError", dlsym_ZSTD_getErrorName(result)); + return; + } +} + +JNIEXPORT jint Java_org_apache_hadoop_io_compress_zstd_ZStandardCompressor_deflateBytesDirect +(JNIEnv *env, jobject this, jobject uncompressed_direct_buf, jint uncompressed_direct_buf_off, jint uncompressed_direct_buf_len, jobject compressed_direct_buf, jint compressed_direct_buf_len ) { + ZSTD_CStream* const stream = (ZSTD_CStream*) (*env)->GetLongField(env, this, ZStandardCompressor_stream); + if (!stream) { + THROW(env, "java/lang/NullPointerException", NULL); + return (jint)0; + } + + jlong bytes_read = (*env)->GetLongField(env, this, ZStandardCompressor_bytesRead); + jlong bytes_written = (*env)->GetLongField(env, this, ZStandardCompressor_bytesWritten); + jboolean finish = (*env)->GetBooleanField(env, this, ZStandardCompressor_finish); + + // Get the input direct buffer + void * uncompressed_bytes = (*env)->GetDirectBufferAddress(env, uncompressed_direct_buf); + if (!uncompressed_bytes) { + THROW(env, "java/lang/InternalError", "Undefined memory address for uncompressedDirectBuf"); + return (jint) 0; + } + + // Get the output direct buffer + void * compressed_bytes = (*env)->GetDirectBufferAddress(env, compressed_direct_buf); + if (!compressed_bytes) { + THROW(env, "java/lang/InternalError", "Undefined memory address for compressedDirectBuf"); + return (jint) 0; + } + + ZSTD_inBuffer input = { uncompressed_bytes, uncompressed_direct_buf_len, uncompressed_direct_buf_off }; + ZSTD_outBuffer output = { compressed_bytes, compressed_direct_buf_len, 0 }; + + size_t size = dlsym_ZSTD_compressStream(stream, &output, &input); + if (dlsym_ZSTD_isError(size)) { + THROW(env, "java/lang/InternalError", dlsym_ZSTD_getErrorName(size)); + return (jint) 0; + } + if (finish && input.pos == input.size) { + // end the stream, flush and write the frame epilogue + size = dlsym_ZSTD_endStream(stream, &output); + if (!size) { + (*env)->SetBooleanField(env, this, ZStandardCompressor_finished, JNI_TRUE); + } + } else { + // need to flush the output buffer + // this also updates the output buffer position. + size = dlsym_ZSTD_flushStream(stream, &output); + } + if (dlsym_ZSTD_isError(size)) { + THROW(env, "java/lang/InternalError", dlsym_ZSTD_getErrorName(size)); + return (jint) 0; + } + + bytes_read += input.pos; + bytes_written += output.pos; + (*env)->SetLongField(env, this, ZStandardCompressor_bytesRead, bytes_read); + (*env)->SetLongField(env, this, ZStandardCompressor_bytesWritten, bytes_written); + + (*env)->SetIntField(env, this, ZStandardCompressor_uncompressedDirectBufOff, input.pos); + (*env)->SetIntField(env, this, ZStandardCompressor_uncompressedDirectBufLen, input.size - input.pos); + return (jint) output.pos; +} + +JNIEXPORT jstring JNICALL Java_org_apache_hadoop_io_compress_zstd_ZStandardCompressor_getLibraryName +(JNIEnv *env, jclass class) { +#ifdef UNIX + if (dlsym_ZSTD_isError) { + Dl_info dl_info; + if (dladdr( dlsym_ZSTD_isError, &dl_info)) { + return (*env)->NewStringUTF(env, dl_info.dli_fname); + } + } + return (*env)->NewStringUTF(env, HADOOP_ZSTD_LIBRARY); +#endif +#ifdef WINDOWS + LPWSTR filename = NULL; + GetLibraryName(dlsym_ZSTD_isError, &filename); + if (filename != NULL) { + return (*env)->NewString(env, filename, (jsize) wcslen(filename)); + } else { + return (*env)->NewStringUTF(env, "Unavailable"); + } +#endif +} + +// returns the max size of the recommended input and output buffers +JNIEXPORT jint JNICALL Java_org_apache_hadoop_io_compress_zstd_ZStandardCompressor_getStreamSize +(JNIEnv *env, jobject this) { + int x = (int) dlsym_ZSTD_CStreamInSize(); + int y = (int) dlsym_ZSTD_CStreamOutSize(); + return (x >= y) ? x : y; +} + +#endif //define HADOOP_ZSTD_LIBRARY \ No newline at end of file http://git-wip-us.apache.org/repos/asf/hadoop/blob/a0a27616/hadoop-common-project/hadoop-common/src/main/native/src/org/apache/hadoop/io/compress/zstd/ZStandardDecompressor.c ---------------------------------------------------------------------- diff --git a/hadoop-common-project/hadoop-common/src/main/native/src/org/apache/hadoop/io/compress/zstd/ZStandardDecompressor.c b/hadoop-common-project/hadoop-common/src/main/native/src/org/apache/hadoop/io/compress/zstd/ZStandardDecompressor.c new file mode 100644 index 0000000..1236756 --- /dev/null +++ b/hadoop-common-project/hadoop-common/src/main/native/src/org/apache/hadoop/io/compress/zstd/ZStandardDecompressor.c @@ -0,0 +1,218 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +#include "org_apache_hadoop_io_compress_zstd.h" + +#if defined HADOOP_ZSTD_LIBRARY + +#include <stdio.h> +#include <stdlib.h> +#include <string.h> + +#ifdef UNIX +#include <dlfcn.h> +#include "config.h" +#endif + +#include "org_apache_hadoop_io_compress_zstd_ZStandardDecompressor.h" + +static jfieldID ZStandardDecompressor_stream; +static jfieldID ZStandardDecompressor_compressedDirectBufOff; +static jfieldID ZStandardDecompressor_bytesInCompressedBuffer; +static jfieldID ZStandardDecompressor_directBufferSize; +static jfieldID ZStandardDecompressor_finished; +static jfieldID ZStandardDecompressor_remaining; + +#ifdef UNIX +static size_t (*dlsym_ZSTD_DStreamOutSize)(void); +static size_t (*dlsym_ZSTD_DStreamInSize)(void); +static ZSTD_DStream* (*dlsym_ZSTD_createDStream)(void); +static size_t (*dlsym_ZSTD_initDStream)(ZSTD_DStream*); +static size_t (*dlsym_ZSTD_freeDStream)(ZSTD_DStream*); +static size_t (*dlsym_ZSTD_resetDStream)(ZSTD_DStream*); +static size_t (*dlsym_ZSTD_decompressStream)(ZSTD_DStream*, ZSTD_outBuffer*, ZSTD_inBuffer*); +static size_t (*dlsym_ZSTD_flushStream)(ZSTD_CStream*, ZSTD_outBuffer*); +static unsigned (*dlsym_ZSTD_isError)(size_t); +static const char * (*dlsym_ZSTD_getErrorName)(size_t); +#endif + +#ifdef WINDOWS +typedef size_t (__cdecl *__dlsym_ZSTD_DStreamOutSize)(void); +typedef size_t (__cdecl *__dlsym_ZSTD_DStreamInSize)(void); +typedef ZSTD_DStream* (__cdecl *__dlsym_ZSTD_createDStream)(void); +typedef size_t (__cdecl *__dlsym_ZSTD_initDStream)(ZSTD_DStream*); +typedef size_t (__cdecl *__dlsym_ZSTD_freeDStream)(ZSTD_DStream*); +typedef size_t (__cdecl *__dlsym_ZSTD_resetDStream)(ZSTD_DStream*); +typedef size_t (__cdecl *__dlsym_ZSTD_decompressStream)(ZSTD_DStream*, ZSTD_outBuffer*, ZSTD_inBuffer*); +typedef size_t (__cdecl *__dlsym_ZSTD_flushStream)(ZSTD_CStream*, ZSTD_outBuffer*); +typedef unsigned (__cdecl *__dlsym_ZSTD_isError)(size_t); +typedef const char * (__cdecl *__dlsym_ZSTD_getErrorName)(size_t); + +static __dlsym_ZSTD_DStreamOutSize dlsym_ZSTD_DStreamOutSize; +static __dlsym_ZSTD_DStreamInSize dlsym_ZSTD_DStreamInSize; +static __dlsym_ZSTD_createDStream dlsym_ZSTD_createDStream; +static __dlsym_ZSTD_initDStream dlsym_ZSTD_initDStream; +static __dlsym_ZSTD_freeDStream dlsym_ZSTD_freeDStream; +static __dlsym_ZSTD_resetDStream dlsym_ZSTD_resetDStream; +static __dlsym_ZSTD_decompressStream dlsym_ZSTD_decompressStream; +static __dlsym_ZSTD_isError dlsym_ZSTD_isError; +static __dlsym_ZSTD_getErrorName dlsym_ZSTD_getErrorName; +static __dlsym_ZSTD_flushStream dlsym_ZSTD_flushStream; +#endif + +JNIEXPORT void JNICALL Java_org_apache_hadoop_io_compress_zstd_ZStandardDecompressor_initIDs (JNIEnv *env, jclass clazz) { + // Load libzstd.so +#ifdef UNIX + void *libzstd = dlopen(HADOOP_ZSTD_LIBRARY, RTLD_LAZY | RTLD_GLOBAL); + if (!libzstd) { + char* msg = (char*)malloc(1000); + snprintf(msg, 1000, "%s (%s)!", "Cannot load " HADOOP_ZSTD_LIBRARY, dlerror()); + THROW(env, "java/lang/UnsatisfiedLinkError", msg); + return; + } +#endif + +#ifdef WINDOWS + HMODULE libzstd = LoadLibrary(HADOOP_ZSTD_LIBRARY); + if (!libzstd) { + THROW(env, "java/lang/UnsatisfiedLinkError", "Cannot load zstd.dll"); + return; + } +#endif + +#ifdef UNIX + dlerror(); + LOAD_DYNAMIC_SYMBOL(dlsym_ZSTD_DStreamOutSize, env, libzstd, "ZSTD_DStreamOutSize"); + LOAD_DYNAMIC_SYMBOL(dlsym_ZSTD_DStreamInSize, env, libzstd, "ZSTD_DStreamInSize"); + LOAD_DYNAMIC_SYMBOL(dlsym_ZSTD_createDStream, env, libzstd, "ZSTD_createDStream"); + LOAD_DYNAMIC_SYMBOL(dlsym_ZSTD_initDStream, env, libzstd, "ZSTD_initDStream"); + LOAD_DYNAMIC_SYMBOL(dlsym_ZSTD_freeDStream, env, libzstd, "ZSTD_freeDStream"); + LOAD_DYNAMIC_SYMBOL(dlsym_ZSTD_resetDStream, env, libzstd, "ZSTD_resetDStream"); + LOAD_DYNAMIC_SYMBOL(dlsym_ZSTD_decompressStream, env, libzstd, "ZSTD_decompressStream"); + LOAD_DYNAMIC_SYMBOL(dlsym_ZSTD_isError, env, libzstd, "ZSTD_isError"); + LOAD_DYNAMIC_SYMBOL(dlsym_ZSTD_getErrorName, env, libzstd, "ZSTD_getErrorName"); + LOAD_DYNAMIC_SYMBOL(dlsym_ZSTD_flushStream, env, libzstd, "ZSTD_flushStream"); +#endif + +#ifdef WINDOWS + LOAD_DYNAMIC_SYMBOL(__dlsym_ZSTD_DStreamOutSize, dlsym_ZSTD_DStreamOutSize, env, libzstd, "ZSTD_DStreamOutSize"); + LOAD_DYNAMIC_SYMBOL(__dlsym_ZSTD_DStreamInSize, dlsym_ZSTD_DStreamInSize, env, libzstd, "ZSTD_DStreamInSize"); + LOAD_DYNAMIC_SYMBOL(__dlsym_ZSTD_createDStream, dlsym_ZSTD_createDStream, env, libzstd, "ZSTD_createDStream"); + LOAD_DYNAMIC_SYMBOL(__dlsym_ZSTD_initDStream, dlsym_ZSTD_initDStream, env, libzstd, "ZSTD_initDStream"); + LOAD_DYNAMIC_SYMBOL(__dlsym_ZSTD_freeDStream, dlsym_ZSTD_freeDStream, env, libzstd, "ZSTD_freeDStream"); + LOAD_DYNAMIC_SYMBOL(__dlsym_ZSTD_resetDStream, dlsym_ZSTD_resetDStream, env, libzstd, "ZSTD_resetDStream"); + LOAD_DYNAMIC_SYMBOL(__dlsym_ZSTD_decompressStream, dlsym_ZSTD_decompressStream, env, libzstd, "ZSTD_decompressStream"); + LOAD_DYNAMIC_SYMBOL(__dlsym_ZSTD_isError, dlsym_ZSTD_isError, env, libzstd, "ZSTD_isError"); + LOAD_DYNAMIC_SYMBOL(__dlsym_ZSTD_getErrorName, dlsym_ZSTD_getErrorName, env, libzstd, "ZSTD_getErrorName"); + LOAD_DYNAMIC_SYMBOL(__dlsym_ZSTD_flushStream, dlsym_ZSTD_flushStream, env, libzstd, "ZSTD_flushStream"); +#endif + + ZStandardDecompressor_stream = (*env)->GetFieldID(env, clazz, "stream", "J"); + ZStandardDecompressor_finished = (*env)->GetFieldID(env, clazz, "finished", "Z"); + ZStandardDecompressor_compressedDirectBufOff = (*env)->GetFieldID(env, clazz, "compressedDirectBufOff", "I"); + ZStandardDecompressor_bytesInCompressedBuffer = (*env)->GetFieldID(env, clazz, "bytesInCompressedBuffer", "I"); + ZStandardDecompressor_directBufferSize = (*env)->GetFieldID(env, clazz, "directBufferSize", "I"); + ZStandardDecompressor_remaining = (*env)->GetFieldID(env, clazz, "remaining", "I"); +} + +JNIEXPORT jlong JNICALL Java_org_apache_hadoop_io_compress_zstd_ZStandardDecompressor_create(JNIEnv *env, jobject this) { + ZSTD_DStream * stream = dlsym_ZSTD_createDStream(); + if (stream == NULL) { + THROW(env, "java/lang/InternalError", "Error creating stream"); + return (jlong) 0; + } + return (jlong) stream; +} + +JNIEXPORT void JNICALL Java_org_apache_hadoop_io_compress_zstd_ZStandardDecompressor_init(JNIEnv *env, jobject this, jlong stream) { + size_t result = dlsym_ZSTD_initDStream((ZSTD_DStream *) stream); + if (dlsym_ZSTD_isError(result)) { + THROW(env, "java/lang/InternalError", dlsym_ZSTD_getErrorName(result)); + return; + } + (*env)->SetLongField(env, this, ZStandardDecompressor_remaining, 0); +} + + +JNIEXPORT void JNICALL Java_org_apache_hadoop_io_compress_zstd_ZStandardDecompressor_free(JNIEnv *env, jclass obj, jlong stream) { + size_t result = dlsym_ZSTD_freeDStream((ZSTD_DStream *) stream); + if (dlsym_ZSTD_isError(result)) { + THROW(env, "java/lang/InternalError", dlsym_ZSTD_getErrorName(result)); + return; + } +} + +JNIEXPORT jint JNICALL Java_org_apache_hadoop_io_compress_zstd_ZStandardDecompressor_inflateBytesDirect +(JNIEnv *env, jobject this, jobject compressed_direct_buf, jint compressed_direct_buf_off, jint compressed_direct_buf_len, jobject uncompressed_direct_buf, jint uncompressed_direct_buf_off, jint uncompressed_direct_buf_len) { + ZSTD_DStream *stream = (ZSTD_DStream *) (*env)->GetLongField(env, this, ZStandardDecompressor_stream); + if (!stream) { + THROW(env, "java/lang/NullPointerException", NULL); + return (jint)0; + } + + // Get the input direct buffer + void * compressed_bytes = (*env)->GetDirectBufferAddress(env, compressed_direct_buf); + if (!compressed_bytes) { + THROW(env, "java/lang/InternalError", "Undefined memory address for compressedDirectBuf"); + return (jint) 0; + } + + // Get the output direct buffer + void * uncompressed_bytes = (*env)->GetDirectBufferAddress(env, uncompressed_direct_buf); + if (!uncompressed_bytes) { + THROW(env, "java/lang/InternalError", "Undefined memory address for uncompressedDirectBuf"); + return (jint) 0; + } + uncompressed_bytes = ((char*) uncompressed_bytes) + uncompressed_direct_buf_off; + + ZSTD_inBuffer input = { compressed_bytes, compressed_direct_buf_len, compressed_direct_buf_off }; + ZSTD_outBuffer output = { uncompressed_bytes, uncompressed_direct_buf_len, 0 }; + + size_t const size = dlsym_ZSTD_decompressStream(stream, &output, &input); + + // check for errors + if (dlsym_ZSTD_isError(size)) { + THROW(env, "java/lang/InternalError", dlsym_ZSTD_getErrorName(size)); + return (jint) 0; + } + int remaining = input.size - input.pos; + (*env)->SetIntField(env, this, ZStandardDecompressor_remaining, remaining); + + // the entire frame has been decoded + if (size == 0) { + (*env)->SetBooleanField(env, this, ZStandardDecompressor_finished, JNI_TRUE); + size_t result = dlsym_ZSTD_initDStream(stream); + if (dlsym_ZSTD_isError(result)) { + THROW(env, "java/lang/InternalError", dlsym_ZSTD_getErrorName(result)); + return (jint) 0; + } + } + (*env)->SetIntField(env, this, ZStandardDecompressor_compressedDirectBufOff, input.pos); + (*env)->SetIntField(env, this, ZStandardDecompressor_bytesInCompressedBuffer, input.size); + return (jint) output.pos; +} + +// returns the max size of the recommended input and output buffers +JNIEXPORT jint JNICALL Java_org_apache_hadoop_io_compress_zstd_ZStandardDecompressor_getStreamSize +(JNIEnv *env, jclass obj) { + int x = (int) dlsym_ZSTD_DStreamInSize(); + int y = (int) dlsym_ZSTD_DStreamOutSize(); + return (x >= y) ? x : y; +} + +#endif //define HADOOP_ZSTD_LIBRARY \ No newline at end of file http://git-wip-us.apache.org/repos/asf/hadoop/blob/a0a27616/hadoop-common-project/hadoop-common/src/main/native/src/org/apache/hadoop/io/compress/zstd/org_apache_hadoop_io_compress_zstd.h ---------------------------------------------------------------------- diff --git a/hadoop-common-project/hadoop-common/src/main/native/src/org/apache/hadoop/io/compress/zstd/org_apache_hadoop_io_compress_zstd.h b/hadoop-common-project/hadoop-common/src/main/native/src/org/apache/hadoop/io/compress/zstd/org_apache_hadoop_io_compress_zstd.h new file mode 100644 index 0000000..78fc0a4 --- /dev/null +++ b/hadoop-common-project/hadoop-common/src/main/native/src/org/apache/hadoop/io/compress/zstd/org_apache_hadoop_io_compress_zstd.h @@ -0,0 +1,34 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + + +#ifndef ORG_APACHE_HADOOP_IO_COMPRESS_ZSTD_ZSTD_H +#define ORG_APACHE_HADOOP_IO_COMPRESS_ZSTD_ZSTD_H + +#include "org_apache_hadoop.h" + +#ifdef UNIX +#include <dlfcn.h> +#endif + +#include <jni.h> +#include <zstd.h> +#include <stddef.h> + + +#endif //ORG_APACHE_HADOOP_IO_COMPRESS_ZSTD_ZSTD_H http://git-wip-us.apache.org/repos/asf/hadoop/blob/a0a27616/hadoop-common-project/hadoop-common/src/main/native/src/org/apache/hadoop/util/NativeCodeLoader.c ---------------------------------------------------------------------- diff --git a/hadoop-common-project/hadoop-common/src/main/native/src/org/apache/hadoop/util/NativeCodeLoader.c b/hadoop-common-project/hadoop-common/src/main/native/src/org/apache/hadoop/util/NativeCodeLoader.c index ae8263a..1bd7fa1 100644 --- a/hadoop-common-project/hadoop-common/src/main/native/src/org/apache/hadoop/util/NativeCodeLoader.c +++ b/hadoop-common-project/hadoop-common/src/main/native/src/org/apache/hadoop/util/NativeCodeLoader.c @@ -39,6 +39,17 @@ JNIEXPORT jboolean JNICALL Java_org_apache_hadoop_util_NativeCodeLoader_buildSup #endif } +JNIEXPORT jboolean JNICALL Java_org_apache_hadoop_util_NativeCodeLoader_buildSupportsZstd + (JNIEnv *env, jclass clazz) +{ +#ifdef HADOOP_ZSTD_LIBRARY + return JNI_TRUE; +#else + return JNI_FALSE; +#endif +} + + JNIEXPORT jboolean JNICALL Java_org_apache_hadoop_util_NativeCodeLoader_buildSupportsOpenssl (JNIEnv *env, jclass clazz) { http://git-wip-us.apache.org/repos/asf/hadoop/blob/a0a27616/hadoop-common-project/hadoop-common/src/main/resources/META-INF/services/org.apache.hadoop.io.compress.CompressionCodec ---------------------------------------------------------------------- diff --git a/hadoop-common-project/hadoop-common/src/main/resources/META-INF/services/org.apache.hadoop.io.compress.CompressionCodec b/hadoop-common-project/hadoop-common/src/main/resources/META-INF/services/org.apache.hadoop.io.compress.CompressionCodec index df46e32..99b6fb2 100644 --- a/hadoop-common-project/hadoop-common/src/main/resources/META-INF/services/org.apache.hadoop.io.compress.CompressionCodec +++ b/hadoop-common-project/hadoop-common/src/main/resources/META-INF/services/org.apache.hadoop.io.compress.CompressionCodec @@ -17,4 +17,5 @@ org.apache.hadoop.io.compress.DeflateCodec org.apache.hadoop.io.compress.GzipCodec org.apache.hadoop.io.compress.Lz4Codec org.apache.hadoop.io.compress.SnappyCodec +org.apache.hadoop.io.compress.ZStandardCodec http://git-wip-us.apache.org/repos/asf/hadoop/blob/a0a27616/hadoop-common-project/hadoop-common/src/site/markdown/NativeLibraries.md.vm ---------------------------------------------------------------------- diff --git a/hadoop-common-project/hadoop-common/src/site/markdown/NativeLibraries.md.vm b/hadoop-common-project/hadoop-common/src/site/markdown/NativeLibraries.md.vm index 04ff426..e4f720c 100644 --- a/hadoop-common-project/hadoop-common/src/site/markdown/NativeLibraries.md.vm +++ b/hadoop-common-project/hadoop-common/src/site/markdown/NativeLibraries.md.vm @@ -118,6 +118,7 @@ NativeLibraryChecker is a tool to check whether native libraries are loaded corr hadoop: true /home/ozawa/hadoop/lib/native/libhadoop.so.1.0.0 zlib: true /lib/x86_64-linux-gnu/libz.so.1 snappy: true /usr/lib/libsnappy.so.1 + zstd: true /usr/lib/libzstd.so.1 lz4: true revision:99 bzip2: false http://git-wip-us.apache.org/repos/asf/hadoop/blob/a0a27616/hadoop-common-project/hadoop-common/src/test/java/org/apache/hadoop/io/compress/TestCodec.java ---------------------------------------------------------------------- diff --git a/hadoop-common-project/hadoop-common/src/test/java/org/apache/hadoop/io/compress/TestCodec.java b/hadoop-common-project/hadoop-common/src/test/java/org/apache/hadoop/io/compress/TestCodec.java index 5443ca7..3955aa2 100644 --- a/hadoop-common-project/hadoop-common/src/test/java/org/apache/hadoop/io/compress/TestCodec.java +++ b/hadoop-common-project/hadoop-common/src/test/java/org/apache/hadoop/io/compress/TestCodec.java @@ -505,6 +505,18 @@ public class TestCodec { } @Test(timeout=20000) + public void testSequenceFileZStandardCodec() throws Exception { + assumeTrue(ZStandardCodec.isNativeCodeLoaded()); + Configuration conf = new Configuration(); + sequenceFileCodecTest(conf, 0, + "org.apache.hadoop.io.compress.ZStandardCodec", 100); + sequenceFileCodecTest(conf, 100, + "org.apache.hadoop.io.compress.ZStandardCodec", 100); + sequenceFileCodecTest(conf, 200000, + "org.apache.hadoop.io.compress.ZStandardCodec", 1000000); + } + + @Test(timeout=20000) public void testSequenceFileBZip2NativeCodec() throws IOException, ClassNotFoundException, InstantiationException, IllegalAccessException { http://git-wip-us.apache.org/repos/asf/hadoop/blob/a0a27616/hadoop-common-project/hadoop-common/src/test/java/org/apache/hadoop/io/compress/TestCompressionStreamReuse.java ---------------------------------------------------------------------- diff --git a/hadoop-common-project/hadoop-common/src/test/java/org/apache/hadoop/io/compress/TestCompressionStreamReuse.java b/hadoop-common-project/hadoop-common/src/test/java/org/apache/hadoop/io/compress/TestCompressionStreamReuse.java index 2d75a2d..dd7bdd2 100644 --- a/hadoop-common-project/hadoop-common/src/test/java/org/apache/hadoop/io/compress/TestCompressionStreamReuse.java +++ b/hadoop-common-project/hadoop-common/src/test/java/org/apache/hadoop/io/compress/TestCompressionStreamReuse.java @@ -37,6 +37,7 @@ import org.apache.hadoop.util.ReflectionUtils; import org.junit.Test; import static org.junit.Assert.assertTrue; +import static org.junit.Assume.assumeTrue; public class TestCompressionStreamReuse { private static final Log LOG = LogFactory @@ -59,6 +60,13 @@ public class TestCompressionStreamReuse { } @Test + public void testZStandardCompressStreamReuse() throws IOException { + assumeTrue(ZStandardCodec.isNativeCodeLoaded()); + resetStateTest(conf, seed, count, + "org.apache.hadoop.io.compress.ZStandardCodec"); + } + + @Test public void testGzipCompressStreamReuseWithParam() throws IOException { Configuration conf = new Configuration(this.conf); ZlibFactory --------------------------------------------------------------------- To unsubscribe, e-mail: common-commits-unsubscr...@hadoop.apache.org For additional commands, e-mail: common-commits-h...@hadoop.apache.org