This is an automated email from the ASF dual-hosted git repository.
apitrou pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/arrow.git
The following commit(s) were added to refs/heads/master by this push:
new 65f0316 ARROW-3409: [C++] Streaming compression and decompression
interfaces
65f0316 is described below
commit 65f03168f20c9429768f64b761209365a99cbada
Author: Antoine Pitrou <[email protected]>
AuthorDate: Mon Oct 15 14:16:04 2018 +0200
ARROW-3409: [C++] Streaming compression and decompression interfaces
Author: Antoine Pitrou <[email protected]>
Closes #2696 from pitrou/ARROW-3409-streaming-compression and squashes the
following commits:
891bfaf4 <Antoine Pitrou> Fix conversion warning
a4ace53c <Antoine Pitrou> Address review comments
ce904081 <Antoine Pitrou> ARROW-3409: Streaming compression and
decompression interfaces
---
cpp/src/arrow/symbols.map | 1 +
cpp/src/arrow/util/compression-test.cc | 337 +++++++++++++++++++++++++++++--
cpp/src/arrow/util/compression.cc | 4 +
cpp/src/arrow/util/compression.h | 82 +++++++-
cpp/src/arrow/util/compression_brotli.cc | 174 +++++++++++++++-
cpp/src/arrow/util/compression_brotli.h | 5 +
cpp/src/arrow/util/compression_lz4.cc | 222 +++++++++++++++++++-
cpp/src/arrow/util/compression_lz4.h | 5 +
cpp/src/arrow/util/compression_snappy.cc | 8 +
cpp/src/arrow/util/compression_snappy.h | 5 +
cpp/src/arrow/util/compression_zlib.cc | 296 ++++++++++++++++++++++++++-
cpp/src/arrow/util/compression_zlib.h | 4 +
cpp/src/arrow/util/compression_zstd.cc | 178 +++++++++++++++-
cpp/src/arrow/util/compression_zstd.h | 5 +
14 files changed, 1293 insertions(+), 33 deletions(-)
diff --git a/cpp/src/arrow/symbols.map b/cpp/src/arrow/symbols.map
index 96faf59..18802ff 100644
--- a/cpp/src/arrow/symbols.map
+++ b/cpp/src/arrow/symbols.map
@@ -49,6 +49,7 @@
_tr_*;
# lz4
LZ4_*;
+ LZ4F_*;
# zstandard
ZSTD_*;
ZSTDv*;
diff --git a/cpp/src/arrow/util/compression-test.cc
b/cpp/src/arrow/util/compression-test.cc
index 11f99e3..10447c6 100644
--- a/cpp/src/arrow/util/compression-test.cc
+++ b/cpp/src/arrow/util/compression-test.cc
@@ -15,8 +15,11 @@
// specific language governing permissions and limitations
// under the License.
+#include <cmath>
#include <cstdint>
+#include <cstring>
#include <memory>
+#include <random>
#include <string>
#include <vector>
@@ -31,13 +34,33 @@ using std::vector;
namespace arrow {
namespace util {
-template <Compression::type CODEC>
-void CheckCodecRoundtrip(const vector<uint8_t>& data) {
+vector<uint8_t> MakeRandomData(int data_size) {
+ vector<uint8_t> data(data_size);
+ random_bytes(data_size, 1234, data.data());
+ return data;
+}
+
+vector<uint8_t> MakeCompressibleData(int data_size) {
+ std::string base_data =
+ "Apache Arrow is a cross-language development platform for in-memory
data";
+ int nrepeats = static_cast<int>(1 + data_size / base_data.size());
+
+ vector<uint8_t> data(base_data.size() * nrepeats);
+ for (int i = 0; i < nrepeats; ++i) {
+ std::memcpy(data.data() + i * base_data.size(), base_data.data(),
base_data.size());
+ }
+ data.resize(data_size);
+ return data;
+}
+
+// Check roundtrip of one-shot compression and decompression functions.
+
+void CheckCodecRoundtrip(Compression::type ctype, const vector<uint8_t>& data)
{
// create multiple compressors to try to break them
std::unique_ptr<Codec> c1, c2;
- ASSERT_OK(Codec::Create(CODEC, &c1));
- ASSERT_OK(Codec::Create(CODEC, &c2));
+ ASSERT_OK(Codec::Create(ctype, &c1));
+ ASSERT_OK(Codec::Create(ctype, &c2));
int max_compressed_len =
static_cast<int>(c1->MaxCompressedLen(data.size(), data.data()));
@@ -69,25 +92,311 @@ void CheckCodecRoundtrip(const vector<uint8_t>& data) {
ASSERT_EQ(data, decompressed);
}
-template <Compression::type CODEC>
-void CheckCodec() {
+// Check the streaming compressor against one-shot decompression
+
+void CheckStreamingCompressor(Codec* codec, const vector<uint8_t>& data) {
+ std::shared_ptr<Compressor> compressor;
+ ASSERT_OK(codec->MakeCompressor(&compressor));
+
+ std::vector<uint8_t> compressed;
+ int64_t compressed_size = 0;
+ const uint8_t* input = data.data();
+ int64_t remaining = data.size();
+
+ compressed.resize(10);
+ bool do_flush = false;
+
+ while (remaining > 0) {
+ // Feed a small amount each time
+ int64_t input_len = std::min(remaining, static_cast<int64_t>(1111));
+ int64_t output_len = compressed.size() - compressed_size;
+ uint8_t* output = compressed.data() + compressed_size;
+ int64_t bytes_read, bytes_written;
+ ASSERT_OK(compressor->Compress(input_len, input, output_len, output,
&bytes_read,
+ &bytes_written));
+ ASSERT_LE(bytes_read, input_len);
+ ASSERT_LE(bytes_written, output_len);
+ compressed_size += bytes_written;
+ input += bytes_read;
+ remaining -= bytes_read;
+ if (bytes_read == 0) {
+ compressed.resize(compressed.capacity() * 2);
+ }
+ // Once every two iterations, do a flush
+ if (do_flush) {
+ bool should_retry = false;
+ do {
+ output_len = compressed.size() - compressed_size;
+ output = compressed.data() + compressed_size;
+ ASSERT_OK(compressor->Flush(output_len, output, &bytes_written,
&should_retry));
+ ASSERT_LE(bytes_written, output_len);
+ compressed_size += bytes_written;
+ if (should_retry) {
+ compressed.resize(compressed.capacity() * 2);
+ }
+ } while (should_retry);
+ }
+ do_flush = !do_flush;
+ }
+
+ // End the compressed stream
+ bool should_retry = false;
+ do {
+ int64_t output_len = compressed.size() - compressed_size;
+ uint8_t* output = compressed.data() + compressed_size;
+ int64_t bytes_written;
+ ASSERT_OK(compressor->End(output_len, output, &bytes_written,
&should_retry));
+ ASSERT_LE(bytes_written, output_len);
+ compressed_size += bytes_written;
+ if (should_retry) {
+ compressed.resize(compressed.capacity() * 2);
+ }
+ } while (should_retry);
+
+ // Check decompressing the compressed data
+ std::vector<uint8_t> decompressed(data.size());
+ ASSERT_OK(codec->Decompress(compressed_size, compressed.data(),
decompressed.size(),
+ decompressed.data()));
+
+ ASSERT_EQ(data, decompressed);
+}
+
+// Check the streaming decompressor against one-shot compression
+
+void CheckStreamingDecompressor(Codec* codec, const vector<uint8_t>& data) {
+ // Create compressed data
+ int64_t max_compressed_len = codec->MaxCompressedLen(data.size(),
data.data());
+ std::vector<uint8_t> compressed(max_compressed_len);
+ int64_t compressed_size;
+ ASSERT_OK(codec->Compress(data.size(), data.data(), max_compressed_len,
+ compressed.data(), &compressed_size));
+ compressed.resize(compressed_size);
+
+ // Run streaming decompression
+ std::shared_ptr<Decompressor> decompressor;
+ ASSERT_OK(codec->MakeDecompressor(&decompressor));
+
+ std::vector<uint8_t> decompressed;
+ int64_t decompressed_size = 0;
+ const uint8_t* input = compressed.data();
+ int64_t remaining = compressed.size();
+
+ decompressed.resize(10);
+ while (!decompressor->IsFinished()) {
+ // Feed a small amount each time
+ int64_t input_len = std::min(remaining, static_cast<int64_t>(23));
+ int64_t output_len = decompressed.size() - decompressed_size;
+ uint8_t* output = decompressed.data() + decompressed_size;
+ int64_t bytes_read, bytes_written;
+ bool need_more_output;
+ ASSERT_OK(decompressor->Decompress(input_len, input, output_len, output,
&bytes_read,
+ &bytes_written, &need_more_output));
+ ASSERT_LE(bytes_read, input_len);
+ ASSERT_LE(bytes_written, output_len);
+ ASSERT_TRUE(need_more_output || bytes_written > 0 || bytes_read > 0)
+ << "Decompression not progressing anymore";
+ if (need_more_output) {
+ decompressed.resize(decompressed.capacity() * 2);
+ }
+ decompressed_size += bytes_written;
+ input += bytes_read;
+ remaining -= bytes_read;
+ }
+ ASSERT_TRUE(decompressor->IsFinished());
+ ASSERT_EQ(remaining, 0);
+
+ // Check the decompressed data
+ decompressed.resize(decompressed_size);
+ ASSERT_EQ(data.size(), decompressed_size);
+ ASSERT_EQ(data, decompressed);
+}
+
+// Check the streaming compressor and decompressor together
+
+void CheckStreamingRoundtrip(Codec* codec, const vector<uint8_t>& data) {
+ std::shared_ptr<Compressor> compressor;
+ std::shared_ptr<Decompressor> decompressor;
+ ASSERT_OK(codec->MakeCompressor(&compressor));
+ ASSERT_OK(codec->MakeDecompressor(&decompressor));
+
+ std::default_random_engine engine(42);
+ std::uniform_int_distribution<int> buf_size_distribution(10, 40);
+
+ auto make_buf_size = [&]() -> int64_t { return
buf_size_distribution(engine); };
+
+ // Compress...
+
+ std::vector<uint8_t> compressed(1);
+ int64_t compressed_size = 0;
+ {
+ const uint8_t* input = data.data();
+ int64_t remaining = data.size();
+
+ while (remaining > 0) {
+ // Feed a varying amount each time
+ int64_t input_len = std::min(remaining, make_buf_size());
+ int64_t output_len = compressed.size() - compressed_size;
+ uint8_t* output = compressed.data() + compressed_size;
+ int64_t bytes_read, bytes_written;
+ ASSERT_OK(compressor->Compress(input_len, input, output_len, output,
&bytes_read,
+ &bytes_written));
+ ASSERT_LE(bytes_read, input_len);
+ ASSERT_LE(bytes_written, output_len);
+ compressed_size += bytes_written;
+ input += bytes_read;
+ remaining -= bytes_read;
+ if (bytes_read == 0) {
+ compressed.resize(compressed.capacity() * 2);
+ }
+ }
+ // End the compressed stream
+ bool should_retry = false;
+ do {
+ int64_t output_len = compressed.size() - compressed_size;
+ uint8_t* output = compressed.data() + compressed_size;
+ int64_t bytes_written;
+ ASSERT_OK(compressor->End(output_len, output, &bytes_written,
&should_retry));
+ ASSERT_LE(bytes_written, output_len);
+ compressed_size += bytes_written;
+ if (should_retry) {
+ compressed.resize(compressed.capacity() * 2);
+ }
+ } while (should_retry);
+
+ compressed.resize(compressed_size);
+ }
+
+ // Then decompress...
+
+ std::vector<uint8_t> decompressed(2);
+ int64_t decompressed_size = 0;
+ {
+ const uint8_t* input = compressed.data();
+ int64_t remaining = compressed.size();
+
+ while (!decompressor->IsFinished()) {
+ // Feed a varying amount each time
+ int64_t input_len = std::min(remaining, make_buf_size());
+ int64_t output_len = decompressed.size() - decompressed_size;
+ uint8_t* output = decompressed.data() + decompressed_size;
+ int64_t bytes_read, bytes_written;
+ bool need_more_output;
+ ASSERT_OK(decompressor->Decompress(input_len, input, output_len, output,
+ &bytes_read, &bytes_written,
&need_more_output));
+ ASSERT_LE(bytes_read, input_len);
+ ASSERT_LE(bytes_written, output_len);
+ ASSERT_TRUE(need_more_output || bytes_written > 0 || bytes_read > 0)
+ << "Decompression not progressing anymore";
+ if (need_more_output) {
+ decompressed.resize(decompressed.capacity() * 2);
+ }
+ decompressed_size += bytes_written;
+ input += bytes_read;
+ remaining -= bytes_read;
+ }
+ ASSERT_EQ(remaining, 0);
+ decompressed.resize(decompressed_size);
+ }
+
+ ASSERT_EQ(data.size(), decompressed.size());
+ ASSERT_EQ(data, decompressed);
+}
+
+class CodecTest : public ::testing::TestWithParam<Compression::type> {
+ protected:
+ Compression::type GetCompression() { return GetParam(); }
+
+ std::unique_ptr<Codec> MakeCodec() {
+ std::unique_ptr<Codec> codec;
+ ABORT_NOT_OK(Codec::Create(GetCompression(), &codec));
+ return codec;
+ }
+};
+
+TEST_P(CodecTest, CodecRoundtrip) {
int sizes[] = {0, 10000, 100000};
for (int data_size : sizes) {
- vector<uint8_t> data(data_size);
- random_bytes(data_size, 1234, data.data());
- CheckCodecRoundtrip<CODEC>(data);
+ vector<uint8_t> data = MakeRandomData(data_size);
+ CheckCodecRoundtrip(GetCompression(), data);
+
+ data = MakeCompressibleData(data_size);
+ CheckCodecRoundtrip(GetCompression(), data);
+ }
+}
+
+TEST_P(CodecTest, StreamingCompressor) {
+ if (GetCompression() == Compression::SNAPPY) {
+ // SKIP: snappy doesn't support streaming compression
+ return;
+ }
+ if (GetCompression() == Compression::LZ4) {
+ // SKIP: LZ4 streaming compression uses the LZ4 framing format,
+ // which must be tested against a streaming decompressor
+ return;
+ }
+
+ int sizes[] = {0, 10, 100000};
+ for (int data_size : sizes) {
+ auto codec = MakeCodec();
+
+ vector<uint8_t> data = MakeRandomData(data_size);
+ CheckStreamingCompressor(codec.get(), data);
+
+ data = MakeCompressibleData(data_size);
+ CheckStreamingCompressor(codec.get(), data);
+ }
+}
+
+TEST_P(CodecTest, StreamingDecompressor) {
+ if (GetCompression() == Compression::SNAPPY) {
+ // SKIP: snappy doesn't support streaming decompression
+ return;
+ }
+ if (GetCompression() == Compression::LZ4) {
+ // SKIP: LZ4 streaming decompression uses the LZ4 framing format,
+ // which must be tested against a streaming compressor
+ return;
+ }
+
+ int sizes[] = {0, 10, 100000};
+ for (int data_size : sizes) {
+ auto codec = MakeCodec();
+
+ vector<uint8_t> data = MakeRandomData(data_size);
+ CheckStreamingDecompressor(codec.get(), data);
+
+ data = MakeCompressibleData(data_size);
+ CheckStreamingDecompressor(codec.get(), data);
+ }
+}
+
+TEST_P(CodecTest, StreamingRoundtrip) {
+ if (GetCompression() == Compression::SNAPPY) {
+ // SKIP: snappy doesn't support streaming decompression
+ return;
+ }
+
+ int sizes[] = {0, 10, 100000};
+ for (int data_size : sizes) {
+ auto codec = MakeCodec();
+
+ vector<uint8_t> data = MakeRandomData(data_size);
+ CheckStreamingRoundtrip(codec.get(), data);
+
+ data = MakeCompressibleData(data_size);
+ CheckStreamingRoundtrip(codec.get(), data);
}
}
-TEST(TestCompressors, Snappy) { CheckCodec<Compression::SNAPPY>(); }
+INSTANTIATE_TEST_CASE_P(TestGZip, CodecTest,
::testing::Values(Compression::GZIP));
-TEST(TestCompressors, Brotli) { CheckCodec<Compression::BROTLI>(); }
+INSTANTIATE_TEST_CASE_P(TestZSTD, CodecTest,
::testing::Values(Compression::ZSTD));
-TEST(TestCompressors, GZip) { CheckCodec<Compression::GZIP>(); }
+INSTANTIATE_TEST_CASE_P(TestSnappy, CodecTest,
::testing::Values(Compression::SNAPPY));
-TEST(TestCompressors, ZSTD) { CheckCodec<Compression::ZSTD>(); }
+INSTANTIATE_TEST_CASE_P(TestLZ4, CodecTest,
::testing::Values(Compression::LZ4));
-TEST(TestCompressors, Lz4) { CheckCodec<Compression::LZ4>(); }
+INSTANTIATE_TEST_CASE_P(TestBrotli, CodecTest,
::testing::Values(Compression::BROTLI));
} // namespace util
} // namespace arrow
diff --git a/cpp/src/arrow/util/compression.cc
b/cpp/src/arrow/util/compression.cc
index 459034a..8d4d838 100644
--- a/cpp/src/arrow/util/compression.cc
+++ b/cpp/src/arrow/util/compression.cc
@@ -44,6 +44,10 @@
namespace arrow {
namespace util {
+Compressor::~Compressor() {}
+
+Decompressor::~Decompressor() {}
+
Codec::~Codec() {}
Status Codec::Create(Compression::type codec_type, std::unique_ptr<Codec>*
result) {
diff --git a/cpp/src/arrow/util/compression.h b/cpp/src/arrow/util/compression.h
index 8c6d7b7..f34ae43 100644
--- a/cpp/src/arrow/util/compression.h
+++ b/cpp/src/arrow/util/compression.h
@@ -21,32 +21,112 @@
#include <cstdint>
#include <memory>
-#include "arrow/status.h"
#include "arrow/util/visibility.h"
namespace arrow {
+class Status;
+
struct Compression {
enum type { UNCOMPRESSED, SNAPPY, GZIP, BROTLI, ZSTD, LZ4, LZO };
};
namespace util {
+/// \brief Streaming compressor interface
+///
+class ARROW_EXPORT Compressor {
+ public:
+ virtual ~Compressor();
+
+ /// \brief Compress some input.
+ ///
+ /// If bytes_read is 0 on return, then a larger output buffer should be
supplied.
+ virtual Status Compress(int64_t input_len, const uint8_t* input, int64_t
output_len,
+ uint8_t* output, int64_t* bytes_read,
+ int64_t* bytes_written) = 0;
+
+ /// \brief Flush part of the compressed output.
+ ///
+ /// If should_retry is true on return, Flush() should be called again
+ /// with a larger buffer.
+ virtual Status Flush(int64_t output_len, uint8_t* output, int64_t*
bytes_written,
+ bool* should_retry) = 0;
+
+ /// \brief End compressing, doing whatever is necessary to end the stream.
+ ///
+ /// If should_retry is true on return, End() should be called again
+ /// with a larger buffer. Otherwise, the Compressor should not be used
anymore.
+ ///
+ /// End() implies Flush().
+ virtual Status End(int64_t output_len, uint8_t* output, int64_t*
bytes_written,
+ bool* should_retry) = 0;
+
+ // XXX add methods for buffer size heuristics?
+};
+
+/// \brief Streaming decompressor interface
+///
+class ARROW_EXPORT Decompressor {
+ public:
+ virtual ~Decompressor();
+
+ /// \brief Decompress some input.
+ ///
+ /// If need_more_output is true on return, a larger output buffer needs
+ /// to be supplied.
+ /// XXX is need_more_output necessary? (Brotli?)
+ virtual Status Decompress(int64_t input_len, const uint8_t* input, int64_t
output_len,
+ uint8_t* output, int64_t* bytes_read, int64_t*
bytes_written,
+ bool* need_more_output) = 0;
+
+ /// \brief Return whether the compressed stream is finished.
+ ///
+ /// This is a heuristic. If true is returned, then it is guaranteed
+ /// that the stream is finished. If false is returned, however, it may
+ /// simply be that the underlying library isn't able to provide the
information.
+ virtual bool IsFinished() = 0;
+
+ // XXX add methods for buffer size heuristics?
+};
+
class ARROW_EXPORT Codec {
public:
virtual ~Codec();
static Status Create(Compression::type codec, std::unique_ptr<Codec>* out);
+ /// \brief One-shot decompression function
+ ///
+ /// output_len must be correct and therefore be obtained in advance.
+ ///
+ /// \note One-shot decompression is not always compatible with streaming
+ /// compression. Depending on the codec (e.g. LZ4), different formats may
+ /// be used.
virtual Status Decompress(int64_t input_len, const uint8_t* input, int64_t
output_len,
uint8_t* output_buffer) = 0;
+ /// \brief One-shot compression function
+ ///
+ /// output_buffer_len must first have been computed using MaxCompressedLen().
+ ///
+ /// \note One-shot compression is not always compatible with streaming
+ /// decompression. Depending on the codec (e.g. LZ4), different formats may
+ /// be used.
virtual Status Compress(int64_t input_len, const uint8_t* input,
int64_t output_buffer_len, uint8_t* output_buffer,
int64_t* output_length) = 0;
virtual int64_t MaxCompressedLen(int64_t input_len, const uint8_t* input) =
0;
+ // XXX Should be able to choose compression level, or presets? ("fast", etc.)
+
+ /// \brief Create a streaming compressor instance
+ virtual Status MakeCompressor(std::shared_ptr<Compressor>* out) = 0;
+
+ /// \brief Create a streaming decompressor instance
+ virtual Status MakeDecompressor(std::shared_ptr<Decompressor>* out) = 0;
+
virtual const char* name() const = 0;
};
diff --git a/cpp/src/arrow/util/compression_brotli.cc
b/cpp/src/arrow/util/compression_brotli.cc
index 3dcaf99..1b8ab85 100644
--- a/cpp/src/arrow/util/compression_brotli.cc
+++ b/cpp/src/arrow/util/compression_brotli.cc
@@ -19,19 +19,184 @@
#include <cstddef>
#include <cstdint>
+#include <sstream>
#include <brotli/decode.h>
#include <brotli/encode.h>
#include <brotli/types.h>
#include "arrow/status.h"
+#include "arrow/util/logging.h"
#include "arrow/util/macros.h"
namespace arrow {
namespace util {
+// Brotli compression quality is max (11) by default, which is slow.
+// We use 8 as a default as it is the best trade-off for Parquet workload.
+constexpr int kBrotliDefaultCompressionLevel = 8;
+
// ----------------------------------------------------------------------
-// Brotli implementation
+// Brotli decompressor implementation
+
+class BrotliDecompressor : public Decompressor {
+ public:
+ BrotliDecompressor() {}
+
+ ~BrotliDecompressor() override {
+ if (state_ != nullptr) {
+ BrotliDecoderDestroyInstance(state_);
+ }
+ }
+
+ Status Init() {
+ state_ = BrotliDecoderCreateInstance(nullptr, nullptr, nullptr);
+ if (state_ == nullptr) {
+ return BrotliError("Brotli init failed");
+ }
+ return Status::OK();
+ }
+
+ Status Decompress(int64_t input_len, const uint8_t* input, int64_t
output_len,
+ uint8_t* output, int64_t* bytes_read, int64_t*
bytes_written,
+ bool* need_more_output) override {
+ auto avail_in = static_cast<size_t>(input_len);
+ auto avail_out = static_cast<size_t>(output_len);
+ BrotliDecoderResult ret;
+
+ ret = BrotliDecoderDecompressStream(state_, &avail_in, &input, &avail_out,
&output,
+ nullptr /* total_out */);
+ if (ret == BROTLI_DECODER_RESULT_ERROR) {
+ return BrotliError(BrotliDecoderGetErrorCode(state_), "Brotli decompress
failed: ");
+ }
+ *need_more_output = (ret == BROTLI_DECODER_RESULT_NEEDS_MORE_OUTPUT);
+ *bytes_read = static_cast<int64_t>(input_len - avail_in);
+ *bytes_written = static_cast<int64_t>(output_len - avail_out);
+ return Status::OK();
+ }
+
+ bool IsFinished() override { return BrotliDecoderIsFinished(state_); }
+
+ protected:
+ Status BrotliError(const char* msg) { return Status::IOError(msg); }
+
+ Status BrotliError(BrotliDecoderErrorCode code, const char* prefix_msg) {
+ std::stringstream ss;
+ ss << prefix_msg << BrotliDecoderErrorString(code);
+ return Status::IOError(ss.str());
+ }
+
+ BrotliDecoderState* state_ = nullptr;
+};
+
+// ----------------------------------------------------------------------
+// Brotli compressor implementation
+
+class BrotliCompressor : public Compressor {
+ public:
+ BrotliCompressor() {}
+
+ ~BrotliCompressor() override {
+ if (state_ != nullptr) {
+ BrotliEncoderDestroyInstance(state_);
+ }
+ }
+
+ Status Init() {
+ state_ = BrotliEncoderCreateInstance(nullptr, nullptr, nullptr);
+ if (state_ == nullptr) {
+ return BrotliError("Brotli init failed");
+ }
+ if (!BrotliEncoderSetParameter(state_, BROTLI_PARAM_QUALITY,
+ kBrotliDefaultCompressionLevel)) {
+ return BrotliError("Brotli set compression level failed");
+ }
+ return Status::OK();
+ }
+
+ Status Compress(int64_t input_len, const uint8_t* input, int64_t output_len,
+ uint8_t* output, int64_t* bytes_read, int64_t*
bytes_written) override;
+
+ Status Flush(int64_t output_len, uint8_t* output, int64_t* bytes_written,
+ bool* should_retry) override;
+
+ Status End(int64_t output_len, uint8_t* output, int64_t* bytes_written,
+ bool* should_retry) override;
+
+ protected:
+ Status BrotliError(const char* msg) { return Status::IOError(msg); }
+
+ BrotliEncoderState* state_ = nullptr;
+};
+
+Status BrotliCompressor::Compress(int64_t input_len, const uint8_t* input,
+ int64_t output_len, uint8_t* output,
+ int64_t* bytes_read, int64_t* bytes_written)
{
+ auto avail_in = static_cast<size_t>(input_len);
+ auto avail_out = static_cast<size_t>(output_len);
+ BROTLI_BOOL ret;
+
+ ret = BrotliEncoderCompressStream(state_, BROTLI_OPERATION_PROCESS,
&avail_in, &input,
+ &avail_out, &output, nullptr /* total_out
*/);
+ if (!ret) {
+ return BrotliError("Brotli compress failed");
+ }
+ *bytes_read = static_cast<int64_t>(input_len - avail_in);
+ *bytes_written = static_cast<int64_t>(output_len - avail_out);
+ return Status::OK();
+}
+
+Status BrotliCompressor::Flush(int64_t output_len, uint8_t* output,
+ int64_t* bytes_written, bool* should_retry) {
+ size_t avail_in = 0;
+ const uint8_t* next_in = nullptr;
+ auto avail_out = static_cast<size_t>(output_len);
+ BROTLI_BOOL ret;
+
+ ret = BrotliEncoderCompressStream(state_, BROTLI_OPERATION_FLUSH, &avail_in,
&next_in,
+ &avail_out, &output, nullptr /* total_out
*/);
+ if (!ret) {
+ return BrotliError("Brotli flush failed");
+ }
+ *bytes_written = static_cast<int64_t>(output_len - avail_out);
+ *should_retry = !!BrotliEncoderHasMoreOutput(state_);
+ return Status::OK();
+}
+
+Status BrotliCompressor::End(int64_t output_len, uint8_t* output, int64_t*
bytes_written,
+ bool* should_retry) {
+ size_t avail_in = 0;
+ const uint8_t* next_in = nullptr;
+ auto avail_out = static_cast<size_t>(output_len);
+ BROTLI_BOOL ret;
+
+ ret = BrotliEncoderCompressStream(state_, BROTLI_OPERATION_FINISH,
&avail_in, &next_in,
+ &avail_out, &output, nullptr /* total_out
*/);
+ if (!ret) {
+ return BrotliError("Brotli end failed");
+ }
+ *bytes_written = static_cast<int64_t>(output_len - avail_out);
+ *should_retry = !!BrotliEncoderHasMoreOutput(state_);
+ DCHECK_EQ(*should_retry, !BrotliEncoderIsFinished(state_));
+ return Status::OK();
+}
+
+// ----------------------------------------------------------------------
+// Brotli codec implementation
+
+Status BrotliCodec::MakeCompressor(std::shared_ptr<Compressor>* out) {
+ auto ptr = std::make_shared<BrotliCompressor>();
+ RETURN_NOT_OK(ptr->Init());
+ *out = ptr;
+ return Status::OK();
+}
+
+Status BrotliCodec::MakeDecompressor(std::shared_ptr<Decompressor>* out) {
+ auto ptr = std::make_shared<BrotliDecompressor>();
+ RETURN_NOT_OK(ptr->Init());
+ *out = ptr;
+ return Status::OK();
+}
Status BrotliCodec::Decompress(int64_t input_len, const uint8_t* input,
int64_t output_len, uint8_t* output_buffer) {
@@ -52,10 +217,9 @@ Status BrotliCodec::Compress(int64_t input_len, const
uint8_t* input,
int64_t output_buffer_len, uint8_t* output_buffer,
int64_t* output_length) {
std::size_t output_len = output_buffer_len;
- // TODO: Make quality configurable. We use 8 as a default as it is the best
- // trade-off for Parquet workload
- if (BrotliEncoderCompress(8, BROTLI_DEFAULT_WINDOW, BROTLI_DEFAULT_MODE,
input_len,
- input, &output_len, output_buffer) ==
BROTLI_FALSE) {
+ if (BrotliEncoderCompress(kBrotliDefaultCompressionLevel,
BROTLI_DEFAULT_WINDOW,
+ BROTLI_DEFAULT_MODE, input_len, input, &output_len,
+ output_buffer) == BROTLI_FALSE) {
return Status::IOError("Brotli compression failure.");
}
*output_length = output_len;
diff --git a/cpp/src/arrow/util/compression_brotli.h
b/cpp/src/arrow/util/compression_brotli.h
index 23fb321..0b403ee 100644
--- a/cpp/src/arrow/util/compression_brotli.h
+++ b/cpp/src/arrow/util/compression_brotli.h
@@ -19,6 +19,7 @@
#define ARROW_UTIL_COMPRESSION_BROTLI_H
#include <cstdint>
+#include <memory>
#include "arrow/status.h"
#include "arrow/util/compression.h"
@@ -38,6 +39,10 @@ class ARROW_EXPORT BrotliCodec : public Codec {
int64_t MaxCompressedLen(int64_t input_len, const uint8_t* input) override;
+ Status MakeCompressor(std::shared_ptr<Compressor>* out) override;
+
+ Status MakeDecompressor(std::shared_ptr<Decompressor>* out) override;
+
const char* name() const override { return "brotli"; }
};
diff --git a/cpp/src/arrow/util/compression_lz4.cc
b/cpp/src/arrow/util/compression_lz4.cc
index 58228f5..7ba1f12 100644
--- a/cpp/src/arrow/util/compression_lz4.cc
+++ b/cpp/src/arrow/util/compression_lz4.cc
@@ -18,17 +18,237 @@
#include "arrow/util/compression_lz4.h"
#include <cstdint>
+#include <sstream>
#include <lz4.h>
+#include <lz4frame.h>
#include "arrow/status.h"
+#include "arrow/util/logging.h"
#include "arrow/util/macros.h"
namespace arrow {
namespace util {
// ----------------------------------------------------------------------
-// Lz4 implementation
+// Lz4 decompressor implementation
+
+class LZ4Decompressor : public Decompressor {
+ public:
+ LZ4Decompressor() {}
+
+ ~LZ4Decompressor() override {
+ if (ctx_ != nullptr) {
+ ARROW_UNUSED(LZ4F_freeDecompressionContext(ctx_));
+ }
+ }
+
+ Status Init() {
+ LZ4F_errorCode_t ret;
+ finished_ = false;
+
+ ret = LZ4F_createDecompressionContext(&ctx_, LZ4F_VERSION);
+ if (LZ4F_isError(ret)) {
+ return LZ4Error(ret, "LZ4 init failed: ");
+ } else {
+ return Status::OK();
+ }
+ }
+
+ Status Decompress(int64_t input_len, const uint8_t* input, int64_t
output_len,
+ uint8_t* output, int64_t* bytes_read, int64_t*
bytes_written,
+ bool* need_more_output) override {
+ auto src = input;
+ auto dst = output;
+ auto srcSize = static_cast<size_t>(input_len);
+ auto dstCapacity = static_cast<size_t>(output_len);
+ size_t ret;
+
+ ret = LZ4F_decompress(ctx_, dst, &dstCapacity, src, &srcSize, nullptr /*
options */);
+ if (LZ4F_isError(ret)) {
+ return LZ4Error(ret, "LZ4 decompress failed: ");
+ }
+ *bytes_read = static_cast<int64_t>(srcSize);
+ *bytes_written = static_cast<int64_t>(dstCapacity);
+ *need_more_output = (*bytes_read == 0 && *bytes_written == 0);
+ finished_ = (ret == 0);
+ return Status::OK();
+ }
+
+ bool IsFinished() override { return finished_; }
+
+ protected:
+ Status LZ4Error(LZ4F_errorCode_t ret, const char* prefix_msg) {
+ std::stringstream ss;
+ ss << prefix_msg << LZ4F_getErrorName(ret);
+ return Status::IOError(ss.str());
+ }
+
+ LZ4F_dctx* ctx_ = nullptr;
+ bool finished_;
+};
+
+// ----------------------------------------------------------------------
+// Lz4 compressor implementation
+
+class LZ4Compressor : public Compressor {
+ public:
+ LZ4Compressor() {}
+
+ ~LZ4Compressor() override {
+ if (ctx_ != nullptr) {
+ ARROW_UNUSED(LZ4F_freeCompressionContext(ctx_));
+ }
+ }
+
+ Status Init() {
+ LZ4F_errorCode_t ret;
+ memset(&prefs_, 0, sizeof(prefs_));
+ first_time_ = true;
+
+ ret = LZ4F_createCompressionContext(&ctx_, LZ4F_VERSION);
+ if (LZ4F_isError(ret)) {
+ return LZ4Error(ret, "LZ4 init failed: ");
+ } else {
+ return Status::OK();
+ }
+ }
+
+ Status Compress(int64_t input_len, const uint8_t* input, int64_t output_len,
+ uint8_t* output, int64_t* bytes_read, int64_t*
bytes_written) override;
+
+ Status Flush(int64_t output_len, uint8_t* output, int64_t* bytes_written,
+ bool* should_retry) override;
+
+ Status End(int64_t output_len, uint8_t* output, int64_t* bytes_written,
+ bool* should_retry) override;
+
+ protected:
+ Status LZ4Error(LZ4F_errorCode_t ret, const char* prefix_msg) {
+ std::stringstream ss;
+ ss << prefix_msg << LZ4F_getErrorName(ret);
+ return Status::IOError(ss.str());
+ }
+
+ LZ4F_cctx* ctx_ = nullptr;
+ LZ4F_preferences_t prefs_;
+ bool first_time_;
+};
+
+#define BEGIN_COMPRESS(dst, dstCapacity) \
+ if (first_time_) { \
+ if (dstCapacity < LZ4F_HEADER_SIZE_MAX) { \
+ /* Output too small to write LZ4F header */ \
+ return Status::OK(); \
+ } \
+ ret = LZ4F_compressBegin(ctx_, dst, dstCapacity, &prefs_); \
+ if (LZ4F_isError(ret)) { \
+ return LZ4Error(ret, "LZ4 compress begin failed: "); \
+ } \
+ first_time_ = false; \
+ dst += ret; \
+ dstCapacity -= ret; \
+ *bytes_written += static_cast<int64_t>(ret); \
+ }
+
+Status LZ4Compressor::Compress(int64_t input_len, const uint8_t* input,
+ int64_t output_len, uint8_t* output, int64_t*
bytes_read,
+ int64_t* bytes_written) {
+ auto src = input;
+ auto dst = output;
+ auto srcSize = static_cast<size_t>(input_len);
+ auto dstCapacity = static_cast<size_t>(output_len);
+ size_t ret;
+
+ *bytes_read = 0;
+ *bytes_written = 0;
+
+ BEGIN_COMPRESS(dst, dstCapacity);
+
+ if (dstCapacity < LZ4F_compressBound(srcSize, &prefs_)) {
+ // Output too small to compress into
+ return Status::OK();
+ }
+ ret = LZ4F_compressUpdate(ctx_, dst, dstCapacity, src, srcSize, nullptr /*
options */);
+ if (LZ4F_isError(ret)) {
+ return LZ4Error(ret, "LZ4 compress update failed: ");
+ }
+ *bytes_read = input_len;
+ *bytes_written += static_cast<int64_t>(ret);
+ DCHECK_LE(*bytes_written, output_len);
+ return Status::OK();
+}
+
+Status LZ4Compressor::Flush(int64_t output_len, uint8_t* output, int64_t*
bytes_written,
+ bool* should_retry) {
+ auto dst = output;
+ auto dstCapacity = static_cast<size_t>(output_len);
+ size_t ret;
+
+ *bytes_written = 0;
+ *should_retry = true;
+
+ BEGIN_COMPRESS(dst, dstCapacity);
+
+ if (dstCapacity < LZ4F_compressBound(0, &prefs_)) {
+ // Output too small to flush into
+ return Status::OK();
+ }
+
+ ret = LZ4F_flush(ctx_, dst, dstCapacity, nullptr /* options */);
+ if (LZ4F_isError(ret)) {
+ return LZ4Error(ret, "LZ4 flush failed: ");
+ }
+ *bytes_written += static_cast<int64_t>(ret);
+ *should_retry = false;
+ DCHECK_LE(*bytes_written, output_len);
+ return Status::OK();
+}
+
+Status LZ4Compressor::End(int64_t output_len, uint8_t* output, int64_t*
bytes_written,
+ bool* should_retry) {
+ auto dst = output;
+ auto dstCapacity = static_cast<size_t>(output_len);
+ size_t ret;
+
+ *bytes_written = 0;
+ *should_retry = true;
+
+ BEGIN_COMPRESS(dst, dstCapacity);
+
+ if (dstCapacity < LZ4F_compressBound(0, &prefs_)) {
+ // Output too small to end frame into
+ return Status::OK();
+ }
+
+ ret = LZ4F_compressEnd(ctx_, dst, dstCapacity, nullptr /* options */);
+ if (LZ4F_isError(ret)) {
+ return LZ4Error(ret, "LZ4 end failed: ");
+ }
+ *bytes_written += static_cast<int64_t>(ret);
+ *should_retry = false;
+ DCHECK_LE(*bytes_written, output_len);
+ return Status::OK();
+}
+
+#undef BEGIN_COMPRESS
+
+// ----------------------------------------------------------------------
+// Lz4 codec implementation
+
+Status Lz4Codec::MakeCompressor(std::shared_ptr<Compressor>* out) {
+ auto ptr = std::make_shared<LZ4Compressor>();
+ RETURN_NOT_OK(ptr->Init());
+ *out = ptr;
+ return Status::OK();
+}
+
+Status Lz4Codec::MakeDecompressor(std::shared_ptr<Decompressor>* out) {
+ auto ptr = std::make_shared<LZ4Decompressor>();
+ RETURN_NOT_OK(ptr->Init());
+ *out = ptr;
+ return Status::OK();
+}
Status Lz4Codec::Decompress(int64_t input_len, const uint8_t* input, int64_t
output_len,
uint8_t* output_buffer) {
diff --git a/cpp/src/arrow/util/compression_lz4.h
b/cpp/src/arrow/util/compression_lz4.h
index 2b7b999..8c4bcf5 100644
--- a/cpp/src/arrow/util/compression_lz4.h
+++ b/cpp/src/arrow/util/compression_lz4.h
@@ -19,6 +19,7 @@
#define ARROW_UTIL_COMPRESSION_LZ4_H
#include <cstdint>
+#include <memory>
#include "arrow/status.h"
#include "arrow/util/compression.h"
@@ -38,6 +39,10 @@ class ARROW_EXPORT Lz4Codec : public Codec {
int64_t MaxCompressedLen(int64_t input_len, const uint8_t* input) override;
+ Status MakeCompressor(std::shared_ptr<Compressor>* out) override;
+
+ Status MakeDecompressor(std::shared_ptr<Decompressor>* out) override;
+
const char* name() const override { return "lz4"; }
};
diff --git a/cpp/src/arrow/util/compression_snappy.cc
b/cpp/src/arrow/util/compression_snappy.cc
index 0f58f18..cc55408 100644
--- a/cpp/src/arrow/util/compression_snappy.cc
+++ b/cpp/src/arrow/util/compression_snappy.cc
@@ -33,6 +33,14 @@ namespace util {
// ----------------------------------------------------------------------
// Snappy implementation
+Status SnappyCodec::MakeCompressor(std::shared_ptr<Compressor>* out) {
+ return Status::NotImplemented("Streaming compression unsupported with
Snappy");
+}
+
+Status SnappyCodec::MakeDecompressor(std::shared_ptr<Decompressor>* out) {
+ return Status::NotImplemented("Streaming decompression unsupported with
Snappy");
+}
+
Status SnappyCodec::Decompress(int64_t input_len, const uint8_t* input,
int64_t ARROW_ARG_UNUSED(output_len),
uint8_t* output_buffer) {
diff --git a/cpp/src/arrow/util/compression_snappy.h
b/cpp/src/arrow/util/compression_snappy.h
index fcbb689..8c25111 100644
--- a/cpp/src/arrow/util/compression_snappy.h
+++ b/cpp/src/arrow/util/compression_snappy.h
@@ -19,6 +19,7 @@
#define ARROW_UTIL_COMPRESSION_SNAPPY_H
#include <cstdint>
+#include <memory>
#include "arrow/status.h"
#include "arrow/util/compression.h"
@@ -37,6 +38,10 @@ class ARROW_EXPORT SnappyCodec : public Codec {
int64_t MaxCompressedLen(int64_t input_len, const uint8_t* input) override;
+ Status MakeCompressor(std::shared_ptr<Compressor>* out);
+
+ Status MakeDecompressor(std::shared_ptr<Decompressor>* out) override;
+
const char* name() const override { return "snappy"; }
};
diff --git a/cpp/src/arrow/util/compression_zlib.cc
b/cpp/src/arrow/util/compression_zlib.cc
index 883b8fe..cb3baff 100644
--- a/cpp/src/arrow/util/compression_zlib.cc
+++ b/cpp/src/arrow/util/compression_zlib.cc
@@ -17,8 +17,10 @@
#include "arrow/util/compression_zlib.h"
+#include <algorithm>
#include <cstdint>
#include <cstring>
+#include <limits>
#include <memory>
#include <sstream>
#include <string>
@@ -33,6 +35,8 @@
namespace arrow {
namespace util {
+constexpr int kGZipDefaultCompressionLevel = 9;
+
// ----------------------------------------------------------------------
// gzip implementation
@@ -48,6 +52,263 @@ static constexpr int GZIP_CODEC = 16;
// Determine if this is libz or gzip from header.
static constexpr int DETECT_CODEC = 32;
+static int CompressionWindowBitsForFormat(GZipCodec::Format format) {
+ int window_bits = WINDOW_BITS;
+ switch (format) {
+ case GZipCodec::DEFLATE:
+ window_bits = -window_bits;
+ break;
+ case GZipCodec::GZIP:
+ window_bits += GZIP_CODEC;
+ break;
+ case GZipCodec::ZLIB:
+ break;
+ }
+ return window_bits;
+}
+
+static int DecompressionWindowBitsForFormat(GZipCodec::Format format) {
+ if (format == GZipCodec::DEFLATE) {
+ return -WINDOW_BITS;
+ } else {
+ /* If not deflate, autodetect format from header */
+ return WINDOW_BITS | DETECT_CODEC;
+ }
+}
+
+// ----------------------------------------------------------------------
+// gzip decompressor implementation
+
+class GZipDecompressor : public Decompressor {
+ public:
+ GZipDecompressor() : initialized_(false) {}
+
+ ~GZipDecompressor() override {
+ if (initialized_) {
+ inflateEnd(&stream_);
+ }
+ }
+
+ Status Init(GZipCodec::Format format) {
+ DCHECK(!initialized_);
+ memset(&stream_, 0, sizeof(stream_));
+ finished_ = false;
+
+ int ret;
+ int window_bits = DecompressionWindowBitsForFormat(format);
+ if ((ret = inflateInit2(&stream_, window_bits)) != Z_OK) {
+ return ZlibError("zlib inflateInit failed: ");
+ } else {
+ initialized_ = true;
+ return Status::OK();
+ }
+ }
+
+ Status Decompress(int64_t input_len, const uint8_t* input, int64_t
output_len,
+ uint8_t* output, int64_t* bytes_read, int64_t*
bytes_written,
+ bool* need_more_output) override {
+ static constexpr auto input_limit =
+ static_cast<int64_t>(std::numeric_limits<uInt>::max());
+ stream_.next_in = const_cast<Bytef*>(reinterpret_cast<const
Bytef*>(input));
+ stream_.avail_in = static_cast<uInt>(std::min(input_len, input_limit));
+ stream_.next_out = reinterpret_cast<Bytef*>(output);
+ stream_.avail_out = static_cast<uInt>(std::min(output_len, input_limit));
+ int ret;
+
+ ret = inflate(&stream_, Z_SYNC_FLUSH);
+ if (ret == Z_DATA_ERROR || ret == Z_STREAM_ERROR || ret == Z_MEM_ERROR) {
+ return ZlibError("zlib inflate failed: ");
+ }
+ if (ret == Z_NEED_DICT) {
+ return ZlibError("zlib inflate failed (need preset dictionary): ");
+ }
+ if (ret == Z_BUF_ERROR) {
+ // No progress was possible
+ *bytes_read = 0;
+ *bytes_written = 0;
+ *need_more_output = true;
+ } else {
+ DCHECK(ret == Z_OK || ret == Z_STREAM_END);
+ // Some progress has been made
+ *bytes_read = input_len - stream_.avail_in;
+ *bytes_written = output_len - stream_.avail_out;
+ *need_more_output = false;
+ }
+ finished_ = (ret == Z_STREAM_END);
+ return Status::OK();
+ }
+
+ bool IsFinished() override { return finished_; }
+
+ protected:
+ Status ZlibError(const char* prefix_msg) {
+ std::stringstream ss;
+ ss << prefix_msg;
+ if (stream_.msg && *stream_.msg) {
+ ss << stream_.msg;
+ } else {
+ ss << "(unknown error)";
+ }
+ return Status::IOError(ss.str());
+ }
+
+ z_stream stream_;
+ bool initialized_;
+ bool finished_;
+};
+
+// ----------------------------------------------------------------------
+// gzip compressor implementation
+
+class GZipCompressor : public Compressor {
+ public:
+ GZipCompressor() : initialized_(false) {}
+
+ ~GZipCompressor() override {
+ if (initialized_) {
+ deflateEnd(&stream_);
+ }
+ }
+
+ Status Init(GZipCodec::Format format) {
+ DCHECK(!initialized_);
+ memset(&stream_, 0, sizeof(stream_));
+
+ int ret;
+ // Initialize to run specified format
+ int window_bits = CompressionWindowBitsForFormat(format);
+ if ((ret = deflateInit2(&stream_, Z_DEFAULT_COMPRESSION, Z_DEFLATED,
window_bits,
+ kGZipDefaultCompressionLevel, Z_DEFAULT_STRATEGY))
!= Z_OK) {
+ return ZlibError("zlib deflateInit failed: ");
+ } else {
+ initialized_ = true;
+ return Status::OK();
+ }
+ }
+
+ Status Compress(int64_t input_len, const uint8_t* input, int64_t output_len,
+ uint8_t* output, int64_t* bytes_read, int64_t*
bytes_written) override;
+
+ Status Flush(int64_t output_len, uint8_t* output, int64_t* bytes_written,
+ bool* should_retry) override;
+
+ Status End(int64_t output_len, uint8_t* output, int64_t* bytes_written,
+ bool* should_retry) override;
+
+ protected:
+ Status ZlibError(const char* prefix_msg) {
+ std::stringstream ss;
+ ss << prefix_msg;
+ if (stream_.msg && *stream_.msg) {
+ ss << stream_.msg;
+ } else {
+ ss << "(unknown error)";
+ }
+ return Status::IOError(ss.str());
+ }
+
+ z_stream stream_;
+ bool initialized_;
+};
+
+Status GZipCompressor::Compress(int64_t input_len, const uint8_t* input,
+ int64_t output_len, uint8_t* output, int64_t*
bytes_read,
+ int64_t* bytes_written) {
+ DCHECK(initialized_) << "Called on non-initialized stream";
+
+ static constexpr auto input_limit =
+ static_cast<int64_t>(std::numeric_limits<uInt>::max());
+
+ stream_.next_in = const_cast<Bytef*>(reinterpret_cast<const Bytef*>(input));
+ stream_.avail_in = static_cast<uInt>(std::min(input_len, input_limit));
+ stream_.next_out = reinterpret_cast<Bytef*>(output);
+ stream_.avail_out = static_cast<uInt>(std::min(output_len, input_limit));
+
+ int64_t ret = 0;
+ ret = deflate(&stream_, Z_NO_FLUSH);
+ if (ret == Z_STREAM_ERROR) {
+ return ZlibError("zlib compress failed: ");
+ }
+ if (ret == Z_OK) {
+ // Some progress has been made
+ *bytes_read = input_len - stream_.avail_in;
+ *bytes_written = output_len - stream_.avail_out;
+ } else {
+ // No progress was possible
+ DCHECK_EQ(ret, Z_BUF_ERROR);
+ *bytes_read = 0;
+ *bytes_written = 0;
+ }
+ return Status::OK();
+}
+
+Status GZipCompressor::Flush(int64_t output_len, uint8_t* output, int64_t*
bytes_written,
+ bool* should_retry) {
+ DCHECK(initialized_) << "Called on non-initialized stream";
+
+ static constexpr auto input_limit =
+ static_cast<int64_t>(std::numeric_limits<uInt>::max());
+
+ stream_.avail_in = 0;
+ stream_.next_out = reinterpret_cast<Bytef*>(output);
+ stream_.avail_out = static_cast<uInt>(std::min(output_len, input_limit));
+
+ int64_t ret = 0;
+ ret = deflate(&stream_, Z_SYNC_FLUSH);
+ if (ret == Z_STREAM_ERROR) {
+ return ZlibError("zlib flush failed: ");
+ }
+ if (ret == Z_OK) {
+ *bytes_written = output_len - stream_.avail_out;
+ } else {
+ DCHECK_EQ(ret, Z_BUF_ERROR);
+ *bytes_written = 0;
+ }
+ // "If deflate returns with avail_out == 0, this function must be called
+ // again with the same value of the flush parameter and more output space
+ // (updated avail_out), until the flush is complete (deflate returns
+ // with non-zero avail_out)."
+ *should_retry = (*bytes_written == 0);
+ return Status::OK();
+}
+
+Status GZipCompressor::End(int64_t output_len, uint8_t* output, int64_t*
bytes_written,
+ bool* should_retry) {
+ DCHECK(initialized_) << "Called on non-initialized stream";
+
+ static constexpr auto input_limit =
+ static_cast<int64_t>(std::numeric_limits<uInt>::max());
+
+ stream_.avail_in = 0;
+ stream_.next_out = reinterpret_cast<Bytef*>(output);
+ stream_.avail_out = static_cast<uInt>(std::min(output_len, input_limit));
+
+ int64_t ret = 0;
+ ret = deflate(&stream_, Z_FINISH);
+ if (ret == Z_STREAM_ERROR) {
+ return ZlibError("zlib flush failed: ");
+ }
+ *bytes_written = output_len - stream_.avail_out;
+ if (ret == Z_STREAM_END) {
+ // Flush complete, we can now end the stream
+ *should_retry = false;
+ initialized_ = false;
+ ret = deflateEnd(&stream_);
+ if (ret == Z_OK) {
+ return Status::OK();
+ } else {
+ return ZlibError("zlib end failed: ");
+ }
+ } else {
+ // Not everything could be flushed,
+ *should_retry = true;
+ return Status::OK();
+ }
+}
+
+// ----------------------------------------------------------------------
+// gzip codec implementation
+
class GZipCodec::GZipCodecImpl {
public:
explicit GZipCodecImpl(GZipCodec::Format format)
@@ -60,20 +321,29 @@ class GZipCodec::GZipCodecImpl {
EndDecompressor();
}
+ Status MakeCompressor(std::shared_ptr<Compressor>* out) {
+ auto ptr = std::make_shared<GZipCompressor>();
+ RETURN_NOT_OK(ptr->Init(format_));
+ *out = ptr;
+ return Status::OK();
+ }
+
+ Status MakeDecompressor(std::shared_ptr<Decompressor>* out) {
+ auto ptr = std::make_shared<GZipDecompressor>();
+ RETURN_NOT_OK(ptr->Init(format_));
+ *out = ptr;
+ return Status::OK();
+ }
+
Status InitCompressor() {
EndDecompressor();
memset(&stream_, 0, sizeof(stream_));
int ret;
// Initialize to run specified format
- int window_bits = WINDOW_BITS;
- if (format_ == DEFLATE) {
- window_bits = -window_bits;
- } else if (format_ == GZIP) {
- window_bits += GZIP_CODEC;
- }
- if ((ret = deflateInit2(&stream_, Z_DEFAULT_COMPRESSION, Z_DEFLATED,
window_bits, 9,
- Z_DEFAULT_STRATEGY)) != Z_OK) {
+ int window_bits = CompressionWindowBitsForFormat(format_);
+ if ((ret = deflateInit2(&stream_, Z_DEFAULT_COMPRESSION, Z_DEFLATED,
window_bits,
+ kGZipDefaultCompressionLevel, Z_DEFAULT_STRATEGY))
!= Z_OK) {
std::stringstream ss;
ss << "zlib deflateInit failed: " << std::string(stream_.msg);
return Status::IOError(ss.str());
@@ -95,7 +365,7 @@ class GZipCodec::GZipCodecImpl {
int ret;
// Initialize to run either deflate or zlib/gzip format
- int window_bits = format_ == DEFLATE ? -WINDOW_BITS : WINDOW_BITS |
DETECT_CODEC;
+ int window_bits = DecompressionWindowBitsForFormat(format_);
if ((ret = inflateInit2(&stream_, window_bits)) != Z_OK) {
std::stringstream ss;
ss << "zlib inflateInit failed: " << std::string(stream_.msg);
@@ -249,6 +519,14 @@ Status GZipCodec::Compress(int64_t input_length, const
uint8_t* input,
return impl_->Compress(input_length, input, output_buffer_len, output,
output_length);
}
+Status GZipCodec::MakeCompressor(std::shared_ptr<Compressor>* out) {
+ return impl_->MakeCompressor(out);
+}
+
+Status GZipCodec::MakeDecompressor(std::shared_ptr<Decompressor>* out) {
+ return impl_->MakeDecompressor(out);
+}
+
const char* GZipCodec::name() const { return "gzip"; }
} // namespace util
diff --git a/cpp/src/arrow/util/compression_zlib.h
b/cpp/src/arrow/util/compression_zlib.h
index 1e66728..f934198 100644
--- a/cpp/src/arrow/util/compression_zlib.h
+++ b/cpp/src/arrow/util/compression_zlib.h
@@ -49,6 +49,10 @@ class ARROW_EXPORT GZipCodec : public Codec {
int64_t MaxCompressedLen(int64_t input_len, const uint8_t* input) override;
+ Status MakeCompressor(std::shared_ptr<Compressor>* out) override;
+
+ Status MakeDecompressor(std::shared_ptr<Decompressor>* out) override;
+
const char* name() const override;
private:
diff --git a/cpp/src/arrow/util/compression_zstd.cc
b/cpp/src/arrow/util/compression_zstd.cc
index 4b9feee..4064f29 100644
--- a/cpp/src/arrow/util/compression_zstd.cc
+++ b/cpp/src/arrow/util/compression_zstd.cc
@@ -19,6 +19,7 @@
#include <cstddef>
#include <cstdint>
+#include <sstream>
#include <zstd.h>
@@ -30,8 +31,178 @@ using std::size_t;
namespace arrow {
namespace util {
+// XXX level = 1 probably doesn't compress very much
+constexpr int kZSTDDefaultCompressionLevel = 1;
+
+// ----------------------------------------------------------------------
+// ZSTD decompressor implementation
+
+class ZSTDDecompressor : public Decompressor {
+ public:
+ ZSTDDecompressor() : stream_(ZSTD_createDStream()) {}
+
+ ~ZSTDDecompressor() override { ZSTD_freeDStream(stream_); }
+
+ Status Init() {
+ finished_ = false;
+ size_t ret = ZSTD_initDStream(stream_);
+ if (ZSTD_isError(ret)) {
+ return ZSTDError(ret, "zstd init failed: ");
+ } else {
+ return Status::OK();
+ }
+ }
+
+ Status Decompress(int64_t input_len, const uint8_t* input, int64_t
output_len,
+ uint8_t* output, int64_t* bytes_read, int64_t*
bytes_written,
+ bool* need_more_output) override {
+ ZSTD_inBuffer in_buf;
+ ZSTD_outBuffer out_buf;
+
+ in_buf.src = input;
+ in_buf.size = static_cast<size_t>(input_len);
+ in_buf.pos = 0;
+ out_buf.dst = output;
+ out_buf.size = static_cast<size_t>(output_len);
+ out_buf.pos = 0;
+
+ size_t ret;
+ ret = ZSTD_decompressStream(stream_, &out_buf, &in_buf);
+ if (ZSTD_isError(ret)) {
+ return ZSTDError(ret, "zstd decompress failed: ");
+ }
+ *bytes_read = static_cast<int64_t>(in_buf.pos);
+ *bytes_written = static_cast<int64_t>(out_buf.pos);
+ *need_more_output = *bytes_read == 0 && *bytes_written == 0;
+ finished_ = (ret == 0);
+ return Status::OK();
+ }
+
+ bool IsFinished() override { return finished_; }
+
+ protected:
+ Status ZSTDError(size_t ret, const char* prefix_msg) {
+ std::stringstream ss;
+ ss << prefix_msg << ZSTD_getErrorName(ret);
+ return Status::IOError(ss.str());
+ }
+
+ ZSTD_DStream* stream_;
+ bool finished_;
+};
+
// ----------------------------------------------------------------------
-// ZSTD implementation
+// ZSTD compressor implementation
+
+class ZSTDCompressor : public Compressor {
+ public:
+ ZSTDCompressor() : stream_(ZSTD_createCStream()) {}
+
+ ~ZSTDCompressor() override { ZSTD_freeCStream(stream_); }
+
+ Status Init() {
+ size_t ret = ZSTD_initCStream(stream_, kZSTDDefaultCompressionLevel);
+ if (ZSTD_isError(ret)) {
+ return ZSTDError(ret, "zstd init failed: ");
+ } else {
+ return Status::OK();
+ }
+ }
+
+ Status Compress(int64_t input_len, const uint8_t* input, int64_t output_len,
+ uint8_t* output, int64_t* bytes_read, int64_t*
bytes_written) override;
+
+ Status Flush(int64_t output_len, uint8_t* output, int64_t* bytes_written,
+ bool* should_retry) override;
+
+ Status End(int64_t output_len, uint8_t* output, int64_t* bytes_written,
+ bool* should_retry) override;
+
+ protected:
+ Status ZSTDError(size_t ret, const char* prefix_msg) {
+ std::stringstream ss;
+ ss << prefix_msg << ZSTD_getErrorName(ret);
+ return Status::IOError(ss.str());
+ }
+
+ ZSTD_CStream* stream_;
+};
+
+Status ZSTDCompressor::Compress(int64_t input_len, const uint8_t* input,
+ int64_t output_len, uint8_t* output, int64_t*
bytes_read,
+ int64_t* bytes_written) {
+ ZSTD_inBuffer in_buf;
+ ZSTD_outBuffer out_buf;
+
+ in_buf.src = input;
+ in_buf.size = static_cast<size_t>(input_len);
+ in_buf.pos = 0;
+ out_buf.dst = output;
+ out_buf.size = static_cast<size_t>(output_len);
+ out_buf.pos = 0;
+
+ size_t ret;
+ ret = ZSTD_compressStream(stream_, &out_buf, &in_buf);
+ if (ZSTD_isError(ret)) {
+ return ZSTDError(ret, "zstd compress failed: ");
+ }
+ *bytes_read = static_cast<int64_t>(in_buf.pos);
+ *bytes_written = static_cast<int64_t>(out_buf.pos);
+ return Status::OK();
+}
+
+Status ZSTDCompressor::Flush(int64_t output_len, uint8_t* output, int64_t*
bytes_written,
+ bool* should_retry) {
+ ZSTD_outBuffer out_buf;
+
+ out_buf.dst = output;
+ out_buf.size = static_cast<size_t>(output_len);
+ out_buf.pos = 0;
+
+ size_t ret;
+ ret = ZSTD_flushStream(stream_, &out_buf);
+ if (ZSTD_isError(ret)) {
+ return ZSTDError(ret, "zstd flush failed: ");
+ }
+ *bytes_written = static_cast<int64_t>(out_buf.pos);
+ *should_retry = ret > 0;
+ return Status::OK();
+}
+
+Status ZSTDCompressor::End(int64_t output_len, uint8_t* output, int64_t*
bytes_written,
+ bool* should_retry) {
+ ZSTD_outBuffer out_buf;
+
+ out_buf.dst = output;
+ out_buf.size = static_cast<size_t>(output_len);
+ out_buf.pos = 0;
+
+ size_t ret;
+ ret = ZSTD_endStream(stream_, &out_buf);
+ if (ZSTD_isError(ret)) {
+ return ZSTDError(ret, "zstd end failed: ");
+ }
+ *bytes_written = static_cast<int64_t>(out_buf.pos);
+ *should_retry = ret > 0;
+ return Status::OK();
+}
+
+// ----------------------------------------------------------------------
+// ZSTD codec implementation
+
+Status ZSTDCodec::MakeCompressor(std::shared_ptr<Compressor>* out) {
+ auto ptr = std::make_shared<ZSTDCompressor>();
+ RETURN_NOT_OK(ptr->Init());
+ *out = ptr;
+ return Status::OK();
+}
+
+Status ZSTDCodec::MakeDecompressor(std::shared_ptr<Decompressor>* out) {
+ auto ptr = std::make_shared<ZSTDDecompressor>();
+ RETURN_NOT_OK(ptr->Init());
+ *out = ptr;
+ return Status::OK();
+}
Status ZSTDCodec::Decompress(int64_t input_len, const uint8_t* input, int64_t
output_len,
uint8_t* output_buffer) {
@@ -52,8 +223,9 @@ int64_t ZSTDCodec::MaxCompressedLen(int64_t input_len,
Status ZSTDCodec::Compress(int64_t input_len, const uint8_t* input,
int64_t output_buffer_len, uint8_t* output_buffer,
int64_t* output_length) {
- *output_length = ZSTD_compress(output_buffer,
static_cast<size_t>(output_buffer_len),
- input, static_cast<size_t>(input_len), 1);
+ *output_length =
+ ZSTD_compress(output_buffer, static_cast<size_t>(output_buffer_len),
input,
+ static_cast<size_t>(input_len),
kZSTDDefaultCompressionLevel);
if (ZSTD_isError(*output_length)) {
return Status::IOError("ZSTD compression failure.");
}
diff --git a/cpp/src/arrow/util/compression_zstd.h
b/cpp/src/arrow/util/compression_zstd.h
index 8ebfc2a..06da152 100644
--- a/cpp/src/arrow/util/compression_zstd.h
+++ b/cpp/src/arrow/util/compression_zstd.h
@@ -19,6 +19,7 @@
#define ARROW_UTIL_COMPRESSION_ZSTD_H
#include <cstdint>
+#include <memory>
#include "arrow/status.h"
#include "arrow/util/compression.h"
@@ -38,6 +39,10 @@ class ARROW_EXPORT ZSTDCodec : public Codec {
int64_t MaxCompressedLen(int64_t input_len, const uint8_t* input) override;
+ Status MakeCompressor(std::shared_ptr<Compressor>* out) override;
+
+ Status MakeDecompressor(std::shared_ptr<Decompressor>* out) override;
+
const char* name() const override { return "zstd"; }
};