This is an automated email from the ASF dual-hosted git repository.
yiguolei pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/doris.git
The following commit(s) were added to refs/heads/master by this push:
new 3c4c4b1a87 [feature-wip](parquet-reader) add gzip compression codec
(#12488)
3c4c4b1a87 is described below
commit 3c4c4b1a872eed775166ecd797b3f397cc89160e
Author: Ashin Gau <[email protected]>
AuthorDate: Fri Sep 9 09:10:25 2022 +0800
[feature-wip](parquet-reader) add gzip compression codec (#12488)
Query failed when reading parquet data compressed by GZIP:
mysql> select * from customer limit 1;
ERROR 1105 (HY000): errCode = 2, detailMessage = unknown compression
type(GZIP)
---
be/src/util/block_compression.cpp | 78 ++++++++++++++++++++++++++++++++++++++-
1 file changed, 77 insertions(+), 1 deletion(-)
diff --git a/be/src/util/block_compression.cpp
b/be/src/util/block_compression.cpp
index 1dafd20f54..4c58a7f86a 100644
--- a/be/src/util/block_compression.cpp
+++ b/be/src/util/block_compression.cpp
@@ -512,6 +512,79 @@ private:
ZSTD_DCtx* ctx_d = nullptr;
};
+class GzipBlockCompression final : public ZlibBlockCompression {
+public:
+ ~GzipBlockCompression() override = default;
+
+ Status decompress(const Slice& input, Slice* output) const override {
+ z_stream z_strm = {nullptr};
+ z_strm.zalloc = Z_NULL;
+ z_strm.zfree = Z_NULL;
+ z_strm.opaque = Z_NULL;
+
+ int ret = inflateInit2(&z_strm, MAX_WBITS + GZIP_CODEC);
+ if (ret != Z_OK) {
+ return Status::InternalError("Fail to do ZLib stream compress,
error={}, res={}",
+ zError(ret), ret);
+ }
+
+ // 1. set input and output
+ z_strm.next_in = reinterpret_cast<Bytef*>(input.data);
+ z_strm.avail_in = input.size;
+ z_strm.next_out = reinterpret_cast<Bytef*>(output->data);
+ z_strm.avail_out = output->size;
+
+ if (z_strm.avail_out > 0) {
+ // We only support non-streaming use case for block decompressor
+ ret = inflate(&z_strm, Z_FINISH);
+ if (ret != Z_OK && ret != Z_STREAM_END) {
+ (void)inflateEnd(&z_strm);
+ return Status::InternalError("Fail to do ZLib stream compress,
error={}, res={}",
+ zError(ret), ret);
+ }
+ }
+ (void)inflateEnd(&z_strm);
+
+ return Status::OK();
+ }
+
+ size_t max_compressed_len(size_t len) const override {
+ z_stream zstrm;
+ zstrm.zalloc = Z_NULL;
+ zstrm.zfree = Z_NULL;
+ zstrm.opaque = Z_NULL;
+ auto zres = deflateInit2(&zstrm, Z_DEFAULT_COMPRESSION, Z_DEFLATED,
MAX_WBITS + GZIP_CODEC,
+ MEM_LEVEL, Z_DEFAULT_STRATEGY);
+ if (zres != Z_OK) {
+ // Fall back to zlib estimate logic for deflate, notice this may
+ // cause decompress error
+ LOG(WARNING) << "Fail to do ZLib stream compress, error=" <<
zError(zres)
+ << ", res=" << zres;
+ return ZlibBlockCompression::max_compressed_len(len);
+ } else {
+ zres = deflateEnd(&zstrm);
+ if (zres != Z_OK) {
+ LOG(WARNING) << "Fail to do deflateEnd on ZLib stream, error="
<< zError(zres)
+ << ", res=" << zres;
+ }
+ // Mark, maintainer of zlib, has stated that 12 needs to be added
to
+ // result for gzip
+ //
http://compgroups.net/comp.unix.programmer/gzip-compressing-an-in-memory-string-usi/54854
+ // To have a safe upper bound for "wrapper variations", we add 32
to
+ // estimate
+ int upper_bound = deflateBound(&zstrm, len) + 32;
+ return upper_bound;
+ }
+ }
+
+private:
+ // Magic number for zlib, see https://zlib.net/manual.html for more
details.
+ const static int GZIP_CODEC = 16; // gzip
+ // The memLevel parameter specifies how much memory should be allocated for
+ // the internal compression state.
+ const static int MEM_LEVEL = 8;
+};
+
Status get_block_compression_codec(segment_v2::CompressionTypePB type,
std::unique_ptr<BlockCompressionCodec>&
codec) {
BlockCompressionCodec* ptr = nullptr;
@@ -566,8 +639,11 @@ Status
get_block_compression_codec(tparquet::CompressionCodec::type parquet_code
case tparquet::CompressionCodec::ZSTD:
ptr = new ZstdBlockCompression();
break;
+ case tparquet::CompressionCodec::GZIP:
+ ptr = new GzipBlockCompression();
+ break;
default:
- return Status::NotFound("unknown compression type({})", parquet_codec);
+ return Status::NotFound("unknown compression type({})",
tparquet::to_string(parquet_codec));
}
Status st = ptr->init();
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]