This is an automated email from the ASF dual-hosted git repository.
morningman pushed a commit to branch branch-2.0
in repository https://gitbox.apache.org/repos/asf/doris.git
The following commit(s) were added to refs/heads/branch-2.0 by this push:
new 6e093f97093 [improvement](load) Enable lzo & Remove dependency on
liblzo2 (#30898)
6e093f97093 is described below
commit 6e093f97093680cd07f7bc23a80332c43723d57d
Author: HowardQin <[email protected]>
AuthorDate: Tue Feb 6 18:08:49 2024 +0800
[improvement](load) Enable lzo & Remove dependency on liblzo2 (#30898)
bp #30573
---
be/CMakeLists.txt | 8 -
be/cmake/thirdparty.cmake | 5 -
be/src/exec/decompressor.cpp | 2 -
be/src/exec/decompressor.h | 7 -
be/src/exec/lzo_decompressor.cpp | 52 ++-
be/src/olap/utils.cpp | 11 +-
be/src/pch/pch.h | 4 -
be/src/vec/exec/format/csv/csv_reader.cpp | 1 +
build.sh | 5 -
.../load_p0/stream_load/test_compress_type.out | 4 +
.../suites/load_p0/stream_load/ddl/basic_data.sql | 29 ++
.../load_p0/stream_load/test_compress_type.groovy | 349 +++++++++++++++++++++
12 files changed, 421 insertions(+), 56 deletions(-)
diff --git a/be/CMakeLists.txt b/be/CMakeLists.txt
index 92c747ddf67..129adeb3ed7 100644
--- a/be/CMakeLists.txt
+++ b/be/CMakeLists.txt
@@ -323,10 +323,6 @@ if (WITH_MYSQL)
add_compile_options(-DDORIS_WITH_MYSQL)
endif()
-if (WITH_LZO)
- add_compile_options(-DDORIS_WITH_LZO)
-endif()
-
# Enable memory tracker, which allows BE to limit the memory of tasks such as
query, load,
# and compaction,and observe the memory of BE through
be_ip:http_port/MemTracker.
# Adding the option `USE_MEM_TRACKER=OFF sh build.sh` when compiling can turn
off the memory tracker,
@@ -638,10 +634,6 @@ set(DORIS_DEPENDENCIES
${KRB5_LIBS}
)
-if(WITH_LZO)
- set(DORIS_DEPENDENCIES ${DORIS_DEPENDENCIES} lzo)
-endif()
-
if (WITH_MYSQL)
set(DORIS_DEPENDENCIES ${DORIS_DEPENDENCIES} mysql)
endif()
diff --git a/be/cmake/thirdparty.cmake b/be/cmake/thirdparty.cmake
index 4928b64b16b..4b6946c5a78 100644
--- a/be/cmake/thirdparty.cmake
+++ b/be/cmake/thirdparty.cmake
@@ -76,11 +76,6 @@ set_target_properties(thrift PROPERTIES IMPORTED_LOCATION
${THIRDPARTY_DIR}/lib/
add_library(thriftnb STATIC IMPORTED)
set_target_properties(thriftnb PROPERTIES IMPORTED_LOCATION
${THIRDPARTY_DIR}/lib/libthriftnb.a)
-if(WITH_LZO)
- add_library(lzo STATIC IMPORTED)
- set_target_properties(lzo PROPERTIES IMPORTED_LOCATION
${THIRDPARTY_DIR}/lib/liblzo2.a)
-endif()
-
if (WITH_MYSQL)
add_library(mysql STATIC IMPORTED)
set_target_properties(mysql PROPERTIES IMPORTED_LOCATION
${THIRDPARTY_DIR}/lib/libmysqlclient.a)
diff --git a/be/src/exec/decompressor.cpp b/be/src/exec/decompressor.cpp
index 1e7e3482234..c0dd7554942 100644
--- a/be/src/exec/decompressor.cpp
+++ b/be/src/exec/decompressor.cpp
@@ -50,11 +50,9 @@ Status Decompressor::create_decompressor(CompressType type,
Decompressor** decom
case CompressType::SNAPPYBLOCK:
*decompressor = new SnappyBlockDecompressor();
break;
-#ifdef DORIS_WITH_LZO
case CompressType::LZOP:
*decompressor = new LzopDecompressor();
break;
-#endif
default:
return Status::InternalError("Unknown compress type: {}", type);
}
diff --git a/be/src/exec/decompressor.h b/be/src/exec/decompressor.h
index 2b07e71139f..a453a1746e4 100644
--- a/be/src/exec/decompressor.h
+++ b/be/src/exec/decompressor.h
@@ -28,11 +28,6 @@
#include <string>
-#ifdef DORIS_WITH_LZO
-#include <lzo/lzo1x.h>
-#include <lzo/lzoconf.h>
-#endif
-
#include "common/status.h"
namespace doris {
@@ -177,7 +172,6 @@ private:
Status init() override;
};
-#ifdef DORIS_WITH_LZO
class LzopDecompressor : public Decompressor {
public:
~LzopDecompressor() override = default;
@@ -271,6 +265,5 @@ private:
const static uint64_t F_CRC32_D;
const static uint64_t F_ADLER32_D;
};
-#endif // DORIS_WITH_LZO
} // namespace doris
diff --git a/be/src/exec/lzo_decompressor.cpp b/be/src/exec/lzo_decompressor.cpp
index a2af9e94fd0..a39bebe8755 100644
--- a/be/src/exec/lzo_decompressor.cpp
+++ b/be/src/exec/lzo_decompressor.cpp
@@ -16,15 +16,30 @@
// under the License.
#include "exec/decompressor.h"
+#include "olap/utils.h"
+#include "orc/Exceptions.hh"
+#include "util/crc32c.h"
+
+namespace orc {
+/**
+ * Decompress the bytes in to the output buffer.
+ * @param inputAddress the start of the input
+ * @param inputLimit one past the last byte of the input
+ * @param outputAddress the start of the output buffer
+ * @param outputLimit one past the last byte of the output buffer
+ * @result the number of bytes decompressed
+ */
+uint64_t lzoDecompress(const char* inputAddress, const char* inputLimit, char*
outputAddress,
+ char* outputLimit);
+} // namespace orc
namespace doris {
-#ifdef DORIS_WITH_LZO
// Lzop
const uint8_t LzopDecompressor::LZOP_MAGIC[9] = {0x89, 0x4c, 0x5a, 0x4f, 0x00,
0x0d, 0x0a, 0x1a, 0x0a};
-const uint64_t LzopDecompressor::LZOP_VERSION = 0x1030;
+const uint64_t LzopDecompressor::LZOP_VERSION = 0x1040;
const uint64_t LzopDecompressor::MIN_LZO_VERSION = 0x0100;
// magic(9) + ver(2) + lib_ver(2) + ver_needed(2) + method(1)
// + lvl(1) + flags(4) + mode/mtime(12) + filename_len(1)
@@ -154,18 +169,18 @@ Status LzopDecompressor::decompress(uint8_t* input,
size_t input_len, size_t* in
ptr += compressed_size;
} else {
// decompress
- *decompressed_len = uncompressed_size;
- int ret = lzo1x_decompress_safe(ptr, compressed_size, output,
-
reinterpret_cast<lzo_uint*>(&uncompressed_size), nullptr);
- if (ret != LZO_E_OK || uncompressed_size != *decompressed_len) {
+ try {
+ *decompressed_len =
+ orc::lzoDecompress((const char*)ptr, (const char*)(ptr +
compressed_size),
+ (char*)output, (char*)(output +
uncompressed_size));
+ } catch (const orc::ParseError& err) {
std::stringstream ss;
- ss << "Lzo decompression failed with ret: " << ret
- << " decompressed len: " << uncompressed_size << " expected: "
<< *decompressed_len;
+ ss << "Lzo decompression failed: " << err.what();
return Status::InternalError(ss.str());
}
RETURN_IF_ERROR(checksum(_header_info.output_checksum_type,
"decompressed", out_checksum,
- output, uncompressed_size));
+ output, *decompressed_len));
ptr += compressed_size;
}
@@ -260,8 +275,14 @@ Status LzopDecompressor::parse_header_info(uint8_t* input,
size_t input_len,
return Status::InternalError(ss.str());
}
- // 6. skip level
- ++ptr;
+ // 6. unsupported level: 7, 8, 9
+ uint8_t level;
+ ptr = get_uint8(ptr, &level);
+ if (level > 6) {
+ std::stringstream ss;
+ ss << "unsupported lzo level: " << (int)level;
+ return Status::InternalError(ss.str());
+ }
// 7. flags
uint32_t flags;
@@ -305,10 +326,10 @@ Status LzopDecompressor::parse_header_info(uint8_t*
input, size_t input_len,
uint32_t computed_checksum;
if (_header_info.header_checksum_type == CHECK_CRC32) {
computed_checksum = CRC32_INIT_VALUE;
- computed_checksum = lzo_crc32(computed_checksum, header, cur - header);
+ computed_checksum = crc32c::Extend(computed_checksum, (const
char*)header, cur - header);
} else {
computed_checksum = ADLER32_INIT_VALUE;
- computed_checksum = lzo_adler32(computed_checksum, header, cur -
header);
+ computed_checksum = olap_adler32(computed_checksum, (const
char*)header, cur - header);
}
if (computed_checksum != expected_checksum) {
@@ -354,10 +375,10 @@ Status LzopDecompressor::checksum(LzoChecksum type, const
std::string& source, u
case CHECK_NONE:
return Status::OK();
case CHECK_CRC32:
- computed_checksum = lzo_crc32(CRC32_INIT_VALUE, ptr, len);
+ computed_checksum = crc32c::Extend(CRC32_INIT_VALUE, (const char*)ptr,
len);
break;
case CHECK_ADLER:
- computed_checksum = lzo_adler32(ADLER32_INIT_VALUE, ptr, len);
+ computed_checksum = olap_adler32(ADLER32_INIT_VALUE, (const char*)ptr,
len);
break;
default:
std::stringstream ss;
@@ -387,6 +408,5 @@ std::string LzopDecompressor::debug_info() {
<< " output checksum type: " << _header_info.output_checksum_type;
return ss.str();
}
-#endif // DORIS_WITH_LZO
} // namespace doris
diff --git a/be/src/olap/utils.cpp b/be/src/olap/utils.cpp
index cdd7ad4c834..59d13d5afcb 100644
--- a/be/src/olap/utils.cpp
+++ b/be/src/olap/utils.cpp
@@ -19,6 +19,7 @@
// IWYU pragma: no_include <bthread/errno.h>
#include <errno.h> // IWYU pragma: keep
+#include <stdarg.h>
#include <time.h>
#include <unistd.h>
#include <zconf.h>
@@ -33,15 +34,6 @@
#include <string>
#include <vector>
-#include "util/sse_util.hpp"
-
-#ifdef DORIS_WITH_LZO
-#include <lzo/lzo1c.h>
-#include <lzo/lzo1x.h>
-#endif
-
-#include <stdarg.h>
-
#include "common/logging.h"
#include "common/status.h"
#include "io/fs/file_reader.h"
@@ -49,6 +41,7 @@
#include "io/fs/file_writer.h"
#include "io/fs/local_file_system.h"
#include "olap/olap_common.h"
+#include "util/sse_util.hpp"
#include "util/string_parser.hpp"
namespace doris {
diff --git a/be/src/pch/pch.h b/be/src/pch/pch.h
index fffef7b8d57..9ec2a4a8531 100644
--- a/be/src/pch/pch.h
+++ b/be/src/pch/pch.h
@@ -256,10 +256,6 @@
#include <lz4/lz4.h>
#include <lz4/lz4frame.h>
-// lzo headers
-#include <lzo/lzo1x.h>
-#include <lzo/lzoconf.h>
-
// mysql headers
#include <mysql/mysql.h>
diff --git a/be/src/vec/exec/format/csv/csv_reader.cpp
b/be/src/vec/exec/format/csv/csv_reader.cpp
index ced615ec66f..fb3ee5c5be3 100644
--- a/be/src/vec/exec/format/csv/csv_reader.cpp
+++ b/be/src/vec/exec/format/csv/csv_reader.cpp
@@ -561,6 +561,7 @@ Status CsvReader::_create_decompressor() {
compress_type = CompressType::GZIP;
break;
case TFileCompressType::LZO:
+ case TFileCompressType::LZOP:
compress_type = CompressType::LZOP;
break;
case TFileCompressType::BZ2:
diff --git a/build.sh b/build.sh
index ca7f21e1c4d..43ae8d8d2e8 100755
--- a/build.sh
+++ b/build.sh
@@ -324,9 +324,6 @@ fi
if [[ -z "${USE_AVX2}" ]]; then
USE_AVX2='ON'
fi
-if [[ -z "${WITH_LZO}" ]]; then
- WITH_LZO='OFF'
-fi
if [[ -z "${USE_LIBCPP}" ]]; then
if [[ "$(uname -s)" != 'Darwin' ]]; then
USE_LIBCPP='OFF'
@@ -422,7 +419,6 @@ echo "Get params:
PARALLEL -- ${PARALLEL}
CLEAN -- ${CLEAN}
WITH_MYSQL -- ${WITH_MYSQL}
- WITH_LZO -- ${WITH_LZO}
GLIBC_COMPATIBILITY -- ${GLIBC_COMPATIBILITY}
USE_AVX2 -- ${USE_AVX2}
USE_LIBCPP -- ${USE_LIBCPP}
@@ -509,7 +505,6 @@ if [[ "${BUILD_BE}" -eq 1 ]]; then
-DBUILD_FS_BENCHMARK="${BUILD_FS_BENCHMARK}" \
${CMAKE_USE_CCACHE:+${CMAKE_USE_CCACHE}} \
-DWITH_MYSQL="${WITH_MYSQL}" \
- -DWITH_LZO="${WITH_LZO}" \
-DUSE_LIBCPP="${USE_LIBCPP}" \
-DBUILD_META_TOOL="${BUILD_META_TOOL}" \
-DBUILD_INDEX_TOOL="${BUILD_INDEX_TOOL}" \
diff --git a/regression-test/data/load_p0/stream_load/test_compress_type.out
b/regression-test/data/load_p0/stream_load/test_compress_type.out
new file mode 100644
index 00000000000..56d195c569e
--- /dev/null
+++ b/regression-test/data/load_p0/stream_load/test_compress_type.out
@@ -0,0 +1,4 @@
+-- This file is automatically generated. You should know what you did if you
want to edit this
+-- !sql --
+160
+
diff --git a/regression-test/suites/load_p0/stream_load/ddl/basic_data.sql
b/regression-test/suites/load_p0/stream_load/ddl/basic_data.sql
new file mode 100644
index 00000000000..41c3660e11c
--- /dev/null
+++ b/regression-test/suites/load_p0/stream_load/ddl/basic_data.sql
@@ -0,0 +1,29 @@
+CREATE TABLE basic_data
+(
+ k00 INT NOT NULL,
+ k01 DATE NOT NULL,
+ k02 BOOLEAN NULL,
+ k03 TINYINT NULL,
+ k04 SMALLINT NULL,
+ k05 INT NULL,
+ k06 BIGINT NULL,
+ k07 LARGEINT NULL,
+ k08 FLOAT NULL,
+ k09 DOUBLE NULL,
+ k10 DECIMAL(9,1) NULL,
+ k11 DECIMALV3(9,1) NULL,
+ k12 DATETIME NULL,
+ k13 DATEV2 NULL,
+ k14 DATETIMEV2 NULL,
+ k15 CHAR NULL,
+ k16 VARCHAR NULL,
+ k17 STRING NULL,
+ k18 JSON NULL
+
+)
+DUPLICATE KEY(k00)
+DISTRIBUTED BY HASH(k00) BUCKETS 32
+PROPERTIES (
+ "bloom_filter_columns"="k05",
+ "replication_num" = "1"
+);
\ No newline at end of file
diff --git
a/regression-test/suites/load_p0/stream_load/test_compress_type.groovy
b/regression-test/suites/load_p0/stream_load/test_compress_type.groovy
new file mode 100644
index 00000000000..950f1fdeb27
--- /dev/null
+++ b/regression-test/suites/load_p0/stream_load/test_compress_type.groovy
@@ -0,0 +1,349 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements. See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership. The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License. You may obtain a copy of the License at
+//
+// http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied. See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+suite("test_stream_load_compress_type", "load_p0") {
+ def tableName = "basic_data"
+
+ sql """ DROP TABLE IF EXISTS ${tableName} """
+ // GZ/LZO/BZ2/LZ4FRAME/LZOP
+ sql new File("""${context.file.parent}/ddl/${tableName}.sql""").text
+
+ streamLoad {
+ table "${tableName}"
+ set 'column_separator', '|'
+ set 'trim_double_quotes', 'true'
+ set 'format', "CSV"
+ set 'compress_type', 'GZ'
+
+ file "basic_data.csv.gz"
+
+ check {
+ result, exception, startTime, endTime ->
+ assertTrue(exception == null)
+ log.info("Stream load result: ${result}".toString())
+ def json = parseJson(result)
+ assertEquals("Success", json.Status)
+ assertEquals(20, json.NumberTotalRows)
+ assertEquals(20, json.NumberLoadedRows)
+ assertEquals(0, json.NumberFilteredRows)
+ assertEquals(0, json.NumberUnselectedRows)
+ assertTrue(json.LoadBytes > 0)
+ }
+ }
+
+ streamLoad {
+ table "${tableName}"
+ set 'column_separator', '|'
+ set 'trim_double_quotes', 'true'
+ set 'format', "CSV"
+ set 'compress_type', 'BZ2'
+
+ file "basic_data.csv.bz2"
+ check {
+ result, exception, startTime, endTime ->
+ assertTrue(exception == null)
+ log.info("Stream load result: ${result}".toString())
+ def json = parseJson(result)
+ assertEquals("Success", json.Status)
+ assertEquals(20, json.NumberTotalRows)
+ assertEquals(20, json.NumberLoadedRows)
+ assertEquals(0, json.NumberFilteredRows)
+ assertEquals(0, json.NumberUnselectedRows)
+ assertTrue(json.LoadBytes > 0)
+ }
+ }
+
+ streamLoad {
+ table "${tableName}"
+ set 'column_separator', '|'
+ set 'trim_double_quotes', 'true'
+ set 'format', 'csv'
+ set 'compress_type', 'LZ4'
+
+ file "basic_data.csv.lz4"
+ check {
+ result, exception, startTime, endTime ->
+ assertTrue(exception == null)
+ log.info("Stream load result: ${result}".toString())
+ def json = parseJson(result)
+ assertEquals("Success", json.Status)
+ assertEquals(20, json.NumberTotalRows)
+ assertEquals(20, json.NumberLoadedRows)
+ assertEquals(0, json.NumberFilteredRows)
+ assertEquals(0, json.NumberUnselectedRows)
+ assertTrue(json.LoadBytes > 0)
+ }
+ }
+
+ streamLoad {
+ table "${tableName}"
+ set 'column_separator', '|'
+ set 'trim_double_quotes', 'true'
+ set 'compress_type', 'GZ'
+
+ file "basic_data.csv.gz"
+ check {
+ result, exception, startTime, endTime ->
+ assertTrue(exception == null)
+ log.info("Stream load result: ${result}".toString())
+ def json = parseJson(result)
+ assertEquals("Success", json.Status)
+ assertEquals(20, json.NumberTotalRows)
+ assertEquals(20, json.NumberLoadedRows)
+ assertEquals(0, json.NumberFilteredRows)
+ assertEquals(0, json.NumberUnselectedRows)
+ assertTrue(json.LoadBytes > 0)
+ }
+ }
+
+ streamLoad {
+ table "${tableName}"
+ set 'column_separator', '|'
+ set 'trim_double_quotes', 'true'
+ set 'compress_type', 'BZ2'
+
+ file "basic_data.csv.bz2"
+ check {
+ result, exception, startTime, endTime ->
+ assertTrue(exception == null)
+ log.info("Stream load result: ${result}".toString())
+ def json = parseJson(result)
+ assertEquals("Success", json.Status)
+ assertEquals(20, json.NumberTotalRows)
+ assertEquals(20, json.NumberLoadedRows)
+ assertEquals(0, json.NumberFilteredRows)
+ assertEquals(0, json.NumberUnselectedRows)
+ assertTrue(json.LoadBytes > 0)
+ }
+ }
+
+ streamLoad {
+ table "${tableName}"
+ set 'column_separator', '|'
+ set 'trim_double_quotes', 'true'
+ set 'compress_type', 'LZ4'
+
+ file "basic_data.csv.lz4"
+ check {
+ result, exception, startTime, endTime ->
+ assertTrue(exception == null)
+ log.info("Stream load result: ${result}".toString())
+ def json = parseJson(result)
+ assertEquals("Success", json.Status)
+ assertEquals(20, json.NumberTotalRows)
+ assertEquals(20, json.NumberLoadedRows)
+ assertEquals(0, json.NumberFilteredRows)
+ assertEquals(0, json.NumberUnselectedRows)
+ assertTrue(json.LoadBytes > 0)
+ }
+ }
+
+ // LZO = LZOP
+ streamLoad {
+ table "${tableName}"
+ set 'column_separator', '|'
+ set 'trim_double_quotes', 'true'
+ set 'compress_type', 'LZO'
+
+ file "basic_data.csv.lzo"
+ check {
+ result, exception, startTime, endTime ->
+ assertTrue(exception == null)
+ log.info("Stream load result: ${result}".toString())
+ def json = parseJson(result)
+ assertEquals("Success", json.Status)
+ assertEquals(20, json.NumberTotalRows)
+ assertEquals(20, json.NumberLoadedRows)
+ assertEquals(0, json.NumberFilteredRows)
+ assertEquals(0, json.NumberUnselectedRows)
+ assertTrue(json.LoadBytes > 0)
+ }
+ }
+
+ streamLoad {
+ table "${tableName}"
+ set 'column_separator', '|'
+ set 'trim_double_quotes', 'true'
+ set 'compress_type', 'LZOP'
+
+ file "basic_data.csv.lzo"
+ check {
+ result, exception, startTime, endTime ->
+ assertTrue(exception == null)
+ log.info("Stream load result: ${result}".toString())
+ def json = parseJson(result)
+ assertEquals("Success", json.Status)
+ assertEquals(20, json.NumberTotalRows)
+ assertEquals(20, json.NumberLoadedRows)
+ assertEquals(0, json.NumberFilteredRows)
+ assertEquals(0, json.NumberUnselectedRows)
+ assertTrue(json.LoadBytes > 0)
+ }
+ }
+
+ // no compress_type
+ streamLoad {
+ table "${tableName}"
+ set 'column_separator', '|'
+ set 'trim_double_quotes', 'true'
+ set 'format', "CSV"
+ file "basic_data.csv.gz"
+
+ check {
+ result, exception, startTime, endTime ->
+ assertTrue(exception == null)
+ log.info("Stream load result: ${result}".toString())
+ def json = parseJson(result)
+ assertEquals("Fail", json.Status)
+ assertTrue(json.Message.contains("too many filtered rows"))
+ assertEquals(13, json.NumberTotalRows)
+ assertEquals(0, json.NumberLoadedRows)
+ assertEquals(13, json.NumberFilteredRows)
+ assertTrue(json.LoadBytes > 0)
+ }
+ }
+
+ // no compress_type
+ streamLoad {
+ table "${tableName}"
+ set 'column_separator', '|'
+ set 'trim_double_quotes', 'true'
+ set 'format', "CSV"
+ file "basic_data.csv.bz2"
+
+ check {
+ result, exception, startTime, endTime ->
+ assertTrue(exception == null)
+ log.info("Stream load result: ${result}".toString())
+ def json = parseJson(result)
+ assertEquals("Fail", json.Status)
+ assertTrue(json.Message.contains("too many filtered rows"))
+ assertEquals(9, json.NumberTotalRows)
+ assertEquals(0, json.NumberLoadedRows)
+ assertEquals(9, json.NumberFilteredRows)
+ assertTrue(json.LoadBytes > 0)
+ }
+ }
+
+ // no compress_type
+ streamLoad {
+ table "${tableName}"
+ set 'column_separator', '|'
+ set 'trim_double_quotes', 'true'
+ set 'format', "CSV"
+ file "basic_data.csv.lz4"
+
+ check {
+ result, exception, startTime, endTime ->
+ assertTrue(exception == null)
+ log.info("Stream load result: ${result}".toString())
+ def json = parseJson(result)
+ assertEquals("Fail", json.Status)
+ assertTrue(json.Message.contains("too many filtered rows"))
+ assertEquals(31, json.NumberTotalRows)
+ assertEquals(0, json.NumberLoadedRows)
+ assertEquals(31, json.NumberFilteredRows)
+ assertTrue(json.LoadBytes > 0)
+ }
+ }
+
+ // no compress_type
+ streamLoad {
+ table "${tableName}"
+ set 'column_separator', '|'
+ set 'trim_double_quotes', 'true'
+ file "basic_data.csv.gz"
+
+ check {
+ result, exception, startTime, endTime ->
+ assertTrue(exception == null)
+ log.info("Stream load result: ${result}".toString())
+ def json = parseJson(result)
+ assertEquals("Fail", json.Status)
+ assertTrue(json.Message.contains("too many filtered rows"))
+ assertEquals(13, json.NumberTotalRows)
+ assertEquals(0, json.NumberLoadedRows)
+ assertEquals(13, json.NumberFilteredRows)
+ assertTrue(json.LoadBytes > 0)
+ }
+ }
+
+ // no compress_type
+ streamLoad {
+ table "${tableName}"
+ set 'column_separator', '|'
+ set 'trim_double_quotes', 'true'
+ file "basic_data.csv.bz2"
+
+ check {
+ result, exception, startTime, endTime ->
+ assertTrue(exception == null)
+ log.info("Stream load result: ${result}".toString())
+ def json = parseJson(result)
+ assertEquals("Fail", json.Status)
+ assertTrue(json.Message.contains("too many filtered rows"))
+ assertEquals(9, json.NumberTotalRows)
+ assertEquals(0, json.NumberLoadedRows)
+ assertEquals(9, json.NumberFilteredRows)
+ assertTrue(json.LoadBytes > 0)
+ }
+ }
+
+ // no compress_type
+ streamLoad {
+ table "${tableName}"
+ set 'column_separator', '|'
+ set 'trim_double_quotes', 'true'
+ file "basic_data.csv.lz4"
+
+ check {
+ result, exception, startTime, endTime ->
+ assertTrue(exception == null)
+ log.info("Stream load result: ${result}".toString())
+ def json = parseJson(result)
+ assertEquals("Fail", json.Status)
+ assertTrue(json.Message.contains("too many filtered rows"))
+ assertEquals(31, json.NumberTotalRows)
+ assertEquals(0, json.NumberLoadedRows)
+ assertEquals(31, json.NumberFilteredRows)
+ assertTrue(json.LoadBytes > 0)
+ }
+ }
+
+ // no compress_type
+ streamLoad {
+ table "${tableName}"
+ set 'column_separator', '|'
+ set 'trim_double_quotes', 'true'
+ file "basic_data.csv.lzo"
+
+ check {
+ result, exception, startTime, endTime ->
+ assertTrue(exception == null)
+ log.info("Stream load result: ${result}".toString())
+ def json = parseJson(result)
+ assertEquals("Fail", json.Status)
+ assertTrue(json.Message.contains("too many filtered rows"))
+ assertEquals(23, json.NumberTotalRows)
+ assertEquals(0, json.NumberLoadedRows)
+ assertEquals(23, json.NumberFilteredRows)
+ assertTrue(json.LoadBytes > 0)
+ }
+ }
+
+ qt_sql """ select count(*) from ${tableName} """
+}
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]