[ 
https://issues.apache.org/jira/browse/HDFS-14099?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17925098#comment-17925098
 ] 

Chris Nauroth commented on HDFS-14099:
--------------------------------------

[~glapark] , as I understand it, you are not seeing the "Unknown frame 
descriptor" error as originally reported. Instead, the read completes 
successfully, but you're not receiving the expected data. Is that right?

We may want to consider moving this into a new bug report with a complete code 
sample showing the problem and a description of how you are generating the test 
data.

> Unknown frame descriptor when decompressing multiple frames in 
> ZStandardDecompressor
> ------------------------------------------------------------------------------------
>
>                 Key: HDFS-14099
>                 URL: https://issues.apache.org/jira/browse/HDFS-14099
>             Project: Hadoop HDFS
>          Issue Type: Bug
>          Components: compress, io
>    Affects Versions: 3.4.0, 3.2.3, 3.3.2
>         Environment: Hadoop Version: hadoop-3.0.3
> Java Version: 1.8.0_144
>            Reporter: ZanderXu
>            Assignee: ZanderXu
>            Priority: Major
>              Labels: pull-request-available
>             Fix For: 3.4.0, 3.2.3, 3.3.2
>
>         Attachments: HDFS-14099-trunk-001.patch, HDFS-14099-trunk-002.patch, 
> HDFS-14099-trunk-003.patch
>
>          Time Spent: 1h 10m
>  Remaining Estimate: 0h
>
> We need to use the ZSTD compression algorithm in Hadoop. So I write a simple 
> demo like this for testing.
> {code:java}
> // code placeholder
> while ((size = fsDataInputStream.read(bufferV2)) > 0 ) {
>   countSize += size;
>   if (countSize == 65536 * 8) {
>     if(!isFinished) {
>       // finish a frame in zstd
>       cmpOut.finish();
>       isFinished = true;
>     }
>     fsDataOutputStream.flush();
>     fsDataOutputStream.hflush();
>   }
>   if(isFinished) {
>     LOG.info("Will resetState. N=" + n);
>     // reset the stream and write again
>     cmpOut.resetState();
>     isFinished = false;
>   }
>   cmpOut.write(bufferV2, 0, size);
>   bufferV2 = new byte[5 * 1024 * 1024];
>   n++;
> }
> {code}
>  
> And I use "*hadoop fs -text*"  to read this file and failed. The error as 
> blow.
> {code:java}
> Exception in thread "main" java.lang.InternalError: Unknown frame descriptor
> at 
> org.apache.hadoop.io.compress.zstd.ZStandardDecompressor.inflateBytesDirect(Native
>  Method)
> at 
> org.apache.hadoop.io.compress.zstd.ZStandardDecompressor.decompress(ZStandardDecompressor.java:181)
> at 
> org.apache.hadoop.io.compress.DecompressorStream.decompress(DecompressorStream.java:111)
> at 
> org.apache.hadoop.io.compress.DecompressorStream.read(DecompressorStream.java:105)
> at java.io.InputStream.read(InputStream.java:101)
> at org.apache.hadoop.io.IOUtils.copyBytes(IOUtils.java:98)
> at org.apache.hadoop.io.IOUtils.copyBytes(IOUtils.java:66)
> at org.apache.hadoop.io.IOUtils.copyBytes(IOUtils.java:127)
> at org.apache.hadoop.fs.shell.Display$Cat.printToStdout(Display.java:101)
> at org.apache.hadoop.fs.shell.Display$Cat.processPath(Display.java:96)
> at org.apache.hadoop.fs.shell.Command.processPaths(Command.java:331)
> at org.apache.hadoop.fs.shell.Command.processPathArgument(Command.java:303)
> at org.apache.hadoop.fs.shell.Command.processArgument(Command.java:285)
> at org.apache.hadoop.fs.shell.Command.processArguments(Command.java:269)
> at 
> org.apache.hadoop.fs.shell.FsCommand.processRawArguments(FsCommand.java:119)
> at org.apache.hadoop.fs.shell.Command.run(Command.java:176)
> at org.apache.hadoop.fs.FsShell.run(FsShell.java:328)
> at org.apache.hadoop.util.ToolRunner.run(ToolRunner.java:76)
> at org.apache.hadoop.util.ToolRunner.run(ToolRunner.java:90)
> at org.apache.hadoop.fs.FsShell.main(FsShell.java:391)
> {code}
>  
> So I had to look the code, include jni, then found this bug.
> *ZSTD_initDStream(stream)* method may by called twice in the same *Frame*.
> The first is  in *ZStandardDecompressor.c.* 
> {code:java}
> if (size == 0) {
>     (*env)->SetBooleanField(env, this, ZStandardDecompressor_finished, 
> JNI_TRUE);
>     size_t result = dlsym_ZSTD_initDStream(stream);
>     if (dlsym_ZSTD_isError(result)) {
>         THROW(env, "java/lang/InternalError", 
> dlsym_ZSTD_getErrorName(result));
>         return (jint) 0;
>     }
> }
> {code}
> This call here is correct, but *Finished* no longer be set to false, even if 
> there is some datas (a new frame) in *CompressedBuffer* or *UserBuffer* need 
> to be decompressed.
> The second is in *org.apache.hadoop.io.compress.DecompressorStream* by 
> *decompressor.reset()*, because *Finished* is always true after decompressed 
> a *Frame*.
> {code:java}
> if (decompressor.finished()) {
>   // First see if there was any leftover buffered input from previous
>   // stream; if not, attempt to refill buffer.  If refill -> EOF, we're
>   // all done; else reset, fix up input buffer, and get ready for next
>   // concatenated substream/"member".
>   int nRemaining = decompressor.getRemaining();
>   if (nRemaining == 0) {
>     int m = getCompressedData();
>     if (m == -1) {
>       // apparently the previous end-of-stream was also end-of-file:
>       // return success, as if we had never called getCompressedData()
>       eof = true;
>       return -1;
>     }
>     decompressor.reset();
>     decompressor.setInput(buffer, 0, m);
>     lastBytesSent = m;
>   } else {
>     // looks like it's a concatenated stream:  reset low-level zlib (or
>     // other engine) and buffers, then "resend" remaining input data
>     decompressor.reset();
>     int leftoverOffset = lastBytesSent - nRemaining;
>     assert (leftoverOffset >= 0);
>     // this recopies userBuf -> direct buffer if using native libraries:
>     decompressor.setInput(buffer, leftoverOffset, nRemaining);
>     // NOTE:  this is the one place we do NOT want to save the number
>     // of bytes sent (nRemaining here) into lastBytesSent:  since we
>     // are resending what we've already sent before, offset is nonzero
>     // in general (only way it could be zero is if it already equals
>     // nRemaining), which would then screw up the offset calculation
>     // _next_ time around.  IOW, getRemaining() is in terms of the
>     // original, zero-offset bufferload, so lastBytesSent must be as
>     // well.  Cheesy ASCII art:
>     //
>     //          <------------ m, lastBytesSent ----------->
>     //          +===============================================+
>     // buffer:  |1111111111|22222222222222222|333333333333|     |
>     //          +===============================================+
>     //     #1:  <-- off -->|<-------- nRemaining --------->
>     //     #2:  <----------- off ----------->|<-- nRem. -->
>     //     #3:  (final substream:  nRemaining == 0; eof = true)
>     //
>     // If lastBytesSent is anything other than m, as shown, then "off"
>     // will be calculated incorrectly.
>   }
> }{code}



--
This message was sent by Atlassian Jira
(v8.20.10#820010)

---------------------------------------------------------------------
To unsubscribe, e-mail: hdfs-issues-unsubscr...@hadoop.apache.org
For additional commands, e-mail: hdfs-issues-h...@hadoop.apache.org

Reply via email to