[ https://issues.apache.org/jira/browse/HADOOP-13578?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=15668094#comment-15668094 ]
churro morales commented on HADOOP-13578: ----------------------------------------- [~jlowe] Thanks for the feedback. As for your concerns: bq. The problem with ignoring the Codec interface is that code using the Compressor/Decompressor interfaces will break if it happens to get the zstd codec but work for all others. That's not really adding a full Codec for zstd. I agree it's not the most ideal interface for the way zstd wants to be called, but that doesn't change the fact that existing code that should work as-is on a Hadoop-provided codec will not if we don't implement the Compressor/Decompressor interfaces. As much as it pains me, I agree with your sentiments. I will implement the Compressor / Decompressor interfaces. bq. It looks like the zstd code already includes a wrapper that translates the zlib API into the zstd API (see https://github.com/facebook/zstd/tree/dev/zlibWrapper) which we can maybe leverage or at least use as a model for the Compressor/Decompressor implementations. Good point, I did look at using the zlib wrapper originally, but in v1.0.0 (https://github.com/facebook/zstd/tree/v1.0.0/zlibWrapper) they didn't have support for deflateReset and inflateReset which would have errored out and broken our code. No worries though, I think I have it figured out using the Compressor / Decompressor logic without having to reference the zlibwrapper. bq. To be clear, I think it's a good thing that the compressor/decompressor streams are using zstd more natively, but I also think it's important to not ignore the non-stream Codec APIs. I think the reason we need to implement the compressor / decompressor is more for the reuse from the CodecPool. Each compressor / decompressor / codec is very much tied to whether the compression library has been implemented using a stream based or block based approach. From what I can see, the API is called as follows: {code} Compressor compressor = CodecPool.getCompressor(); codec.createOutputStream(OutputStream, compressor); // do work {code} And when we look at specific implementations of codec.createOutputStream(), codec's like Snappy always returns a Block(Compressor|Decompressor)Stream and it seems to me that the Snappy(Compressor|Decompressor) can only work on these type of streams. Looks like the way it is used everywhere (including hbase) is that we always get a stream and work on it, the compressor / decompressor are passed in so we have a pool and don't have to constantly re-initialize these native libs. For example I wrote this test, and if you use the Compression(Input|Output)Streams that are native to the codec the test will pass. But if you create a Compressor/Decompressor and use a Compression(Input|Output)Stream that is not specific to this codec the test will fail. {code:java} @Test public void testHandlingWithStreams() throws Exception { byte[] bytes = BytesGenerator.get(1024*64); ByteArrayInputStream inputStream = new ByteArrayInputStream(bytes); SnappyCodec codec = new SnappyCodec(); codec.setConf(new Configuration()); Compressor compressor = codec.createCompressor(); ByteArrayOutputStream baos = new ByteArrayOutputStream(); CompressionOutputStream compressorStream = new CompressorStream(baos, compressor); // if you replace compressorStream with // CompressionOutputStream compressorStream = codec.createOutputStream(baos, compressor); // things work fine. byte[] buffer = new byte[100]; int result; while ((result = inputStream.read(buffer, 0, buffer.length)) != -1) { compressorStream.write(buffer, 0, result); } compressorStream.flush(); compressorStream.finish(); // lets make sure the compressed bytes are able to be decompressed to read byte[] compressedBytes = baos.toByteArray(); ByteArrayInputStream bais = new ByteArrayInputStream(compressedBytes); baos = new ByteArrayOutputStream(); Decompressor decompressor = codec.createDecompressor(); CompressionInputStream decompressorStream = new DecompressorStream(bais, decompressor); // if you replace decompressorStream with // CompressionInputStream decompressorStream = codec.createInputStream(bais, decompressor); // things work fine. byte[] decompressionBuffer = new byte[100]; while ((result = decompressorStream.read(decompressionBuffer, 0, buffer.length)) != -1) { baos.write(decompressionBuffer, 0, result); } decompressorStream.close(); byte[] decompressBytes = baos.toByteArray(); assertArrayEquals(bytes, decompressBytes); } {code} bq. I'm also not so sure about the minimum buffer assertion. I saw in the zstd unit tests there is a byte-by-byte streaming decompression test where it tries to decompress a buffer with a 1-byte input buffer and a 1-byte output buffer. You are absolutely correct. I will allow the user to specify the buffer size and default to the one zstandard recommends. I almost have a patch ready for you by implementing a Compressor | Decompressor. Unfortunately this API makes things a bit clunky but is needed because of the pooling so we don't have to reinitialize the shared libraries. I will have a new patch up soon and thank you for taking the time to comment and review this work, it is much appreciated. > Add Codec for ZStandard Compression > ----------------------------------- > > Key: HADOOP-13578 > URL: https://issues.apache.org/jira/browse/HADOOP-13578 > Project: Hadoop Common > Issue Type: New Feature > Reporter: churro morales > Assignee: churro morales > Attachments: HADOOP-13578.patch, HADOOP-13578.v1.patch, > HADOOP-13578.v2.patch > > > ZStandard: https://github.com/facebook/zstd has been used in production for 6 > months by facebook now. v1.0 was recently released. Create a codec for this > library. -- This message was sent by Atlassian JIRA (v6.3.4#6332) --------------------------------------------------------------------- To unsubscribe, e-mail: common-issues-unsubscr...@hadoop.apache.org For additional commands, e-mail: common-issues-h...@hadoop.apache.org