This is an automated email from the ASF dual-hosted git repository. gangwu pushed a commit to branch main in repository https://gitbox.apache.org/repos/asf/orc.git
The following commit(s) were added to refs/heads/main by this push: new 634d9b014 ORC-1365: [C++] Use BlockBuffer to replace dataBuffer of rawInputBuffer 634d9b014 is described below commit 634d9b0147f7b0a59bf7afa82fbee5959cada9b9 Author: luffy-zh <zhn...@outlook.com> AuthorDate: Wed May 8 12:35:28 2024 +0800 ORC-1365: [C++] Use BlockBuffer to replace dataBuffer of rawInputBuffer ### What changes were proposed in this pull request? The purpose of this PR is to replace the DataBuffer with BlockBuffer. This way the orc compressor can start with a small initial size and grow automatically as needed. ### Why are the changes needed? This patch uses BlockBuffer to replace the DataBuffer of class CompressionStream in order to solve the [issue](https://issues.apache.org/jira/browse/ORC-1365). ### How was this patch tested? The UTs in TestBufferedOutputStream.cc and TestCompression.cc can cover this patch. Closes #1916 from luffy-zh/ORC-1365. Authored-by: luffy-zh <zhn...@outlook.com> Signed-off-by: Gang Wu <ust...@gmail.com> --- c++/src/BlockBuffer.hh | 2 + c++/src/Compression.cc | 171 +++++++++++++++++++++++++++++++------------------ 2 files changed, 112 insertions(+), 61 deletions(-) diff --git a/c++/src/BlockBuffer.hh b/c++/src/BlockBuffer.hh index 2faf38f7f..6d265b0e3 100644 --- a/c++/src/BlockBuffer.hh +++ b/c++/src/BlockBuffer.hh @@ -106,12 +106,14 @@ namespace orc { } void resize(uint64_t size); + /** * Requests the BlockBuffer to contain at least newCapacity bytes. * Reallocation happens if there is need of more space. * @param newCapacity new capacity of BlockBuffer */ void reserve(uint64_t newCapacity); + /** * Write the BlockBuffer content into OutputStream * @param output the output stream to write to diff --git a/c++/src/Compression.cc b/c++/src/Compression.cc index 4002276e1..a1595da49 100644 --- a/c++/src/Compression.cc +++ b/c++/src/Compression.cc @@ -55,11 +55,11 @@ namespace orc { uint64_t blockSize, MemoryPool& pool, WriterMetrics* metrics); virtual bool Next(void** data, int* size) override = 0; - virtual void BackUp(int count) override; + virtual void BackUp(int count) override = 0; virtual std::string getName() const override = 0; - virtual uint64_t flush() override; - virtual void suppress() override; + virtual uint64_t flush() override = 0; + virtual void suppress() override = 0; virtual bool isCompressed() const override { return true; @@ -78,9 +78,6 @@ namespace orc { // ensure enough room for compression block header void ensureHeader(); - // Buffer to hold uncompressed data until user calls Next() - DataBuffer<unsigned char> rawInputBuffer; - // Compress level int level; @@ -105,7 +102,6 @@ namespace orc { uint64_t capacity, uint64_t blockSize, MemoryPool& pool, WriterMetrics* metrics) : BufferedOutputStream(pool, outStream, capacity, blockSize, metrics), - rawInputBuffer(pool, blockSize), level(compressionLevel), outputBuffer(nullptr), bufferSize(0), @@ -115,30 +111,6 @@ namespace orc { header.fill(nullptr); } - void CompressionStreamBase::BackUp(int count) { - if (count > bufferSize) { - throw std::logic_error("Can't backup that much!"); - } - bufferSize -= count; - } - - uint64_t CompressionStreamBase::flush() { - void* data; - int size; - if (!Next(&data, &size)) { - throw std::runtime_error("Failed to flush compression buffer."); - } - BufferedOutputStream::BackUp(outputSize - outputPosition); - bufferSize = outputSize = outputPosition = 0; - return BufferedOutputStream::flush(); - } - - void CompressionStreamBase::suppress() { - outputBuffer = nullptr; - bufferSize = outputPosition = outputSize = 0; - BufferedOutputStream::suppress(); - } - uint64_t CompressionStreamBase::getSize() const { return BufferedOutputStream::getSize() - static_cast<uint64_t>(outputSize - outputPosition); } @@ -187,27 +159,62 @@ namespace orc { virtual bool Next(void** data, int* size) override; virtual std::string getName() const override = 0; + virtual void BackUp(int count) override; + virtual void suppress() override; + virtual uint64_t flush() override; protected: // return total compressed size virtual uint64_t doStreamingCompression() = 0; + + // Buffer to hold uncompressed data until user calls Next() + BlockBuffer rawInputBuffer; }; + void CompressionStream::BackUp(int count) { + uint64_t backup = static_cast<uint64_t>(count); + uint64_t currSize = rawInputBuffer.size(); + if (backup > currSize) { + throw std::logic_error("Can't backup that much!"); + } + rawInputBuffer.resize(currSize - backup); + } + + uint64_t CompressionStream::flush() { + void* data; + int size; + if (!Next(&data, &size)) { + throw std::runtime_error("Failed to flush compression buffer."); + } + BufferedOutputStream::BackUp(outputSize - outputPosition); + rawInputBuffer.resize(0); + outputSize = outputPosition = 0; + return BufferedOutputStream::flush(); + } + + void CompressionStream::suppress() { + outputBuffer = nullptr; + outputPosition = outputSize = 0; + rawInputBuffer.resize(0); + BufferedOutputStream::suppress(); + } + CompressionStream::CompressionStream(OutputStream* outStream, int compressionLevel, uint64_t capacity, uint64_t blockSize, MemoryPool& pool, WriterMetrics* metrics) - : CompressionStreamBase(outStream, compressionLevel, capacity, blockSize, pool, metrics) { + : CompressionStreamBase(outStream, compressionLevel, capacity, blockSize, pool, metrics), + rawInputBuffer(pool, blockSize) { // PASS } bool CompressionStream::Next(void** data, int* size) { - if (bufferSize != 0) { + if (rawInputBuffer.size() != 0) { ensureHeader(); uint64_t preSize = getSize(); uint64_t totalCompressedSize = doStreamingCompression(); - if (totalCompressedSize >= static_cast<unsigned long>(bufferSize)) { - writeHeader(static_cast<size_t>(bufferSize), true); + if (totalCompressedSize >= static_cast<unsigned long>(rawInputBuffer.size())) { + writeHeader(static_cast<size_t>(rawInputBuffer.size()), true); // reset output buffer outputBuffer = nullptr; outputPosition = outputSize = 0; @@ -215,16 +222,20 @@ namespace orc { BufferedOutputStream::BackUp(static_cast<int>(backup)); // copy raw input buffer into block buffer - writeData(rawInputBuffer.data(), bufferSize); + uint64_t blockNumber = rawInputBuffer.getBlockNumber(); + for (uint64_t i = 0; i < blockNumber; ++i) { + auto block = rawInputBuffer.getBlock(i); + writeData(reinterpret_cast<const unsigned char*>(block.data), block.size); + } } else { writeHeader(totalCompressedSize, false); } + rawInputBuffer.resize(0); } - *data = rawInputBuffer.data(); - *size = static_cast<int>(rawInputBuffer.size()); - bufferSize = *size; - + auto block = rawInputBuffer.getNextBlock(); + *data = block.data; + *size = static_cast<int>(block.size); return true; } @@ -260,31 +271,43 @@ namespace orc { throw std::runtime_error("Failed to reset inflate."); } - strm_.avail_in = static_cast<unsigned int>(bufferSize); - strm_.next_in = rawInputBuffer.data(); + // iterate through all blocks + uint64_t blockId = 0; + bool finish = false; do { - if (outputPosition >= outputSize) { - if (!BufferedOutputStream::Next(reinterpret_cast<void**>(&outputBuffer), &outputSize)) { - throw std::runtime_error("Failed to get next output buffer from output stream."); - } - outputPosition = 0; + if (blockId == rawInputBuffer.getBlockNumber()) { + finish = true; + strm_.avail_in = 0; + strm_.next_in = nullptr; + } else { + auto block = rawInputBuffer.getBlock(blockId++); + strm_.avail_in = static_cast<unsigned int>(block.size); + strm_.next_in = reinterpret_cast<unsigned char*>(block.data); } - strm_.next_out = reinterpret_cast<unsigned char*>(outputBuffer + outputPosition); - strm_.avail_out = static_cast<unsigned int>(outputSize - outputPosition); - int ret = deflate(&strm_, Z_FINISH); - outputPosition = outputSize - static_cast<int>(strm_.avail_out); + do { + if (outputPosition >= outputSize) { + if (!BufferedOutputStream::Next(reinterpret_cast<void**>(&outputBuffer), &outputSize)) { + throw std::runtime_error("Failed to get next output buffer from output stream."); + } + outputPosition = 0; + } + strm_.next_out = reinterpret_cast<unsigned char*>(outputBuffer + outputPosition); + strm_.avail_out = static_cast<unsigned int>(outputSize - outputPosition); - if (ret == Z_STREAM_END) { - break; - } else if (ret == Z_OK) { - // needs more buffer so will continue the loop - } else { - throw std::runtime_error("Failed to deflate input data."); - } - } while (strm_.avail_out == 0); + int ret = deflate(&strm_, finish ? Z_FINISH : Z_NO_FLUSH); + outputPosition = outputSize - static_cast<int>(strm_.avail_out); + if (ret == Z_STREAM_END) { + break; + } else if (ret == Z_OK) { + // needs more buffer so will continue the loop + } else { + throw std::runtime_error("Failed to deflate input data."); + } + } while (strm_.avail_out == 0); + } while (!finish); return strm_.total_out; } @@ -882,12 +905,15 @@ namespace orc { BlockCompressionStream(OutputStream* outStream, int compressionLevel, uint64_t capacity, uint64_t blockSize, MemoryPool& pool, WriterMetrics* metrics) : CompressionStreamBase(outStream, compressionLevel, capacity, blockSize, pool, metrics), - compressorBuffer(pool) { + compressorBuffer(pool), + rawInputBuffer(pool, blockSize) { // PASS } virtual bool Next(void** data, int* size) override; virtual void suppress() override; + virtual void BackUp(int count) override; + virtual uint64_t flush() override; virtual std::string getName() const override = 0; protected: @@ -900,8 +926,29 @@ namespace orc { // should allocate max possible compressed size DataBuffer<unsigned char> compressorBuffer; + + // Buffer to hold uncompressed data until user calls Next() + DataBuffer<unsigned char> rawInputBuffer; }; + void BlockCompressionStream::BackUp(int count) { + if (count > bufferSize) { + throw std::logic_error("Can't backup that much!"); + } + bufferSize -= count; + } + + uint64_t BlockCompressionStream::flush() { + void* data; + int size; + if (!Next(&data, &size)) { + throw std::runtime_error("Failed to flush compression buffer."); + } + BufferedOutputStream::BackUp(outputSize - outputPosition); + bufferSize = outputSize = outputPosition = 0; + return BufferedOutputStream::flush(); + } + bool BlockCompressionStream::Next(void** data, int* size) { if (bufferSize != 0) { ensureHeader(); @@ -935,7 +982,9 @@ namespace orc { void BlockCompressionStream::suppress() { compressorBuffer.resize(0); - CompressionStreamBase::suppress(); + outputBuffer = nullptr; + bufferSize = outputPosition = outputSize = 0; + BufferedOutputStream::suppress(); } /**