This is an automated email from the ASF dual-hosted git repository. yiguolei pushed a commit to branch branch-2.1 in repository https://gitbox.apache.org/repos/asf/doris.git
commit c0fd98abe50c75d500524feaac9ed74a52835c6e Author: Tiewei Fang <43782773+bepppo...@users.noreply.github.com> AuthorDate: Tue May 21 09:57:15 2024 +0800 [Fix](tvf) Fix that tvf reading empty files in compressed formats. (#34926) 1. Fix the issue with tvf reading empty compressed files. 2. move two test cases (`test_local_tvf_compression` and `test_s3_tvf_compression`) from p2 to p0 --- be/src/exec/decompressor.cpp | 27 +++++++++++--- .../file_reader/new_plain_text_line_reader.cpp | 3 +- .../ExternalFileTableValuedFunction.java | 40 ++++++++++++++++++++- .../tvf/compress/test_empty_snappy.snappy | Bin 0 -> 4 bytes .../tvf/compress/test_tvf.csv.bz2 | Bin .../tvf/compress/test_tvf.csv.deflate | Bin .../tvf/compress/test_tvf.csv.gz | Bin .../tvf/compress/test_tvf.csv.lz4 | Bin .../tvf/compress/test_tvf.csv.snappy | Bin .../tvf/test_local_tvf_compression.out | 2 ++ .../tvf/test_s3_tvf_compression.out | 0 .../tvf/test_local_tvf_compression.groovy | 14 ++++++-- .../tvf/test_s3_tvf_compression.groovy | 2 +- 13 files changed, 78 insertions(+), 10 deletions(-) diff --git a/be/src/exec/decompressor.cpp b/be/src/exec/decompressor.cpp index c9f16f10e7a..d8d02c9cf9e 100644 --- a/be/src/exec/decompressor.cpp +++ b/be/src/exec/decompressor.cpp @@ -557,6 +557,10 @@ Status SnappyBlockDecompressor::decompress(uint8_t* input, size_t input_len, // See: // https://github.com/apache/hadoop/blob/trunk/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-nativetask/src/main/native/src/codec/SnappyCodec.cc while (remaining_input_size > 0) { + if (remaining_input_size < 4) { + *more_input_bytes = 4 - remaining_input_size; + break; + } // Read uncompressed size uint32_t uncompressed_block_len = Decompressor::_read_int32(src); int64_t remaining_output_len = output_max_len - uncompressed_total_len; @@ -566,12 +570,24 @@ Status SnappyBlockDecompressor::decompress(uint8_t* input, size_t input_len, break; } + if (uncompressed_block_len == 0) { + remaining_input_size -= sizeof(uint32_t); + break; + } + + if (remaining_input_size <= 2 * sizeof(uint32_t)) { + // The remaining input size should be larger then <uncompressed size><compressed size><compressed data> + // +1 means we need at least 1 bytes of compressed data. + *more_input_bytes = 2 * sizeof(uint32_t) + 1 - remaining_input_size; + break; + } + // Read compressed size - size_t tmp_src_size = remaining_input_size - sizeof(uint32_t); + size_t tmp_remaining_size = remaining_input_size - 2 * sizeof(uint32_t); size_t compressed_len = _read_int32(src + sizeof(uint32_t)); - if (compressed_len == 0 || compressed_len > tmp_src_size) { + if (compressed_len > tmp_remaining_size) { // Need more input data - *more_input_bytes = compressed_len - tmp_src_size; + *more_input_bytes = compressed_len - tmp_remaining_size; break; } @@ -590,8 +606,9 @@ Status SnappyBlockDecompressor::decompress(uint8_t* input, size_t input_len, // Decompress if (!snappy::RawUncompress(reinterpret_cast<const char*>(src), compressed_len, reinterpret_cast<char*>(output))) { - return Status::InternalError("snappy block decompress failed. uncompressed_len: {}", - uncompressed_block_len); + return Status::InternalError( + "snappy block decompress failed. uncompressed_len: {}, compressed_len: {}", + uncompressed_block_len, compressed_len); } output += uncompressed_block_len; diff --git a/be/src/vec/exec/format/file_reader/new_plain_text_line_reader.cpp b/be/src/vec/exec/format/file_reader/new_plain_text_line_reader.cpp index fefd5ecae67..8dce6e589af 100644 --- a/be/src/vec/exec/format/file_reader/new_plain_text_line_reader.cpp +++ b/be/src/vec/exec/format/file_reader/new_plain_text_line_reader.cpp @@ -444,7 +444,8 @@ Status NewPlainTextLineReader::read_line(const uint8_t** ptr, size_t* size, bool std::stringstream ss; ss << "decompress made no progress." << " input_read_bytes: " << input_read_bytes - << " decompressed_len: " << decompressed_len; + << " decompressed_len: " << decompressed_len + << " input len: " << (_input_buf_limit - _input_buf_pos); LOG(WARNING) << ss.str(); return Status::InternalError(ss.str()); } diff --git a/fe/fe-core/src/main/java/org/apache/doris/tablefunction/ExternalFileTableValuedFunction.java b/fe/fe-core/src/main/java/org/apache/doris/tablefunction/ExternalFileTableValuedFunction.java index 260f7b2df44..0a295e1b555 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/tablefunction/ExternalFileTableValuedFunction.java +++ b/fe/fe-core/src/main/java/org/apache/doris/tablefunction/ExternalFileTableValuedFunction.java @@ -482,7 +482,7 @@ public abstract class ExternalFileTableValuedFunction extends TableValuedFunctio // get first file, used to parse table schema TBrokerFileStatus firstFile = null; for (TBrokerFileStatus fileStatus : fileStatuses) { - if (fileStatus.isIsDir() || fileStatus.size == 0) { + if (isFileContentEmpty(fileStatus)) { continue; } firstFile = fileStatus; @@ -514,5 +514,43 @@ public abstract class ExternalFileTableValuedFunction extends TableValuedFunctio return InternalService.PFetchTableSchemaRequest.newBuilder() .setFileScanRange(ByteString.copyFrom(new TSerializer().serialize(fileScanRange))).build(); } + + private boolean isFileContentEmpty(TBrokerFileStatus fileStatus) { + if (fileStatus.isIsDir() || fileStatus.size == 0) { + return true; + } + if (Util.isCsvFormat(fileFormatType) || fileFormatType == TFileFormatType.FORMAT_JSON) { + int magicNumberBytes = 0; + switch (compressionType) { + case GZ: + magicNumberBytes = 20; + break; + case LZO: + case LZOP: + magicNumberBytes = 42; + break; + case DEFLATE: + magicNumberBytes = 8; + break; + case SNAPPYBLOCK: + case LZ4BLOCK: + case LZ4FRAME: + magicNumberBytes = 4; + break; + case BZ2: + magicNumberBytes = 14; + break; + case UNKNOWN: + case PLAIN: + default: + break; + } + // fileStatus.size may be -1 in http_stream + if (fileStatus.size >= 0 && fileStatus.size <= magicNumberBytes) { + return true; + } + } + return false; + } } diff --git a/regression-test/data/external_table_p0/tvf/compress/test_empty_snappy.snappy b/regression-test/data/external_table_p0/tvf/compress/test_empty_snappy.snappy new file mode 100644 index 00000000000..593f4708db8 Binary files /dev/null and b/regression-test/data/external_table_p0/tvf/compress/test_empty_snappy.snappy differ diff --git a/regression-test/data/external_table_p2/tvf/compress/test_tvf.csv.bz2 b/regression-test/data/external_table_p0/tvf/compress/test_tvf.csv.bz2 similarity index 100% rename from regression-test/data/external_table_p2/tvf/compress/test_tvf.csv.bz2 rename to regression-test/data/external_table_p0/tvf/compress/test_tvf.csv.bz2 diff --git a/regression-test/data/external_table_p2/tvf/compress/test_tvf.csv.deflate b/regression-test/data/external_table_p0/tvf/compress/test_tvf.csv.deflate similarity index 100% rename from regression-test/data/external_table_p2/tvf/compress/test_tvf.csv.deflate rename to regression-test/data/external_table_p0/tvf/compress/test_tvf.csv.deflate diff --git a/regression-test/data/external_table_p2/tvf/compress/test_tvf.csv.gz b/regression-test/data/external_table_p0/tvf/compress/test_tvf.csv.gz similarity index 100% rename from regression-test/data/external_table_p2/tvf/compress/test_tvf.csv.gz rename to regression-test/data/external_table_p0/tvf/compress/test_tvf.csv.gz diff --git a/regression-test/data/external_table_p2/tvf/compress/test_tvf.csv.lz4 b/regression-test/data/external_table_p0/tvf/compress/test_tvf.csv.lz4 similarity index 100% rename from regression-test/data/external_table_p2/tvf/compress/test_tvf.csv.lz4 rename to regression-test/data/external_table_p0/tvf/compress/test_tvf.csv.lz4 diff --git a/regression-test/data/external_table_p2/tvf/compress/test_tvf.csv.snappy b/regression-test/data/external_table_p0/tvf/compress/test_tvf.csv.snappy similarity index 100% rename from regression-test/data/external_table_p2/tvf/compress/test_tvf.csv.snappy rename to regression-test/data/external_table_p0/tvf/compress/test_tvf.csv.snappy diff --git a/regression-test/data/external_table_p2/tvf/test_local_tvf_compression.out b/regression-test/data/external_table_p0/tvf/test_local_tvf_compression.out similarity index 99% rename from regression-test/data/external_table_p2/tvf/test_local_tvf_compression.out rename to regression-test/data/external_table_p0/tvf/test_local_tvf_compression.out index 19699b0dc5d..8120427ea6c 100644 --- a/regression-test/data/external_table_p2/tvf/test_local_tvf_compression.out +++ b/regression-test/data/external_table_p0/tvf/test_local_tvf_compression.out @@ -148,3 +148,5 @@ -- !snappy_2 -- +-- !snappy_empty -- + diff --git a/regression-test/data/external_table_p2/tvf/test_s3_tvf_compression.out b/regression-test/data/external_table_p0/tvf/test_s3_tvf_compression.out similarity index 100% rename from regression-test/data/external_table_p2/tvf/test_s3_tvf_compression.out rename to regression-test/data/external_table_p0/tvf/test_s3_tvf_compression.out diff --git a/regression-test/suites/external_table_p2/tvf/test_local_tvf_compression.groovy b/regression-test/suites/external_table_p0/tvf/test_local_tvf_compression.groovy similarity index 93% rename from regression-test/suites/external_table_p2/tvf/test_local_tvf_compression.groovy rename to regression-test/suites/external_table_p0/tvf/test_local_tvf_compression.groovy index 343afab56f9..420887fb41d 100644 --- a/regression-test/suites/external_table_p2/tvf/test_local_tvf_compression.groovy +++ b/regression-test/suites/external_table_p0/tvf/test_local_tvf_compression.groovy @@ -17,11 +17,11 @@ import org.junit.Assert // specific language governing permissions and limitations // under the License. -suite("test_local_tvf_compression", "p2,external,tvf,external_remote,external_remote_tvf") { +suite("test_local_tvf_compression", "p0,tvf") { List<List<Object>> backends = sql """ show backends """ assertTrue(backends.size() > 0) def be_id = backends[0][0] - def dataFilePath = context.config.dataPath + "/external_table_p2/tvf/compress" + def dataFilePath = context.config.dataPath + "/external_table_p0/tvf/compress" def outFilePath="/compress" @@ -136,6 +136,16 @@ suite("test_local_tvf_compression", "p2,external,tvf,external_remote,external_re "compress_type" ="${compress_type}block") where c2="abcd" order by c3 limit 22 ; """ + // test empty snapppy file + qt_snappy_empty """ + select * from local( + "file_path" = "${outFilePath}/test_empty_snappy.${compress_type}", + "backend_id" = "${be_id}", + "format" = "csv", + "column_separator" = ",", + "compress_type" ="${compress_type}block"); + """ + // test error case test { sql """ diff --git a/regression-test/suites/external_table_p2/tvf/test_s3_tvf_compression.groovy b/regression-test/suites/external_table_p0/tvf/test_s3_tvf_compression.groovy similarity index 98% rename from regression-test/suites/external_table_p2/tvf/test_s3_tvf_compression.groovy rename to regression-test/suites/external_table_p0/tvf/test_s3_tvf_compression.groovy index 279fcb5e8a5..5eb63c94ddc 100644 --- a/regression-test/suites/external_table_p2/tvf/test_s3_tvf_compression.groovy +++ b/regression-test/suites/external_table_p0/tvf/test_s3_tvf_compression.groovy @@ -15,7 +15,7 @@ // specific language governing permissions and limitations // under the License. -suite("test_s3_tvf_compression", "p2,external,tvf,external_remote,external_remote_tvf") { +suite("test_s3_tvf_compression", "p0") { String ak = getS3AK() String sk = getS3SK() --------------------------------------------------------------------- To unsubscribe, e-mail: commits-unsubscr...@doris.apache.org For additional commands, e-mail: commits-h...@doris.apache.org