This is an automated email from the ASF dual-hosted git repository.
morningman pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/doris.git
The following commit(s) were added to refs/heads/master by this push:
new c418bbd2d1 [feature-wip](new-scan) support Json reader (#13546)
c418bbd2d1 is described below
commit c418bbd2d1c52776becd793562a2ca2ec4b74b96
Author: Tiewei Fang <[email protected]>
AuthorDate: Wed Oct 26 12:52:21 2022 +0800
[feature-wip](new-scan) support Json reader (#13546)
Issue Number: close #12574
This pr adds `NewJsonReader` which implements GenericReader interface to
support read json format file.
TODO:
1. modify `_scann_eof` later.
2. Rename `NewJsonReader` to `JsonReader` when `JsonReader` is deleted.
---
be/src/vec/CMakeLists.txt | 1 +
be/src/vec/exec/format/json/new_json_reader.cpp | 754 +++++++++++++++++++++
be/src/vec/exec/format/json/new_json_reader.h | 150 ++++
be/src/vec/exec/scan/vfile_scanner.cpp | 7 +
.../docker-compose/hive/scripts/hive-metastore.sh | 2 +
.../scripts/json_format_test/multi_line_json.json | 2 +
.../multi_line_json_lack_column.json | 2 +
.../json_format_test/multi_line_json_unorder.json | 2 +
.../hive/scripts/json_format_test/nest_json.json | 5 +
.../json_format_test/simple_object_json.json | 12 +
.../org/apache/doris/analysis/DataDescription.java | 9 +-
.../main/java/org/apache/doris/catalog/Env.java | 2 +-
.../org/apache/doris/load/BrokerFileGroup.java | 6 +-
.../apache/doris/planner/StreamLoadPlanner.java | 5 -
regression-test/conf/regression-conf.groovy | 1 +
.../data/load_p0/broker_load/test_array_load.out | 40 ++
.../stream_load/load_json_null_to_nullable.out | 12 +
.../stream_load/load_json_with_jsonpath.out | 10 +
.../data/load_p0/stream_load/nest_json.json | 3 +
.../data/load_p0/stream_load/nest_json_array.json | 74 ++
.../data/load_p0/stream_load/simple_json2.json | 52 ++
.../stream_load/simple_json2_lack_one_column.json | 48 ++
.../load_p0/stream_load/test_hdfs_json_load.out | 305 +++++++++
.../data/load_p0/stream_load/test_json_load.out | 236 ++++++-
.../load_p0/broker_load/test_array_load.groovy | 136 ++--
...n_column_exclude_schema_without_jsonpath.groovy | 19 +-
.../stream_load/load_json_null_to_nullable.groovy | 38 +-
.../stream_load/load_json_with_jsonpath.groovy | 36 +-
.../load_p0/stream_load/test_hdfs_json_load.groovy | 554 +++++++++++++++
.../load_p0/stream_load/test_json_load.groovy | 404 ++++++++++-
.../hive_catalog.groovy | 33 +-
31 files changed, 2813 insertions(+), 147 deletions(-)
diff --git a/be/src/vec/CMakeLists.txt b/be/src/vec/CMakeLists.txt
index d3ae2849de..2ba617295c 100644
--- a/be/src/vec/CMakeLists.txt
+++ b/be/src/vec/CMakeLists.txt
@@ -258,6 +258,7 @@ set(VEC_FILES
exec/scan/new_es_scan_node.cpp
exec/format/csv/csv_reader.cpp
exec/format/orc/vorc_reader.cpp
+ exec/format/json/new_json_reader.cpp
)
add_library(Vec STATIC
diff --git a/be/src/vec/exec/format/json/new_json_reader.cpp
b/be/src/vec/exec/format/json/new_json_reader.cpp
new file mode 100644
index 0000000000..add32361e3
--- /dev/null
+++ b/be/src/vec/exec/format/json/new_json_reader.cpp
@@ -0,0 +1,754 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements. See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership. The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License. You may obtain a copy of the License at
+//
+// http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied. See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+#include "vec/exec/format/json/new_json_reader.h"
+
+#include "common/compiler_util.h"
+#include "exec/plain_text_line_reader.h"
+#include "exprs/json_functions.h"
+#include "io/file_factory.h"
+#include "runtime/descriptors.h"
+#include "runtime/runtime_state.h"
+#include "vec/core/block.h"
+#include "vec/exec/scan/vscanner.h"
+namespace doris::vectorized {
+
+NewJsonReader::NewJsonReader(RuntimeState* state, RuntimeProfile* profile,
ScannerCounter* counter,
+ const TFileScanRangeParams& params, const
TFileRangeDesc& range,
+ const std::vector<SlotDescriptor*>&
file_slot_descs, bool* scanner_eof)
+ : _vhandle_json_callback(nullptr),
+ _state(state),
+ _profile(profile),
+ _counter(counter),
+ _params(params),
+ _range(range),
+ _file_slot_descs(file_slot_descs),
+ _file_reader(nullptr),
+ _file_reader_s(nullptr),
+ _real_file_reader(nullptr),
+ _line_reader(nullptr),
+ _reader_eof(false),
+ _skip_first_line(false),
+ _next_row(0),
+ _total_rows(0),
+ _value_allocator(_value_buffer, sizeof(_value_buffer)),
+ _parse_allocator(_parse_buffer, sizeof(_parse_buffer)),
+ _origin_json_doc(&_value_allocator, sizeof(_parse_buffer),
&_parse_allocator),
+ _scanner_eof(scanner_eof) {
+ _file_format_type = _params.format_type;
+
+ _bytes_read_counter = ADD_COUNTER(_profile, "BytesRead", TUnit::BYTES);
+ _read_timer = ADD_TIMER(_profile, "ReadTime");
+ _file_read_timer = ADD_TIMER(_profile, "FileReadTime");
+}
+
+Status NewJsonReader::init_reader() {
+ RETURN_IF_ERROR(_get_range_params());
+
+ RETURN_IF_ERROR(_open_file_reader());
+ if (_read_json_by_line) {
+ RETURN_IF_ERROR(_open_line_reader());
+ }
+
+ // generate _parsed_jsonpaths and _parsed_json_root
+ RETURN_IF_ERROR(_parse_jsonpath_and_json_root());
+
+ //improve performance
+ if (_parsed_jsonpaths.empty()) { // input is a simple json-string
+ _vhandle_json_callback = &NewJsonReader::_vhandle_simple_json;
+ } else { // input is a complex json-string and a json-path
+ if (_strip_outer_array) {
+ _vhandle_json_callback =
&NewJsonReader::_vhandle_flat_array_complex_json;
+ } else {
+ _vhandle_json_callback =
&NewJsonReader::_vhandle_nested_complex_json;
+ }
+ }
+ return Status::OK();
+}
+
+Status NewJsonReader::get_next_block(Block* block, size_t* read_rows, bool*
eof) {
+ if (_reader_eof == true) {
+ *eof = true;
+ return Status::OK();
+ }
+
+ const int batch_size = _state->batch_size();
+ auto columns = block->mutate_columns();
+
+ while (columns[0]->size() < batch_size && !_reader_eof) {
+ if (UNLIKELY(_read_json_by_line && _skip_first_line)) {
+ size_t size = 0;
+ const uint8_t* line_ptr = nullptr;
+ RETURN_IF_ERROR(_line_reader->read_line(&line_ptr, &size,
&_reader_eof));
+ _skip_first_line = false;
+ continue;
+ }
+
+ bool is_empty_row = false;
+
+ RETURN_IF_ERROR(_read_json_column(columns, _file_slot_descs,
&is_empty_row, &_reader_eof));
+ ++(*read_rows);
+ if (is_empty_row) {
+ // Read empty row, just continue
+ continue;
+ }
+ }
+
+ columns.clear();
+ return Status::OK();
+}
+
+Status NewJsonReader::get_columns(std::unordered_map<std::string,
TypeDescriptor>* name_to_type,
+ std::unordered_set<std::string>*
missing_cols) {
+ for (auto& slot : _file_slot_descs) {
+ name_to_type->emplace(slot->col_name(), slot->type());
+ }
+ return Status::OK();
+}
+
+Status NewJsonReader::_get_range_params() {
+ if (!_params.__isset.file_attributes) {
+ return Status::InternalError("BE cat get file_attributes");
+ }
+
+ // get line_delimiter
+ if (_params.file_attributes.__isset.text_params &&
+ _params.file_attributes.text_params.__isset.line_delimiter) {
+ _line_delimiter = _params.file_attributes.text_params.line_delimiter;
+ _line_delimiter_length = _line_delimiter.size();
+ }
+
+ if (_params.file_attributes.__isset.jsonpaths) {
+ _jsonpaths = _params.file_attributes.jsonpaths;
+ }
+ if (_params.file_attributes.__isset.json_root) {
+ _json_root = _params.file_attributes.json_root;
+ }
+ if (_params.file_attributes.__isset.read_json_by_line) {
+ _read_json_by_line = _params.file_attributes.read_json_by_line;
+ }
+ if (_params.file_attributes.__isset.strip_outer_array) {
+ _strip_outer_array = _params.file_attributes.strip_outer_array;
+ }
+ if (_params.file_attributes.__isset.num_as_string) {
+ _num_as_string = _params.file_attributes.num_as_string;
+ }
+ if (_params.file_attributes.__isset.fuzzy_parse) {
+ _fuzzy_parse = _params.file_attributes.fuzzy_parse;
+ }
+ return Status::OK();
+}
+
+Status NewJsonReader::_open_file_reader() {
+ int64_t start_offset = _range.start_offset;
+ if (start_offset != 0) {
+ start_offset -= 1;
+ }
+
+ if (_params.file_type == TFileType::FILE_STREAM) {
+ RETURN_IF_ERROR(FileFactory::create_pipe_reader(_range.load_id,
_file_reader_s));
+ _real_file_reader = _file_reader_s.get();
+ } else {
+ RETURN_IF_ERROR(FileFactory::create_file_reader(
+ _profile, _params, _range.path, start_offset,
_range.file_size, 0, _file_reader));
+ _real_file_reader = _file_reader.get();
+ }
+ return _real_file_reader->open();
+}
+
+Status NewJsonReader::_open_line_reader() {
+ int64_t size = _range.size;
+ if (_range.start_offset != 0) {
+ // When we fetch range doesn't start from 0, size will += 1.
+
+ // TODO(ftw): check what if file_reader is stream_pipe? Is `size+=1`
is correct?
+ size += 1;
+ _skip_first_line = true;
+ } else {
+ _skip_first_line = false;
+ }
+ _line_reader.reset(new PlainTextLineReader(_profile, _real_file_reader,
nullptr, size,
+ _line_delimiter,
_line_delimiter_length));
+ return Status::OK();
+}
+
+Status NewJsonReader::_parse_jsonpath_and_json_root() {
+ // parse jsonpaths
+ if (!_jsonpaths.empty()) {
+ rapidjson::Document jsonpaths_doc;
+ if (!jsonpaths_doc.Parse(_jsonpaths.c_str(),
_jsonpaths.length()).HasParseError()) {
+ if (!jsonpaths_doc.IsArray()) {
+ return Status::InvalidArgument("Invalid json path: {}",
_jsonpaths);
+ } else {
+ for (int i = 0; i < jsonpaths_doc.Size(); i++) {
+ const rapidjson::Value& path = jsonpaths_doc[i];
+ if (!path.IsString()) {
+ return Status::InvalidArgument("Invalid json path:
{}", _jsonpaths);
+ }
+ std::vector<JsonPath> parsed_paths;
+ JsonFunctions::parse_json_paths(path.GetString(),
&parsed_paths);
+ _parsed_jsonpaths.push_back(std::move(parsed_paths));
+ }
+ }
+ } else {
+ return Status::InvalidArgument("Invalid json path: {}",
_jsonpaths);
+ }
+ }
+
+ // parse jsonroot
+ if (!_json_root.empty()) {
+ JsonFunctions::parse_json_paths(_json_root, &_parsed_json_root);
+ }
+ return Status::OK();
+}
+
+Status NewJsonReader::_read_json_column(std::vector<MutableColumnPtr>& columns,
+ const std::vector<SlotDescriptor*>&
slot_descs,
+ bool* is_empty_row, bool* eof) {
+ return (this->*_vhandle_json_callback)(columns, slot_descs, is_empty_row,
eof);
+}
+
+Status NewJsonReader::_vhandle_simple_json(std::vector<MutableColumnPtr>&
columns,
+ const std::vector<SlotDescriptor*>&
slot_descs,
+ bool* is_empty_row, bool* eof) {
+ do {
+ bool valid = false;
+ if (_next_row >= _total_rows) { // parse json and generic document
+ Status st = _parse_json(is_empty_row, eof);
+ if (st.is_data_quality_error()) {
+ continue; // continue to read next
+ }
+ RETURN_IF_ERROR(st);
+ if (*is_empty_row == true) {
+ return Status::OK();
+ }
+ _name_map.clear();
+ rapidjson::Value* objectValue = nullptr;
+ if (_json_doc->IsArray()) {
+ _total_rows = _json_doc->Size();
+ if (_total_rows == 0) {
+ // may be passing an empty json, such as "[]"
+ RETURN_IF_ERROR(_append_error_msg(*_json_doc, "Empty json
line", "", nullptr));
+
+ // TODO(ftw): check _reader_eof??
+ if (_reader_eof) {
+ *is_empty_row = true;
+ return Status::OK();
+ }
+ continue;
+ }
+ objectValue = &(*_json_doc)[0];
+ } else {
+ _total_rows = 1; // only one row
+ objectValue = _json_doc;
+ }
+ _next_row = 0;
+ if (_fuzzy_parse) {
+ for (auto v : slot_descs) {
+ for (int i = 0; i < objectValue->MemberCount(); ++i) {
+ auto it = objectValue->MemberBegin() + i;
+ if (v->col_name() == it->name.GetString()) {
+ _name_map[v->col_name()] = i;
+ break;
+ }
+ }
+ }
+ }
+ }
+
+ if (_json_doc->IsArray()) { // handle
case 1
+ rapidjson::Value& objectValue = (*_json_doc)[_next_row]; // json
object
+ RETURN_IF_ERROR(_set_column_value(objectValue, columns,
slot_descs, &valid));
+ } else { // handle case 2
+ RETURN_IF_ERROR(_set_column_value(*_json_doc, columns, slot_descs,
&valid));
+ }
+ _next_row++;
+ if (!valid) {
+ if (*_scanner_eof) {
+ // When _scanner_eof is true and valid is false, it means that
we have encountered
+ // unqualified data and decided to stop the scan.
+ *is_empty_row = true;
+ // TODO(ftw): check *eof=true?
+ *eof = true;
+ return Status::OK();
+ }
+ continue;
+ }
+ *is_empty_row = false;
+ break; // get a valid row, then break
+ } while (_next_row <= _total_rows);
+ return Status::OK();
+}
+
+Status NewJsonReader::_vhandle_flat_array_complex_json(
+ std::vector<MutableColumnPtr>& columns, const
std::vector<SlotDescriptor*>& slot_descs,
+ bool* is_empty_row, bool* eof) {
+ do {
+ if (_next_row >= _total_rows) {
+ Status st = _parse_json(is_empty_row, eof);
+ if (st.is_data_quality_error()) {
+ continue; // continue to read next
+ }
+ RETURN_IF_ERROR(st);
+ if (*is_empty_row == true) {
+ if (st == Status::OK()) {
+ return Status::OK();
+ }
+ if (_total_rows == 0) {
+ continue;
+ }
+ }
+ }
+ rapidjson::Value& objectValue = (*_json_doc)[_next_row++];
+ bool valid = true;
+ RETURN_IF_ERROR(_write_columns_by_jsonpath(objectValue, slot_descs,
columns, &valid));
+ if (!valid) {
+ continue; // process next line
+ }
+ *is_empty_row = false;
+ break; // get a valid row, then break
+ } while (_next_row <= _total_rows);
+ return Status::OK();
+}
+
+Status
NewJsonReader::_vhandle_nested_complex_json(std::vector<MutableColumnPtr>&
columns,
+ const
std::vector<SlotDescriptor*>& slot_descs,
+ bool* is_empty_row, bool*
eof) {
+ while (true) {
+ Status st = _parse_json(is_empty_row, eof);
+ if (st.is_data_quality_error()) {
+ continue; // continue to read next
+ }
+ RETURN_IF_ERROR(st);
+ if (*is_empty_row == true) {
+ return Status::OK();
+ }
+ *is_empty_row = false;
+ break; // read a valid row
+ }
+ bool valid = true;
+ RETURN_IF_ERROR(_write_columns_by_jsonpath(*_json_doc, slot_descs,
columns, &valid));
+ if (!valid) {
+ // there is only one line in this case, so if it return false, just
set is_empty_row true
+ // so that the caller will continue reading next line.
+ *is_empty_row = true;
+ }
+ return Status::OK();
+}
+
+Status NewJsonReader::_parse_json(bool* is_empty_row, bool* eof) {
+ size_t size = 0;
+ RETURN_IF_ERROR(_parse_json_doc(&size, eof));
+
+ // read all data, then return
+ if (size == 0 || *eof) {
+ *is_empty_row = true;
+ return Status::OK();
+ }
+
+ if (!_parsed_jsonpaths.empty() && _strip_outer_array) {
+ _total_rows = _json_doc->Size();
+ _next_row = 0;
+
+ if (_total_rows == 0) {
+ // meet an empty json array.
+ *is_empty_row = true;
+ }
+ }
+ return Status::OK();
+}
+
+// read one json string from line reader or file reader and parse it to json
doc.
+// return Status::DataQualityError() if data has quality error.
+// return other error if encounter other problemes.
+// return Status::OK() if parse succeed or reach EOF.
+Status NewJsonReader::_parse_json_doc(size_t* size, bool* eof) {
+ // read a whole message
+ SCOPED_TIMER(_file_read_timer);
+ const uint8_t* json_str = nullptr;
+ std::unique_ptr<uint8_t[]> json_str_ptr;
+ if (_line_reader != nullptr) {
+ RETURN_IF_ERROR(_line_reader->read_line(&json_str, size, eof));
+ } else {
+ int64_t length = 0;
+ RETURN_IF_ERROR(_real_file_reader->read_one_message(&json_str_ptr,
&length));
+ json_str = json_str_ptr.get();
+ *size = length;
+ if (length == 0) {
+ *eof = true;
+ }
+ }
+
+ _bytes_read_counter += *size;
+ if (*eof) {
+ return Status::OK();
+ }
+
+ // clear memory here.
+ _value_allocator.Clear();
+ _parse_allocator.Clear();
+ bool has_parse_error = false;
+ // parse jsondata to JsonDoc
+
+ // As the issue: https://github.com/Tencent/rapidjson/issues/1458
+ // Now, rapidjson only support uint64_t, So lagreint load cause bug. We
use kParseNumbersAsStringsFlag.
+ if (_num_as_string) {
+ has_parse_error =
+ _origin_json_doc
+
.Parse<rapidjson::kParseNumbersAsStringsFlag>((char*)json_str, *size)
+ .HasParseError();
+ } else {
+ has_parse_error = _origin_json_doc.Parse((char*)json_str,
*size).HasParseError();
+ }
+
+ if (has_parse_error) {
+ fmt::memory_buffer error_msg;
+ fmt::format_to(error_msg, "Parse json data for JsonDoc failed. code:
{}, error info: {}",
+ _origin_json_doc.GetParseError(),
+
rapidjson::GetParseError_En(_origin_json_doc.GetParseError()));
+ RETURN_IF_ERROR(_state->append_error_msg_to_file(
+ [&]() -> std::string { return std::string((char*)json_str,
*size); },
+ [&]() -> std::string { return fmt::to_string(error_msg); },
_scanner_eof));
+ _counter->num_rows_filtered++;
+ if (*_scanner_eof) {
+ // Case A: if _scanner_eof is set to true in
"append_error_msg_to_file", which means
+ // we meet enough invalid rows and the scanner should be stopped.
+ // So we set eof to true and return OK, the caller will stop the
process as we meet the end of file.
+ *eof = true;
+ return Status::OK();
+ }
+ return Status::DataQualityError(fmt::to_string(error_msg));
+ }
+
+ // set json root
+ if (_parsed_json_root.size() != 0) {
+ _json_doc = JsonFunctions::get_json_object_from_parsed_json(
+ _parsed_json_root, &_origin_json_doc,
_origin_json_doc.GetAllocator());
+ if (_json_doc == nullptr) {
+ fmt::memory_buffer error_msg;
+ fmt::format_to(error_msg, "{}", "JSON Root not found.");
+ RETURN_IF_ERROR(_state->append_error_msg_to_file(
+ [&]() -> std::string { return
_print_json_value(_origin_json_doc); },
+ [&]() -> std::string { return fmt::to_string(error_msg);
}, _scanner_eof));
+ _counter->num_rows_filtered++;
+ if (*_scanner_eof) {
+ // Same as Case A
+ *eof = true;
+ return Status::OK();
+ }
+ return Status::DataQualityError(fmt::to_string(error_msg));
+ }
+ } else {
+ _json_doc = &_origin_json_doc;
+ }
+
+ if (_json_doc->IsArray() && !_strip_outer_array) {
+ fmt::memory_buffer error_msg;
+ fmt::format_to(error_msg, "{}",
+ "JSON data is array-object, `strip_outer_array` must be
TRUE.");
+ RETURN_IF_ERROR(_state->append_error_msg_to_file(
+ [&]() -> std::string { return
_print_json_value(_origin_json_doc); },
+ [&]() -> std::string { return fmt::to_string(error_msg); },
_scanner_eof));
+ _counter->num_rows_filtered++;
+ if (*_scanner_eof) {
+ // Same as Case A
+ *eof = true;
+ return Status::OK();
+ }
+ return Status::DataQualityError(fmt::to_string(error_msg));
+ }
+
+ if (!_json_doc->IsArray() && _strip_outer_array) {
+ fmt::memory_buffer error_msg;
+ fmt::format_to(error_msg, "{}",
+ "JSON data is not an array-object, `strip_outer_array`
must be FALSE.");
+ RETURN_IF_ERROR(_state->append_error_msg_to_file(
+ [&]() -> std::string { return
_print_json_value(_origin_json_doc); },
+ [&]() -> std::string { return fmt::to_string(error_msg); },
_scanner_eof));
+ _counter->num_rows_filtered++;
+ if (*_scanner_eof) {
+ // Same as Case A
+ *eof = true;
+ return Status::OK();
+ }
+ return Status::DataQualityError(fmt::to_string(error_msg));
+ }
+
+ return Status::OK();
+}
+
+// for simple format json
+// set valid to true and return OK if succeed.
+// set valid to false and return OK if we met an invalid row.
+// return other status if encounter other problmes.
+Status NewJsonReader::_set_column_value(rapidjson::Value& objectValue,
+ std::vector<MutableColumnPtr>& columns,
+ const std::vector<SlotDescriptor*>&
slot_descs,
+ bool* valid) {
+ if (!objectValue.IsObject()) {
+ // Here we expect the incoming `objectValue` to be a Json Object, such
as {"key" : "value"},
+ // not other type of Json format.
+ RETURN_IF_ERROR(_append_error_msg(objectValue, "Expect json object
value", "", valid));
+ return Status::OK();
+ }
+
+ int ctx_idx = 0;
+ bool has_valid_value = false;
+ size_t cur_row_count = columns[0]->size();
+ for (auto slot_desc : slot_descs) {
+ if (!slot_desc->is_materialized()) {
+ continue;
+ }
+
+ int dest_index = ctx_idx++;
+ auto* column_ptr = columns[dest_index].get();
+ rapidjson::Value::ConstMemberIterator it = objectValue.MemberEnd();
+
+ if (_fuzzy_parse) {
+ auto idx_it = _name_map.find(slot_desc->col_name());
+ if (idx_it != _name_map.end() && idx_it->second <
objectValue.MemberCount()) {
+ it = objectValue.MemberBegin() + idx_it->second;
+ }
+ } else {
+ it = objectValue.FindMember(
+ rapidjson::Value(slot_desc->col_name().c_str(),
slot_desc->col_name().size()));
+ }
+
+ if (it != objectValue.MemberEnd()) {
+ const rapidjson::Value& value = it->value;
+ RETURN_IF_ERROR(_write_data_to_column(&value, slot_desc,
column_ptr, valid));
+ if (!(*valid)) {
+ return Status::OK();
+ }
+ has_valid_value = true;
+ } else { // not found
+ // When the entire row has no valid value, this row should be
filtered,
+ // so the default value cannot be directly inserted here
+ if (!slot_desc->is_nullable()) {
+ RETURN_IF_ERROR(_append_error_msg(
+ objectValue,
+ "The column `{}` is not nullable, but it's not found
in jsondata.",
+ slot_desc->col_name(), valid));
+ break;
+ }
+ }
+ }
+ if (!has_valid_value) {
+ RETURN_IF_ERROR(_append_error_msg(objectValue, "All fields is null,
this is a invalid row.",
+ "", valid));
+ return Status::OK();
+ }
+ ctx_idx = 0;
+ int nullcount = 0;
+ // fill missing slot
+ for (auto slot_desc : slot_descs) {
+ if (!slot_desc->is_materialized()) {
+ continue;
+ }
+ int dest_index = ctx_idx++;
+ auto* column_ptr = columns[dest_index].get();
+ if (column_ptr->size() < cur_row_count + 1) {
+ DCHECK(column_ptr->size() == cur_row_count);
+ column_ptr->assume_mutable()->insert_default();
+ ++nullcount;
+ }
+ DCHECK(column_ptr->size() == cur_row_count + 1);
+ }
+ // There is at least one valid value here
+ DCHECK(nullcount < columns.size());
+ *valid = true;
+ return Status::OK();
+}
+
+Status
NewJsonReader::_write_data_to_column(rapidjson::Value::ConstValueIterator value,
+ SlotDescriptor* slot_desc,
+ vectorized::IColumn* column_ptr,
bool* valid) {
+ const char* str_value = nullptr;
+ char tmp_buf[128] = {0};
+ int32_t wbytes = 0;
+ std::string json_str;
+
+ vectorized::ColumnNullable* nullable_column = nullptr;
+ if (slot_desc->is_nullable()) {
+ nullable_column =
reinterpret_cast<vectorized::ColumnNullable*>(column_ptr);
+ // kNullType will put 1 into the Null map, so there is no need to push
0 for kNullType.
+ if (value->GetType() != rapidjson::Type::kNullType) {
+ nullable_column->get_null_map_data().push_back(0);
+ } else {
+ nullable_column->insert_default();
+ }
+ column_ptr = &nullable_column->get_nested_column();
+ }
+
+ switch (value->GetType()) {
+ case rapidjson::Type::kStringType:
+ str_value = value->GetString();
+ wbytes = strlen(str_value);
+ break;
+ case rapidjson::Type::kNumberType:
+ if (value->IsUint()) {
+ wbytes = sprintf(tmp_buf, "%u", value->GetUint());
+ } else if (value->IsInt()) {
+ wbytes = sprintf(tmp_buf, "%d", value->GetInt());
+ } else if (value->IsUint64()) {
+ wbytes = sprintf(tmp_buf, "%" PRIu64, value->GetUint64());
+ } else if (value->IsInt64()) {
+ wbytes = sprintf(tmp_buf, "%" PRId64, value->GetInt64());
+ } else {
+ wbytes = sprintf(tmp_buf, "%f", value->GetDouble());
+ }
+ str_value = tmp_buf;
+ break;
+ case rapidjson::Type::kFalseType:
+ wbytes = 1;
+ str_value = (char*)"0";
+ break;
+ case rapidjson::Type::kTrueType:
+ wbytes = 1;
+ str_value = (char*)"1";
+ break;
+ case rapidjson::Type::kNullType:
+ if (!slot_desc->is_nullable()) {
+ RETURN_IF_ERROR(_append_error_msg(
+ *value, "Json value is null, but the column `{}` is not
nullable.",
+ slot_desc->col_name(), valid));
+ return Status::OK();
+ }
+ // return immediately to prevent from repeatedly insert_data
+ *valid = true;
+ return Status::OK();
+ default:
+ // for other type like array or object. we convert it to string to save
+ json_str = NewJsonReader::_print_json_value(*value);
+ wbytes = json_str.size();
+ str_value = json_str.c_str();
+ break;
+ }
+
+ // TODO: if the vexpr can support another 'slot_desc type' than
'TYPE_VARCHAR',
+ // we need use a function to support these types to insert data in columns.
+ DCHECK(slot_desc->type().type == TYPE_VARCHAR);
+ assert_cast<ColumnString*>(column_ptr)->insert_data(str_value, wbytes);
+
+ *valid = true;
+ return Status::OK();
+}
+
+Status NewJsonReader::_write_columns_by_jsonpath(rapidjson::Value& objectValue,
+ const
std::vector<SlotDescriptor*>& slot_descs,
+
std::vector<MutableColumnPtr>& columns,
+ bool* valid) {
+ int ctx_idx = 0;
+ bool has_valid_value = false;
+ size_t cur_row_count = columns[0]->size();
+ for (auto slot_desc : slot_descs) {
+ if (!slot_desc->is_materialized()) {
+ continue;
+ }
+ int i = ctx_idx++;
+ auto* column_ptr = columns[i].get();
+ rapidjson::Value* json_values = nullptr;
+ bool wrap_explicitly = false;
+ if (LIKELY(i < _parsed_jsonpaths.size())) {
+ json_values = JsonFunctions::get_json_array_from_parsed_json(
+ _parsed_jsonpaths[i], &objectValue,
_origin_json_doc.GetAllocator(),
+ &wrap_explicitly);
+ }
+
+ if (json_values == nullptr) {
+ // not match in jsondata.
+ if (!slot_descs[i]->is_nullable()) {
+ RETURN_IF_ERROR(_append_error_msg(
+ objectValue,
+ "The column `{}` is not nullable, but it's not found
in jsondata.",
+ slot_descs[i]->col_name(), valid));
+ return Status::OK();
+ }
+ } else {
+ CHECK(json_values->IsArray());
+ if (json_values->Size() == 1 && wrap_explicitly) {
+ // NOTICE1: JsonFunctions::get_json_array_from_parsed_json()
will wrap the single json object with an array.
+ // so here we unwrap the array to get the real element.
+ // if json_values' size > 1, it means we just match an array,
not a wrapped one, so no need to unwrap.
+ json_values = &((*json_values)[0]);
+ }
+ RETURN_IF_ERROR(_write_data_to_column(json_values, slot_descs[i],
column_ptr, valid));
+ if (!(*valid)) {
+ return Status::OK();
+ }
+ has_valid_value = true;
+ }
+ }
+ if (!has_valid_value) {
+ RETURN_IF_ERROR(_append_error_msg(
+ objectValue, "All fields is null or not matched, this is a
invalid row.", "",
+ valid));
+ return Status::OK();
+ }
+ ctx_idx = 0;
+ for (auto slot_desc : slot_descs) {
+ if (!slot_desc->is_materialized()) {
+ continue;
+ }
+ int dest_index = ctx_idx++;
+ auto* column_ptr = columns[dest_index].get();
+ if (column_ptr->size() < cur_row_count + 1) {
+ DCHECK(column_ptr->size() == cur_row_count);
+ column_ptr->assume_mutable()->insert_default();
+ }
+ DCHECK(column_ptr->size() == cur_row_count + 1);
+ }
+ return Status::OK();
+}
+
+Status NewJsonReader::_append_error_msg(const rapidjson::Value& objectValue,
std::string error_msg,
+ std::string col_name, bool* valid) {
+ std::string err_msg;
+ if (!col_name.empty()) {
+ fmt::memory_buffer error_buf;
+ fmt::format_to(error_buf, error_msg, col_name);
+ err_msg = fmt::to_string(error_buf);
+ } else {
+ err_msg = error_msg;
+ }
+
+ RETURN_IF_ERROR(_state->append_error_msg_to_file(
+ [&]() -> std::string { return
NewJsonReader::_print_json_value(objectValue); },
+ [&]() -> std::string { return err_msg; }, _scanner_eof));
+
+ // TODO(ftw): check here?
+ if (*_scanner_eof == true) {
+ _reader_eof = true;
+ }
+
+ _counter->num_rows_filtered++;
+ if (valid != nullptr) {
+ // current row is invalid
+ *valid = false;
+ }
+ return Status::OK();
+}
+
+std::string NewJsonReader::_print_json_value(const rapidjson::Value& value) {
+ rapidjson::StringBuffer buffer;
+ buffer.Clear();
+ rapidjson::Writer<rapidjson::StringBuffer> writer(buffer);
+ value.Accept(writer);
+ return std::string(buffer.GetString());
+}
+
+} // namespace doris::vectorized
diff --git a/be/src/vec/exec/format/json/new_json_reader.h
b/be/src/vec/exec/format/json/new_json_reader.h
new file mode 100644
index 0000000000..aee11535fb
--- /dev/null
+++ b/be/src/vec/exec/format/json/new_json_reader.h
@@ -0,0 +1,150 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements. See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership. The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License. You may obtain a copy of the License at
+//
+// http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied. See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+#pragma once
+
+#include <rapidjson/document.h>
+#include <rapidjson/error/en.h>
+#include <rapidjson/stringbuffer.h>
+#include <rapidjson/writer.h>
+
+#include "vec/exec/format/generic_reader.h"
+namespace doris {
+
+class FileReader;
+struct JsonPath;
+class LineReader;
+class SlotDescriptor;
+
+namespace vectorized {
+
+struct ScannerCounter;
+
+class NewJsonReader : public GenericReader {
+public:
+ NewJsonReader(RuntimeState* state, RuntimeProfile* profile,
ScannerCounter* counter,
+ const TFileScanRangeParams& params, const TFileRangeDesc&
range,
+ const std::vector<SlotDescriptor*>& file_slot_descs, bool*
scanner_eof);
+ ~NewJsonReader() override = default;
+
+ Status init_reader();
+ Status get_next_block(Block* block, size_t* read_rows, bool* eof) override;
+ Status get_columns(std::unordered_map<std::string, TypeDescriptor>*
name_to_type,
+ std::unordered_set<std::string>* missing_cols) override;
+
+private:
+ Status _get_range_params();
+ Status _open_file_reader();
+ Status _open_line_reader();
+ Status _parse_jsonpath_and_json_root();
+
+ Status _read_json_column(std::vector<MutableColumnPtr>& columns,
+ const std::vector<SlotDescriptor*>& slot_descs,
bool* is_empty_row,
+ bool* eof);
+
+ Status _vhandle_simple_json(std::vector<MutableColumnPtr>& columns,
+ const std::vector<SlotDescriptor*>&
slot_descs, bool* is_empty_row,
+ bool* eof);
+
+ Status _vhandle_flat_array_complex_json(std::vector<MutableColumnPtr>&
columns,
+ const
std::vector<SlotDescriptor*>& slot_descs,
+ bool* is_empty_row, bool* eof);
+
+ Status _vhandle_nested_complex_json(std::vector<MutableColumnPtr>& columns,
+ const std::vector<SlotDescriptor*>&
slot_descs,
+ bool* is_empty_row, bool* eof);
+
+ Status _parse_json(bool* is_empty_row, bool* eof);
+ Status _parse_json_doc(size_t* size, bool* eof);
+
+ Status _set_column_value(rapidjson::Value& objectValue,
std::vector<MutableColumnPtr>& columns,
+ const std::vector<SlotDescriptor*>& slot_descs,
bool* valid);
+
+ Status _write_data_to_column(rapidjson::Value::ConstValueIterator value,
+ SlotDescriptor* slot_desc,
vectorized::IColumn* column_ptr,
+ bool* valid);
+
+ Status _write_columns_by_jsonpath(rapidjson::Value& objectValue,
+ const std::vector<SlotDescriptor*>&
slot_descs,
+ std::vector<MutableColumnPtr>& columns,
bool* valid);
+
+ Status _append_error_msg(const rapidjson::Value& objectValue, std::string
error_msg,
+ std::string col_name, bool* valid);
+
+ std::string _print_json_value(const rapidjson::Value& value);
+
+private:
+ Status (NewJsonReader::*_vhandle_json_callback)(
+ std::vector<vectorized::MutableColumnPtr>& columns,
+ const std::vector<SlotDescriptor*>& slot_descs, bool*
is_empty_row, bool* eof);
+ RuntimeState* _state;
+ RuntimeProfile* _profile;
+ ScannerCounter* _counter;
+ const TFileScanRangeParams& _params;
+ const TFileRangeDesc& _range;
+ const std::vector<SlotDescriptor*>& _file_slot_descs;
+
+ // _file_reader_s is for stream load pipe reader,
+ // and _file_reader is for other file reader.
+ // TODO: refactor this to use only shared_ptr or unique_ptr
+ std::unique_ptr<FileReader> _file_reader;
+ std::shared_ptr<FileReader> _file_reader_s;
+ FileReader* _real_file_reader;
+ std::unique_ptr<LineReader> _line_reader;
+ bool _reader_eof;
+
+ TFileFormatType::type _file_format_type;
+
+ // When we fetch range doesn't start from 0 will always skip the first line
+ bool _skip_first_line;
+
+ std::string _line_delimiter;
+ int _line_delimiter_length;
+
+ int _next_row;
+ int _total_rows;
+
+ std::string _jsonpaths;
+ std::string _json_root;
+ bool _read_json_by_line;
+ bool _strip_outer_array;
+ bool _num_as_string;
+ bool _fuzzy_parse;
+
+ std::vector<std::vector<JsonPath>> _parsed_jsonpaths;
+ std::vector<JsonPath> _parsed_json_root;
+
+ char _value_buffer[4 * 1024 * 1024]; // 4MB
+ char _parse_buffer[512 * 1024]; // 512KB
+
+ typedef rapidjson::GenericDocument<rapidjson::UTF8<>,
rapidjson::MemoryPoolAllocator<>,
+ rapidjson::MemoryPoolAllocator<>>
+ Document;
+ rapidjson::MemoryPoolAllocator<> _value_allocator;
+ rapidjson::MemoryPoolAllocator<> _parse_allocator;
+ Document _origin_json_doc; // origin json document object from parsed
json string
+ rapidjson::Value* _json_doc; // _json_doc equals _final_json_doc iff not
set `json_root`
+ std::unordered_map<std::string, int> _name_map;
+
+ bool* _scanner_eof;
+
+ RuntimeProfile::Counter* _bytes_read_counter;
+ RuntimeProfile::Counter* _read_timer;
+ RuntimeProfile::Counter* _file_read_timer;
+};
+} // namespace vectorized
+} // namespace doris
diff --git a/be/src/vec/exec/scan/vfile_scanner.cpp
b/be/src/vec/exec/scan/vfile_scanner.cpp
index 3503e8c468..9df3722b95 100644
--- a/be/src/vec/exec/scan/vfile_scanner.cpp
+++ b/be/src/vec/exec/scan/vfile_scanner.cpp
@@ -31,6 +31,7 @@
#include "runtime/raw_value.h"
#include "runtime/runtime_state.h"
#include "vec/exec/format/csv/csv_reader.h"
+#include "vec/exec/format/json/new_json_reader.h"
#include "vec/exec/format/orc/vorc_reader.h"
#include "vec/exec/format/parquet/vparquet_reader.h"
#include "vec/exec/scan/new_file_scan_node.h"
@@ -494,6 +495,12 @@ Status VFileScanner::_get_next_reader() {
init_status =
((CsvReader*)(_cur_reader.get()))->init_reader(_is_load);
break;
}
+ case TFileFormatType::FORMAT_JSON: {
+ _cur_reader.reset(new NewJsonReader(_state, _profile, &_counter,
_params, range,
+ _file_slot_descs,
&_scanner_eof));
+ init_status = ((NewJsonReader*)(_cur_reader.get()))->init_reader();
+ break;
+ }
default:
return Status::InternalError("Not supported file format: {}",
_params.format_type);
}
diff --git a/docker/thirdparties/docker-compose/hive/scripts/hive-metastore.sh
b/docker/thirdparties/docker-compose/hive/scripts/hive-metastore.sh
index 1fe80f6e84..0d07bf5e36 100755
--- a/docker/thirdparties/docker-compose/hive/scripts/hive-metastore.sh
+++ b/docker/thirdparties/docker-compose/hive/scripts/hive-metastore.sh
@@ -27,6 +27,8 @@ echo "hadoop fs -mkdir /user/doris/"
hadoop fs -mkdir -p /user/doris/
echo "hadoop fs -put /mnt/scripts/tpch1.db /user/doris/"
hadoop fs -put /mnt/scripts/tpch1.db /user/doris/
+echo "hadoop fs -put /mnt/scripts/json_format_test.db /user/doris/"
+hadoop fs -put /mnt/scripts/json_format_test /user/doris/
echo "hive -f /mnt/scripts/create.hql"
hive -f /mnt/scripts/create.hql
diff --git
a/docker/thirdparties/docker-compose/hive/scripts/json_format_test/multi_line_json.json
b/docker/thirdparties/docker-compose/hive/scripts/json_format_test/multi_line_json.json
new file mode 100644
index 0000000000..b5079899fc
--- /dev/null
+++
b/docker/thirdparties/docker-compose/hive/scripts/json_format_test/multi_line_json.json
@@ -0,0 +1,2 @@
+[{"id": 1, "city": "beijing", "code": 1454547},{"id": 2, "city": "shanghai",
"code": 1244264}, {"id": 3, "city": "guangzhou", "code": 528369},{"id": 4,
"city": "shenzhen", "code": 594201},{"id": 5, "city": "hangzhou", "code":
594201}]
+[{"id": 6, "city": "nanjing", "code": 2345672},{"id": 7, "city": "wuhan",
"code": 2345673}, {"id": 8, "city": "chengdu", "code": 2345674},{"id": 9,
"city": "xian", "code": 2345675},{"id": 10, "city": "hefei", "code": 2345676}]
\ No newline at end of file
diff --git
a/docker/thirdparties/docker-compose/hive/scripts/json_format_test/multi_line_json_lack_column.json
b/docker/thirdparties/docker-compose/hive/scripts/json_format_test/multi_line_json_lack_column.json
new file mode 100644
index 0000000000..338eaa726f
--- /dev/null
+++
b/docker/thirdparties/docker-compose/hive/scripts/json_format_test/multi_line_json_lack_column.json
@@ -0,0 +1,2 @@
+[{"id": 1, "code": 1454547},{"city": "shanghai", "id": 2, "code": 1244264},
{"id": 3, "code": 528369},{"id": 4, "code": 594202},{"city": "hangzhou", "id":
5, "code": 594201}]
+[{"city": "nanjing", "id": 6, "code": 2345672},{"id": 7, "code": 2345673},
{"id": 8, "code": 2345674},{"city": "xian", "id": 9, "code": 2345675},{"id":
10, "code": 2345676}]
diff --git
a/docker/thirdparties/docker-compose/hive/scripts/json_format_test/multi_line_json_unorder.json
b/docker/thirdparties/docker-compose/hive/scripts/json_format_test/multi_line_json_unorder.json
new file mode 100644
index 0000000000..539d66df8d
--- /dev/null
+++
b/docker/thirdparties/docker-compose/hive/scripts/json_format_test/multi_line_json_unorder.json
@@ -0,0 +1,2 @@
+[{"city": "beijing", "id": 1, "code": 1454547},{"city": "shanghai", "id": 2,
"code": 1244264}, {"city": "guangzhou", "id": 3, "code": 528369},{"city":
"shenzhen", "id": 4, "code": 594202},{"city": "hangzhou", "id": 5, "code":
594201}]
+[{"city": "nanjing", "id": 6, "code": 2345672},{"city": "wuhan", "id": 7,
"code": 2345673}, {"city": "chengdu", "id": 8, "code": 2345674},{"city":
"xian", "id": 9, "code": 2345675},{"city": "hefei", "id": 10, "code": 2345676}]
diff --git
a/docker/thirdparties/docker-compose/hive/scripts/json_format_test/nest_json.json
b/docker/thirdparties/docker-compose/hive/scripts/json_format_test/nest_json.json
new file mode 100644
index 0000000000..b28159b2a0
--- /dev/null
+++
b/docker/thirdparties/docker-compose/hive/scripts/json_format_test/nest_json.json
@@ -0,0 +1,5 @@
+{"no": 1, "item": {"id": 1, "city": "beijing", "code": 2345671}}
+{"no": 2, "item": {"id": 2, "city": "shanghai", "code": 2345672}}
+{"no": 3, "item": {"id": 3, "city": "hangzhou", "code": 2345673}}
+{"no": 4, "item": {"id": 4, "city": "shenzhen", "code": 2345674}}
+{"no": 5, "item": {"id": 5, "city": "guangzhou", "code": 2345675}}
diff --git
a/docker/thirdparties/docker-compose/hive/scripts/json_format_test/simple_object_json.json
b/docker/thirdparties/docker-compose/hive/scripts/json_format_test/simple_object_json.json
new file mode 100644
index 0000000000..a7912466fd
--- /dev/null
+++
b/docker/thirdparties/docker-compose/hive/scripts/json_format_test/simple_object_json.json
@@ -0,0 +1,12 @@
+{"id": 1, "city": "beijing", "code": 2345671}
+{"id": 2, "city": "shanghai", "code": 2345672}
+{"id": 3, "city": "guangzhou", "code": 2345673}
+{"id": 4, "city": "shenzhen", "code": 2345674}
+{"id": 5, "city": "hangzhou", "code": 2345675}
+{"id": 6, "city": "nanjing", "code": 2345676}
+{"id": 7, "city": "wuhan", "code": 2345677}
+{"id": 8, "city": "chengdu", "code": 2345678}
+{"id": 9, "city": "xian", "code": 2345679}
+{"id": 10, "city": "hefei", "code": 23456710}
+{"id": 10, "city": null, "code": 23456711}
+{"id": 10, "city": "hefei", "code": null}
diff --git
a/fe/fe-core/src/main/java/org/apache/doris/analysis/DataDescription.java
b/fe/fe-core/src/main/java/org/apache/doris/analysis/DataDescription.java
index 32aed58211..5854be7970 100644
--- a/fe/fe-core/src/main/java/org/apache/doris/analysis/DataDescription.java
+++ b/fe/fe-core/src/main/java/org/apache/doris/analysis/DataDescription.java
@@ -121,7 +121,10 @@ public class DataDescription {
private String jsonPaths = "";
private String jsonRoot = "";
private boolean fuzzyParse = false;
- private boolean readJsonByLine = false;
+ // the default must be true.
+ // So that for broker load, this is always true,
+ // and for stream load, it will set on demand.
+ private boolean readJsonByLine = true;
private boolean numAsString = false;
private String sequenceCol;
@@ -616,6 +619,10 @@ public class DataDescription {
return !Strings.isNullOrEmpty(srcTableName);
}
+ public boolean isReadJsonByLine() {
+ return readJsonByLine;
+ }
+
/*
* Analyze parsedExprMap and columnToHadoopFunction from columns, columns
from path and columnMappingList
* Example:
diff --git a/fe/fe-core/src/main/java/org/apache/doris/catalog/Env.java
b/fe/fe-core/src/main/java/org/apache/doris/catalog/Env.java
index f62f81ea54..aa341ad283 100755
--- a/fe/fe-core/src/main/java/org/apache/doris/catalog/Env.java
+++ b/fe/fe-core/src/main/java/org/apache/doris/catalog/Env.java
@@ -4031,7 +4031,7 @@ public class Env {
}
}
- public void renameColumn(Database db, OlapTable table, String colName,
+ private void renameColumn(Database db, OlapTable table, String colName,
String newColName, boolean isReplay) throws DdlException {
if (table.getState() != OlapTableState.NORMAL) {
throw new DdlException("Table[" + table.getName() + "] is under "
+ table.getState());
diff --git
a/fe/fe-core/src/main/java/org/apache/doris/load/BrokerFileGroup.java
b/fe/fe-core/src/main/java/org/apache/doris/load/BrokerFileGroup.java
index 2762ca89d3..a89f48e52b 100644
--- a/fe/fe-core/src/main/java/org/apache/doris/load/BrokerFileGroup.java
+++ b/fe/fe-core/src/main/java/org/apache/doris/load/BrokerFileGroup.java
@@ -256,9 +256,9 @@ public class BrokerFileGroup implements Writable {
jsonPaths = dataDescription.getJsonPaths();
jsonRoot = dataDescription.getJsonRoot();
fuzzyParse = dataDescription.isFuzzyParse();
- // For broker load, we only support reading json format data line
by line,
- // so we set readJsonByLine to true here.
- readJsonByLine = true;
+ // ATTN: for broker load, we only support reading json format data
line by line,
+ // so if this is set to false, it must be stream load.
+ readJsonByLine = dataDescription.isReadJsonByLine();
numAsString = dataDescription.isNumAsString();
}
}
diff --git
a/fe/fe-core/src/main/java/org/apache/doris/planner/StreamLoadPlanner.java
b/fe/fe-core/src/main/java/org/apache/doris/planner/StreamLoadPlanner.java
index 5d68f6edef..c47699142b 100644
--- a/fe/fe-core/src/main/java/org/apache/doris/planner/StreamLoadPlanner.java
+++ b/fe/fe-core/src/main/java/org/apache/doris/planner/StreamLoadPlanner.java
@@ -42,7 +42,6 @@ import org.apache.doris.common.DdlException;
import org.apache.doris.common.ErrorCode;
import org.apache.doris.common.ErrorReport;
import org.apache.doris.common.UserException;
-import org.apache.doris.common.util.Util;
import org.apache.doris.load.BrokerFileGroup;
import org.apache.doris.load.LoadErrorHub;
import org.apache.doris.load.loadv2.LoadTask;
@@ -174,10 +173,6 @@ public class StreamLoadPlanner {
// create scan node
if (Config.enable_new_load_scan_node && Config.enable_vectorized_load)
{
ExternalFileScanNode fileScanNode = new ExternalFileScanNode(new
PlanNodeId(0), scanTupleDesc);
- if (!Util.isCsvFormat(taskInfo.getFormatType())) {
- throw new AnalysisException(
- "New stream load scan load not support non-csv type
now: " + taskInfo.getFormatType());
- }
// 1. create file group
DataDescription dataDescription = new
DataDescription(destTable.getName(), taskInfo);
dataDescription.analyzeWithoutCheckPriv(db.getFullName());
diff --git a/regression-test/conf/regression-conf.groovy
b/regression-test/conf/regression-conf.groovy
index 00872d275c..6791f7cacd 100644
--- a/regression-test/conf/regression-conf.groovy
+++ b/regression-test/conf/regression-conf.groovy
@@ -78,3 +78,4 @@ pg_14_port=5442
// See `docker/thirdparties/start-thirdparties-docker.sh`
enableHiveTest=false
hms_port=9183
+hdfs_port=8120
diff --git a/regression-test/data/load_p0/broker_load/test_array_load.out
b/regression-test/data/load_p0/broker_load/test_array_load.out
index baff23e6f2..6c568b6ef1 100644
--- a/regression-test/data/load_p0/broker_load/test_array_load.out
+++ b/regression-test/data/load_p0/broker_load/test_array_load.out
@@ -39,3 +39,43 @@
5 \N \N \N \N \N \N \N \N \N
\N
100 [1, 2, 3] [32767, 32768, 32769] [65534, 65535, 65536] ['a',
'b', 'c'] ['hello', 'world'] [2022-07-13] [2022-07-13 12:30:00]
[0.33, 0.67] [3.1415926, 0.878787878] [4.000000, 5.500000, 6.670000]
+-- !select --
+1 [1, 2, 3, 4, 5] [32767, 32768, 32769] [65534, 65535, 65536] ['a',
'b', 'c', 'd', 'e'] ['hello', 'world'] [1991-01-01] [1991-01-01
00:00:00] [0.33, 0.67] [3.1415926, 0.878787878] [1, 1.2, 1.3]
+2 [6, 7, 8, 9, 10] [32767, 32768, 32769] [65534, 65535, 65536]
['a', 'b', 'c', 'd', 'e'] ['hello', 'world'] [1991-01-01]
[1991-01-01 00:00:00] [0.33, 0.67] [3.1415926, 0.878787878] [1,
1.2, 1.3]
+3 [] [32767, 32768, 32769] [NULL, NULL, 65536] ['a', 'b', 'c',
'd', 'e'] ['happy', 'birthday'] [1991-01-01] [1991-01-01 00:00:00]
[0.33, 0.67] [3.1415926, 0.878787878] [1, 1.2, 1.3]
+4 [NULL] [32767, 32768, 32769] [NULL, NULL, 65536] ['a', 'b', 'c',
'd', 'e'] ['hello', 'world'] [1991-01-01] [1991-01-01 00:00:00]
[0.33, 0.67] [3.1415926, 0.878787878] [1, 1.2, 1.3]
+5 [NULL, NULL] [32767, 32768, NULL] [65534, NULL, 65536] ['a',
'b', 'c', 'd', 'e'] ['hello', 'world'] [1991-01-01] [1991-01-01
00:00:00] [0.33, 0.67] [3.1415926, 0.878787878] [1, 1.2, 1.3]
+100 [1, 2, 3] [32767, 32768, 32769] [65534, 65535, 65536] ['a',
'b', 'c'] ['hello', 'world'] [2022-07-13] [2022-07-13 12:30:00]
[0.33, 0.67] [3.1415926, 0.878787878] [4, 5.5, 6.67]
+
+-- !select --
+1 [1, 2, 3, 4, 5] [32767, 32768, 32769] [65534, 65535, 65536] ['a',
'b', 'c', 'd', 'e'] ['hello', 'world'] [1991-01-01] [1991-01-01
00:00:00] [0.33, 0.67] [3.1415926, 0.878787878] [1.000000,
1.200000, 1.300000]
+2 [6, 7, 8, 9, 10] [32767, 32768, 32769] [65534, 65535, 65536]
['a', 'b', 'c', 'd', 'e'] ['hello', 'world'] [1991-01-01]
[1991-01-01 00:00:00] [0.33, 0.67] [3.1415926, 0.878787878]
[1.000000, 1.200000, 1.300000]
+3 [] [32767, 32768, 32769] [NULL, NULL, 65536] ['a', 'b', 'c',
'd', 'e'] ['happy', 'birthday'] [1991-01-01] [1991-01-01 00:00:00]
[0.33, 0.67] [3.1415926, 0.878787878] [1.000000, 1.200000, 1.300000]
+4 [NULL] [32767, 32768, 32769] [NULL, NULL, 65536] ['a', 'b', 'c',
'd', 'e'] ['hello', 'world'] [1991-01-01] [1991-01-01 00:00:00]
[0.33, 0.67] [3.1415926, 0.878787878] [1.000000, 1.200000, 1.300000]
+5 [NULL, NULL] [32767, 32768, NULL] [65534, NULL, 65536] ['a',
'b', 'c', 'd', 'e'] ['hello', 'world'] [1991-01-01] [1991-01-01
00:00:00] [0.33, 0.67] [3.1415926, 0.878787878] [1.000000,
1.200000, 1.300000]
+100 [1, 2, 3] [32767, 32768, 32769] [65534, 65535, 65536] ['a',
'b', 'c'] ['hello', 'world'] [2022-07-13] [2022-07-13 12:30:00]
[0.33, 0.67] [3.1415926, 0.878787878] [4.000000, 5.500000, 6.670000]
+
+-- !select --
+1 [1, 2, 3, 4, 5] [32767, 32768, 32769] [65534, 65535, 65536] ['a',
'b', 'c', 'd', 'e'] ['hello', 'world'] [1991-01-01] [1991-01-01
00:00:00] [0.33, 0.67] [3.1415926, 0.878787878] [1, 1.2, 1.3]
+2 [6, 7, 8, 9, 10] [32767, 32768, 32769] [65534, 65535, 65536]
['a', 'b', 'c', 'd', 'e'] ['hello', 'world'] [1991-01-01]
[1991-01-01 00:00:00] [0.33, 0.67] [3.1415926, 0.878787878] [1,
1.2, 1.3]
+3 [] [32767, 32768, 32769] [NULL, NULL, 65536] ['a', 'b', 'c',
'd', 'e'] ['happy', 'birthday'] [1991-01-01] [1991-01-01 00:00:00]
[0.33, 0.67] [3.1415926, 0.878787878] [1, 1.2, 1.3]
+4 [NULL] [32767, 32768, 32769] [NULL, NULL, 65536] ['a', 'b', 'c',
'd', 'e'] ['hello', 'world'] [1991-01-01] [1991-01-01 00:00:00]
[0.33, 0.67] [3.1415926, 0.878787878] [1, 1.2, 1.3]
+5 [NULL, NULL] [32767, 32768, NULL] [65534, NULL, 65536] ['a',
'b', 'c', 'd', 'e'] ['hello', 'world'] [1991-01-01] [1991-01-01
00:00:00] [0.33, 0.67] [3.1415926, 0.878787878] [1, 1.2, 1.3]
+100 [1, 2, 3] [32767, 32768, 32769] [65534, 65535, 65536] ['a',
'b', 'c'] ['hello', 'world'] [2022-07-13] [2022-07-13 12:30:00]
[0.33, 0.67] [3.1415926, 0.878787878] [4, 5.5, 6.67]
+
+-- !select --
+1 [1, 2, 3, 4, 5] [32767, 32768, 32769] [65534, 65535, 65536] ['a',
'b', 'c', 'd', 'e'] ['hello', 'world'] [1991-01-01] [1991-01-01
00:00:00] [0.33, 0.67] [3.1415926, 0.878787878] [1.000000,
1.200000, 1.300000]
+2 [6, 7, 8, 9, 10] [32767, 32768, 32769] [65534, 65535, 65536]
['a', 'b', 'c', 'd', 'e'] ['hello', 'world'] [1991-01-01]
[1991-01-01 00:00:00] [0.33, 0.67] [3.1415926, 0.878787878]
[1.000000, 1.200000, 1.300000]
+3 [] [32767, 32768, 32769] [NULL, NULL, 65536] ['a', 'b', 'c',
'd', 'e'] ['happy', 'birthday'] [1991-01-01] [1991-01-01 00:00:00]
[0.33, 0.67] [3.1415926, 0.878787878] [1.000000, 1.200000, 1.300000]
+4 [NULL] [32767, 32768, 32769] [NULL, NULL, 65536] ['a', 'b', 'c',
'd', 'e'] ['hello', 'world'] [1991-01-01] [1991-01-01 00:00:00]
[0.33, 0.67] [3.1415926, 0.878787878] [1.000000, 1.200000, 1.300000]
+5 [NULL, NULL] [32767, 32768, NULL] [65534, NULL, 65536] ['a',
'b', 'c', 'd', 'e'] ['hello', 'world'] [1991-01-01] [1991-01-01
00:00:00] [0.33, 0.67] [3.1415926, 0.878787878] [1.000000,
1.200000, 1.300000]
+100 [1, 2, 3] [32767, 32768, 32769] [65534, 65535, 65536] ['a',
'b', 'c'] ['hello', 'world'] [2022-07-13] [2022-07-13 12:30:00]
[0.33, 0.67] [3.1415926, 0.878787878] [4.000000, 5.500000, 6.670000]
+
+-- !select --
+1 [1, 2, 3, 4, 5] [32767, 32768, 32769] [65534, 65535, 65536] ['a',
'b', 'c', 'd', 'e'] ['hello', 'world'] [1991-01-01, 1992-02-02,
1993-03-03] [1991-01-01 00:00:00] [0.33, 0.67] [3.1415926, 0.878787878]
[1.000000, 1.200000, 1.300000]
+2 [1, 2, 3, 4, 5] [32767, 32768, 32769] [65534, 65535, 65536] ['a',
'b', 'c', 'd', 'e'] ['hello', 'world'] [1991-01-01, 1992-02-02,
1993-03-03] \N \N \N [1.000000, NULL, 1.300000]
+3 \N \N \N \N \N \N \N \N \N
\N
+4 \N \N \N \N \N \N \N \N \N
\N
+5 \N \N \N \N \N \N \N \N \N
\N
+100 [1, 2, 3] [32767, 32768, 32769] [65534, 65535, 65536] ['a',
'b', 'c'] ['hello', 'world'] [2022-07-13] [2022-07-13 12:30:00]
[0.33, 0.67] [3.1415926, 0.878787878] [4.000000, 5.500000, 6.670000]
+
diff --git
a/regression-test/data/load_p0/stream_load/load_json_null_to_nullable.out
b/regression-test/data/load_p0/stream_load/load_json_null_to_nullable.out
index d564b5b99a..8bbc4be012 100644
--- a/regression-test/data/load_p0/stream_load/load_json_null_to_nullable.out
+++ b/regression-test/data/load_p0/stream_load/load_json_null_to_nullable.out
@@ -11,3 +11,15 @@ h h
H H
h h
+-- !select --
+\N \N
+
+H H
+h h
+
+-- !select --
+\N \N
+
+H H
+h h
+
diff --git
a/regression-test/data/load_p0/stream_load/load_json_with_jsonpath.out
b/regression-test/data/load_p0/stream_load/load_json_with_jsonpath.out
index 43037b624d..4918c59dd8 100644
--- a/regression-test/data/load_p0/stream_load/load_json_with_jsonpath.out
+++ b/regression-test/data/load_p0/stream_load/load_json_with_jsonpath.out
@@ -9,3 +9,13 @@
1000 7395.231067
2000 \N
+-- !select --
+22 \N
+1000 7395.231067
+2000 \N
+
+-- !select --
+22 \N
+1000 7395.231067
+2000 \N
+
diff --git a/regression-test/data/load_p0/stream_load/nest_json.json
b/regression-test/data/load_p0/stream_load/nest_json.json
index 90647c490b..b28159b2a0 100644
--- a/regression-test/data/load_p0/stream_load/nest_json.json
+++ b/regression-test/data/load_p0/stream_load/nest_json.json
@@ -1,2 +1,5 @@
{"no": 1, "item": {"id": 1, "city": "beijing", "code": 2345671}}
{"no": 2, "item": {"id": 2, "city": "shanghai", "code": 2345672}}
+{"no": 3, "item": {"id": 3, "city": "hangzhou", "code": 2345673}}
+{"no": 4, "item": {"id": 4, "city": "shenzhen", "code": 2345674}}
+{"no": 5, "item": {"id": 5, "city": "guangzhou", "code": 2345675}}
diff --git a/regression-test/data/load_p0/stream_load/nest_json_array.json
b/regression-test/data/load_p0/stream_load/nest_json_array.json
new file mode 100644
index 0000000000..b8b3cf917d
--- /dev/null
+++ b/regression-test/data/load_p0/stream_load/nest_json_array.json
@@ -0,0 +1,74 @@
+[
+ {
+ "no": 1,
+ "item": {
+ "id": 1,
+ "city": [
+ "zhejiang",
+ "hangzhou",
+ "xihu"
+ ],
+ "code": 2345671
+ }
+ },
+ {
+ "no": 2,
+ "item": {
+ "id": 2,
+ "city": [
+ "zhejiang",
+ "hangzhou",
+ "xiaoshan"
+ ],
+ "code": 2345672
+ }
+ },
+ {
+ "no": 3,
+ "item": {
+ "id": 3,
+ "city": [
+ "zhejiang",
+ "hangzhou",
+ "binjiang"
+ ],
+ "code": 2345673
+ }
+ },
+ {
+ "no": 4,
+ "item": {
+ "id": 4,
+ "city": [
+ "zhejiang",
+ "hangzhou",
+ "shangcheng"
+ ],
+ "code": 2345674
+ }
+ },
+ {
+ "no": 5,
+ "item": {
+ "id": 5,
+ "city": [
+ "zhejiang",
+ "hangzhou",
+ "tonglu"
+ ],
+ "code": 2345675
+ }
+ },
+ {
+ "no": 6,
+ "item": {
+ "id": 6,
+ "city": [
+ "zhejiang",
+ "hangzhou",
+ "fuyang"
+ ],
+ "code": 2345676
+ }
+ }
+]
\ No newline at end of file
diff --git a/regression-test/data/load_p0/stream_load/simple_json2.json
b/regression-test/data/load_p0/stream_load/simple_json2.json
new file mode 100644
index 0000000000..eb698453de
--- /dev/null
+++ b/regression-test/data/load_p0/stream_load/simple_json2.json
@@ -0,0 +1,52 @@
+[
+ {
+ "code": 2345671,
+ "id": 1,
+ "city": "beijing"
+ },
+ {
+ "code": 2345672,
+ "id": 2,
+ "city": "shanghai"
+ },
+ {
+ "code": 2345673,
+ "id": 3,
+ "city": "guangzhou"
+ },
+ {
+ "code": 2345674,
+ "id": 4,
+ "city": "shenzhen"
+ },
+ {
+ "code": 2345675,
+ "id": 5,
+ "city": "hangzhou"
+ },
+ {
+ "code": 2345676,
+ "id": 6,
+ "city": "nanjing"
+ },
+ {
+ "code": 2345677,
+ "id": 7,
+ "city": "wuhan"
+ },
+ {
+ "code": 2345678,
+ "id": 8,
+ "city": "chengdu"
+ },
+ {
+ "code": 2345679,
+ "id": 9,
+ "city": "xian"
+ },
+ {
+ "code": 23456710,
+ "id": 10,
+ "city": "hefei"
+ }
+]
\ No newline at end of file
diff --git
a/regression-test/data/load_p0/stream_load/simple_json2_lack_one_column.json
b/regression-test/data/load_p0/stream_load/simple_json2_lack_one_column.json
new file mode 100644
index 0000000000..7b6b4ad800
--- /dev/null
+++ b/regression-test/data/load_p0/stream_load/simple_json2_lack_one_column.json
@@ -0,0 +1,48 @@
+[
+ {
+ "code": 2345671,
+ "id": 1
+ },
+ {
+ "code": 2345672,
+ "id": 2,
+ "city": "shanghai"
+ },
+ {
+ "code": 2345673,
+ "id": 3,
+ "city": "beijing"
+ },
+ {
+ "code": 2345674,
+ "id": 4,
+ "city": "shenzhen"
+ },
+ {
+ "code": 2345675,
+ "id": 5,
+ "city": "hangzhou"
+ },
+ {
+ "code": 2345676,
+ "id": 6,
+ "city": "nanjing"
+ },
+ {
+ "code": 2345677,
+ "id": 7
+ },
+ {
+ "code": 2345678,
+ "id": 8,
+ "city": "chengdu"
+ },
+ {
+ "code": 2345679,
+ "id": 9
+ },
+ {
+ "code": 23456710,
+ "id": 10
+ }
+]
\ No newline at end of file
diff --git a/regression-test/data/load_p0/stream_load/test_hdfs_json_load.out
b/regression-test/data/load_p0/stream_load/test_hdfs_json_load.out
new file mode 100644
index 0000000000..594d2ec60a
--- /dev/null
+++ b/regression-test/data/load_p0/stream_load/test_hdfs_json_load.out
@@ -0,0 +1,305 @@
+-- This file is automatically generated. You should know what you did if you
want to edit this
+-- !select1 --
+1 beijing 2345671
+2 shanghai 2345672
+3 guangzhou 2345673
+4 shenzhen 2345674
+5 hangzhou 2345675
+6 nanjing 2345676
+7 wuhan 2345677
+8 chengdu 2345678
+9 xian 2345679
+10 \N 23456711
+10 hefei 23456710
+200 changsha 3456789
+
+-- !select1 --
+1 beijing 2345671
+2 shanghai 2345672
+3 guangzhou 2345673
+4 shenzhen 2345674
+5 hangzhou 2345675
+6 nanjing 2345676
+7 wuhan 2345677
+8 chengdu 2345678
+9 xian 2345679
+10 \N 23456711
+10 hefei 23456710
+200 changsha 3456789
+
+-- !select2 --
+10 beijing 2345671
+20 shanghai 2345672
+30 guangzhou 2345673
+40 shenzhen 2345674
+50 hangzhou 2345675
+60 nanjing 2345676
+70 wuhan 2345677
+80 chengdu 2345678
+90 xian 2345679
+100 \N 23456711
+100 hefei 23456710
+200 changsha 3456789
+
+-- !select2 --
+10 beijing 2345671
+20 shanghai 2345672
+30 guangzhou 2345673
+40 shenzhen 2345674
+50 hangzhou 2345675
+60 nanjing 2345676
+70 wuhan 2345677
+80 chengdu 2345678
+90 xian 2345679
+100 \N 23456711
+100 hefei 23456710
+200 changsha 3456789
+
+-- !select3 --
+1 2345671 \N
+2 2345672 \N
+3 2345673 \N
+4 2345674 \N
+5 2345675 \N
+6 2345676 \N
+7 2345677 \N
+8 2345678 \N
+9 2345679 \N
+10 \N \N
+10 23456710 \N
+10 23456711 \N
+200 changsha 3456789
+
+-- !select3 --
+1 2345671 \N
+2 2345672 \N
+3 2345673 \N
+4 2345674 \N
+5 2345675 \N
+6 2345676 \N
+7 2345677 \N
+8 2345678 \N
+9 2345679 \N
+10 \N \N
+10 23456710 \N
+10 23456711 \N
+200 changsha 3456789
+
+-- !select4 --
+1 \N 210
+2 \N 220
+3 \N 230
+4 \N 240
+5 \N 250
+6 \N 260
+7 \N 270
+8 \N 280
+9 \N 290
+10 \N 900
+200 changsha 3456789
+
+-- !select4 --
+1 \N 210
+2 \N 220
+3 \N 230
+4 \N 240
+5 \N 250
+6 \N 260
+7 \N 270
+8 \N 280
+9 \N 290
+10 \N 900
+200 changsha 3456789
+
+-- !select5 --
+1 beijing 1454547
+2 shanghai 1244264
+3 guangzhou 528369
+4 shenzhen 594201
+5 hangzhou 594201
+6 nanjing 2345672
+7 wuhan 2345673
+8 chengdu 2345674
+9 xian 2345675
+10 hefei 2345676
+200 changsha 3456789
+
+-- !select5 --
+1 beijing 1454547
+2 shanghai 1244264
+3 guangzhou 528369
+4 shenzhen 594201
+5 hangzhou 594201
+6 nanjing 2345672
+7 wuhan 2345673
+8 chengdu 2345674
+9 xian 2345675
+10 hefei 2345676
+200 changsha 3456789
+
+-- !select6 --
+10 1454547 \N
+20 1244264 \N
+30 528369 \N
+40 594201 \N
+50 594201 \N
+60 2345672 \N
+70 2345673 \N
+80 2345674 \N
+90 2345675 \N
+100 2345676 \N
+200 changsha 3456789
+
+-- !select6 --
+10 1454547 \N
+20 1244264 \N
+30 528369 \N
+40 594201 \N
+50 594201 \N
+60 2345672 \N
+70 2345673 \N
+80 2345674 \N
+90 2345675 \N
+100 2345676 \N
+200 changsha 3456789
+
+-- !select7 --
+60 2345672 \N
+70 2345673 \N
+80 2345674 \N
+90 2345675 \N
+100 2345676 \N
+200 changsha 3456789
+
+-- !select7 --
+60 2345672 \N
+70 2345673 \N
+80 2345674 \N
+90 2345675 \N
+100 2345676 \N
+200 changsha 3456789
+
+-- !select8 --
+60 nanjing \N
+70 wuhan \N
+80 chengdu \N
+90 xian \N
+100 hefei \N
+200 changsha 3456789
+
+-- !select8 --
+60 nanjing \N
+70 wuhan \N
+80 chengdu \N
+90 xian \N
+100 hefei \N
+200 changsha 3456789
+
+-- !select9 --
+10 beijing 2345671
+20 shanghai 2345672
+30 hangzhou 2345673
+40 shenzhen 2345674
+50 guangzhou 2345675
+200 changsha 3456789
+
+-- !select9 --
+10 beijing 2345671
+20 shanghai 2345672
+30 hangzhou 2345673
+40 shenzhen 2345674
+50 guangzhou 2345675
+200 changsha 3456789
+
+-- !select10 --
+1 beijing 1454547
+2 shanghai 1244264
+3 guangzhou 528369
+4 shenzhen 594202
+5 hangzhou 594201
+6 nanjing 2345672
+7 wuhan 2345673
+8 chengdu 2345674
+9 xian 2345675
+10 hefei 2345676
+200 changsha 3456789
+
+-- !select10 --
+1 beijing 1454547
+2 shanghai 1244264
+3 guangzhou 528369
+4 shenzhen 594202
+5 hangzhou 594201
+6 nanjing 2345672
+7 wuhan 2345673
+8 chengdu 2345674
+9 xian 2345675
+10 hefei 2345676
+200 changsha 3456789
+
+-- !select11 --
+1 \N 1454547
+2 shanghai 1244264
+3 \N 528369
+4 \N 594202
+5 hangzhou 594201
+6 nanjing 2345672
+7 \N 2345673
+8 \N 2345674
+9 xian 2345675
+10 \N 2345676
+200 changsha 3456789
+
+-- !select11 --
+1 \N 1454547
+2 shanghai 1244264
+3 \N 528369
+4 \N 594202
+5 hangzhou 594201
+6 nanjing 2345672
+7 \N 2345673
+8 \N 2345674
+9 xian 2345675
+10 \N 2345676
+200 changsha 3456789
+
+-- !select12 --
+10 beijing \N
+20 shanghai \N
+30 hangzhou \N
+40 shenzhen \N
+50 guangzhou \N
+200 changsha 3456789
+
+-- !select12 --
+10 beijing \N
+20 shanghai \N
+30 hangzhou \N
+40 shenzhen \N
+50 guangzhou \N
+200 changsha 3456789
+
+-- !select13 --
+30 hangzhou \N
+40 shenzhen \N
+50 guangzhou \N
+200 changsha 3456789
+
+-- !select13 --
+30 hangzhou \N
+40 shenzhen \N
+50 guangzhou \N
+200 changsha 3456789
+
+-- !select14 --
+30 hangzhou 2345673
+40 shenzhen 2345674
+50 guangzhou 2345675
+200 changsha 3456789
+
+-- !select14 --
+30 hangzhou 2345673
+40 shenzhen 2345674
+50 guangzhou 2345675
+200 changsha 3456789
+
diff --git a/regression-test/data/load_p0/stream_load/test_json_load.out
b/regression-test/data/load_p0/stream_load/test_json_load.out
index b6df264df1..ebf706f594 100644
--- a/regression-test/data/load_p0/stream_load/test_json_load.out
+++ b/regression-test/data/load_p0/stream_load/test_json_load.out
@@ -1,5 +1,5 @@
-- This file is automatically generated. You should know what you did if you
want to edit this
--- !select --
+-- !select1 --
1 beijing 2345671
2 shanghai 2345672
3 guangzhou 2345673
@@ -12,7 +12,20 @@
10 hefei 23456710
200 changsha 3456789
--- !select --
+-- !select1 --
+1 beijing 2345671
+2 shanghai 2345672
+3 guangzhou 2345673
+4 shenzhen 2345674
+5 hangzhou 2345675
+6 nanjing 2345676
+7 wuhan 2345677
+8 chengdu 2345678
+9 xian 2345679
+10 hefei 23456710
+200 changsha 3456789
+
+-- !select2 --
10 beijing 2345671
20 shanghai 2345672
30 guangzhou 2345673
@@ -25,7 +38,33 @@
100 hefei 23456710
200 changsha 3456789
--- !select --
+-- !select2 --
+10 beijing 2345671
+20 shanghai 2345672
+30 guangzhou 2345673
+40 shenzhen 2345674
+50 hangzhou 2345675
+60 nanjing 2345676
+70 wuhan 2345677
+80 chengdu 2345678
+90 xian 2345679
+100 hefei 23456710
+200 changsha 3456789
+
+-- !select3 --
+1 2345671
+2 2345672
+3 2345673
+4 2345674
+5 2345675
+6 2345676
+7 2345677
+8 2345678
+9 2345679
+10 23456710
+200 755
+
+-- !select3 --
1 2345671
2 2345672
3 2345673
@@ -38,7 +77,20 @@
10 23456710
200 755
--- !select --
+-- !select4 --
+1 210
+2 220
+3 230
+4 240
+5 250
+6 260
+7 270
+8 280
+9 290
+10 300
+200 755
+
+-- !select4 --
1 210
2 220
3 230
@@ -51,7 +103,7 @@
10 300
200 755
--- !select --
+-- !select5 --
1 1454547
2 1244264
3 528369
@@ -64,7 +116,33 @@
10 2345676
200 755
--- !select --
+-- !select5 --
+1 1454547
+2 1244264
+3 528369
+4 594201
+5 594201
+6 2345672
+7 2345673
+8 2345674
+9 2345675
+10 2345676
+200 755
+
+-- !select6 --
+10 1454547
+20 1244264
+30 528369
+40 594201
+50 594201
+60 2345672
+70 2345673
+80 2345674
+90 2345675
+100 2345676
+200 755
+
+-- !select6 --
10 1454547
20 1244264
30 528369
@@ -77,7 +155,7 @@
100 2345676
200 755
--- !select --
+-- !select7 --
60 2345672
70 2345673
80 2345674
@@ -85,7 +163,7 @@
100 2345676
200 755
--- !select --
+-- !select7 --
60 2345672
70 2345673
80 2345674
@@ -93,15 +171,46 @@
100 2345676
200 755
--- !select --
+-- !select8 --
+60 2345672
+70 2345673
+80 2345674
+90 2345675
+100 2345676
+200 755
+
+-- !select8 --
+60 2345672
+70 2345673
+80 2345674
+90 2345675
+100 2345676
+200 755
+
+-- !select9 --
10 beijing 2345671
20 shanghai 2345672
+30 hangzhou 2345673
+40 shenzhen 2345674
+50 guangzhou 2345675
200 changsha 3456789
--- !select --
+-- !select9 --
+10 beijing 2345671
+20 shanghai 2345672
+30 hangzhou 2345673
+40 shenzhen 2345674
+50 guangzhou 2345675
+200 changsha 3456789
+
+-- !select10 --
200 changsha 3456789
--- !select --
+-- !select10 --
+200 changsha 3456789
+
+-- !select11 --
+1 beijing 2345671
2 shanghai 2345672
3 guangzhou 2345673
4 shenzhen 2345674
@@ -110,12 +219,113 @@
7 wuhan 2345677
8 chengdu 2345678
9 xian 2345679
+10 hefei 23456710
200 changsha 3456789
--- !select --
+-- !select11 --
+1 beijing 2345671
2 shanghai 2345672
3 guangzhou 2345673
4 shenzhen 2345674
+5 hangzhou 2345675
+6 nanjing 2345676
+7 wuhan 2345677
+8 chengdu 2345678
+9 xian 2345679
+10 hefei 23456710
+200 changsha 3456789
+
+-- !select12 --
+1 \N 2345671
+2 shanghai 2345672
+3 beijing 2345673
+4 shenzhen 2345674
+5 hangzhou 2345675
+6 nanjing 2345676
+7 \N 2345677
+8 chengdu 2345678
+9 \N 2345679
+10 \N 23456710
+200 changsha 3456789
+
+-- !select12 --
+1 \N 2345671
+2 shanghai 2345672
+3 beijing 2345673
+4 shenzhen 2345674
+5 hangzhou 2345675
+6 nanjing 2345676
+7 \N 2345677
+8 chengdu 2345678
+9 \N 2345679
+10 \N 23456710
+200 changsha 3456789
+
+-- !select13 --
+2 shanghai 2345672
+3 beijing 2345673
+4 shenzhen 2345674
+5 hangzhou 2345675
+6 nanjing 2345676
+8 chengdu 2345678
+200 hangzhou 12345
+
+-- !select13 --
+2 shanghai 2345672
+3 beijing 2345673
+4 shenzhen 2345674
+5 hangzhou 2345675
+6 nanjing 2345676
+8 chengdu 2345678
+200 hangzhou 12345
+
+-- !select14 --
+10 2345671 \N
+20 2345672 \N
+30 2345673 \N
+40 2345674 \N
+50 2345675 \N
+200 changsha 3456789
+
+-- !select14 --
+10 2345671 \N
+20 2345672 \N
+30 2345673 \N
+40 2345674 \N
+50 2345675 \N
+200 changsha 3456789
+
+-- !select15 --
+10 beijing 2345671
+20 shanghai 2345672
+30 hangzhou 2345673
+40 shenzhen 2345674
+50 guangzhou 2345675
+200 changsha 3456789
+
+-- !select15 --
+10 beijing 2345671
+20 shanghai 2345672
+30 hangzhou 2345673
+40 shenzhen 2345674
+50 guangzhou 2345675
+200 changsha 3456789
+
+-- !select16 --
+1 xihu 2345671
+2 xiaoshan 2345672
+3 binjiang 2345673
+4 shangcheng 2345674
+5 tonglu 2345675
+6 fuyang 2345676
+200 changsha 3456789
+
+-- !select16 --
+1 xihu 2345671
+2 xiaoshan 2345672
+3 binjiang 2345673
+4 shangcheng 2345674
+5 tonglu 2345675
+6 fuyang 2345676
200 changsha 3456789
--- !select --
diff --git a/regression-test/suites/load_p0/broker_load/test_array_load.groovy
b/regression-test/suites/load_p0/broker_load/test_array_load.groovy
index 5581a4928e..b462d3c00e 100644
--- a/regression-test/suites/load_p0/broker_load/test_array_load.groovy
+++ b/regression-test/suites/load_p0/broker_load/test_array_load.groovy
@@ -194,86 +194,96 @@ suite("test_array_load", "p0") {
}
}
- // case1: import array data in json format and enable vectorized engine
- try {
- sql "DROP TABLE IF EXISTS ${testTable}"
-
- create_test_table.call(testTable, true)
-
- load_array_data.call(testTable, 'true', '', 'json', '', '', '', '',
'', '', 'simple_array.json')
-
- // select the table and check whether the data is correct
+ def check_data_correct = {table_name ->
sql "sync"
- qt_select "select * from ${testTable} order by k1"
-
- } finally {
- try_sql("DROP TABLE IF EXISTS ${testTable}")
+ // select the table and check whether the data is correct
+ qt_select "select * from ${table_name} order by k1"
}
- // case2: import array data in json format and disable vectorized engine
try {
- sql "DROP TABLE IF EXISTS ${testTable}"
-
- create_test_table.call(testTable, false)
+ for ( i in 0..1 ) {
+ // should be deleted after new_load_scan is ready
+ if (i == 1) {
+ sql """ADMIN SET FRONTEND CONFIG ("enable_new_load_scan_node"
= "false");"""
+ } else {
+ sql """ADMIN SET FRONTEND CONFIG ("enable_new_load_scan_node"
= "true");"""
+ }
- load_array_data.call(testTable, 'true', '', 'json', '', '', '', '',
'', '', 'simple_array.json')
-
- // select the table and check whether the data is correct
- sql "sync"
- qt_select "select * from ${testTable} order by k1"
+ // case1: import array data in json format and enable vectorized
engine
+ try {
+ sql "DROP TABLE IF EXISTS ${testTable}"
+
+ create_test_table.call(testTable, true)
- } finally {
- try_sql("DROP TABLE IF EXISTS ${testTable}")
- }
-
- // case3: import array data in csv format and enable vectorized engine
- try {
- sql "DROP TABLE IF EXISTS ${testTable}"
-
- create_test_table.call(testTable, true)
+ load_array_data.call(testTable, 'true', '', 'json', '', '',
'', '', '', '', 'simple_array.json')
+
+ check_data_correct(testTable)
- load_array_data.call(testTable, 'true', '', 'csv', '', '', '', '', '',
'/', 'simple_array.csv')
-
- // select the table and check whether the data is correct
- sql "sync"
- qt_select "select * from ${testTable} order by k1"
+ } finally {
+ try_sql("DROP TABLE IF EXISTS ${testTable}")
+ }
- } finally {
- try_sql("DROP TABLE IF EXISTS ${testTable}")
- }
+ // case2: import array data in json format and disable vectorized
engine
+ try {
+ sql "DROP TABLE IF EXISTS ${testTable}"
+
+ create_test_table.call(testTable, false)
- // case4: import array data in csv format and disable vectorized engine
- try {
- sql "DROP TABLE IF EXISTS ${testTable}"
-
- create_test_table.call(testTable, false)
+ load_array_data.call(testTable, 'true', '', 'json', '', '',
'', '', '', '', 'simple_array.json')
+
+ check_data_correct(testTable)
- load_array_data.call(testTable, 'true', '', 'csv', '', '', '', '', '',
'/', 'simple_array.csv')
-
- // select the table and check whether the data is correct
- sql "sync"
- qt_select "select * from ${testTable} order by k1"
+ } finally {
+ try_sql("DROP TABLE IF EXISTS ${testTable}")
+ }
+
+ // case3: import array data in csv format and enable vectorized
engine
+ try {
+ sql "DROP TABLE IF EXISTS ${testTable}"
+
+ create_test_table.call(testTable, true)
+
+ load_array_data.call(testTable, 'true', '', 'csv', '', '', '',
'', '', '/', 'simple_array.csv')
+
+ check_data_correct(testTable)
+
+ } finally {
+ try_sql("DROP TABLE IF EXISTS ${testTable}")
+ }
- } finally {
- try_sql("DROP TABLE IF EXISTS ${testTable}")
- }
+ // case4: import array data in csv format and disable vectorized
engine
+ try {
+ sql "DROP TABLE IF EXISTS ${testTable}"
+
+ create_test_table.call(testTable, false)
- // case5: import array data not specify the format
- try {
- sql "DROP TABLE IF EXISTS ${testTable01}"
-
- create_test_table01.call(testTable01)
+ load_array_data.call(testTable, 'true', '', 'csv', '', '', '',
'', '', '/', 'simple_array.csv')
+
+ check_data_correct(testTable)
- load_array_data.call(testTable01, '', '', '', '', '', '', '', '', '/',
'simple_array.data')
-
- // select the table and check whether the data is correct
- sql "sync"
- qt_select "select * from ${testTable01} order by k1"
+ } finally {
+ try_sql("DROP TABLE IF EXISTS ${testTable}")
+ }
+ // case5: import array data not specify the format
+ try {
+ sql "DROP TABLE IF EXISTS ${testTable01}"
+
+ create_test_table01.call(testTable01)
+
+ load_array_data.call(testTable01, '', '', '', '', '', '', '',
'', '/', 'simple_array.data')
+
+ check_data_correct(testTable01)
+
+ } finally {
+ // try_sql("DROP TABLE IF EXISTS ${testTable01}")
+ }
+ }
} finally {
- // try_sql("DROP TABLE IF EXISTS ${testTable01}")
+ try_sql("""ADMIN SET FRONTEND CONFIG ("enable_new_load_scan_node" =
"false");""")
}
+
// if 'enableHdfs' in regression-conf.groovy has been set to true,
// the test will run these case as below.
if (enableHdfs()) {
diff --git
a/regression-test/suites/load_p0/stream_load/load_json_column_exclude_schema_without_jsonpath.groovy
b/regression-test/suites/load_p0/stream_load/load_json_column_exclude_schema_without_jsonpath.groovy
index f3edfd225c..760af3344e 100644
---
a/regression-test/suites/load_p0/stream_load/load_json_column_exclude_schema_without_jsonpath.groovy
+++
b/regression-test/suites/load_p0/stream_load/load_json_column_exclude_schema_without_jsonpath.groovy
@@ -49,8 +49,11 @@
suite("test_load_json_column_exclude_schema_without_jsonpath", "p0") {
"""
}
- def load_array_data = {table_name, strip_flag, read_flag, format_flag,
exprs, json_paths,
+ def load_array_data = {new_json_reader_flag, table_name, strip_flag,
read_flag, format_flag, exprs, json_paths,
json_root, where_expr, fuzzy_flag, column_sep,
file_name ->
+ // should be deleted after new_load_scan is ready
+ sql """ADMIN SET FRONTEND CONFIG ("enable_new_load_scan_node" =
"${new_json_reader_flag}");"""
+
// load the json data
streamLoad {
table table_name
@@ -91,7 +94,12 @@
suite("test_load_json_column_exclude_schema_without_jsonpath", "p0") {
create_test_table.call(true)
- load_array_data.call(testTable, 'true', '', 'json', '', '', '', '',
'', '', 'json_column_match.json')
+ load_array_data.call('false', testTable, 'true', '', 'json', '', '',
'', '', '', '', 'json_column_match.json')
+
+ // test new json load, should be deleted after new_load_scan ready
+ sql "DROP TABLE IF EXISTS ${testTable}"
+ create_test_table.call(true)
+ load_array_data.call('true', testTable, 'true', '', 'json', '', '',
'', '', '', '', 'json_column_match.json')
} finally {
try_sql("DROP TABLE IF EXISTS ${testTable}")
@@ -103,7 +111,12 @@
suite("test_load_json_column_exclude_schema_without_jsonpath", "p0") {
create_test_table.call(false)
- load_array_data.call(testTable, 'true', '', 'json', '', '', '', '',
'', '', 'json_column_match.json')
+ load_array_data.call('false', testTable, 'true', '', 'json', '', '',
'', '', '', '', 'json_column_match.json')
+
+ // test new json load, should be deleted after new_load_scan ready
+ sql "DROP TABLE IF EXISTS ${testTable}"
+ create_test_table.call(false)
+ load_array_data.call('true', testTable, 'true', '', 'json', '', '',
'', '', '', '', 'json_column_match.json')
} finally {
try_sql("DROP TABLE IF EXISTS ${testTable}")
diff --git
a/regression-test/suites/load_p0/stream_load/load_json_null_to_nullable.groovy
b/regression-test/suites/load_p0/stream_load/load_json_null_to_nullable.groovy
index 4dfc5a9fcb..f934c038a2 100644
---
a/regression-test/suites/load_p0/stream_load/load_json_null_to_nullable.groovy
+++
b/regression-test/suites/load_p0/stream_load/load_json_null_to_nullable.groovy
@@ -40,8 +40,11 @@ suite("test_load_json_null_to_nullable", "p0") {
"""
}
- def load_array_data = {table_name, strip_flag, read_flag, format_flag,
exprs, json_paths,
+ def load_array_data = {new_json_reader_flag, table_name, strip_flag,
read_flag, format_flag, exprs, json_paths,
json_root, where_expr, fuzzy_flag, column_sep,
file_name ->
+ // should be deleted after new_load_scan is ready
+ sql """ADMIN SET FRONTEND CONFIG ("enable_new_load_scan_node" =
"${new_json_reader_flag}");"""
+
// load the json data
streamLoad {
table table_name
@@ -74,6 +77,15 @@ suite("test_load_json_null_to_nullable", "p0") {
assertTrue(json.NumberLoadedRows > 0 && json.LoadBytes > 0)
}
}
+
+ // should be deleted after new_load_scan is ready
+ sql """ADMIN SET FRONTEND CONFIG ("enable_new_load_scan_node" =
"false");"""
+ }
+
+ def check_data_correct = {table_name ->
+ sql "sync"
+ // select the table and check whether the data is correct
+ qt_select "select * from ${table_name} order by k1"
}
// case1: import array data in json format and enable vectorized engine
@@ -82,11 +94,15 @@ suite("test_load_json_null_to_nullable", "p0") {
create_test_table.call(true)
- load_array_data.call(testTable, 'true', '', 'json', '', '', '', '',
'', '', 'test_char.json')
+ load_array_data.call('false', testTable, 'true', '', 'json', '', '',
'', '', '', '', 'test_char.json')
- sql "sync"
- // select the table and check whether the data is correct
- qt_select "select * from ${testTable} order by k1"
+ check_data_correct(testTable)
+
+ // test new json load, should be deleted after new_load_scan ready
+ sql "DROP TABLE IF EXISTS ${testTable}"
+ create_test_table.call(true)
+ load_array_data.call('true', testTable, 'true', '', 'json', '', '',
'', '', '', '', 'test_char.json')
+ check_data_correct(testTable)
} finally {
try_sql("DROP TABLE IF EXISTS ${testTable}")
@@ -98,11 +114,15 @@ suite("test_load_json_null_to_nullable", "p0") {
create_test_table.call(false)
- load_array_data.call(testTable, 'true', '', 'json', '', '', '', '',
'', '', 'test_char.json')
+ load_array_data.call('false', testTable, 'true', '', 'json', '', '',
'', '', '', '', 'test_char.json')
- sql "sync"
- // select the table and check whether the data is correct
- qt_select "select * from ${testTable} order by k1"
+ check_data_correct(testTable)
+
+ // test new json load, should be deleted after new_load_scan ready
+ sql "DROP TABLE IF EXISTS ${testTable}"
+ create_test_table.call(false)
+ load_array_data.call('true', testTable, 'true', '', 'json', '', '',
'', '', '', '', 'test_char.json')
+ check_data_correct(testTable)
} finally {
try_sql("DROP TABLE IF EXISTS ${testTable}")
diff --git
a/regression-test/suites/load_p0/stream_load/load_json_with_jsonpath.groovy
b/regression-test/suites/load_p0/stream_load/load_json_with_jsonpath.groovy
index b5fc9ea096..02ffd808e2 100644
--- a/regression-test/suites/load_p0/stream_load/load_json_with_jsonpath.groovy
+++ b/regression-test/suites/load_p0/stream_load/load_json_with_jsonpath.groovy
@@ -40,8 +40,12 @@ suite("test_load_json_with_jsonpath", "p0") {
"""
}
- def load_array_data = {table_name, strip_flag, read_flag, format_flag,
exprs, json_paths,
+ def load_array_data = {new_json_reader_flag, table_name, strip_flag,
read_flag, format_flag, exprs, json_paths,
json_root, where_expr, fuzzy_flag, column_sep,
file_name ->
+
+ // should be deleted after new_load_scan is ready
+ sql """ADMIN SET FRONTEND CONFIG ("enable_new_load_scan_node" =
"${new_json_reader_flag}");"""
+
// load the json data
streamLoad {
table table_name
@@ -74,6 +78,15 @@ suite("test_load_json_with_jsonpath", "p0") {
assertTrue(json.NumberLoadedRows > 0 && json.LoadBytes > 0)
}
}
+
+ // should be deleted after new_load_scan is ready
+ sql """ADMIN SET FRONTEND CONFIG ("enable_new_load_scan_node" =
"false");"""
+ }
+
+ def check_data_correct = {table_name ->
+ sql "sync"
+ // select the table and check whether the data is correct
+ qt_select "select * from ${table_name} order by k1"
}
// case1: import array data in json format and enable vectorized engine
@@ -82,11 +95,15 @@ suite("test_load_json_with_jsonpath", "p0") {
create_test_table.call(true)
- load_array_data.call(testTable, 'true', '', 'json', '', '["$.k1",
"$.v1"]', '', '', '', '', 'test_load_with_jsonpath.json')
+ load_array_data.call('false', testTable, 'true', '', 'json', '',
'["$.k1", "$.v1"]', '', '', '', '', 'test_load_with_jsonpath.json')
- // select the table and check whether the data is correct
- sql "sync"
- qt_select "select * from ${testTable} order by k1"
+ check_data_correct(testTable)
+
+ // test new json load, should be deleted after new_load_scan ready
+ sql "DROP TABLE IF EXISTS ${testTable}"
+ create_test_table.call(true)
+ load_array_data.call('true', testTable, 'true', '', 'json', '',
'["$.k1", "$.v1"]', '', '', '', '', 'test_load_with_jsonpath.json')
+ check_data_correct(testTable)
} finally {
try_sql("DROP TABLE IF EXISTS ${testTable}")
@@ -98,12 +115,19 @@ suite("test_load_json_with_jsonpath", "p0") {
create_test_table.call(false)
- load_array_data.call(testTable, 'true', '', 'json', '', '["$.k1",
"$.v1"]', '', '', '', '', 'test_load_with_jsonpath.json')
+ load_array_data.call('false', testTable, 'true', '', 'json', '',
'["$.k1", "$.v1"]', '', '', '', '', 'test_load_with_jsonpath.json')
sql "sync"
// select the table and check whether the data is correct
qt_select "select * from ${testTable} order by k1"
+
+ // test new json load, should be deleted after new_load_scan ready
+ sql "DROP TABLE IF EXISTS ${testTable}"
+ create_test_table.call(false)
+ load_array_data.call('true', testTable, 'true', '', 'json', '',
'["$.k1", "$.v1"]', '', '', '', '', 'test_load_with_jsonpath.json')
+ check_data_correct(testTable)
+
} finally {
try_sql("DROP TABLE IF EXISTS ${testTable}")
}
diff --git
a/regression-test/suites/load_p0/stream_load/test_hdfs_json_load.groovy
b/regression-test/suites/load_p0/stream_load/test_hdfs_json_load.groovy
new file mode 100644
index 0000000000..4c7a1c162b
--- /dev/null
+++ b/regression-test/suites/load_p0/stream_load/test_hdfs_json_load.groovy
@@ -0,0 +1,554 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements. See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership. The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License. You may obtain a copy of the License at
+//
+// http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied. See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+suite("test_hdfs_json_load", "p0") {
+ // define a sql table
+ def testTable = "test_hdfs_json_load"
+
+ def create_test_table1 = {testTablex ->
+ // multi-line sql
+ def result1 = sql """
+ CREATE TABLE IF NOT EXISTS ${testTablex} (
+ id INT DEFAULT '10',
+ city VARCHAR(32) DEFAULT '',
+ code BIGINT SUM DEFAULT '0')
+ DISTRIBUTED BY HASH(id) BUCKETS 10
+ PROPERTIES("replication_num" = "1");
+ """
+
+ // DDL/DML return 1 row and 3 column, the only value is update row
count
+ assertTrue(result1.size() == 1)
+ assertTrue(result1[0].size() == 1)
+ assertTrue(result1[0][0] == 0, "Create table should update 0 rows")
+
+ // insert 1 row to check whether the table is ok
+ def result2 = sql "INSERT INTO ${testTablex} (id, city, code) VALUES
(200, 'changsha', 3456789)"
+ assertTrue(result2.size() == 1)
+ assertTrue(result2[0].size() == 1)
+ assertTrue(result2[0][0] == 1, "Insert should update 1 rows")
+ }
+
+ def load_from_hdfs1 = {new_json_reader_flag, strip_flag, fuzzy_flag,
testTablex, label, fileName,
+ fsPath, hdfsUser, exprs, jsonpaths, json_root,
columns_parameter, where ->
+ // should be delete after new_load_scan is ready
+ sql """ADMIN SET FRONTEND CONFIG ("enable_new_load_scan_node" =
"${new_json_reader_flag}");"""
+
+ def hdfsFilePath = "${fsPath}/user/doris/json_format_test/${fileName}"
+ def result1= sql """
+ LOAD LABEL ${label} (
+ DATA INFILE("${hdfsFilePath}")
+ INTO TABLE ${testTablex}
+ FORMAT as "json"
+ ${columns_parameter}
+ ${exprs}
+ ${where}
+ properties(
+ "json_root" = "${json_root}",
+ "jsonpaths" = "${jsonpaths}",
+ "strip_outer_array" = "${strip_flag}",
+ "fuzzy_parse" = "${fuzzy_flag}"
+ )
+ )
+ with HDFS (
+ "fs.defaultFS"="${fsPath}",
+ "hadoop.username" = "${hdfsUser}"
+ )
+ PROPERTIES (
+ "timeout"="1200",
+ "max_filter_ratio"="0"
+ );
+ """
+
+ assertTrue(result1.size() == 1)
+ assertTrue(result1[0].size() == 1)
+ assertTrue(result1[0][0] == 0, "Query OK, 0 rows affected")
+
+ // should be delete after new_load_scan is ready
+ sql """ADMIN SET FRONTEND CONFIG ("enable_new_load_scan_node" =
"false");"""
+ }
+
+ def check_load_result = {checklabel, testTablex ->
+ max_try_milli_secs = 10000
+ while(max_try_milli_secs) {
+ result = sql "show load where label = '${checklabel}'"
+ if(result[0][2] == "FINISHED") {
+ log.info("LOAD FINISHED!")
+ break
+ } else {
+ sleep(1000) // wait 1 second every time
+ max_try_milli_secs -= 1000
+ if(max_try_milli_secs <= 0) {
+ log.info("Broker load result: ${result}".toString())
+ assertEquals(1, 2)
+ }
+ }
+ }
+ }
+
+
+
+ String hdfs_port = context.config.otherConfigs.get("hdfs_port")
+ def fsPath = "hdfs://127.0.0.1:${hdfs_port}"
+ // It's okay to use random `hdfsUser`, but can not be empty.
+ def hdfsUser = "doris"
+
+
+ // case1: import simple json
+ def q1 = {
+ try {
+ def test_load_label1 =
UUID.randomUUID().toString().replaceAll("-", "")
+ sql "DROP TABLE IF EXISTS ${testTable}"
+
+ create_test_table1.call(testTable)
+ load_from_hdfs1.call("false", "false", "false", testTable,
test_load_label1, "simple_object_json.json",
+ fsPath, hdfsUser, '', '', '', '', '')
+
+ check_load_result(test_load_label1, testTable)
+ sql "sync"
+ qt_select1 "select * from ${testTable} order by id"
+
+ // test new json reader
+ def test_load_label2 =
UUID.randomUUID().toString().replaceAll("-", "")
+ sql "DROP TABLE IF EXISTS ${testTable}"
+ create_test_table1.call(testTable)
+ load_from_hdfs1.call("true", "false", "false", testTable,
test_load_label2, "simple_object_json.json",
+ fsPath, hdfsUser, '', '', '', '', '')
+
+ check_load_result(test_load_label2, testTable)
+ sql "sync"
+ qt_select1 "select * from ${testTable} order by id"
+ } finally {
+ try_sql("DROP TABLE IF EXISTS ${testTable}")
+ }
+ }
+
+ // case2: import json and apply exprs
+ def q2 = {
+ try {
+ def test_load_label1 =
UUID.randomUUID().toString().replaceAll("-", "")
+ sql "DROP TABLE IF EXISTS ${testTable}"
+
+ create_test_table1.call(testTable)
+ load_from_hdfs1.call("false", "false", "false", testTable,
test_load_label1, "simple_object_json.json",
+ fsPath, hdfsUser, "SET(id= id * 10)", '', '',
'', '')
+
+ check_load_result(test_load_label1, testTable)
+ sql "sync"
+ qt_select2 "select * from ${testTable} order by id"
+
+ // test new json reader
+ def test_load_label2 =
UUID.randomUUID().toString().replaceAll("-", "")
+ sql "DROP TABLE IF EXISTS ${testTable}"
+ create_test_table1.call(testTable)
+ load_from_hdfs1.call("true", "false", "false", testTable,
test_load_label2, "simple_object_json.json",
+ fsPath, hdfsUser, "SET(id= id * 10)", '', '',
'', '')
+
+ check_load_result(test_load_label2, testTable)
+ sql "sync"
+ qt_select2 "select * from ${testTable} order by id"
+ } finally {
+ try_sql("DROP TABLE IF EXISTS ${testTable}")
+ }
+ }
+
+ // case3: import json and apply jsonpaths
+ def q3 = {
+ try {
+ def test_load_label1 =
UUID.randomUUID().toString().replaceAll("-", "")
+ sql "DROP TABLE IF EXISTS ${testTable}"
+
+ create_test_table1.call(testTable)
+ load_from_hdfs1.call("false", "false", "false", testTable,
test_load_label1, "simple_object_json.json",
+ fsPath, hdfsUser, '', """[\\"\$.id\\",
\\"\$.code\\"]""", '', '', '')
+
+ check_load_result(test_load_label1, testTable)
+ sql "sync"
+ qt_select3 "select * from ${testTable} order by id"
+
+ // test new json reader
+ def test_load_label2 =
UUID.randomUUID().toString().replaceAll("-", "")
+ sql "DROP TABLE IF EXISTS ${testTable}"
+ create_test_table1.call(testTable)
+ load_from_hdfs1.call("true", "false", "false", testTable,
test_load_label2, "simple_object_json.json",
+ fsPath, hdfsUser, '', """[\\"\$.id\\",
\\"\$.code\\"]""", '', '', '')
+
+ check_load_result(test_load_label2, testTable)
+ sql "sync"
+ qt_select3 "select * from ${testTable} order by id"
+ } finally {
+ try_sql("DROP TABLE IF EXISTS ${testTable}")
+ }
+ }
+
+ // case4: import json and apply jsonpaths & exprs
+ def q4 = {
+ try {
+ def test_load_label1 =
UUID.randomUUID().toString().replaceAll("-", "")
+ sql "DROP TABLE IF EXISTS ${testTable}"
+
+ create_test_table1.call(testTable)
+ load_from_hdfs1.call("false", "false", "false", testTable,
test_load_label1, "simple_object_json.json",
+ fsPath, hdfsUser, "SET(code = id * 10 + 200)",
"""[\\"\$.id\\"]""", '', '', '')
+
+ check_load_result(test_load_label1, testTable)
+ sql "sync"
+ qt_select4 "select * from ${testTable} order by id"
+
+ // test new json reader
+ def test_load_label2 =
UUID.randomUUID().toString().replaceAll("-", "")
+ sql "DROP TABLE IF EXISTS ${testTable}"
+ create_test_table1.call(testTable)
+ load_from_hdfs1.call("true", "false", "false", testTable,
test_load_label2, "simple_object_json.json",
+ fsPath, hdfsUser, "SET(code = id * 10 + 200)",
"""[\\"\$.id\\"]""", '', '', '')
+
+ check_load_result(test_load_label2, testTable)
+ sql "sync"
+ qt_select4 "select * from ${testTable} order by id"
+ } finally {
+ try_sql("DROP TABLE IF EXISTS ${testTable}")
+ }
+ }
+
+ // case5: import json with line reader
+ def q5 = {
+ try {
+ def test_load_label1 =
UUID.randomUUID().toString().replaceAll("-", "")
+ sql "DROP TABLE IF EXISTS ${testTable}"
+
+ create_test_table1.call(testTable)
+ load_from_hdfs1.call("false", "true", "false", testTable,
test_load_label1, "multi_line_json.json",
+ fsPath, hdfsUser, '', '', '', '', '')
+
+ check_load_result(test_load_label1, testTable)
+ sql "sync"
+ qt_select5 "select * from ${testTable} order by id"
+
+ // test new json reader
+ def test_load_label2 =
UUID.randomUUID().toString().replaceAll("-", "")
+ sql "DROP TABLE IF EXISTS ${testTable}"
+ create_test_table1.call(testTable)
+ load_from_hdfs1.call("true", "true", "false", testTable,
test_load_label2, "multi_line_json.json",
+ fsPath, hdfsUser, '', '', '', '', '')
+
+ check_load_result(test_load_label2, testTable)
+ sql "sync"
+ qt_select5 "select * from ${testTable} order by id"
+ } finally {
+ try_sql("DROP TABLE IF EXISTS ${testTable}")
+ }
+ }
+
+ // case6: import json use exprs and jsonpaths
+ def q6 = {
+ try {
+ def test_load_label1 =
UUID.randomUUID().toString().replaceAll("-", "")
+ sql "DROP TABLE IF EXISTS ${testTable}"
+
+ create_test_table1.call(testTable)
+ load_from_hdfs1.call("false", "true", "false", testTable,
test_load_label1, "multi_line_json.json",
+ fsPath, hdfsUser, "SET(id = id * 10)",
"""[\\"\$.id\\", \\"\$.code\\"]""", '', '', '')
+
+ check_load_result(test_load_label1, testTable)
+ sql "sync"
+ qt_select6 "select * from ${testTable} order by id"
+
+ // test new json reader
+ def test_load_label2 =
UUID.randomUUID().toString().replaceAll("-", "")
+ sql "DROP TABLE IF EXISTS ${testTable}"
+ create_test_table1.call(testTable)
+ load_from_hdfs1.call("true", "true", "false", testTable,
test_load_label2, "multi_line_json.json",
+ fsPath, hdfsUser, "SET(id = id * 10)",
"""[\\"\$.id\\", \\"\$.code\\"]""", '', '', '')
+
+ check_load_result(test_load_label2, testTable)
+ sql "sync"
+ qt_select6 "select * from ${testTable} order by id"
+ } finally {
+ try_sql("DROP TABLE IF EXISTS ${testTable}")
+ }
+ }
+
+ // case7: import json use where
+ def q7 = {
+ try {
+ def test_load_label1 =
UUID.randomUUID().toString().replaceAll("-", "")
+ sql "DROP TABLE IF EXISTS ${testTable}"
+
+ create_test_table1.call(testTable)
+ load_from_hdfs1.call("false", "true", "false", testTable,
test_load_label1, "multi_line_json.json",
+ fsPath, hdfsUser, "SET(id = id * 10)",
"""[\\"\$.id\\", \\"\$.code\\"]""", '', '', 'WHERE id>50')
+
+ check_load_result(test_load_label1, testTable)
+ sql "sync"
+ qt_select7 "select * from ${testTable} order by id"
+
+ // test new json reader
+ def test_load_label2 =
UUID.randomUUID().toString().replaceAll("-", "")
+ sql "DROP TABLE IF EXISTS ${testTable}"
+ create_test_table1.call(testTable)
+ load_from_hdfs1.call("true", "true", "false", testTable,
test_load_label2, "multi_line_json.json",
+ fsPath, hdfsUser, "SET(id = id * 10)",
"""[\\"\$.id\\", \\"\$.code\\"]""", '', '', 'WHERE id>50')
+
+ check_load_result(test_load_label2, testTable)
+ sql "sync"
+ qt_select7 "select * from ${testTable} order by id"
+ } finally {
+ try_sql("DROP TABLE IF EXISTS ${testTable}")
+ }
+ }
+
+
+ // case8: import json use fuzzy_parse
+ def q8 = {
+ try {
+ def test_load_label1 =
UUID.randomUUID().toString().replaceAll("-", "")
+ sql "DROP TABLE IF EXISTS ${testTable}"
+
+ create_test_table1.call(testTable)
+ load_from_hdfs1.call("false", "true", "true", testTable,
test_load_label1, "multi_line_json.json",
+ fsPath, hdfsUser, "SET(id = id * 10)",
"""[\\"\$.id\\", \\"\$.city\\"]""", '', '', 'WHERE id>50')
+
+ check_load_result(test_load_label1, testTable)
+ sql "sync"
+ qt_select8 "select * from ${testTable} order by id"
+
+ // test new json reader
+ def test_load_label2 =
UUID.randomUUID().toString().replaceAll("-", "")
+ sql "DROP TABLE IF EXISTS ${testTable}"
+ create_test_table1.call(testTable)
+ load_from_hdfs1.call("true", "true", "true", testTable,
test_load_label2, "multi_line_json.json",
+ fsPath, hdfsUser, "SET(id = id * 10)",
"""[\\"\$.id\\", \\"\$.city\\"]""", '', '', 'WHERE id>50')
+
+ check_load_result(test_load_label2, testTable)
+ sql "sync"
+ qt_select8 "select * from ${testTable} order by id"
+ } finally {
+ try_sql("DROP TABLE IF EXISTS ${testTable}")
+ }
+ }
+
+ // case9: import json use json_root
+ def q9 = {
+ try {
+ def test_load_label1 =
UUID.randomUUID().toString().replaceAll("-", "")
+ sql "DROP TABLE IF EXISTS ${testTable}"
+
+ create_test_table1.call(testTable)
+ load_from_hdfs1.call("false", "false", "true", testTable,
test_load_label1, "nest_json.json",
+ fsPath, hdfsUser, "SET(id = id * 10)", '',
'$.item', '', '')
+
+ check_load_result(test_load_label1, testTable)
+ sql "sync"
+ qt_select9 "select * from ${testTable} order by id"
+
+ // test new json reader
+ def test_load_label2 =
UUID.randomUUID().toString().replaceAll("-", "")
+ sql "DROP TABLE IF EXISTS ${testTable}"
+ create_test_table1.call(testTable)
+ load_from_hdfs1.call("true", "false", "true", testTable,
test_load_label2, "nest_json.json",
+ fsPath, hdfsUser, "SET(id = id * 10)", '',
'$.item', '', '')
+
+ check_load_result(test_load_label2, testTable)
+ sql "sync"
+ qt_select9 "select * from ${testTable} order by id"
+ } finally {
+ try_sql("DROP TABLE IF EXISTS ${testTable}")
+ }
+ }
+
+ // case10: test json file which is unordered and no use json_path
+ def q10 = {
+ try {
+ def test_load_label1 =
UUID.randomUUID().toString().replaceAll("-", "")
+ sql "DROP TABLE IF EXISTS ${testTable}"
+
+ create_test_table1.call(testTable)
+ load_from_hdfs1.call("false", "true", "false", testTable,
test_load_label1, "multi_line_json_unorder.json",
+ fsPath, hdfsUser, '', '', '', '', '')
+
+ check_load_result(test_load_label1, testTable)
+ sql "sync"
+ qt_select10 "select * from ${testTable} order by id"
+
+ // test new json reader
+ def test_load_label2 =
UUID.randomUUID().toString().replaceAll("-", "")
+ sql "DROP TABLE IF EXISTS ${testTable}"
+ create_test_table1.call(testTable)
+ load_from_hdfs1.call("true", "true", "false", testTable,
test_load_label2, "multi_line_json_unorder.json",
+ fsPath, hdfsUser, '', '', '', '', '')
+
+ check_load_result(test_load_label2, testTable)
+ sql "sync"
+ qt_select10 "select * from ${testTable} order by id"
+ } finally {
+ try_sql("DROP TABLE IF EXISTS ${testTable}")
+ }
+ }
+
+ // case11: test json file which is unordered and lack one column which is
nullable
+ def q11 = {
+ try {
+ def test_load_label1 =
UUID.randomUUID().toString().replaceAll("-", "")
+ sql "DROP TABLE IF EXISTS ${testTable}"
+
+ create_test_table1.call(testTable)
+ load_from_hdfs1.call("false", "true", "false", testTable,
test_load_label1, "multi_line_json_lack_column.json",
+ fsPath, hdfsUser, '', '', '', '', '')
+
+ check_load_result(test_load_label1, testTable)
+ sql "sync"
+ qt_select11 "select * from ${testTable} order by id"
+
+ // test new json reader
+ def test_load_label2 =
UUID.randomUUID().toString().replaceAll("-", "")
+ sql "DROP TABLE IF EXISTS ${testTable}"
+ create_test_table1.call(testTable)
+ load_from_hdfs1.call("true", "true", "false", testTable,
test_load_label2, "multi_line_json_lack_column.json",
+ fsPath, hdfsUser, '', '', '', '', '')
+
+ check_load_result(test_load_label2, testTable)
+ sql "sync"
+ qt_select11 "select * from ${testTable} order by id"
+ } finally {
+ try_sql("DROP TABLE IF EXISTS ${testTable}")
+ }
+ }
+
+
+ // case12: use json_path and json_root
+ def q12 = {
+ try {
+ def test_load_label1 =
UUID.randomUUID().toString().replaceAll("-", "")
+ sql "DROP TABLE IF EXISTS ${testTable}"
+
+ create_test_table1.call(testTable)
+ load_from_hdfs1.call("false", "false", "false", testTable,
test_load_label1, "nest_json.json", fsPath, hdfsUser,
+ "SET(id = id * 10)", """[\\"\$.id\\",
\\"\$.city\\"]""", '$.item', '', '')
+
+ check_load_result(test_load_label1, testTable)
+ sql "sync"
+ qt_select12 "select * from ${testTable} order by id"
+
+ // test new json reader
+ def test_load_label2 =
UUID.randomUUID().toString().replaceAll("-", "")
+ sql "DROP TABLE IF EXISTS ${testTable}"
+ create_test_table1.call(testTable)
+ load_from_hdfs1.call("true", "false", "false", testTable,
test_load_label2, "nest_json.json", fsPath, hdfsUser,
+ "SET(id = id * 10)", """[\\"\$.id\\",
\\"\$.city\\"]""", '$.item', '', '')
+
+ check_load_result(test_load_label2, testTable)
+ sql "sync"
+ qt_select12 "select * from ${testTable} order by id"
+ } finally {
+ try_sql("DROP TABLE IF EXISTS ${testTable}")
+ }
+ }
+
+ // case13: use json_path & json_root & where
+ def q13 = {
+ try {
+ def test_load_label1 =
UUID.randomUUID().toString().replaceAll("-", "")
+ sql "DROP TABLE IF EXISTS ${testTable}"
+
+ create_test_table1.call(testTable)
+ load_from_hdfs1.call("false", "false", "false", testTable,
test_load_label1, "nest_json.json", fsPath, hdfsUser,
+ "SET(id = id * 10)", """[\\"\$.id\\",
\\"\$.city\\"]""", '$.item', '', 'WHERE id>20')
+
+ check_load_result(test_load_label1, testTable)
+ sql "sync"
+ qt_select13 "select * from ${testTable} order by id"
+
+ // test new json reader
+ def test_load_label2 =
UUID.randomUUID().toString().replaceAll("-", "")
+ sql "DROP TABLE IF EXISTS ${testTable}"
+ create_test_table1.call(testTable)
+ load_from_hdfs1.call("true", "false", "false", testTable,
test_load_label2, "nest_json.json", fsPath, hdfsUser,
+ "SET(id = id * 10)", """[\\"\$.id\\",
\\"\$.city\\"]""", '$.item', '', 'WHERE id>20')
+
+ check_load_result(test_load_label2, testTable)
+ sql "sync"
+ qt_select13 "select * from ${testTable} order by id"
+ } finally {
+ try_sql("DROP TABLE IF EXISTS ${testTable}")
+ }
+ }
+
+ // case14: use jsonpaths & json_root & where & columns
+ def q14 = {
+ try {
+ def test_load_label1 =
UUID.randomUUID().toString().replaceAll("-", "")
+ sql "DROP TABLE IF EXISTS ${testTable}"
+
+ create_test_table1.call(testTable)
+ load_from_hdfs1.call("false", "false", "false", testTable,
test_load_label1, "nest_json.json", fsPath, hdfsUser,
+ "SET(id = id * 10)", """[\\"\$.id\\",
\\"\$.code\\",\\"\$.city\\"]""", '$.item',
+ '(id, code, city)', 'WHERE id>20')
+
+ check_load_result(test_load_label1, testTable)
+ sql "sync"
+ qt_select14 "select * from ${testTable} order by id"
+
+ // test new json reader
+ def test_load_label2 =
UUID.randomUUID().toString().replaceAll("-", "")
+ sql "DROP TABLE IF EXISTS ${testTable}"
+ create_test_table1.call(testTable)
+ load_from_hdfs1.call("true", "false", "false", testTable,
test_load_label2, "nest_json.json", fsPath, hdfsUser,
+ "SET(id = id * 10)", """[\\"\$.id\\",
\\"\$.code\\", \\"\$.city\\"]""", '$.item',
+ '(id, code, city)', 'WHERE id>20')
+
+ check_load_result(test_load_label2, testTable)
+ sql "sync"
+ qt_select14 "select * from ${testTable} order by id"
+ } finally {
+ try_sql("DROP TABLE IF EXISTS ${testTable}")
+ }
+ }
+
+
+
+
+ String enabled = context.config.otherConfigs.get("enableHiveTest")
+ if (enabled != null && enabled.equalsIgnoreCase("true")) {
+ log.info("Begin Test q1:")
+ q1()
+ log.info("Begin Test q2:")
+ q2()
+ log.info("Begin Test q3:")
+ q3()
+ log.info("Begin Test q4:")
+ q4()
+ log.info("Begin Test q5:")
+ q5()
+ log.info("Begin Test q6:")
+ q6()
+ log.info("Begin Test q7:")
+ q7()
+ log.info("Begin Test q8:")
+ q8()
+ log.info("Begin Test q9:")
+ q9()
+ log.info("Begin Test q10:")
+ q10()
+ log.info("Begin Test q11:")
+ q11()
+ log.info("Begin Test q12:")
+ q12()
+ log.info("Begin Test q13:")
+ q13()
+ log.info("Begin Test q14:")
+ q14()
+ }
+}
\ No newline at end of file
diff --git a/regression-test/suites/load_p0/stream_load/test_json_load.groovy
b/regression-test/suites/load_p0/stream_load/test_json_load.groovy
index 8dc594c660..e066467e3e 100644
--- a/regression-test/suites/load_p0/stream_load/test_json_load.groovy
+++ b/regression-test/suites/load_p0/stream_load/test_json_load.groovy
@@ -15,7 +15,7 @@
// specific language governing permissions and limitations
// under the License.
-suite("test_json_load", "p0") {
+suite("test_json_load", "p0") {
// define a sql table
def testTable = "test_json_load"
@@ -64,6 +64,30 @@ suite("test_json_load", "p0") {
assertTrue(result2[0][0] == 1, "Insert should update 1 rows")
}
+ // city is NOT NULL
+ def create_test_table3 = {testTablex ->
+ // multi-line sql
+ def result1 = sql """
+ CREATE TABLE IF NOT EXISTS ${testTablex} (
+ id INT DEFAULT '10',
+ city VARCHAR(32) NOT NULL,
+ code BIGINT SUM DEFAULT '0')
+ DISTRIBUTED BY HASH(id) BUCKETS 10
+ PROPERTIES("replication_num" = "1");
+ """
+
+ // DDL/DML return 1 row and 3 column, the only value is update row
count
+ assertTrue(result1.size() == 1)
+ assertTrue(result1[0].size() == 1)
+ assertTrue(result1[0][0] == 0, "Create table should update 0 rows")
+
+ // insert 1 row to check whether the table is ok
+ def result2 = sql "INSERT INTO ${testTablex} (id, city, code) VALUES
(200, 'hangzhou', 12345)"
+ assertTrue(result2.size() == 1)
+ assertTrue(result2[0].size() == 1)
+ assertTrue(result2[0][0] == 1, "Insert should update 1 rows")
+ }
+
def test_invalid_json_array_table = { testTablex ->
// multi-line sql
def result1 = sql """
@@ -90,8 +114,11 @@ suite("test_json_load", "p0") {
assertTrue(result1[0][0] == 0, "Create table should update 0 rows")
}
- def load_json_data = {label, strip_flag, read_flag, format_flag, exprs,
json_paths,
- json_root, where_expr, fuzzy_flag, file_name,
ignore_failure=false ->
+ def load_json_data = {new_json_reader_flag, label, strip_flag, read_flag,
format_flag, exprs, json_paths,
+ json_root, where_expr, fuzzy_flag, file_name,
ignore_failure=false ->
+ // should be delete after new_load_scan is ready
+ sql """ADMIN SET FRONTEND CONFIG ("enable_new_load_scan_node" =
"${new_json_reader_flag}");"""
+
// load the json data
streamLoad {
table "test_json_load"
@@ -123,6 +150,9 @@ suite("test_json_load", "p0") {
assertTrue(json.NumberLoadedRows > 0 && json.LoadBytes > 0)
}
}
+
+ // should be deleted after new_load_scan is ready
+ sql """ADMIN SET FRONTEND CONFIG ("enable_new_load_scan_node" =
"false");"""
}
def load_from_hdfs1 = {testTablex, label, hdfsFilePath, format,
brokerName, hdfsUser, hdfsPasswd ->
@@ -182,10 +212,20 @@ suite("test_json_load", "p0") {
create_test_table1.call(testTable)
- load_json_data.call('test_json_load_case1', 'true', '', 'json', '',
'', '', '', '', 'simple_json.json')
+ load_json_data.call('false', 'test_json_load_case1', 'true', '',
'json', '', '', '', '', '', 'simple_json.json')
sql "sync"
- qt_select "select * from ${testTable} order by id"
+ qt_select1 "select * from ${testTable} order by id"
+
+ // test new json reader
+ sql "DROP TABLE IF EXISTS ${testTable}"
+
+ create_test_table1.call(testTable)
+
+ load_json_data.call('true', 'test_json_load_case1_2', 'true', '',
'json', '', '', '', '', '', 'simple_json.json')
+
+ sql "sync"
+ qt_select1 "select * from ${testTable} order by id"
} finally {
try_sql("DROP TABLE IF EXISTS ${testTable}")
@@ -197,10 +237,20 @@ suite("test_json_load", "p0") {
create_test_table1.call(testTable)
- load_json_data.call('test_json_load_case2', 'true', '', 'json', 'id=
id * 10', '', '', '', '', 'simple_json.json')
+ load_json_data.call('false', 'test_json_load_case2', 'true', '',
'json', 'id= id * 10', '', '', '', '', 'simple_json.json')
+
+ sql "sync"
+ qt_select2 "select * from ${testTable} order by id"
+
+ // test new json reader
+ sql "DROP TABLE IF EXISTS ${testTable}"
+
+ create_test_table1.call(testTable)
+
+ load_json_data.call('true', 'test_json_load_case2_2', 'true', '',
'json', 'id= id * 10', '', '', '', '', 'simple_json.json')
sql "sync"
- qt_select "select * from ${testTable} order by id"
+ qt_select2 "select * from ${testTable} order by id"
} finally {
try_sql("DROP TABLE IF EXISTS ${testTable}")
@@ -212,11 +262,22 @@ suite("test_json_load", "p0") {
create_test_table2.call(testTable)
- load_json_data.call('test_json_load_case3', 'true', '', 'json', '',
'[\"$.id\", \"$.code\"]',
+ load_json_data.call('false', 'test_json_load_case3', 'true', '',
'json', '', '[\"$.id\", \"$.code\"]',
+ '', '', '', 'simple_json.json')
+
+ sql "sync"
+ qt_select3 "select * from ${testTable} order by id"
+
+ // test new json reader
+ sql "DROP TABLE IF EXISTS ${testTable}"
+
+ create_test_table2.call(testTable)
+
+ load_json_data.call('true', 'test_json_load_case3_2', 'true', '',
'json', '', '[\"$.id\", \"$.code\"]',
'', '', '', 'simple_json.json')
sql "sync"
- qt_select "select * from ${testTable} order by id"
+ qt_select3 "select * from ${testTable} order by id"
} finally {
try_sql("DROP TABLE IF EXISTS ${testTable}")
@@ -228,11 +289,22 @@ suite("test_json_load", "p0") {
create_test_table2.call(testTable)
- load_json_data.call('test_json_load_case4', 'true', '', 'json', 'code
= id * 10 + 200', '[\"$.id\"]',
+ load_json_data.call('false', 'test_json_load_case4', 'true', '',
'json', 'code = id * 10 + 200', '[\"$.id\"]',
'', '', '', 'simple_json.json')
sql "sync"
- qt_select "select * from ${testTable} order by id"
+ qt_select4 "select * from ${testTable} order by id"
+
+ // test new json reader
+ sql "DROP TABLE IF EXISTS ${testTable}"
+
+ create_test_table2.call(testTable)
+
+ load_json_data.call('true', 'test_json_load_case4_2', 'true', '',
'json', 'code = id * 10 + 200', '[\"$.id\"]',
+ '', '', '', 'simple_json.json')
+
+ sql "sync"
+ qt_select4 "select * from ${testTable} order by id"
} finally {
try_sql("DROP TABLE IF EXISTS ${testTable}")
@@ -244,11 +316,22 @@ suite("test_json_load", "p0") {
create_test_table2.call(testTable)
- load_json_data.call('test_json_load_case5', 'true', 'true', 'json',
'', '[\"$.id\", \"$.code\"]',
+ load_json_data.call('false', 'test_json_load_case5', 'true', 'true',
'json', '', '[\"$.id\", \"$.code\"]',
'', '', '', 'multi_line_json.json')
sql "sync"
- qt_select "select * from ${testTable} order by id"
+ qt_select5 "select * from ${testTable} order by id"
+
+ // test new json reader
+ sql "DROP TABLE IF EXISTS ${testTable}"
+
+ create_test_table2.call(testTable)
+
+ load_json_data.call('true', 'test_json_load_case5_2', 'true', 'true',
'json', '', '[\"$.id\", \"$.code\"]',
+ '', '', '', 'multi_line_json.json')
+
+ sql "sync"
+ qt_select5 "select * from ${testTable} order by id"
} finally {
try_sql("DROP TABLE IF EXISTS ${testTable}")
@@ -260,11 +343,23 @@ suite("test_json_load", "p0") {
create_test_table2.call(testTable)
- load_json_data.call('test_json_load_case6', 'true', 'true', 'json',
'id= id * 10', '[\"$.id\", \"$.code\"]',
+ load_json_data.call('false', 'test_json_load_case6', 'true', 'true',
'json', 'id= id * 10', '[\"$.id\", \"$.code\"]',
+ '', '', '', 'multi_line_json.json')
+
+ sql "sync"
+ qt_select6 "select * from ${testTable} order by id"
+
+
+ // test new json reader
+ sql "DROP TABLE IF EXISTS ${testTable}"
+
+ create_test_table2.call(testTable)
+
+ load_json_data.call('true', 'test_json_load_case6_2', 'true', 'true',
'json', 'id= id * 10', '[\"$.id\", \"$.code\"]',
'', '', '', 'multi_line_json.json')
sql "sync"
- qt_select "select * from ${testTable} order by id"
+ qt_select6 "select * from ${testTable} order by id"
} finally {
try_sql("DROP TABLE IF EXISTS ${testTable}")
@@ -276,11 +371,22 @@ suite("test_json_load", "p0") {
create_test_table2.call(testTable)
- load_json_data.call('test_json_load_case7', 'true', 'true', 'json',
'id= id * 10', '[\"$.id\", \"$.code\"]',
+ load_json_data.call('false', 'test_json_load_case7', 'true', 'true',
'json', 'id= id * 10', '[\"$.id\", \"$.code\"]',
+ '', 'id > 50', '', 'multi_line_json.json')
+
+ sql "sync"
+ qt_select7 "select * from ${testTable} order by id"
+
+ // test new json reader
+ sql "DROP TABLE IF EXISTS ${testTable}"
+
+ create_test_table2.call(testTable)
+
+ load_json_data.call('true', 'test_json_load_case7_2', 'true', 'true',
'json', 'id= id * 10', '[\"$.id\", \"$.code\"]',
'', 'id > 50', '', 'multi_line_json.json')
sql "sync"
- qt_select "select * from ${testTable} order by id"
+ qt_select7 "select * from ${testTable} order by id"
} finally {
try_sql("DROP TABLE IF EXISTS ${testTable}")
@@ -292,11 +398,23 @@ suite("test_json_load", "p0") {
create_test_table2.call(testTable)
- load_json_data.call('test_json_load_case8', 'true', 'true', 'json',
'id= id * 10', '[\"$.id\", \"$.code\"]',
+ load_json_data.call('false', 'test_json_load_case8', 'true', 'true',
'json', 'id= id * 10', '[\"$.id\", \"$.code\"]',
'', 'id > 50', 'true', 'multi_line_json.json')
sql "sync"
- qt_select "select * from ${testTable} order by id"
+ qt_select8 "select * from ${testTable} order by id"
+
+
+ // test new json reader
+ sql "DROP TABLE IF EXISTS ${testTable}"
+
+ create_test_table2.call(testTable)
+
+ load_json_data.call('true', 'test_json_load_case8_2', 'true', 'true',
'json', 'id= id * 10', '[\"$.id\", \"$.code\"]',
+ '', 'id > 50', 'true', 'multi_line_json.json')
+
+ sql "sync"
+ qt_select8 "select * from ${testTable} order by id"
} finally {
try_sql("DROP TABLE IF EXISTS ${testTable}")
@@ -308,26 +426,258 @@ suite("test_json_load", "p0") {
create_test_table1.call(testTable)
- load_json_data.call('test_json_load_case9', '', 'true', 'json', 'id=
id * 10', '',
+ load_json_data.call('false', 'test_json_load_case9', '', 'true',
'json', 'id= id * 10', '',
'$.item', '', 'true', 'nest_json.json')
sql "sync"
- qt_select "select * from ${testTable} order by id"
+ qt_select9 "select * from ${testTable} order by id"
+
+ // test new json reader
+ sql "DROP TABLE IF EXISTS ${testTable}"
+
+ create_test_table1.call(testTable)
+
+ load_json_data.call('true', 'test_json_load_case9_2', '', 'true',
'json', 'id= id * 10', '',
+ '$.item', '', 'true', 'nest_json.json')
+
+ sql "sync"
+ qt_select9 "select * from ${testTable} order by id"
} finally {
try_sql("DROP TABLE IF EXISTS ${testTable}")
}
+
// case10: invalid json
try {
sql "DROP TABLE IF EXISTS ${testTable}"
create_test_table1.call(testTable)
- load_json_data.call('test_json_load_case10', '', 'true', 'json', 'id=
id * 10', '',
+ load_json_data.call('false', 'test_json_load_case10', '', 'true',
'json', 'id= id * 10', '',
'$.item', '', 'true', 'invalid_json.json', true)
sql "sync"
- qt_select "select * from ${testTable} order by id"
+ qt_select10 "select * from ${testTable} order by id"
+
+
+ // test new json reader
+ sql "DROP TABLE IF EXISTS ${testTable}"
+
+ create_test_table1.call(testTable)
+
+ load_json_data.call('true', 'test_json_load_case10_2', '', 'true',
'json', 'id= id * 10', '',
+ '$.item', '', 'true', 'invalid_json.json', true)
+
+ sql "sync"
+ qt_select10 "select * from ${testTable} order by id"
+
+ } finally {
+ try_sql("DROP TABLE IF EXISTS ${testTable}")
+ }
+
+ // case11: test json file which is unordered and no use json_path
+ try {
+ sql "DROP TABLE IF EXISTS ${testTable}"
+
+ create_test_table1.call(testTable)
+
+ load_json_data.call('false', 'test_json_load_case11', 'true', '',
'json', '', '', '', '', '', 'simple_json2.json')
+
+ sql "sync"
+ qt_select11 "select * from ${testTable} order by id"
+
+ // test new json reader
+ sql "DROP TABLE IF EXISTS ${testTable}"
+
+ create_test_table1.call(testTable)
+
+ load_json_data.call('true', 'test_json_load_case11_2', 'true', '',
'json', '', '', '', '', '', 'simple_json2.json')
+
+ sql "sync"
+ qt_select11 "select * from ${testTable} order by id"
+
+ } finally {
+ try_sql("DROP TABLE IF EXISTS ${testTable}")
+ }
+
+ // case12: test json file which is unordered and lack one column which is
nullable
+ try {
+ sql "DROP TABLE IF EXISTS ${testTable}"
+
+ create_test_table1.call(testTable)
+
+ load_json_data.call('false', 'test_json_load_case12', 'true', '',
'json', '', '', '', '', '', 'simple_json2_lack_one_column.json')
+
+ sql "sync"
+ qt_select12 "select * from ${testTable} order by id"
+
+ // test new json reader
+ sql "DROP TABLE IF EXISTS ${testTable}"
+
+ create_test_table1.call(testTable)
+
+ load_json_data.call('true', 'test_json_load_case12_2', 'true', '',
'json', '', '', '', '', '', 'simple_json2_lack_one_column.json')
+
+ sql "sync"
+ qt_select12 "select * from ${testTable} order by id"
+
+ } finally {
+ try_sql("DROP TABLE IF EXISTS ${testTable}")
+ }
+
+ // case13: test json file which is unordered and lack one column which is
not nullable
+ try {
+ sql "DROP TABLE IF EXISTS ${testTable}"
+ create_test_table3.call(testTable)
+ // should be delete after new_load_scan is ready
+ sql """ADMIN SET FRONTEND CONFIG ("enable_new_load_scan_node" =
"false");"""
+ // load the json data
+ streamLoad {
+ table "${testTable}"
+
+ // set http request header params
+ set 'strip_outer_array', "true"
+ set 'format', "json"
+ set 'max_filter_ratio', '1'
+ file "simple_json2_lack_one_column.json" // import json file
+ time 10000 // limit inflight 10s
+
+ // if declared a check callback, the default check condition will
ignore.
+ // So you must check all condition
+ check { result, exception, startTime, endTime ->
+ if (exception != null) {
+ throw exception
+ }
+ log.info("Stream load result: ${result}".toString())
+ def json = parseJson(result)
+ assertEquals("success", json.Status.toLowerCase())
+ assertEquals(json.NumberTotalRows, json.NumberLoadedRows +
json.NumberUnselectedRows + json.NumberFilteredRows)
+ assertEquals(json.NumberFilteredRows, 4)
+ assertEquals(json.NumberLoadedRows, 6)
+ assertTrue(json.NumberLoadedRows > 0 && json.LoadBytes > 0)
+ }
+ }
+ // should be deleted after new_load_scan is ready
+ sql """ADMIN SET FRONTEND CONFIG ("enable_new_load_scan_node" =
"false");"""
+ sql "sync"
+ qt_select13 "select * from ${testTable} order by id"
+
+
+ sql "DROP TABLE IF EXISTS ${testTable}"
+ create_test_table3.call(testTable)
+ // should be delete after new_load_scan is ready
+ sql """ADMIN SET FRONTEND CONFIG ("enable_new_load_scan_node" =
"true");"""
+ // load the json data
+ streamLoad {
+ table "${testTable}"
+
+ // set http request header params
+ set 'strip_outer_array', "true"
+ set 'format', "json"
+ set 'max_filter_ratio', '1'
+ file "simple_json2_lack_one_column.json" // import json file
+ time 10000 // limit inflight 10s
+
+ // if declared a check callback, the default check condition will
ignore.
+ // So you must check all condition
+ check { result, exception, startTime, endTime ->
+ if (exception != null) {
+ throw exception
+ }
+ log.info("Stream load result: ${result}".toString())
+ def json = parseJson(result)
+ assertEquals("success", json.Status.toLowerCase())
+ assertEquals(json.NumberTotalRows, json.NumberLoadedRows +
json.NumberUnselectedRows + json.NumberFilteredRows)
+ assertEquals(json.NumberFilteredRows, 4)
+ assertEquals(json.NumberLoadedRows, 6)
+ assertTrue(json.NumberLoadedRows > 0 && json.LoadBytes > 0)
+ }
+ }
+ // should be deleted after new_load_scan is ready
+ sql """ADMIN SET FRONTEND CONFIG ("enable_new_load_scan_node" =
"false");"""
+ sql "sync"
+ qt_select13 "select * from ${testTable} order by id"
+
+ } finally {
+ try_sql("DROP TABLE IF EXISTS ${testTable}")
+ }
+
+ // case14: use json_path and json_root
+ try {
+ sql "DROP TABLE IF EXISTS ${testTable}"
+
+ create_test_table1.call(testTable)
+
+ load_json_data.call('false', 'test_json_load_case14', '', 'true',
'json', 'id= id * 10', '[\"$.id\", \"$.code\"]',
+ '$.item', '', 'true', 'nest_json.json')
+
+ sql "sync"
+ qt_select14 "select * from ${testTable} order by id"
+
+ // test new json reader
+ sql "DROP TABLE IF EXISTS ${testTable}"
+
+ create_test_table1.call(testTable)
+
+ load_json_data.call('true', 'test_json_load_case14_2', '', 'true',
'json', 'id= id * 10', '[\"$.id\", \"$.code\"]',
+ '$.item', '', 'true', 'nest_json.json')
+
+ sql "sync"
+ qt_select14 "select * from ${testTable} order by id"
+
+ } finally {
+ try_sql("DROP TABLE IF EXISTS ${testTable}")
+ }
+
+ // case15: apply jsonpaths & exprs & json_root
+ try {
+ sql "DROP TABLE IF EXISTS ${testTable}"
+
+ create_test_table1.call(testTable)
+
+ load_json_data.call('false', 'test_json_load_case15', '', 'true',
'json', 'id, code, city, id= id * 10',
+ '[\"$.id\", \"$.code\", \"$.city\"]', '$.item',
'', 'true', 'nest_json.json')
+
+ sql "sync"
+ qt_select15 "select * from ${testTable} order by id"
+
+ // test new json reader
+ sql "DROP TABLE IF EXISTS ${testTable}"
+
+ create_test_table1.call(testTable)
+
+ load_json_data.call('true', 'test_json_load_case15_2', '', 'true',
'json', 'id, code, city,id= id * 10',
+ '[\"$.id\", \"$.code\", \"$.city\"]', '$.item',
'', 'true', 'nest_json.json')
+
+ sql "sync"
+ qt_select15 "select * from ${testTable} order by id"
+
+ } finally {
+ try_sql("DROP TABLE IF EXISTS ${testTable}")
+ }
+
+ // case16: apply jsonpaths & exprs & json_root
+ try {
+ sql "DROP TABLE IF EXISTS ${testTable}"
+
+ create_test_table1.call(testTable)
+
+ load_json_data.call('false', 'test_json_load_case16', 'true', '',
'json', 'id, code, city',
+ '[\"$.id\", \"$.code\", \"$.city[2]\"]', '$.item',
'', 'true', 'nest_json_array.json')
+
+ sql "sync"
+ qt_select16 "select * from ${testTable} order by id"
+
+ // test new json reader
+ sql "DROP TABLE IF EXISTS ${testTable}"
+
+ create_test_table1.call(testTable)
+
+ load_json_data.call('true', 'test_json_load_case16_2', 'true', '',
'json', 'id, code, city',
+ '[\"$.id\", \"$.code\", \"$.city[2]\"]', '$.item',
'', 'true', 'nest_json_array.json')
+
+ sql "sync"
+ qt_select16 "select * from ${testTable} order by id"
} finally {
try_sql("DROP TABLE IF EXISTS ${testTable}")
@@ -342,7 +692,7 @@ suite("test_json_load", "p0") {
def hdfs_file_path = uploadToHdfs "stream_load/simple_object_json.json"
def format = "json"
- // case11: import json use pre-filter exprs
+ // case17: import json use pre-filter exprs
try {
sql "DROP TABLE IF EXISTS ${testTable}"
@@ -357,7 +707,7 @@ suite("test_json_load", "p0") {
try_sql("DROP TABLE IF EXISTS ${testTable}")
}
- // case12: import json use pre-filter and where exprs
+ // case18: import json use pre-filter and where exprs
try {
sql "DROP TABLE IF EXISTS ${testTable}"
@@ -372,12 +722,12 @@ suite("test_json_load", "p0") {
try_sql("DROP TABLE IF EXISTS ${testTable}")
}
- // case13: invalid json
+ // case19: invalid json
try {
sql "DROP TABLE IF EXISTS ${testTable}"
test_invalid_json_array_table.call(testTable)
- load_json_data.call('test_json_load_case11', 'true', '', 'json',
'', '',
+ load_json_data.call('false', 'test_json_load_case19', 'true', '',
'json', '', '',
'', '', '', 'invalid_json_array.json', true)
sql "sync"
diff --git
a/regression-test/suites/tpch_sf1_p0/multi_catalog_query_parquet/hive_catalog.groovy
b/regression-test/suites/tpch_sf1_p0/multi_catalog_query_parquet/hive_catalog.groovy
index 9db1324d4e..30d2798ef2 100644
---
a/regression-test/suites/tpch_sf1_p0/multi_catalog_query_parquet/hive_catalog.groovy
+++
b/regression-test/suites/tpch_sf1_p0/multi_catalog_query_parquet/hive_catalog.groovy
@@ -801,22 +801,23 @@ order by
String[][] backends = sql """ show backends; """
assertTrue(backends.size() > 0)
for (String[] backend in backends) {
- StringBuilder setConfigCommand = new StringBuilder();
- setConfigCommand.append("curl -X POST http://")
- setConfigCommand.append(backend[2])
- setConfigCommand.append(":")
- setConfigCommand.append(backend[5])
- setConfigCommand.append("/api/update_config?")
- String command1 = setConfigCommand.toString() +
"enable_new_load_scan_node=true"
- logger.info(command1)
- String command2 = setConfigCommand.toString() +
"enable_new_file_scanner=true"
- logger.info(command2)
- def process1 = command1.execute()
- int code = process1.waitFor()
- assertEquals(code, 0)
- def process2 = command2.execute()
- code = process1.waitFor()
- assertEquals(code, 0)
+ // No need to set this config anymore, but leave this code sample
here
+ // StringBuilder setConfigCommand = new StringBuilder();
+ // setConfigCommand.append("curl -X POST http://")
+ // setConfigCommand.append(backend[2])
+ // setConfigCommand.append(":")
+ // setConfigCommand.append(backend[5])
+ // setConfigCommand.append("/api/update_config?")
+ // String command1 = setConfigCommand.toString() +
"enable_new_load_scan_node=true"
+ // logger.info(command1)
+ // String command2 = setConfigCommand.toString() +
"enable_new_file_scanner=true"
+ // logger.info(command2)
+ // def process1 = command1.execute()
+ // int code = process1.waitFor()
+ // assertEquals(code, 0)
+ // def process2 = command2.execute()
+ // code = process1.waitFor()
+ // assertEquals(code, 0)
}
}
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]