arouel opened a new issue, #3478:
URL: https://github.com/apache/parquet-java/issues/3478

   ### Describe the bug, including details regarding any error messages, 
version, and platform.
   
   When reading `LZ4_RAW`-compressed data through the heap codec path, 
decompression can fail if the decompressed page is larger than the chunk size 
used by stream materialization (typically ~8 KiB via 
`Channels.newChannel(...)`).
   This appears in paths that materialize `BytesInput` lazily (for example via 
`BytesInput.copy(...)` / `toByteBuffer(...)`, including dictionary-filter 
related reads).
   
   ### Error
   
   ```
   io.airlift.compress.MalformedInputException: all input must be consumed: 
offset=2532
        at 
io.airlift.compress.lz4.Lz4RawDecompressor.decompress(Lz4RawDecompressor.java:89)
        at 
io.airlift.compress.lz4.Lz4Decompressor.decompress(Lz4Decompressor.java:98)
        at 
org.apache.parquet.hadoop.codec.Lz4RawDecompressor.uncompress(Lz4RawDecompressor.java:39)
        at 
org.apache.parquet.hadoop.codec.NonBlockedDecompressor.decompress(NonBlockedDecompressor.java:81)
        at 
org.apache.parquet.hadoop.codec.NonBlockedDecompressorStream.read(NonBlockedDecompressorStream.java:51)
        at 
java.base/java.nio.channels.Channels$ReadableByteChannelImpl.read(Channels.java:318)
        at 
org.apache.parquet.bytes.BytesInput$StreamBytesInput.writeInto(BytesInput.java:384)
        at org.apache.parquet.bytes.BytesInput.copy(BytesInput.java:270)
        at org.apache.parquet.bytes.BytesInput.copy(BytesInput.java:280)
        at 
org.apache.parquet.hadoop.DictionaryPageReader.reusableCopy(DictionaryPageReader.java:113)
        at 
org.apache.parquet.hadoop.DictionaryPageReader.lambda$readDictionaryPage$0(DictionaryPageReader.java:104)
        at 
java.base/java.util.concurrent.ConcurrentHashMap.computeIfAbsent(ConcurrentHashMap.java:1735)
        at 
org.apache.parquet.hadoop.DictionaryPageReader.readDictionaryPage(DictionaryPageReader.java:97)
        at 
org.apache.parquet.filter2.dictionarylevel.DictionaryFilter.expandDictionary(DictionaryFilter.java:93)
        at 
org.apache.parquet.filter2.dictionarylevel.DictionaryFilter.visit(DictionaryFilter.java:160)
        at 
org.apache.parquet.filter2.dictionarylevel.DictionaryFilter.visit(DictionaryFilter.java:62)
        at 
org.apache.parquet.filter2.predicate.Operators$Eq.accept(Operators.java:189)
        at 
org.apache.parquet.filter2.dictionarylevel.DictionaryFilter.canDrop(DictionaryFilter.java:72)
        at 
org.apache.parquet.filter2.compat.RowGroupFilter.visit(RowGroupFilter.java:107)
        at 
org.apache.parquet.filter2.compat.RowGroupFilter.visit(RowGroupFilter.java:43)
        at 
org.apache.parquet.filter2.compat.FilterCompat$FilterPredicateCompat.accept(FilterCompat.java:157)
        at 
org.apache.parquet.filter2.compat.RowGroupFilter.filterRowGroups(RowGroupFilter.java:71)
        at 
org.apache.parquet.hadoop.ParquetFileReader.filterRowGroups(ParquetFileReader.java:1090)
   ```
   
   ### Reproducer
   
   ```java
     @Test
     public void lz4RawHeapDecompressorCanCopyLargePage() throws IOException {
       final int size = 16 * 1024;
       final byte[] raw = new byte[size];
       new Random(42).nextBytes(raw);
   
       try (TrackingByteBufferAllocator allocator = 
TrackingByteBufferAllocator.wrap(new DirectByteBufferAllocator());
           ByteBufferReleaser releaser = new ByteBufferReleaser(allocator)) {
         CodecFactory heapCodecFactory = new CodecFactory(new Configuration(), 
pageSize);
         BytesInputCompressor compressor = 
heapCodecFactory.getCompressor(LZ4_RAW);
         BytesInputDecompressor decompressor = 
heapCodecFactory.getDecompressor(LZ4_RAW);
   
         BytesInput compressed = compressor.compress(BytesInput.from(raw));
         BytesInput decompressed = decompressor.decompress(compressed, size);
   
         // Regression coverage: previously this copy path hit 
StreamBytesInput.writeInto(...),
         // which reads via Channels.newChannel(...) in 8KB chunks and failed 
for LZ4_RAW.
         BytesInput copied = decompressed.copy(releaser);
         Assert.assertArrayEquals(raw, copied.toByteArray());
   
         compressor.release();
         decompressor.release();
         heapCodecFactory.release();
       }
     }
   ```
   
   ### Suspected root cause  
   - `Lz4RawDecompressor.maxUncompressedLength(...)` returns caller `len` 
(requested read size), not true page size.
   - `NonBlockedDecompressor.decompress(...)` uses that estimate to allocate 
output and performs one-shot decompression.
   - In chunked stream reads, first call may request ~8 KiB even when page is 
larger, leading to incorrect decompression state and subsequent zero-byte read 
error.  
   
   ### Impact
   - Correctness/readability bug for `LZ4_RAW` in heap decompression path.
   - It break real reads (not just tests), especially when pages exceed chunk 
size.
   
   
   ### Component(s)
   
   Core


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to