This is an automated email from the ASF dual-hosted git repository.
gabor pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/parquet-mr.git
The following commit(s) were added to refs/heads/master by this push:
new dc40b59 PARQUET-1866: Replace Hadoop ZSTD with JNI-ZSTD (#793)
dc40b59 is described below
commit dc40b598a003dc8da38cadd576c9a36ece1eea1f
Author: shangxinli <[email protected]>
AuthorDate: Wed Jun 3 00:17:15 2020 -0700
PARQUET-1866: Replace Hadoop ZSTD with JNI-ZSTD (#793)
---
.../hadoop/metadata/CompressionCodecName.java | 2 +-
parquet-hadoop/README.md | 13 +-
parquet-hadoop/pom.xml | 5 +
.../org/apache/parquet/hadoop/CodecFactory.java | 4 +-
.../parquet/hadoop/codec/ZstandardCodec.java | 112 ++++++++++++++
.../parquet/hadoop/codec/ZstdCompressorStream.java | 62 ++++++++
.../hadoop/codec/ZstdDecompressorStream.java | 47 ++++++
.../apache/parquet/hadoop/TestZstandardCodec.java | 167 +++++++++++++++++++++
.../hadoop/example/TestInputOutputFormat.java | 1 +
9 files changed, 410 insertions(+), 3 deletions(-)
diff --git
a/parquet-common/src/main/java/org/apache/parquet/hadoop/metadata/CompressionCodecName.java
b/parquet-common/src/main/java/org/apache/parquet/hadoop/metadata/CompressionCodecName.java
index 8cdede0..05abd15 100644
---
a/parquet-common/src/main/java/org/apache/parquet/hadoop/metadata/CompressionCodecName.java
+++
b/parquet-common/src/main/java/org/apache/parquet/hadoop/metadata/CompressionCodecName.java
@@ -30,7 +30,7 @@ public enum CompressionCodecName {
LZO("com.hadoop.compression.lzo.LzoCodec", CompressionCodec.LZO, ".lzo"),
BROTLI("org.apache.hadoop.io.compress.BrotliCodec", CompressionCodec.BROTLI,
".br"),
LZ4("org.apache.hadoop.io.compress.Lz4Codec", CompressionCodec.LZ4, ".lz4"),
- ZSTD("org.apache.hadoop.io.compress.ZStandardCodec", CompressionCodec.ZSTD,
".zstd");
+ ZSTD("org.apache.parquet.hadoop.codec.ZstandardCodec",
CompressionCodec.ZSTD, ".zstd");
public static CompressionCodecName fromConf(String name) {
if (name == null) {
diff --git a/parquet-hadoop/README.md b/parquet-hadoop/README.md
index 74e64d0..939a79a 100644
--- a/parquet-hadoop/README.md
+++ b/parquet-hadoop/README.md
@@ -324,9 +324,20 @@ ParquetInputFormat to materialize records. It should be a
the descendant class o
**Property:** `parquet.read.schema`
**Description:** The read projection schema.
-
## Class: UnmaterializableRecordCounter
**Property:** `parquet.read.bad.record.threshold`
**Description:** The percentage of bad records to tolerate.
**Default value:** `0`
+
+## Class: ZstandardCodec
+
+**Property:** `parquet.compression.codec.zstd.level`
+**Description:** The compression level of ZSTD. The valid range is 1~22.
Generally the higher compression level, the higher compression ratio can be
achieved, but the writing time will be longer.
+**Default value:** `3`
+
+---
+
+**Property:** `parquet.compression.codec.zstd.workers`
+**Description:** The number of threads will be spawned to compress in
parallel. More workers improve speed, but also increase memory usage. When it
is 0, it works as single-threaded mode.
+**Default value:** `0`
diff --git a/parquet-hadoop/pom.xml b/parquet-hadoop/pom.xml
index e047f03..af2ac17 100644
--- a/parquet-hadoop/pom.xml
+++ b/parquet-hadoop/pom.xml
@@ -103,6 +103,11 @@
<version>${brotli-codec.version}</version>
<optional>true</optional>
</dependency>
+ <dependency>
+ <groupId>com.github.luben</groupId>
+ <artifactId>zstd-jni</artifactId>
+ <version>${zstd-jni.version}</version>
+ </dependency>
<dependency>
<groupId>com.google.guava</groupId>
diff --git
a/parquet-hadoop/src/main/java/org/apache/parquet/hadoop/CodecFactory.java
b/parquet-hadoop/src/main/java/org/apache/parquet/hadoop/CodecFactory.java
index a03d045..f0e7af3 100644
--- a/parquet-hadoop/src/main/java/org/apache/parquet/hadoop/CodecFactory.java
+++ b/parquet-hadoop/src/main/java/org/apache/parquet/hadoop/CodecFactory.java
@@ -105,7 +105,9 @@ public class CodecFactory implements
CompressionCodecFactory {
public BytesInput decompress(BytesInput bytes, int uncompressedSize)
throws IOException {
final BytesInput decompressed;
if (codec != null) {
- decompressor.reset();
+ if (decompressor != null) {
+ decompressor.reset();
+ }
InputStream is = codec.createInputStream(bytes.toInputStream(),
decompressor);
decompressed = BytesInput.from(is, uncompressedSize);
} else {
diff --git
a/parquet-hadoop/src/main/java/org/apache/parquet/hadoop/codec/ZstandardCodec.java
b/parquet-hadoop/src/main/java/org/apache/parquet/hadoop/codec/ZstandardCodec.java
new file mode 100644
index 0000000..0409cf2
--- /dev/null
+++
b/parquet-hadoop/src/main/java/org/apache/parquet/hadoop/codec/ZstandardCodec.java
@@ -0,0 +1,112 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.parquet.hadoop.codec;
+
+import org.apache.hadoop.conf.Configurable;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.io.compress.CompressionCodec;
+import org.apache.hadoop.io.compress.CompressionInputStream;
+import org.apache.hadoop.io.compress.CompressionOutputStream;
+import org.apache.hadoop.io.compress.Compressor;
+import org.apache.hadoop.io.compress.Decompressor;
+
+import java.io.IOException;
+import java.io.InputStream;
+import java.io.OutputStream;
+
+/**
+ * ZSTD compression codec for Parquet. We do not use the default hadoop
+ * one because it requires 1) to set up hadoop on local development machine;
+ * 2) to upgrade hadoop to the newer version to have ZSTD support which is
+ * more cumbersome than upgrading parquet version.
+ *
+ * This implementation relies on ZSTD JNI(https://github.com/luben/zstd-jni)
+ * which is already a dependency for Parquet. ZSTD JNI ZstdOutputStream and
+ * ZstdInputStream use Zstd internally. So no need to create compressor and
+ * decompressor in ZstandardCodec.
+ */
+public class ZstandardCodec implements Configurable, CompressionCodec {
+
+ public final static String PARQUET_COMPRESS_ZSTD_LEVEL =
"parquet.compression.codec.zstd.level";
+ public final static int DEFAULT_PARQUET_COMPRESS_ZSTD_LEVEL = 3;
+ public final static String PARQUET_COMPRESS_ZSTD_WORKERS =
"parquet.compression.codec.zstd.workers";
+ public final static int DEFAULTPARQUET_COMPRESS_ZSTD_WORKERS = 0;
+
+ private Configuration conf;
+
+ @Override
+ public void setConf(Configuration conf) {
+ this.conf = conf;
+ }
+
+ @Override
+ public Configuration getConf() {
+ return conf;
+ }
+
+ @Override
+ public Compressor createCompressor() {
+ // ZstdOutputStream calls static Zstd compressor, so no compressor is
created
+ return null;
+ }
+
+ @Override
+ public Decompressor createDecompressor() {
+ // ZstdInputStream calls static Zstd decompressor, so no decompressor is
created
+ return null;
+ }
+
+ @Override
+ public CompressionInputStream createInputStream(InputStream stream,
Decompressor decompressor) throws IOException {
+ // Ignore decompressor because ZstdInputStream calls static Zstd
decompressor
+ return createInputStream(stream);
+ }
+
+ @Override
+ public CompressionInputStream createInputStream(InputStream stream) throws
IOException {
+ return new ZstdDecompressorStream(stream);
+ }
+
+ @Override
+ public CompressionOutputStream createOutputStream(OutputStream stream,
Compressor compressor) throws IOException {
+ // Ignore compressor because ZstdOutputStream calls static Zstd compressor
+ return createOutputStream(stream);
+ }
+
+ @Override
+ public CompressionOutputStream createOutputStream(OutputStream stream)
throws IOException {
+ return new ZstdCompressorStream(stream,
conf.getInt(PARQUET_COMPRESS_ZSTD_LEVEL, DEFAULT_PARQUET_COMPRESS_ZSTD_LEVEL),
+ conf.getInt(PARQUET_COMPRESS_ZSTD_WORKERS,
DEFAULTPARQUET_COMPRESS_ZSTD_WORKERS));
+ }
+
+ @Override
+ public Class<? extends Compressor> getCompressorType() {
+ return null;
+ }
+
+ @Override
+ public Class<? extends Decompressor> getDecompressorType() {
+ return null;
+ }
+
+ @Override
+ public String getDefaultExtension() {
+ return ".zstd";
+ }
+}
diff --git
a/parquet-hadoop/src/main/java/org/apache/parquet/hadoop/codec/ZstdCompressorStream.java
b/parquet-hadoop/src/main/java/org/apache/parquet/hadoop/codec/ZstdCompressorStream.java
new file mode 100644
index 0000000..f9b9210
--- /dev/null
+++
b/parquet-hadoop/src/main/java/org/apache/parquet/hadoop/codec/ZstdCompressorStream.java
@@ -0,0 +1,62 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.parquet.hadoop.codec;
+
+import com.github.luben.zstd.ZstdOutputStream;
+import org.apache.hadoop.io.compress.CompressionOutputStream;
+
+import java.io.IOException;
+import java.io.OutputStream;
+
+public class ZstdCompressorStream extends CompressionOutputStream {
+
+ private ZstdOutputStream zstdOutputStream;
+
+ public ZstdCompressorStream(OutputStream stream, int level, int workers)
throws IOException {
+ super(stream);
+ zstdOutputStream = new ZstdOutputStream(stream, level);
+ zstdOutputStream.setWorkers(workers);
+ }
+
+ public void write(byte[] b, int off, int len) throws IOException {
+ zstdOutputStream.write(b, off, len);
+ }
+
+ public void write(int b) throws IOException {
+ zstdOutputStream.write(b);
+ }
+
+ public void finish() throws IOException {
+ //no-opt, doesn't apply to ZSTD
+ }
+
+ public void resetState() throws IOException {
+ // no-opt, doesn't apply to ZSTD
+ }
+
+ @Override
+ public void flush() throws IOException {
+ zstdOutputStream.flush();
+ }
+
+ @Override
+ public void close() throws IOException {
+ zstdOutputStream.close();
+ }
+}
diff --git
a/parquet-hadoop/src/main/java/org/apache/parquet/hadoop/codec/ZstdDecompressorStream.java
b/parquet-hadoop/src/main/java/org/apache/parquet/hadoop/codec/ZstdDecompressorStream.java
new file mode 100644
index 0000000..34d4849
--- /dev/null
+++
b/parquet-hadoop/src/main/java/org/apache/parquet/hadoop/codec/ZstdDecompressorStream.java
@@ -0,0 +1,47 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.parquet.hadoop.codec;
+
+import com.github.luben.zstd.ZstdInputStream;
+import org.apache.hadoop.io.compress.CompressionInputStream;
+
+import java.io.IOException;
+import java.io.InputStream;
+
+public class ZstdDecompressorStream extends CompressionInputStream {
+
+ private ZstdInputStream zstdInputStream;
+
+ public ZstdDecompressorStream(InputStream stream) throws IOException {
+ super(stream);
+ zstdInputStream = new ZstdInputStream(stream);
+ }
+
+ public int read(byte[] b, int off, int len) throws IOException {
+ return zstdInputStream.read(b, off, len);
+ }
+
+ public int read() throws IOException {
+ return zstdInputStream.read();
+ }
+
+ public void resetState() throws IOException {
+ // no-opt, doesn't apply to ZSTD
+ }
+}
diff --git
a/parquet-hadoop/src/test/java/org/apache/parquet/hadoop/TestZstandardCodec.java
b/parquet-hadoop/src/test/java/org/apache/parquet/hadoop/TestZstandardCodec.java
new file mode 100644
index 0000000..289a6aa
--- /dev/null
+++
b/parquet-hadoop/src/test/java/org/apache/parquet/hadoop/TestZstandardCodec.java
@@ -0,0 +1,167 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.parquet.hadoop;
+
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.FileStatus;
+import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.io.LongWritable;
+import org.apache.hadoop.io.Text;
+import org.apache.hadoop.io.compress.CompressionOutputStream;
+import org.apache.hadoop.mapred.JobClient;
+import org.apache.hadoop.mapred.JobConf;
+import org.apache.hadoop.mapred.OutputCollector;
+import org.apache.hadoop.mapred.Reporter;
+import org.apache.hadoop.mapred.RunningJob;
+import org.apache.hadoop.mapred.TextInputFormat;
+import org.apache.parquet.bytes.BytesInput;
+import org.apache.parquet.example.data.Group;
+import org.apache.parquet.example.data.simple.SimpleGroupFactory;
+import org.apache.parquet.hadoop.codec.ZstandardCodec;
+import org.apache.parquet.hadoop.example.GroupWriteSupport;
+import org.apache.parquet.hadoop.mapred.DeprecatedParquetOutputFormat;
+import org.apache.parquet.hadoop.metadata.CompressionCodecName;
+import org.apache.parquet.schema.MessageTypeParser;
+import org.junit.Assert;
+import org.junit.Test;
+
+import java.io.ByteArrayOutputStream;
+import java.io.IOException;
+import java.io.InputStream;
+import java.nio.file.Files;
+import java.util.Random;
+
+public class TestZstandardCodec {
+
+ private final Path inputPath = new
Path("src/test/java/org/apache/parquet/hadoop/example/TestInputOutputFormat.java");
+
+ @Test
+ public void testZstdCodec() throws IOException {
+ ZstandardCodec codec = new ZstandardCodec();
+ Configuration conf = new Configuration();
+ int[] levels = {1, 4, 7, 10, 13, 16, 19, 22};
+ int[] dataSizes = {0, 1, 10, 1024, 1024 * 1024};
+
+ for (int i = 0; i < levels.length; i++) {
+ conf.setInt(ZstandardCodec.PARQUET_COMPRESS_ZSTD_LEVEL, levels[i]);
+ codec.setConf(conf);
+ for (int j = 0; j < dataSizes.length; j++) {
+ testZstd(codec, dataSizes[j]);
+ }
+ }
+ }
+
+ private void testZstd(ZstandardCodec codec, int dataSize) throws IOException
{
+ byte[] data = new byte[dataSize];
+ (new Random()).nextBytes(data);
+ BytesInput compressedData = compress(codec, BytesInput.from(data));
+ BytesInput decompressedData = decompress(codec, compressedData,
data.length);
+ Assert.assertArrayEquals(data, decompressedData.toByteArray());
+ }
+
+ private BytesInput compress(ZstandardCodec codec, BytesInput bytes) throws
IOException {
+ ByteArrayOutputStream compressedOutBuffer = new
ByteArrayOutputStream((int)bytes.size());
+ CompressionOutputStream cos =
codec.createOutputStream(compressedOutBuffer, null);
+ bytes.writeAllTo(cos);
+ cos.close();
+ return BytesInput.from(compressedOutBuffer);
+ }
+
+ private BytesInput decompress(ZstandardCodec codec, BytesInput bytes, int
uncompressedSize) throws IOException {
+ BytesInput decompressed;
+ InputStream is = codec.createInputStream(bytes.toInputStream(), null);
+ decompressed = BytesInput.from(is, uncompressedSize);
+ is.close();
+ return decompressed;
+ }
+
+ /**
+ * This test is to verify that the properties are passed through from the
config to the codec.
+ */
+ @Test
+ public void testZstdConfWithMr() throws Exception {
+ long fileSizeLowLevel = runMrWithConf(1);
+ // Clear the cache so that a new codec can be created with new
configuration
+ CodecFactory.CODEC_BY_NAME.clear();
+ long fileSizeHighLevel = runMrWithConf(22);
+ Assert.assertTrue(fileSizeLowLevel > fileSizeHighLevel);
+ }
+
+ private long runMrWithConf(int level) throws Exception {
+ JobConf jobConf = new JobConf();
+ Configuration conf = new Configuration();
+ jobConf.setInt(ZstandardCodec.PARQUET_COMPRESS_ZSTD_LEVEL, level);
+ jobConf.setInt(ZstandardCodec.PARQUET_COMPRESS_ZSTD_WORKERS, 4);
+ Path path = new Path(Files.createTempDirectory("zstd" +
level).toAbsolutePath().toString());
+ RunningJob mapRedJob = runMapReduceJob(CompressionCodecName.ZSTD, jobConf,
conf, path);
+ Assert.assertTrue(mapRedJob.isSuccessful());
+ return getFileSize(path, conf);
+ }
+
+ private RunningJob runMapReduceJob(CompressionCodecName codec, JobConf
jobConf, Configuration conf, Path parquetPath) throws IOException,
ClassNotFoundException, InterruptedException {
+ String writeSchema = "message example {\n" +
+ "required int32 line;\n" +
+ "required binary content;\n" +
+ "}";
+
+ FileSystem fileSystem = parquetPath.getFileSystem(conf);
+ fileSystem.delete(parquetPath, true);
+ jobConf.setInputFormat(TextInputFormat.class);
+ TextInputFormat.addInputPath(jobConf, inputPath);
+ jobConf.setNumReduceTasks(0);
+ jobConf.setOutputFormat(DeprecatedParquetOutputFormat.class);
+ DeprecatedParquetOutputFormat.setCompression(jobConf, codec);
+ DeprecatedParquetOutputFormat.setOutputPath(jobConf, parquetPath);
+ DeprecatedParquetOutputFormat.setWriteSupportClass(jobConf,
GroupWriteSupport.class);
+
GroupWriteSupport.setSchema(MessageTypeParser.parseMessageType(writeSchema),
jobConf);
+
+ jobConf.setMapperClass(TestZstandardCodec.DumpMapper.class);
+ return JobClient.runJob(jobConf);
+ }
+
+ private long getFileSize(Path parquetPath, Configuration conf) throws
IOException {
+ for (FileStatus file :
parquetPath.getFileSystem(conf).listStatus(parquetPath)) {
+ if (file.getPath().getName().endsWith(".parquet")) {
+ return file.getLen();
+ }
+ }
+ return -1;
+ }
+
+ public static class DumpMapper implements
org.apache.hadoop.mapred.Mapper<LongWritable, Text, Void, Group> {
+ private SimpleGroupFactory factory;
+
+ public void configure(JobConf job) {
+ factory = new SimpleGroupFactory(GroupWriteSupport.getSchema(job));
+ }
+
+ @Override
+ public void map(LongWritable key, Text value, OutputCollector<Void, Group>
outputCollector, Reporter reporter) throws IOException {
+ Group group = factory.newGroup()
+ .append("line", (int) key.get())
+ .append("content", value.toString());
+ outputCollector.collect(null, group);
+ }
+
+ @Override
+ public void close() {
+ }
+ }
+}
diff --git
a/parquet-hadoop/src/test/java/org/apache/parquet/hadoop/example/TestInputOutputFormat.java
b/parquet-hadoop/src/test/java/org/apache/parquet/hadoop/example/TestInputOutputFormat.java
index 47645ac..188a796 100644
---
a/parquet-hadoop/src/test/java/org/apache/parquet/hadoop/example/TestInputOutputFormat.java
+++
b/parquet-hadoop/src/test/java/org/apache/parquet/hadoop/example/TestInputOutputFormat.java
@@ -236,6 +236,7 @@ public class TestInputOutputFormat {
testReadWrite(CompressionCodecName.GZIP);
testReadWrite(CompressionCodecName.UNCOMPRESSED);
testReadWrite(CompressionCodecName.SNAPPY);
+ testReadWrite(CompressionCodecName.ZSTD);
}
@Test