[ https://issues.apache.org/jira/browse/PARQUET-2196?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17610210#comment-17610210 ]
ASF GitHub Bot commented on PARQUET-2196: ----------------------------------------- shangxinli commented on code in PR #1000: URL: https://github.com/apache/parquet-mr/pull/1000#discussion_r981649538 ########## parquet-hadoop/src/main/java/org/apache/parquet/hadoop/codec/NonBlockedDecompressor.java: ########## @@ -0,0 +1,174 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.parquet.hadoop.codec; + +import org.apache.hadoop.io.compress.Decompressor; +import org.apache.parquet.Preconditions; + +import java.io.IOException; +import java.nio.ByteBuffer; + +abstract public class NonBlockedDecompressor implements Decompressor { + + // Buffer for uncompressed output. This buffer grows as necessary. + private ByteBuffer outputBuffer = ByteBuffer.allocateDirect(0); + + // Buffer for compressed input. This buffer grows as necessary. + private ByteBuffer inputBuffer = ByteBuffer.allocateDirect(0); + + private boolean finished; + + /** + * Fills specified buffer with uncompressed data. Returns actual number + * of bytes of uncompressed data. A return value of 0 indicates that + * {@link #needsInput()} should be called in order to determine if more + * input data is required. + * + * @param buffer Buffer for the compressed data + * @param off Start offset of the data + * @param len Size of the buffer + * @return The actual number of bytes of uncompressed data. + * @throws IOException if reading or decompression fails + */ + @Override + public synchronized int decompress(byte[] buffer, int off, int len) throws IOException { + SnappyUtil.validateBuffer(buffer, off, len); + if (inputBuffer.position() == 0 && !outputBuffer.hasRemaining()) { + return 0; + } + + if (!outputBuffer.hasRemaining()) { + inputBuffer.rewind(); + Preconditions.checkArgument(inputBuffer.position() == 0, "Invalid position of 0."); + Preconditions.checkArgument(outputBuffer.position() == 0, "Invalid position of 0."); + // There is compressed input, decompress it now. + int decompressedSize = uncompressedLength(inputBuffer, len); + if (decompressedSize > outputBuffer.capacity()) { + ByteBuffer oldBuffer = outputBuffer; + outputBuffer = ByteBuffer.allocateDirect(decompressedSize); + CleanUtil.cleanDirectBuffer(oldBuffer); + } + + // Reset the previous outputBuffer (i.e. set position to 0) + outputBuffer.clear(); + int size = uncompress(inputBuffer, outputBuffer); + outputBuffer.limit(size); + // We've decompressed the entire input, reset the input now + inputBuffer.clear(); + inputBuffer.limit(0); + finished = true; + } + + // Return compressed output up to 'len' + int numBytes = Math.min(len, outputBuffer.remaining()); + outputBuffer.get(buffer, off, numBytes); + return numBytes; + } + + /** + * Sets input data for decompression. + * This should be called if and only if {@link #needsInput()} returns + * <code>true</code> indicating that more input data is required. + * (Both native and non-native versions of various Decompressors require + * that the data passed in via <code>b[]</code> remain unmodified until + * the caller is explicitly notified--via {@link #needsInput()}--that the + * buffer may be safely modified. With this requirement, an extra + * buffer-copy can be avoided.) + * + * @param buffer Input data + * @param off Start offset + * @param len Length + */ + @Override + public synchronized void setInput(byte[] buffer, int off, int len) { + SnappyUtil.validateBuffer(buffer, off, len); + + if (inputBuffer.capacity() - inputBuffer.position() < len) { + final ByteBuffer newBuffer = ByteBuffer.allocateDirect(inputBuffer.position() + len); + inputBuffer.rewind(); + newBuffer.put(inputBuffer); + final ByteBuffer oldBuffer = inputBuffer; + inputBuffer = newBuffer; + CleanUtil.cleanDirectBuffer(oldBuffer); + } else { + inputBuffer.limit(inputBuffer.position() + len); + } + inputBuffer.put(buffer, off, len); + } + + @Override + public void end() { + CleanUtil.cleanDirectBuffer(inputBuffer); + CleanUtil.cleanDirectBuffer(outputBuffer); + } + + @Override + public synchronized boolean finished() { + return finished && !outputBuffer.hasRemaining(); + } + + @Override + public int getRemaining() { + return 0; + } + + @Override + public synchronized boolean needsInput() { + return !inputBuffer.hasRemaining() && !outputBuffer.hasRemaining(); + } + + @Override + public synchronized void reset() { + finished = false; + inputBuffer.rewind(); + outputBuffer.rewind(); + inputBuffer.limit(0); + outputBuffer.limit(0); + } + + @Override + public boolean needsDictionary() { + return false; + } + + @Override + public void setDictionary(byte[] b, int off, int len) { + // No-op + } + + /** + * Uncompress the content in the input buffer. The result is dumped to the + * specified output buffer. + * + * @param compressed buffer[pos() ... limit()) containing the input data + * @param uncompressed output of the the uncompressed data. It uses buffer[pos()..] + * @return uncompressed data size + */ + abstract protected int uncompress(ByteBuffer compressed, ByteBuffer uncompressed) throws IOException; + + /** + * Get the uncompressed byte size of the given compressed input. This operation takes O(1) time. + * + * @param compressed input data [pos() ... limit()) + * @param maxUncompressedLength maximum length of the uncompressed data + * @return uncompressed byte length of the given input + */ + abstract protected int uncompressedLength(ByteBuffer compressed, int maxUncompressedLength) throws IOException; + +} //class NonBlockedDecompressor Review Comment: We generally don't need this ending comment > Support LZ4_RAW codec > --------------------- > > Key: PARQUET-2196 > URL: https://issues.apache.org/jira/browse/PARQUET-2196 > Project: Parquet > Issue Type: Improvement > Components: parquet-mr > Reporter: Gang Wu > Priority: Major > > There is a long history about the LZ4 interoperability of parquet files > between parquet-mr and parquet-cpp (which is now in the Apache Arrow). > Attached links are the evidence. In short, a new LZ4_RAW codec type has been > introduced since parquet format v2.9.0. However, only parquet-cpp supports > LZ4_RAW. The parquet-mr library still uses the old Hadoop-provided LZ4 codec > and cannot read parquet files with LZ4_RAW. -- This message was sent by Atlassian Jira (v8.20.10#820010)