This is an automated email from the ASF dual-hosted git repository.

morningman pushed a commit to branch branch-2.0
in repository https://gitbox.apache.org/repos/asf/doris.git


The following commit(s) were added to refs/heads/branch-2.0 by this push:
     new ca57582a192 [Fix](tvf) Fix that tvf reading empty files in compressed 
formats. (#34926) (#35141)
ca57582a192 is described below

commit ca57582a192a82a70bd584cb4f6e075c3280896a
Author: Tiewei Fang <43782773+bepppo...@users.noreply.github.com>
AuthorDate: Thu May 23 00:05:28 2024 +0800

    [Fix](tvf) Fix that tvf reading empty files in compressed formats. (#34926) 
(#35141)
    
    bp: #34926
---
 be/src/exec/decompressor.cpp                       |  27 +++++++++++---
 .../file_reader/new_plain_text_line_reader.cpp     |   3 +-
 .../ExternalFileTableValuedFunction.java           |  40 ++++++++++++++++++++-
 .../tvf/compress/test_empty_snappy.snappy          | Bin 0 -> 4 bytes
 .../tvf/compress/test_tvf.csv.bz2                  | Bin
 .../tvf/compress/test_tvf.csv.deflate              | Bin
 .../tvf/compress/test_tvf.csv.gz                   | Bin
 .../tvf/compress/test_tvf.csv.lz4                  | Bin
 .../tvf/compress/test_tvf.csv.snappy               | Bin
 .../tvf/test_local_tvf_compression.out             |   2 ++
 .../tvf/test_s3_tvf_compression.out                |   0
 .../tvf/test_local_tvf_compression.groovy          |  14 ++++++--
 .../tvf/test_s3_tvf_compression.groovy             |   2 +-
 13 files changed, 78 insertions(+), 10 deletions(-)

diff --git a/be/src/exec/decompressor.cpp b/be/src/exec/decompressor.cpp
index c0dd7554942..37731362ee6 100644
--- a/be/src/exec/decompressor.cpp
+++ b/be/src/exec/decompressor.cpp
@@ -478,6 +478,10 @@ Status SnappyBlockDecompressor::decompress(uint8_t* input, 
size_t input_len,
     // See:
     // 
https://github.com/apache/hadoop/blob/trunk/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-nativetask/src/main/native/src/codec/SnappyCodec.cc
     while (remaining_input_size > 0) {
+        if (remaining_input_size < 4) {
+            *more_input_bytes = 4 - remaining_input_size;
+            break;
+        }
         // Read uncompressed size
         uint32_t uncompressed_block_len = Decompressor::_read_int32(src);
         int64_t remaining_output_len = output_max_len - uncompressed_total_len;
@@ -487,12 +491,24 @@ Status SnappyBlockDecompressor::decompress(uint8_t* 
input, size_t input_len,
             break;
         }
 
+        if (uncompressed_block_len == 0) {
+            remaining_input_size -= sizeof(uint32_t);
+            break;
+        }
+
+        if (remaining_input_size <= 2 * sizeof(uint32_t)) {
+            // The remaining input size should be larger then <uncompressed 
size><compressed size><compressed data>
+            // +1 means we need at least 1 bytes of compressed data.
+            *more_input_bytes = 2 * sizeof(uint32_t) + 1 - 
remaining_input_size;
+            break;
+        }
+
         // Read compressed size
-        size_t tmp_src_size = remaining_input_size - sizeof(uint32_t);
+        size_t tmp_remaining_size = remaining_input_size - 2 * 
sizeof(uint32_t);
         size_t compressed_len = _read_int32(src + sizeof(uint32_t));
-        if (compressed_len == 0 || compressed_len > tmp_src_size) {
+        if (compressed_len > tmp_remaining_size) {
             // Need more input data
-            *more_input_bytes = compressed_len - tmp_src_size;
+            *more_input_bytes = compressed_len - tmp_remaining_size;
             break;
         }
 
@@ -511,8 +527,9 @@ Status SnappyBlockDecompressor::decompress(uint8_t* input, 
size_t input_len,
         // Decompress
         if (!snappy::RawUncompress(reinterpret_cast<const char*>(src), 
compressed_len,
                                    reinterpret_cast<char*>(output))) {
-            return Status::InternalError("snappy block decompress failed. 
uncompressed_len: {}",
-                                         uncompressed_block_len);
+            return Status::InternalError(
+                    "snappy block decompress failed. uncompressed_len: {}, 
compressed_len: {}",
+                    uncompressed_block_len, compressed_len);
         }
 
         output += uncompressed_block_len;
diff --git a/be/src/vec/exec/format/file_reader/new_plain_text_line_reader.cpp 
b/be/src/vec/exec/format/file_reader/new_plain_text_line_reader.cpp
index 5e0d4a0d5ae..04953a2b54d 100644
--- a/be/src/vec/exec/format/file_reader/new_plain_text_line_reader.cpp
+++ b/be/src/vec/exec/format/file_reader/new_plain_text_line_reader.cpp
@@ -444,7 +444,8 @@ Status NewPlainTextLineReader::read_line(const uint8_t** 
ptr, size_t* size, bool
                     std::stringstream ss;
                     ss << "decompress made no progress."
                        << " input_read_bytes: " << input_read_bytes
-                       << " decompressed_len: " << decompressed_len;
+                       << " decompressed_len: " << decompressed_len
+                       << " input len: " << (_input_buf_limit - 
_input_buf_pos);
                     LOG(WARNING) << ss.str();
                     return Status::InternalError(ss.str());
                 }
diff --git 
a/fe/fe-core/src/main/java/org/apache/doris/tablefunction/ExternalFileTableValuedFunction.java
 
b/fe/fe-core/src/main/java/org/apache/doris/tablefunction/ExternalFileTableValuedFunction.java
index eabe82804c3..8f25ca30143 100644
--- 
a/fe/fe-core/src/main/java/org/apache/doris/tablefunction/ExternalFileTableValuedFunction.java
+++ 
b/fe/fe-core/src/main/java/org/apache/doris/tablefunction/ExternalFileTableValuedFunction.java
@@ -434,7 +434,7 @@ public abstract class ExternalFileTableValuedFunction 
extends TableValuedFunctio
         // get first file, used to parse table schema
         TBrokerFileStatus firstFile = null;
         for (TBrokerFileStatus fileStatus : fileStatuses) {
-            if (fileStatus.isIsDir() || fileStatus.size == 0) {
+            if (isFileContentEmpty(fileStatus)) {
                 continue;
             }
             firstFile = fileStatus;
@@ -465,6 +465,44 @@ public abstract class ExternalFileTableValuedFunction 
extends TableValuedFunctio
         return InternalService.PFetchTableSchemaRequest.newBuilder()
                 .setFileScanRange(ByteString.copyFrom(new 
TSerializer().serialize(fileScanRange))).build();
     }
+
+    private boolean isFileContentEmpty(TBrokerFileStatus fileStatus) {
+        if (fileStatus.isIsDir() || fileStatus.size == 0) {
+            return true;
+        }
+        if (Util.isCsvFormat(fileFormatType) || fileFormatType == 
TFileFormatType.FORMAT_JSON) {
+            int magicNumberBytes = 0;
+            switch (compressionType) {
+                case GZ:
+                    magicNumberBytes = 20;
+                    break;
+                case LZO:
+                case LZOP:
+                    magicNumberBytes = 42;
+                    break;
+                case DEFLATE:
+                    magicNumberBytes = 8;
+                    break;
+                case SNAPPYBLOCK:
+                case LZ4BLOCK:
+                case LZ4FRAME:
+                    magicNumberBytes = 4;
+                    break;
+                case BZ2:
+                    magicNumberBytes = 14;
+                    break;
+                case UNKNOWN:
+                case PLAIN:
+                default:
+                    break;
+            }
+            // fileStatus.size may be -1 in http_stream
+            if (fileStatus.size >= 0 && fileStatus.size <= magicNumberBytes) {
+                return true;
+            }
+        }
+        return false;
+    }
 }
 
 
diff --git 
a/regression-test/data/external_table_p0/tvf/compress/test_empty_snappy.snappy 
b/regression-test/data/external_table_p0/tvf/compress/test_empty_snappy.snappy
new file mode 100644
index 00000000000..593f4708db8
Binary files /dev/null and 
b/regression-test/data/external_table_p0/tvf/compress/test_empty_snappy.snappy 
differ
diff --git 
a/regression-test/data/external_table_p2/tvf/compress/test_tvf.csv.bz2 
b/regression-test/data/external_table_p0/tvf/compress/test_tvf.csv.bz2
similarity index 100%
rename from regression-test/data/external_table_p2/tvf/compress/test_tvf.csv.bz2
rename to regression-test/data/external_table_p0/tvf/compress/test_tvf.csv.bz2
diff --git 
a/regression-test/data/external_table_p2/tvf/compress/test_tvf.csv.deflate 
b/regression-test/data/external_table_p0/tvf/compress/test_tvf.csv.deflate
similarity index 100%
rename from 
regression-test/data/external_table_p2/tvf/compress/test_tvf.csv.deflate
rename to 
regression-test/data/external_table_p0/tvf/compress/test_tvf.csv.deflate
diff --git 
a/regression-test/data/external_table_p2/tvf/compress/test_tvf.csv.gz 
b/regression-test/data/external_table_p0/tvf/compress/test_tvf.csv.gz
similarity index 100%
rename from regression-test/data/external_table_p2/tvf/compress/test_tvf.csv.gz
rename to regression-test/data/external_table_p0/tvf/compress/test_tvf.csv.gz
diff --git 
a/regression-test/data/external_table_p2/tvf/compress/test_tvf.csv.lz4 
b/regression-test/data/external_table_p0/tvf/compress/test_tvf.csv.lz4
similarity index 100%
rename from regression-test/data/external_table_p2/tvf/compress/test_tvf.csv.lz4
rename to regression-test/data/external_table_p0/tvf/compress/test_tvf.csv.lz4
diff --git 
a/regression-test/data/external_table_p2/tvf/compress/test_tvf.csv.snappy 
b/regression-test/data/external_table_p0/tvf/compress/test_tvf.csv.snappy
similarity index 100%
rename from 
regression-test/data/external_table_p2/tvf/compress/test_tvf.csv.snappy
rename to 
regression-test/data/external_table_p0/tvf/compress/test_tvf.csv.snappy
diff --git 
a/regression-test/data/external_table_p2/tvf/test_local_tvf_compression.out 
b/regression-test/data/external_table_p0/tvf/test_local_tvf_compression.out
similarity index 99%
rename from 
regression-test/data/external_table_p2/tvf/test_local_tvf_compression.out
rename to 
regression-test/data/external_table_p0/tvf/test_local_tvf_compression.out
index 19699b0dc5d..8120427ea6c 100644
--- a/regression-test/data/external_table_p2/tvf/test_local_tvf_compression.out
+++ b/regression-test/data/external_table_p0/tvf/test_local_tvf_compression.out
@@ -148,3 +148,5 @@
 
 -- !snappy_2 --
 
+-- !snappy_empty --
+
diff --git 
a/regression-test/data/external_table_p2/tvf/test_s3_tvf_compression.out 
b/regression-test/data/external_table_p0/tvf/test_s3_tvf_compression.out
similarity index 100%
rename from 
regression-test/data/external_table_p2/tvf/test_s3_tvf_compression.out
rename to regression-test/data/external_table_p0/tvf/test_s3_tvf_compression.out
diff --git 
a/regression-test/suites/external_table_p2/tvf/test_local_tvf_compression.groovy
 
b/regression-test/suites/external_table_p0/tvf/test_local_tvf_compression.groovy
similarity index 93%
rename from 
regression-test/suites/external_table_p2/tvf/test_local_tvf_compression.groovy
rename to 
regression-test/suites/external_table_p0/tvf/test_local_tvf_compression.groovy
index 950c7afc37f..f8c99318262 100644
--- 
a/regression-test/suites/external_table_p2/tvf/test_local_tvf_compression.groovy
+++ 
b/regression-test/suites/external_table_p0/tvf/test_local_tvf_compression.groovy
@@ -17,11 +17,11 @@ import org.junit.Assert
 // specific language governing permissions and limitations
 // under the License.
 
-suite("test_local_tvf_compression", 
"p2,external,tvf,external_remote,external_remote_tvf") {
+suite("test_local_tvf_compression", "p0,tvf") {
     List<List<Object>> backends =  sql """ show backends """
     assertTrue(backends.size() > 0)
     def be_id = backends[0][0]
-    def dataFilePath = context.config.dataPath + 
"/external_table_p2/tvf/compress"
+    def dataFilePath = context.config.dataPath + 
"/external_table_p0/tvf/compress"
 
     def outFilePath="/compress"
 
@@ -137,6 +137,16 @@ suite("test_local_tvf_compression", 
"p2,external,tvf,external_remote,external_re
         "compress_type" ="${compress_type}block") where c2="abcd" order by c3 
limit 22 ;            
     """
 
+    // test empty snapppy file
+    qt_snappy_empty """ 
+        select * from local(
+        "file_path" = "${outFilePath}/test_empty_snappy.${compress_type}",
+        "backend_id" = "${be_id}",
+        "format" = "csv",
+        "column_separator" = ",",
+        "compress_type" ="${compress_type}block");            
+    """
+
     // test error case
     test {
         sql """
diff --git 
a/regression-test/suites/external_table_p2/tvf/test_s3_tvf_compression.groovy 
b/regression-test/suites/external_table_p0/tvf/test_s3_tvf_compression.groovy
similarity index 98%
rename from 
regression-test/suites/external_table_p2/tvf/test_s3_tvf_compression.groovy
rename to 
regression-test/suites/external_table_p0/tvf/test_s3_tvf_compression.groovy
index 279fcb5e8a5..5eb63c94ddc 100644
--- 
a/regression-test/suites/external_table_p2/tvf/test_s3_tvf_compression.groovy
+++ 
b/regression-test/suites/external_table_p0/tvf/test_s3_tvf_compression.groovy
@@ -15,7 +15,7 @@
 // specific language governing permissions and limitations
 // under the License.
 
-suite("test_s3_tvf_compression", 
"p2,external,tvf,external_remote,external_remote_tvf") {
+suite("test_s3_tvf_compression", "p0") {
     
     String ak = getS3AK()
     String sk = getS3SK()


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscr...@doris.apache.org
For additional commands, e-mail: commits-h...@doris.apache.org

Reply via email to