This is an automated email from the ASF dual-hosted git repository.

gabor pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/parquet-mr.git


The following commit(s) were added to refs/heads/master by this push:
     new dc40b59  PARQUET-1866: Replace Hadoop ZSTD with JNI-ZSTD (#793)
dc40b59 is described below

commit dc40b598a003dc8da38cadd576c9a36ece1eea1f
Author: shangxinli <[email protected]>
AuthorDate: Wed Jun 3 00:17:15 2020 -0700

    PARQUET-1866: Replace Hadoop ZSTD with JNI-ZSTD (#793)
---
 .../hadoop/metadata/CompressionCodecName.java      |   2 +-
 parquet-hadoop/README.md                           |  13 +-
 parquet-hadoop/pom.xml                             |   5 +
 .../org/apache/parquet/hadoop/CodecFactory.java    |   4 +-
 .../parquet/hadoop/codec/ZstandardCodec.java       | 112 ++++++++++++++
 .../parquet/hadoop/codec/ZstdCompressorStream.java |  62 ++++++++
 .../hadoop/codec/ZstdDecompressorStream.java       |  47 ++++++
 .../apache/parquet/hadoop/TestZstandardCodec.java  | 167 +++++++++++++++++++++
 .../hadoop/example/TestInputOutputFormat.java      |   1 +
 9 files changed, 410 insertions(+), 3 deletions(-)

diff --git 
a/parquet-common/src/main/java/org/apache/parquet/hadoop/metadata/CompressionCodecName.java
 
b/parquet-common/src/main/java/org/apache/parquet/hadoop/metadata/CompressionCodecName.java
index 8cdede0..05abd15 100644
--- 
a/parquet-common/src/main/java/org/apache/parquet/hadoop/metadata/CompressionCodecName.java
+++ 
b/parquet-common/src/main/java/org/apache/parquet/hadoop/metadata/CompressionCodecName.java
@@ -30,7 +30,7 @@ public enum CompressionCodecName {
   LZO("com.hadoop.compression.lzo.LzoCodec", CompressionCodec.LZO, ".lzo"),
   BROTLI("org.apache.hadoop.io.compress.BrotliCodec", CompressionCodec.BROTLI, 
".br"),
   LZ4("org.apache.hadoop.io.compress.Lz4Codec", CompressionCodec.LZ4, ".lz4"),
-  ZSTD("org.apache.hadoop.io.compress.ZStandardCodec", CompressionCodec.ZSTD, 
".zstd");
+  ZSTD("org.apache.parquet.hadoop.codec.ZstandardCodec", 
CompressionCodec.ZSTD, ".zstd");
 
   public static CompressionCodecName fromConf(String name) {
      if (name == null) {
diff --git a/parquet-hadoop/README.md b/parquet-hadoop/README.md
index 74e64d0..939a79a 100644
--- a/parquet-hadoop/README.md
+++ b/parquet-hadoop/README.md
@@ -324,9 +324,20 @@ ParquetInputFormat to materialize records. It should be a 
the descendant class o
 **Property:** `parquet.read.schema`  
 **Description:** The read projection schema.
 
-
 ## Class: UnmaterializableRecordCounter
 
 **Property:** `parquet.read.bad.record.threshold`  
 **Description:** The percentage of bad records to tolerate.  
 **Default value:** `0`
+
+## Class: ZstandardCodec
+
+**Property:** `parquet.compression.codec.zstd.level`  
+**Description:** The compression level of ZSTD. The valid range is 1~22. 
Generally the higher compression level, the higher compression ratio can be 
achieved, but the writing time will be longer.  
+**Default value:** `3`
+
+---
+
+**Property:** `parquet.compression.codec.zstd.workers`  
+**Description:** The number of threads will be spawned to compress in 
parallel. More workers improve speed, but also increase memory usage. When it 
is 0, it works as single-threaded mode.  
+**Default value:** `0`
diff --git a/parquet-hadoop/pom.xml b/parquet-hadoop/pom.xml
index e047f03..af2ac17 100644
--- a/parquet-hadoop/pom.xml
+++ b/parquet-hadoop/pom.xml
@@ -103,6 +103,11 @@
       <version>${brotli-codec.version}</version>
       <optional>true</optional>
     </dependency>
+    <dependency>
+      <groupId>com.github.luben</groupId>
+      <artifactId>zstd-jni</artifactId>
+      <version>${zstd-jni.version}</version>
+    </dependency>
 
     <dependency>
       <groupId>com.google.guava</groupId>
diff --git 
a/parquet-hadoop/src/main/java/org/apache/parquet/hadoop/CodecFactory.java 
b/parquet-hadoop/src/main/java/org/apache/parquet/hadoop/CodecFactory.java
index a03d045..f0e7af3 100644
--- a/parquet-hadoop/src/main/java/org/apache/parquet/hadoop/CodecFactory.java
+++ b/parquet-hadoop/src/main/java/org/apache/parquet/hadoop/CodecFactory.java
@@ -105,7 +105,9 @@ public class CodecFactory implements 
CompressionCodecFactory {
     public BytesInput decompress(BytesInput bytes, int uncompressedSize) 
throws IOException {
       final BytesInput decompressed;
       if (codec != null) {
-        decompressor.reset();
+        if (decompressor != null) {
+          decompressor.reset();
+        }
         InputStream is = codec.createInputStream(bytes.toInputStream(), 
decompressor);
         decompressed = BytesInput.from(is, uncompressedSize);
       } else {
diff --git 
a/parquet-hadoop/src/main/java/org/apache/parquet/hadoop/codec/ZstandardCodec.java
 
b/parquet-hadoop/src/main/java/org/apache/parquet/hadoop/codec/ZstandardCodec.java
new file mode 100644
index 0000000..0409cf2
--- /dev/null
+++ 
b/parquet-hadoop/src/main/java/org/apache/parquet/hadoop/codec/ZstandardCodec.java
@@ -0,0 +1,112 @@
+/* 
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ * 
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ * 
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.parquet.hadoop.codec;
+
+import org.apache.hadoop.conf.Configurable;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.io.compress.CompressionCodec;
+import org.apache.hadoop.io.compress.CompressionInputStream;
+import org.apache.hadoop.io.compress.CompressionOutputStream;
+import org.apache.hadoop.io.compress.Compressor;
+import org.apache.hadoop.io.compress.Decompressor;
+
+import java.io.IOException;
+import java.io.InputStream;
+import java.io.OutputStream;
+
+/**
+ * ZSTD compression codec for Parquet.  We do not use the default hadoop
+ * one because it requires 1) to set up hadoop on local development machine;
+ * 2) to upgrade hadoop to the newer version to have ZSTD support which is
+ * more cumbersome than upgrading parquet version.
+ *
+ * This implementation relies on ZSTD JNI(https://github.com/luben/zstd-jni)
+ * which is already a dependency for Parquet. ZSTD JNI ZstdOutputStream and
+ * ZstdInputStream use Zstd internally. So no need to create compressor and
+ * decompressor in ZstandardCodec.
+ */
+public class ZstandardCodec implements Configurable, CompressionCodec {
+
+  public final static String PARQUET_COMPRESS_ZSTD_LEVEL = 
"parquet.compression.codec.zstd.level";
+  public final static int DEFAULT_PARQUET_COMPRESS_ZSTD_LEVEL = 3;
+  public final static String PARQUET_COMPRESS_ZSTD_WORKERS = 
"parquet.compression.codec.zstd.workers";
+  public final static int DEFAULTPARQUET_COMPRESS_ZSTD_WORKERS = 0;
+
+  private Configuration conf;
+
+  @Override
+  public void setConf(Configuration conf) {
+    this.conf = conf;
+  }
+
+  @Override
+  public Configuration getConf() {
+    return conf;
+  }
+
+  @Override
+  public Compressor createCompressor() {
+    // ZstdOutputStream calls static Zstd compressor, so no compressor is 
created
+    return null;
+  }
+
+  @Override
+  public Decompressor createDecompressor() {
+    // ZstdInputStream calls static Zstd decompressor, so no decompressor is 
created
+    return null;
+  }
+
+  @Override
+  public CompressionInputStream createInputStream(InputStream stream, 
Decompressor decompressor) throws IOException {
+    // Ignore decompressor because ZstdInputStream calls static Zstd 
decompressor
+    return createInputStream(stream);
+  }
+
+  @Override
+  public CompressionInputStream createInputStream(InputStream stream) throws 
IOException {
+    return new ZstdDecompressorStream(stream);
+  }
+
+  @Override
+  public CompressionOutputStream createOutputStream(OutputStream stream, 
Compressor compressor) throws IOException {
+    // Ignore compressor because ZstdOutputStream calls static Zstd compressor
+    return createOutputStream(stream);
+  }
+
+  @Override
+  public CompressionOutputStream createOutputStream(OutputStream stream) 
throws IOException {
+    return new ZstdCompressorStream(stream, 
conf.getInt(PARQUET_COMPRESS_ZSTD_LEVEL, DEFAULT_PARQUET_COMPRESS_ZSTD_LEVEL),
+      conf.getInt(PARQUET_COMPRESS_ZSTD_WORKERS, 
DEFAULTPARQUET_COMPRESS_ZSTD_WORKERS));
+  }
+
+  @Override
+  public Class<? extends Compressor> getCompressorType() {
+    return null;
+  }
+
+  @Override
+  public Class<? extends Decompressor> getDecompressorType() {
+    return null;
+  }
+
+  @Override
+  public String getDefaultExtension() {
+    return ".zstd";
+  }  
+}
diff --git 
a/parquet-hadoop/src/main/java/org/apache/parquet/hadoop/codec/ZstdCompressorStream.java
 
b/parquet-hadoop/src/main/java/org/apache/parquet/hadoop/codec/ZstdCompressorStream.java
new file mode 100644
index 0000000..f9b9210
--- /dev/null
+++ 
b/parquet-hadoop/src/main/java/org/apache/parquet/hadoop/codec/ZstdCompressorStream.java
@@ -0,0 +1,62 @@
+/* 
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ * 
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ * 
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.parquet.hadoop.codec;
+
+import com.github.luben.zstd.ZstdOutputStream;
+import org.apache.hadoop.io.compress.CompressionOutputStream;
+
+import java.io.IOException;
+import java.io.OutputStream;
+
+public class ZstdCompressorStream extends CompressionOutputStream {
+
+  private ZstdOutputStream zstdOutputStream;
+
+  public ZstdCompressorStream(OutputStream stream, int level, int workers) 
throws IOException {
+           super(stream);
+           zstdOutputStream = new ZstdOutputStream(stream, level);
+           zstdOutputStream.setWorkers(workers);
+  }
+
+  public void write(byte[] b, int off, int len) throws IOException {
+    zstdOutputStream.write(b, off, len);
+  }
+
+  public void write(int b) throws IOException {
+    zstdOutputStream.write(b);
+  }
+
+  public void finish() throws IOException {
+     //no-opt, doesn't apply to ZSTD
+  }
+
+  public void resetState() throws IOException {
+    // no-opt, doesn't apply to ZSTD
+  }
+
+  @Override
+  public void flush() throws IOException {
+    zstdOutputStream.flush();
+  }
+
+  @Override
+  public void close() throws IOException {
+    zstdOutputStream.close();
+  }
+}
diff --git 
a/parquet-hadoop/src/main/java/org/apache/parquet/hadoop/codec/ZstdDecompressorStream.java
 
b/parquet-hadoop/src/main/java/org/apache/parquet/hadoop/codec/ZstdDecompressorStream.java
new file mode 100644
index 0000000..34d4849
--- /dev/null
+++ 
b/parquet-hadoop/src/main/java/org/apache/parquet/hadoop/codec/ZstdDecompressorStream.java
@@ -0,0 +1,47 @@
+/* 
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ * 
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ * 
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.parquet.hadoop.codec;
+
+import com.github.luben.zstd.ZstdInputStream;
+import org.apache.hadoop.io.compress.CompressionInputStream;
+
+import java.io.IOException;
+import java.io.InputStream;
+
+public class ZstdDecompressorStream extends CompressionInputStream {
+
+  private ZstdInputStream zstdInputStream;
+
+  public ZstdDecompressorStream(InputStream stream) throws IOException {
+         super(stream);
+    zstdInputStream = new ZstdInputStream(stream);
+  }
+
+  public int read(byte[] b, int off, int len) throws IOException {
+    return zstdInputStream.read(b, off, len);
+  }
+
+  public int read() throws IOException {
+    return zstdInputStream.read();
+  }
+
+  public void resetState() throws IOException {
+    // no-opt, doesn't apply to ZSTD
+  }
+}
diff --git 
a/parquet-hadoop/src/test/java/org/apache/parquet/hadoop/TestZstandardCodec.java
 
b/parquet-hadoop/src/test/java/org/apache/parquet/hadoop/TestZstandardCodec.java
new file mode 100644
index 0000000..289a6aa
--- /dev/null
+++ 
b/parquet-hadoop/src/test/java/org/apache/parquet/hadoop/TestZstandardCodec.java
@@ -0,0 +1,167 @@
+/* 
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ * 
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ * 
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.parquet.hadoop;
+
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.FileStatus;
+import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.io.LongWritable;
+import org.apache.hadoop.io.Text;
+import org.apache.hadoop.io.compress.CompressionOutputStream;
+import org.apache.hadoop.mapred.JobClient;
+import org.apache.hadoop.mapred.JobConf;
+import org.apache.hadoop.mapred.OutputCollector;
+import org.apache.hadoop.mapred.Reporter;
+import org.apache.hadoop.mapred.RunningJob;
+import org.apache.hadoop.mapred.TextInputFormat;
+import org.apache.parquet.bytes.BytesInput;
+import org.apache.parquet.example.data.Group;
+import org.apache.parquet.example.data.simple.SimpleGroupFactory;
+import org.apache.parquet.hadoop.codec.ZstandardCodec;
+import org.apache.parquet.hadoop.example.GroupWriteSupport;
+import org.apache.parquet.hadoop.mapred.DeprecatedParquetOutputFormat;
+import org.apache.parquet.hadoop.metadata.CompressionCodecName;
+import org.apache.parquet.schema.MessageTypeParser;
+import org.junit.Assert;
+import org.junit.Test;
+
+import java.io.ByteArrayOutputStream;
+import java.io.IOException;
+import java.io.InputStream;
+import java.nio.file.Files;
+import java.util.Random;
+
+public class TestZstandardCodec {
+
+  private final Path inputPath = new 
Path("src/test/java/org/apache/parquet/hadoop/example/TestInputOutputFormat.java");
+
+  @Test
+  public void testZstdCodec() throws IOException {
+    ZstandardCodec codec = new ZstandardCodec();
+    Configuration conf = new Configuration();
+    int[] levels = {1, 4, 7, 10, 13, 16, 19, 22};
+    int[] dataSizes = {0, 1, 10, 1024, 1024 * 1024};
+
+    for (int i = 0; i < levels.length; i++) {
+      conf.setInt(ZstandardCodec.PARQUET_COMPRESS_ZSTD_LEVEL, levels[i]);
+      codec.setConf(conf);
+      for (int j = 0; j < dataSizes.length; j++) {
+        testZstd(codec, dataSizes[j]);
+      }
+    }
+  }
+
+  private void testZstd(ZstandardCodec codec, int dataSize) throws IOException 
{
+    byte[] data = new byte[dataSize];
+    (new Random()).nextBytes(data);
+    BytesInput compressedData = compress(codec,  BytesInput.from(data));
+    BytesInput decompressedData = decompress(codec, compressedData, 
data.length);
+    Assert.assertArrayEquals(data, decompressedData.toByteArray());
+  }
+
+  private BytesInput compress(ZstandardCodec codec, BytesInput bytes) throws 
IOException {
+    ByteArrayOutputStream compressedOutBuffer = new 
ByteArrayOutputStream((int)bytes.size());
+    CompressionOutputStream cos = 
codec.createOutputStream(compressedOutBuffer, null);
+    bytes.writeAllTo(cos);
+    cos.close();
+    return BytesInput.from(compressedOutBuffer);
+  }
+
+  private BytesInput decompress(ZstandardCodec codec, BytesInput bytes, int 
uncompressedSize) throws IOException {
+    BytesInput decompressed;
+    InputStream is = codec.createInputStream(bytes.toInputStream(), null);
+    decompressed = BytesInput.from(is, uncompressedSize);
+    is.close();
+    return decompressed;
+  }
+
+  /**
+   *  This test is to verify that the properties are passed through from the 
config to the codec. 
+   */
+  @Test
+  public void testZstdConfWithMr() throws Exception {
+    long fileSizeLowLevel = runMrWithConf(1);
+    // Clear the cache so that a new codec can be created with new 
configuration
+    CodecFactory.CODEC_BY_NAME.clear();
+    long fileSizeHighLevel = runMrWithConf(22);
+    Assert.assertTrue(fileSizeLowLevel > fileSizeHighLevel);
+  }
+
+  private long runMrWithConf(int level) throws Exception {
+    JobConf jobConf = new JobConf();
+    Configuration conf = new Configuration();
+    jobConf.setInt(ZstandardCodec.PARQUET_COMPRESS_ZSTD_LEVEL, level);
+    jobConf.setInt(ZstandardCodec.PARQUET_COMPRESS_ZSTD_WORKERS, 4);
+    Path path = new Path(Files.createTempDirectory("zstd" + 
level).toAbsolutePath().toString());
+    RunningJob mapRedJob = runMapReduceJob(CompressionCodecName.ZSTD, jobConf, 
conf, path);
+    Assert.assertTrue(mapRedJob.isSuccessful());
+    return getFileSize(path, conf);
+  }
+
+  private RunningJob runMapReduceJob(CompressionCodecName codec, JobConf 
jobConf, Configuration conf, Path parquetPath) throws IOException, 
ClassNotFoundException, InterruptedException {
+    String writeSchema = "message example {\n" +
+      "required int32 line;\n" +
+      "required binary content;\n" +
+      "}";
+
+    FileSystem fileSystem = parquetPath.getFileSystem(conf);
+    fileSystem.delete(parquetPath, true);
+    jobConf.setInputFormat(TextInputFormat.class);
+    TextInputFormat.addInputPath(jobConf, inputPath);
+    jobConf.setNumReduceTasks(0);
+    jobConf.setOutputFormat(DeprecatedParquetOutputFormat.class);
+    DeprecatedParquetOutputFormat.setCompression(jobConf, codec);
+    DeprecatedParquetOutputFormat.setOutputPath(jobConf, parquetPath);
+    DeprecatedParquetOutputFormat.setWriteSupportClass(jobConf, 
GroupWriteSupport.class);
+    
GroupWriteSupport.setSchema(MessageTypeParser.parseMessageType(writeSchema), 
jobConf);
+
+    jobConf.setMapperClass(TestZstandardCodec.DumpMapper.class);
+    return JobClient.runJob(jobConf);
+  }
+
+  private long getFileSize(Path parquetPath, Configuration conf) throws 
IOException {
+    for (FileStatus file : 
parquetPath.getFileSystem(conf).listStatus(parquetPath)) {
+      if (file.getPath().getName().endsWith(".parquet")) {
+        return file.getLen();
+      }
+    }
+    return -1;
+  }
+
+  public static class DumpMapper implements 
org.apache.hadoop.mapred.Mapper<LongWritable, Text, Void, Group> {
+    private SimpleGroupFactory factory;
+
+    public void configure(JobConf job) {
+      factory = new SimpleGroupFactory(GroupWriteSupport.getSchema(job));
+    }
+
+    @Override
+    public void map(LongWritable key, Text value, OutputCollector<Void, Group> 
outputCollector, Reporter reporter) throws IOException {
+      Group group = factory.newGroup()
+        .append("line", (int) key.get())
+        .append("content", value.toString());
+      outputCollector.collect(null, group);
+    }
+
+    @Override
+    public void close() {
+    }
+  }
+}
diff --git 
a/parquet-hadoop/src/test/java/org/apache/parquet/hadoop/example/TestInputOutputFormat.java
 
b/parquet-hadoop/src/test/java/org/apache/parquet/hadoop/example/TestInputOutputFormat.java
index 47645ac..188a796 100644
--- 
a/parquet-hadoop/src/test/java/org/apache/parquet/hadoop/example/TestInputOutputFormat.java
+++ 
b/parquet-hadoop/src/test/java/org/apache/parquet/hadoop/example/TestInputOutputFormat.java
@@ -236,6 +236,7 @@ public class TestInputOutputFormat {
     testReadWrite(CompressionCodecName.GZIP);
     testReadWrite(CompressionCodecName.UNCOMPRESSED);
     testReadWrite(CompressionCodecName.SNAPPY);
+    testReadWrite(CompressionCodecName.ZSTD);
   }
 
   @Test

Reply via email to