[ 
https://issues.apache.org/jira/browse/HADOOP-13578?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=15668094#comment-15668094
 ] 

churro morales commented on HADOOP-13578:
-----------------------------------------

[~jlowe]

Thanks for the feedback.  As for your concerns:  

bq. The problem with ignoring the Codec interface is that code using the 
Compressor/Decompressor interfaces will break if it happens to get the zstd 
codec but work for all others. That's not really adding a full Codec for zstd. 
I agree it's not the most ideal interface for the way zstd wants to be called, 
but that doesn't change the fact that existing code that should work as-is on a 
Hadoop-provided codec will not if we don't implement the 
Compressor/Decompressor interfaces.

As much as it pains me, I agree with your sentiments.  I will implement the 
Compressor / Decompressor interfaces.

bq. It looks like the zstd code already includes a wrapper that translates the 
zlib API into the zstd API (see 
https://github.com/facebook/zstd/tree/dev/zlibWrapper) which we can maybe 
leverage or at least use as a model for the Compressor/Decompressor 
implementations. 

Good point, I did look at using the zlib wrapper originally, but in v1.0.0 
(https://github.com/facebook/zstd/tree/v1.0.0/zlibWrapper) they didn't have 
support for deflateReset and inflateReset which would have errored out and 
broken our code.  No worries though, I think I have it figured out using the 
Compressor / Decompressor logic without having to reference the zlibwrapper.

bq.  To be clear, I think it's a good thing that the compressor/decompressor 
streams are using zstd more natively, but I also think it's important to not 
ignore the non-stream Codec APIs.

I think the reason we need to implement the compressor / decompressor is more 
for the reuse from the CodecPool.  Each compressor / decompressor / codec is 
very much tied to whether the compression library has been implemented using a 
stream based or block based approach.  From what I can see, the API is called 
as follows:

{code} 
Compressor compressor =  CodecPool.getCompressor();
codec.createOutputStream(OutputStream, compressor);
// do work
{code}

And when we look at specific implementations of codec.createOutputStream(), 
codec's like Snappy always returns a Block(Compressor|Decompressor)Stream and 
it seems to me that the Snappy(Compressor|Decompressor) can only work on these 
type of streams.  Looks like the way it is used everywhere (including hbase) is 
that we always get a stream and work on it, the compressor / decompressor are 
passed in so we have a pool and don't have to constantly re-initialize these 
native libs.

For example I wrote this test, and if you use the 
Compression(Input|Output)Streams that are native to the codec the test will 
pass.  But if you create a Compressor/Decompressor and use a 
Compression(Input|Output)Stream that is not specific to this codec the test 
will fail. 

{code:java}
 @Test
  public void testHandlingWithStreams() throws Exception {
    byte[] bytes = BytesGenerator.get(1024*64);
    ByteArrayInputStream inputStream = new ByteArrayInputStream(bytes);
    SnappyCodec codec = new SnappyCodec();
    codec.setConf(new Configuration());
    Compressor compressor = codec.createCompressor();
    ByteArrayOutputStream baos = new ByteArrayOutputStream();
    CompressionOutputStream compressorStream = new CompressorStream(baos, 
compressor);
    // if you replace compressorStream with 
    // CompressionOutputStream compressorStream = 
codec.createOutputStream(baos, compressor);
    // things work fine.
    byte[] buffer = new byte[100];
    int result;

    while ((result = inputStream.read(buffer, 0, buffer.length)) != -1) {
      compressorStream.write(buffer, 0, result);
    }
    compressorStream.flush();
    compressorStream.finish();

    // lets make sure the compressed bytes are able to be decompressed to read
    byte[] compressedBytes = baos.toByteArray();
    ByteArrayInputStream bais = new ByteArrayInputStream(compressedBytes);
    baos = new ByteArrayOutputStream();
    Decompressor decompressor = codec.createDecompressor();
    CompressionInputStream decompressorStream = new DecompressorStream(bais, 
decompressor);
    // if you replace decompressorStream with 
    // CompressionInputStream decompressorStream = 
codec.createInputStream(bais, decompressor);
    // things work fine. 
    byte[] decompressionBuffer = new byte[100];
    while ((result = decompressorStream.read(decompressionBuffer, 0, 
buffer.length)) != -1) {
      baos.write(decompressionBuffer, 0, result);
    }
    decompressorStream.close();
    byte[] decompressBytes = baos.toByteArray();
    assertArrayEquals(bytes, decompressBytes);
  }
{code}

bq. I'm also not so sure about the minimum buffer assertion. I saw in the zstd 
unit tests there is a byte-by-byte streaming decompression test where it tries 
to decompress a buffer with a 1-byte input buffer and a 1-byte output buffer.

You are absolutely correct.  I will allow the user to specify the buffer size 
and default to the one zstandard recommends. 

I almost have a patch ready for you by implementing a Compressor | 
Decompressor.  Unfortunately this API makes things a bit clunky but is needed 
because of the pooling so we don't have to reinitialize the shared libraries.  
I will have a new patch up soon and thank  you for taking the time to comment 
and review this work, it is much appreciated.  



> Add Codec for ZStandard Compression
> -----------------------------------
>
>                 Key: HADOOP-13578
>                 URL: https://issues.apache.org/jira/browse/HADOOP-13578
>             Project: Hadoop Common
>          Issue Type: New Feature
>            Reporter: churro morales
>            Assignee: churro morales
>         Attachments: HADOOP-13578.patch, HADOOP-13578.v1.patch, 
> HADOOP-13578.v2.patch
>
>
> ZStandard: https://github.com/facebook/zstd has been used in production for 6 
> months by facebook now.  v1.0 was recently released.  Create a codec for this 
> library.  



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)

---------------------------------------------------------------------
To unsubscribe, e-mail: common-issues-unsubscr...@hadoop.apache.org
For additional commands, e-mail: common-issues-h...@hadoop.apache.org

Reply via email to