This is an automated email from the ASF dual-hosted git repository.

gangwu pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/orc.git


The following commit(s) were added to refs/heads/main by this push:
     new 634d9b014 ORC-1365: [C++] Use BlockBuffer to replace dataBuffer of  
rawInputBuffer
634d9b014 is described below

commit 634d9b0147f7b0a59bf7afa82fbee5959cada9b9
Author: luffy-zh <zhn...@outlook.com>
AuthorDate: Wed May 8 12:35:28 2024 +0800

    ORC-1365: [C++] Use BlockBuffer to replace dataBuffer of  rawInputBuffer
    
    ### What changes were proposed in this pull request?
    The purpose of this PR is to replace the DataBuffer with BlockBuffer. This 
way the orc compressor can start with a small initial size and grow 
automatically as needed.
    
    ### Why are the changes needed?
    This patch uses BlockBuffer to replace the DataBuffer of class 
CompressionStream in order to solve the 
[issue](https://issues.apache.org/jira/browse/ORC-1365).
    
    ### How was this patch tested?
    The UTs in TestBufferedOutputStream.cc and TestCompression.cc can cover 
this patch.
    
    Closes #1916 from luffy-zh/ORC-1365.
    
    Authored-by: luffy-zh <zhn...@outlook.com>
    Signed-off-by: Gang Wu <ust...@gmail.com>
---
 c++/src/BlockBuffer.hh |   2 +
 c++/src/Compression.cc | 171 +++++++++++++++++++++++++++++++------------------
 2 files changed, 112 insertions(+), 61 deletions(-)

diff --git a/c++/src/BlockBuffer.hh b/c++/src/BlockBuffer.hh
index 2faf38f7f..6d265b0e3 100644
--- a/c++/src/BlockBuffer.hh
+++ b/c++/src/BlockBuffer.hh
@@ -106,12 +106,14 @@ namespace orc {
     }
 
     void resize(uint64_t size);
+
     /**
      * Requests the BlockBuffer to contain at least newCapacity bytes.
      * Reallocation happens if there is need of more space.
      * @param newCapacity new capacity of BlockBuffer
      */
     void reserve(uint64_t newCapacity);
+
     /**
      * Write the BlockBuffer content into OutputStream
      * @param output the output stream to write to
diff --git a/c++/src/Compression.cc b/c++/src/Compression.cc
index 4002276e1..a1595da49 100644
--- a/c++/src/Compression.cc
+++ b/c++/src/Compression.cc
@@ -55,11 +55,11 @@ namespace orc {
                           uint64_t blockSize, MemoryPool& pool, WriterMetrics* 
metrics);
 
     virtual bool Next(void** data, int* size) override = 0;
-    virtual void BackUp(int count) override;
+    virtual void BackUp(int count) override = 0;
 
     virtual std::string getName() const override = 0;
-    virtual uint64_t flush() override;
-    virtual void suppress() override;
+    virtual uint64_t flush() override = 0;
+    virtual void suppress() override = 0;
 
     virtual bool isCompressed() const override {
       return true;
@@ -78,9 +78,6 @@ namespace orc {
     // ensure enough room for compression block header
     void ensureHeader();
 
-    // Buffer to hold uncompressed data until user calls Next()
-    DataBuffer<unsigned char> rawInputBuffer;
-
     // Compress level
     int level;
 
@@ -105,7 +102,6 @@ namespace orc {
                                                uint64_t capacity, uint64_t 
blockSize,
                                                MemoryPool& pool, 
WriterMetrics* metrics)
       : BufferedOutputStream(pool, outStream, capacity, blockSize, metrics),
-        rawInputBuffer(pool, blockSize),
         level(compressionLevel),
         outputBuffer(nullptr),
         bufferSize(0),
@@ -115,30 +111,6 @@ namespace orc {
     header.fill(nullptr);
   }
 
-  void CompressionStreamBase::BackUp(int count) {
-    if (count > bufferSize) {
-      throw std::logic_error("Can't backup that much!");
-    }
-    bufferSize -= count;
-  }
-
-  uint64_t CompressionStreamBase::flush() {
-    void* data;
-    int size;
-    if (!Next(&data, &size)) {
-      throw std::runtime_error("Failed to flush compression buffer.");
-    }
-    BufferedOutputStream::BackUp(outputSize - outputPosition);
-    bufferSize = outputSize = outputPosition = 0;
-    return BufferedOutputStream::flush();
-  }
-
-  void CompressionStreamBase::suppress() {
-    outputBuffer = nullptr;
-    bufferSize = outputPosition = outputSize = 0;
-    BufferedOutputStream::suppress();
-  }
-
   uint64_t CompressionStreamBase::getSize() const {
     return BufferedOutputStream::getSize() - static_cast<uint64_t>(outputSize 
- outputPosition);
   }
@@ -187,27 +159,62 @@ namespace orc {
 
     virtual bool Next(void** data, int* size) override;
     virtual std::string getName() const override = 0;
+    virtual void BackUp(int count) override;
+    virtual void suppress() override;
+    virtual uint64_t flush() override;
 
    protected:
     // return total compressed size
     virtual uint64_t doStreamingCompression() = 0;
+
+    // Buffer to hold uncompressed data until user calls Next()
+    BlockBuffer rawInputBuffer;
   };
 
+  void CompressionStream::BackUp(int count) {
+    uint64_t backup = static_cast<uint64_t>(count);
+    uint64_t currSize = rawInputBuffer.size();
+    if (backup > currSize) {
+      throw std::logic_error("Can't backup that much!");
+    }
+    rawInputBuffer.resize(currSize - backup);
+  }
+
+  uint64_t CompressionStream::flush() {
+    void* data;
+    int size;
+    if (!Next(&data, &size)) {
+      throw std::runtime_error("Failed to flush compression buffer.");
+    }
+    BufferedOutputStream::BackUp(outputSize - outputPosition);
+    rawInputBuffer.resize(0);
+    outputSize = outputPosition = 0;
+    return BufferedOutputStream::flush();
+  }
+
+  void CompressionStream::suppress() {
+    outputBuffer = nullptr;
+    outputPosition = outputSize = 0;
+    rawInputBuffer.resize(0);
+    BufferedOutputStream::suppress();
+  }
+
   CompressionStream::CompressionStream(OutputStream* outStream, int 
compressionLevel,
                                        uint64_t capacity, uint64_t blockSize, 
MemoryPool& pool,
                                        WriterMetrics* metrics)
-      : CompressionStreamBase(outStream, compressionLevel, capacity, 
blockSize, pool, metrics) {
+      : CompressionStreamBase(outStream, compressionLevel, capacity, 
blockSize, pool, metrics),
+        rawInputBuffer(pool, blockSize) {
     // PASS
   }
 
   bool CompressionStream::Next(void** data, int* size) {
-    if (bufferSize != 0) {
+    if (rawInputBuffer.size() != 0) {
       ensureHeader();
 
       uint64_t preSize = getSize();
       uint64_t totalCompressedSize = doStreamingCompression();
-      if (totalCompressedSize >= static_cast<unsigned long>(bufferSize)) {
-        writeHeader(static_cast<size_t>(bufferSize), true);
+      if (totalCompressedSize >= static_cast<unsigned 
long>(rawInputBuffer.size())) {
+        writeHeader(static_cast<size_t>(rawInputBuffer.size()), true);
         // reset output buffer
         outputBuffer = nullptr;
         outputPosition = outputSize = 0;
@@ -215,16 +222,20 @@ namespace orc {
         BufferedOutputStream::BackUp(static_cast<int>(backup));
 
         // copy raw input buffer into block buffer
-        writeData(rawInputBuffer.data(), bufferSize);
+        uint64_t blockNumber = rawInputBuffer.getBlockNumber();
+        for (uint64_t i = 0; i < blockNumber; ++i) {
+          auto block = rawInputBuffer.getBlock(i);
+          writeData(reinterpret_cast<const unsigned char*>(block.data), 
block.size);
+        }
       } else {
         writeHeader(totalCompressedSize, false);
       }
+      rawInputBuffer.resize(0);
     }
 
-    *data = rawInputBuffer.data();
-    *size = static_cast<int>(rawInputBuffer.size());
-    bufferSize = *size;
-
+    auto block = rawInputBuffer.getNextBlock();
+    *data = block.data;
+    *size = static_cast<int>(block.size);
     return true;
   }
 
@@ -260,31 +271,43 @@ namespace orc {
       throw std::runtime_error("Failed to reset inflate.");
     }
 
-    strm_.avail_in = static_cast<unsigned int>(bufferSize);
-    strm_.next_in = rawInputBuffer.data();
+    // iterate through all blocks
+    uint64_t blockId = 0;
+    bool finish = false;
 
     do {
-      if (outputPosition >= outputSize) {
-        if 
(!BufferedOutputStream::Next(reinterpret_cast<void**>(&outputBuffer), 
&outputSize)) {
-          throw std::runtime_error("Failed to get next output buffer from 
output stream.");
-        }
-        outputPosition = 0;
+      if (blockId == rawInputBuffer.getBlockNumber()) {
+        finish = true;
+        strm_.avail_in = 0;
+        strm_.next_in = nullptr;
+      } else {
+        auto block = rawInputBuffer.getBlock(blockId++);
+        strm_.avail_in = static_cast<unsigned int>(block.size);
+        strm_.next_in = reinterpret_cast<unsigned char*>(block.data);
       }
-      strm_.next_out = reinterpret_cast<unsigned char*>(outputBuffer + 
outputPosition);
-      strm_.avail_out = static_cast<unsigned int>(outputSize - outputPosition);
 
-      int ret = deflate(&strm_, Z_FINISH);
-      outputPosition = outputSize - static_cast<int>(strm_.avail_out);
+      do {
+        if (outputPosition >= outputSize) {
+          if 
(!BufferedOutputStream::Next(reinterpret_cast<void**>(&outputBuffer), 
&outputSize)) {
+            throw std::runtime_error("Failed to get next output buffer from 
output stream.");
+          }
+          outputPosition = 0;
+        }
+        strm_.next_out = reinterpret_cast<unsigned char*>(outputBuffer + 
outputPosition);
+        strm_.avail_out = static_cast<unsigned int>(outputSize - 
outputPosition);
 
-      if (ret == Z_STREAM_END) {
-        break;
-      } else if (ret == Z_OK) {
-        // needs more buffer so will continue the loop
-      } else {
-        throw std::runtime_error("Failed to deflate input data.");
-      }
-    } while (strm_.avail_out == 0);
+        int ret = deflate(&strm_, finish ? Z_FINISH : Z_NO_FLUSH);
+        outputPosition = outputSize - static_cast<int>(strm_.avail_out);
 
+        if (ret == Z_STREAM_END) {
+          break;
+        } else if (ret == Z_OK) {
+          // needs more buffer so will continue the loop
+        } else {
+          throw std::runtime_error("Failed to deflate input data.");
+        }
+      } while (strm_.avail_out == 0);
+    } while (!finish);
     return strm_.total_out;
   }
 
@@ -882,12 +905,15 @@ namespace orc {
     BlockCompressionStream(OutputStream* outStream, int compressionLevel, 
uint64_t capacity,
                            uint64_t blockSize, MemoryPool& pool, 
WriterMetrics* metrics)
         : CompressionStreamBase(outStream, compressionLevel, capacity, 
blockSize, pool, metrics),
-          compressorBuffer(pool) {
+          compressorBuffer(pool),
+          rawInputBuffer(pool, blockSize) {
       // PASS
     }
 
     virtual bool Next(void** data, int* size) override;
     virtual void suppress() override;
+    virtual void BackUp(int count) override;
+    virtual uint64_t flush() override;
     virtual std::string getName() const override = 0;
 
    protected:
@@ -900,8 +926,29 @@ namespace orc {
 
     // should allocate max possible compressed size
     DataBuffer<unsigned char> compressorBuffer;
+
+    // Buffer to hold uncompressed data until user calls Next()
+    DataBuffer<unsigned char> rawInputBuffer;
   };
 
+  void BlockCompressionStream::BackUp(int count) {
+    if (count > bufferSize) {
+      throw std::logic_error("Can't backup that much!");
+    }
+    bufferSize -= count;
+  }
+
+  uint64_t BlockCompressionStream::flush() {
+    void* data;
+    int size;
+    if (!Next(&data, &size)) {
+      throw std::runtime_error("Failed to flush compression buffer.");
+    }
+    BufferedOutputStream::BackUp(outputSize - outputPosition);
+    bufferSize = outputSize = outputPosition = 0;
+    return BufferedOutputStream::flush();
+  }
+
   bool BlockCompressionStream::Next(void** data, int* size) {
     if (bufferSize != 0) {
       ensureHeader();
@@ -935,7 +982,9 @@ namespace orc {
 
   void BlockCompressionStream::suppress() {
     compressorBuffer.resize(0);
-    CompressionStreamBase::suppress();
+    outputBuffer = nullptr;
+    bufferSize = outputPosition = outputSize = 0;
+    BufferedOutputStream::suppress();
   }
 
   /**

Reply via email to