This is an automated email from the ASF dual-hosted git repository.

morningman pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/doris.git


The following commit(s) were added to refs/heads/master by this push:
     new c418bbd2d1 [feature-wip](new-scan) support Json reader (#13546)
c418bbd2d1 is described below

commit c418bbd2d1c52776becd793562a2ca2ec4b74b96
Author: Tiewei Fang <[email protected]>
AuthorDate: Wed Oct 26 12:52:21 2022 +0800

    [feature-wip](new-scan) support Json reader (#13546)
    
    Issue Number: close #12574
    This pr adds `NewJsonReader` which implements GenericReader interface to 
support read json format file.
    
    TODO:
    1. modify `_scann_eof` later.
    2. Rename `NewJsonReader` to `JsonReader` when `JsonReader` is deleted.
---
 be/src/vec/CMakeLists.txt                          |   1 +
 be/src/vec/exec/format/json/new_json_reader.cpp    | 754 +++++++++++++++++++++
 be/src/vec/exec/format/json/new_json_reader.h      | 150 ++++
 be/src/vec/exec/scan/vfile_scanner.cpp             |   7 +
 .../docker-compose/hive/scripts/hive-metastore.sh  |   2 +
 .../scripts/json_format_test/multi_line_json.json  |   2 +
 .../multi_line_json_lack_column.json               |   2 +
 .../json_format_test/multi_line_json_unorder.json  |   2 +
 .../hive/scripts/json_format_test/nest_json.json   |   5 +
 .../json_format_test/simple_object_json.json       |  12 +
 .../org/apache/doris/analysis/DataDescription.java |   9 +-
 .../main/java/org/apache/doris/catalog/Env.java    |   2 +-
 .../org/apache/doris/load/BrokerFileGroup.java     |   6 +-
 .../apache/doris/planner/StreamLoadPlanner.java    |   5 -
 regression-test/conf/regression-conf.groovy        |   1 +
 .../data/load_p0/broker_load/test_array_load.out   |  40 ++
 .../stream_load/load_json_null_to_nullable.out     |  12 +
 .../stream_load/load_json_with_jsonpath.out        |  10 +
 .../data/load_p0/stream_load/nest_json.json        |   3 +
 .../data/load_p0/stream_load/nest_json_array.json  |  74 ++
 .../data/load_p0/stream_load/simple_json2.json     |  52 ++
 .../stream_load/simple_json2_lack_one_column.json  |  48 ++
 .../load_p0/stream_load/test_hdfs_json_load.out    | 305 +++++++++
 .../data/load_p0/stream_load/test_json_load.out    | 236 ++++++-
 .../load_p0/broker_load/test_array_load.groovy     | 136 ++--
 ...n_column_exclude_schema_without_jsonpath.groovy |  19 +-
 .../stream_load/load_json_null_to_nullable.groovy  |  38 +-
 .../stream_load/load_json_with_jsonpath.groovy     |  36 +-
 .../load_p0/stream_load/test_hdfs_json_load.groovy | 554 +++++++++++++++
 .../load_p0/stream_load/test_json_load.groovy      | 404 ++++++++++-
 .../hive_catalog.groovy                            |  33 +-
 31 files changed, 2813 insertions(+), 147 deletions(-)

diff --git a/be/src/vec/CMakeLists.txt b/be/src/vec/CMakeLists.txt
index d3ae2849de..2ba617295c 100644
--- a/be/src/vec/CMakeLists.txt
+++ b/be/src/vec/CMakeLists.txt
@@ -258,6 +258,7 @@ set(VEC_FILES
   exec/scan/new_es_scan_node.cpp
   exec/format/csv/csv_reader.cpp
   exec/format/orc/vorc_reader.cpp
+  exec/format/json/new_json_reader.cpp
   )
 
 add_library(Vec STATIC
diff --git a/be/src/vec/exec/format/json/new_json_reader.cpp 
b/be/src/vec/exec/format/json/new_json_reader.cpp
new file mode 100644
index 0000000000..add32361e3
--- /dev/null
+++ b/be/src/vec/exec/format/json/new_json_reader.cpp
@@ -0,0 +1,754 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+#include "vec/exec/format/json/new_json_reader.h"
+
+#include "common/compiler_util.h"
+#include "exec/plain_text_line_reader.h"
+#include "exprs/json_functions.h"
+#include "io/file_factory.h"
+#include "runtime/descriptors.h"
+#include "runtime/runtime_state.h"
+#include "vec/core/block.h"
+#include "vec/exec/scan/vscanner.h"
+namespace doris::vectorized {
+
+NewJsonReader::NewJsonReader(RuntimeState* state, RuntimeProfile* profile, 
ScannerCounter* counter,
+                             const TFileScanRangeParams& params, const 
TFileRangeDesc& range,
+                             const std::vector<SlotDescriptor*>& 
file_slot_descs, bool* scanner_eof)
+        : _vhandle_json_callback(nullptr),
+          _state(state),
+          _profile(profile),
+          _counter(counter),
+          _params(params),
+          _range(range),
+          _file_slot_descs(file_slot_descs),
+          _file_reader(nullptr),
+          _file_reader_s(nullptr),
+          _real_file_reader(nullptr),
+          _line_reader(nullptr),
+          _reader_eof(false),
+          _skip_first_line(false),
+          _next_row(0),
+          _total_rows(0),
+          _value_allocator(_value_buffer, sizeof(_value_buffer)),
+          _parse_allocator(_parse_buffer, sizeof(_parse_buffer)),
+          _origin_json_doc(&_value_allocator, sizeof(_parse_buffer), 
&_parse_allocator),
+          _scanner_eof(scanner_eof) {
+    _file_format_type = _params.format_type;
+
+    _bytes_read_counter = ADD_COUNTER(_profile, "BytesRead", TUnit::BYTES);
+    _read_timer = ADD_TIMER(_profile, "ReadTime");
+    _file_read_timer = ADD_TIMER(_profile, "FileReadTime");
+}
+
+Status NewJsonReader::init_reader() {
+    RETURN_IF_ERROR(_get_range_params());
+
+    RETURN_IF_ERROR(_open_file_reader());
+    if (_read_json_by_line) {
+        RETURN_IF_ERROR(_open_line_reader());
+    }
+
+    // generate _parsed_jsonpaths and _parsed_json_root
+    RETURN_IF_ERROR(_parse_jsonpath_and_json_root());
+
+    //improve performance
+    if (_parsed_jsonpaths.empty()) { // input is a simple json-string
+        _vhandle_json_callback = &NewJsonReader::_vhandle_simple_json;
+    } else { // input is a complex json-string and a json-path
+        if (_strip_outer_array) {
+            _vhandle_json_callback = 
&NewJsonReader::_vhandle_flat_array_complex_json;
+        } else {
+            _vhandle_json_callback = 
&NewJsonReader::_vhandle_nested_complex_json;
+        }
+    }
+    return Status::OK();
+}
+
+Status NewJsonReader::get_next_block(Block* block, size_t* read_rows, bool* 
eof) {
+    if (_reader_eof == true) {
+        *eof = true;
+        return Status::OK();
+    }
+
+    const int batch_size = _state->batch_size();
+    auto columns = block->mutate_columns();
+
+    while (columns[0]->size() < batch_size && !_reader_eof) {
+        if (UNLIKELY(_read_json_by_line && _skip_first_line)) {
+            size_t size = 0;
+            const uint8_t* line_ptr = nullptr;
+            RETURN_IF_ERROR(_line_reader->read_line(&line_ptr, &size, 
&_reader_eof));
+            _skip_first_line = false;
+            continue;
+        }
+
+        bool is_empty_row = false;
+
+        RETURN_IF_ERROR(_read_json_column(columns, _file_slot_descs, 
&is_empty_row, &_reader_eof));
+        ++(*read_rows);
+        if (is_empty_row) {
+            // Read empty row, just continue
+            continue;
+        }
+    }
+
+    columns.clear();
+    return Status::OK();
+}
+
+Status NewJsonReader::get_columns(std::unordered_map<std::string, 
TypeDescriptor>* name_to_type,
+                                  std::unordered_set<std::string>* 
missing_cols) {
+    for (auto& slot : _file_slot_descs) {
+        name_to_type->emplace(slot->col_name(), slot->type());
+    }
+    return Status::OK();
+}
+
+Status NewJsonReader::_get_range_params() {
+    if (!_params.__isset.file_attributes) {
+        return Status::InternalError("BE cat get file_attributes");
+    }
+
+    // get line_delimiter
+    if (_params.file_attributes.__isset.text_params &&
+        _params.file_attributes.text_params.__isset.line_delimiter) {
+        _line_delimiter = _params.file_attributes.text_params.line_delimiter;
+        _line_delimiter_length = _line_delimiter.size();
+    }
+
+    if (_params.file_attributes.__isset.jsonpaths) {
+        _jsonpaths = _params.file_attributes.jsonpaths;
+    }
+    if (_params.file_attributes.__isset.json_root) {
+        _json_root = _params.file_attributes.json_root;
+    }
+    if (_params.file_attributes.__isset.read_json_by_line) {
+        _read_json_by_line = _params.file_attributes.read_json_by_line;
+    }
+    if (_params.file_attributes.__isset.strip_outer_array) {
+        _strip_outer_array = _params.file_attributes.strip_outer_array;
+    }
+    if (_params.file_attributes.__isset.num_as_string) {
+        _num_as_string = _params.file_attributes.num_as_string;
+    }
+    if (_params.file_attributes.__isset.fuzzy_parse) {
+        _fuzzy_parse = _params.file_attributes.fuzzy_parse;
+    }
+    return Status::OK();
+}
+
+Status NewJsonReader::_open_file_reader() {
+    int64_t start_offset = _range.start_offset;
+    if (start_offset != 0) {
+        start_offset -= 1;
+    }
+
+    if (_params.file_type == TFileType::FILE_STREAM) {
+        RETURN_IF_ERROR(FileFactory::create_pipe_reader(_range.load_id, 
_file_reader_s));
+        _real_file_reader = _file_reader_s.get();
+    } else {
+        RETURN_IF_ERROR(FileFactory::create_file_reader(
+                _profile, _params, _range.path, start_offset, 
_range.file_size, 0, _file_reader));
+        _real_file_reader = _file_reader.get();
+    }
+    return _real_file_reader->open();
+}
+
+Status NewJsonReader::_open_line_reader() {
+    int64_t size = _range.size;
+    if (_range.start_offset != 0) {
+        // When we fetch range doesn't start from 0, size will += 1.
+
+        // TODO(ftw): check what if file_reader is stream_pipe? Is `size+=1` 
is correct?
+        size += 1;
+        _skip_first_line = true;
+    } else {
+        _skip_first_line = false;
+    }
+    _line_reader.reset(new PlainTextLineReader(_profile, _real_file_reader, 
nullptr, size,
+                                               _line_delimiter, 
_line_delimiter_length));
+    return Status::OK();
+}
+
+Status NewJsonReader::_parse_jsonpath_and_json_root() {
+    // parse jsonpaths
+    if (!_jsonpaths.empty()) {
+        rapidjson::Document jsonpaths_doc;
+        if (!jsonpaths_doc.Parse(_jsonpaths.c_str(), 
_jsonpaths.length()).HasParseError()) {
+            if (!jsonpaths_doc.IsArray()) {
+                return Status::InvalidArgument("Invalid json path: {}", 
_jsonpaths);
+            } else {
+                for (int i = 0; i < jsonpaths_doc.Size(); i++) {
+                    const rapidjson::Value& path = jsonpaths_doc[i];
+                    if (!path.IsString()) {
+                        return Status::InvalidArgument("Invalid json path: 
{}", _jsonpaths);
+                    }
+                    std::vector<JsonPath> parsed_paths;
+                    JsonFunctions::parse_json_paths(path.GetString(), 
&parsed_paths);
+                    _parsed_jsonpaths.push_back(std::move(parsed_paths));
+                }
+            }
+        } else {
+            return Status::InvalidArgument("Invalid json path: {}", 
_jsonpaths);
+        }
+    }
+
+    // parse jsonroot
+    if (!_json_root.empty()) {
+        JsonFunctions::parse_json_paths(_json_root, &_parsed_json_root);
+    }
+    return Status::OK();
+}
+
+Status NewJsonReader::_read_json_column(std::vector<MutableColumnPtr>& columns,
+                                        const std::vector<SlotDescriptor*>& 
slot_descs,
+                                        bool* is_empty_row, bool* eof) {
+    return (this->*_vhandle_json_callback)(columns, slot_descs, is_empty_row, 
eof);
+}
+
+Status NewJsonReader::_vhandle_simple_json(std::vector<MutableColumnPtr>& 
columns,
+                                           const std::vector<SlotDescriptor*>& 
slot_descs,
+                                           bool* is_empty_row, bool* eof) {
+    do {
+        bool valid = false;
+        if (_next_row >= _total_rows) { // parse json and generic document
+            Status st = _parse_json(is_empty_row, eof);
+            if (st.is_data_quality_error()) {
+                continue; // continue to read next
+            }
+            RETURN_IF_ERROR(st);
+            if (*is_empty_row == true) {
+                return Status::OK();
+            }
+            _name_map.clear();
+            rapidjson::Value* objectValue = nullptr;
+            if (_json_doc->IsArray()) {
+                _total_rows = _json_doc->Size();
+                if (_total_rows == 0) {
+                    // may be passing an empty json, such as "[]"
+                    RETURN_IF_ERROR(_append_error_msg(*_json_doc, "Empty json 
line", "", nullptr));
+
+                    // TODO(ftw): check _reader_eof??
+                    if (_reader_eof) {
+                        *is_empty_row = true;
+                        return Status::OK();
+                    }
+                    continue;
+                }
+                objectValue = &(*_json_doc)[0];
+            } else {
+                _total_rows = 1; // only one row
+                objectValue = _json_doc;
+            }
+            _next_row = 0;
+            if (_fuzzy_parse) {
+                for (auto v : slot_descs) {
+                    for (int i = 0; i < objectValue->MemberCount(); ++i) {
+                        auto it = objectValue->MemberBegin() + i;
+                        if (v->col_name() == it->name.GetString()) {
+                            _name_map[v->col_name()] = i;
+                            break;
+                        }
+                    }
+                }
+            }
+        }
+
+        if (_json_doc->IsArray()) {                                  // handle 
case 1
+            rapidjson::Value& objectValue = (*_json_doc)[_next_row]; // json 
object
+            RETURN_IF_ERROR(_set_column_value(objectValue, columns, 
slot_descs, &valid));
+        } else { // handle case 2
+            RETURN_IF_ERROR(_set_column_value(*_json_doc, columns, slot_descs, 
&valid));
+        }
+        _next_row++;
+        if (!valid) {
+            if (*_scanner_eof) {
+                // When _scanner_eof is true and valid is false, it means that 
we have encountered
+                // unqualified data and decided to stop the scan.
+                *is_empty_row = true;
+                // TODO(ftw): check *eof=true?
+                *eof = true;
+                return Status::OK();
+            }
+            continue;
+        }
+        *is_empty_row = false;
+        break; // get a valid row, then break
+    } while (_next_row <= _total_rows);
+    return Status::OK();
+}
+
+Status NewJsonReader::_vhandle_flat_array_complex_json(
+        std::vector<MutableColumnPtr>& columns, const 
std::vector<SlotDescriptor*>& slot_descs,
+        bool* is_empty_row, bool* eof) {
+    do {
+        if (_next_row >= _total_rows) {
+            Status st = _parse_json(is_empty_row, eof);
+            if (st.is_data_quality_error()) {
+                continue; // continue to read next
+            }
+            RETURN_IF_ERROR(st);
+            if (*is_empty_row == true) {
+                if (st == Status::OK()) {
+                    return Status::OK();
+                }
+                if (_total_rows == 0) {
+                    continue;
+                }
+            }
+        }
+        rapidjson::Value& objectValue = (*_json_doc)[_next_row++];
+        bool valid = true;
+        RETURN_IF_ERROR(_write_columns_by_jsonpath(objectValue, slot_descs, 
columns, &valid));
+        if (!valid) {
+            continue; // process next line
+        }
+        *is_empty_row = false;
+        break; // get a valid row, then break
+    } while (_next_row <= _total_rows);
+    return Status::OK();
+}
+
+Status 
NewJsonReader::_vhandle_nested_complex_json(std::vector<MutableColumnPtr>& 
columns,
+                                                   const 
std::vector<SlotDescriptor*>& slot_descs,
+                                                   bool* is_empty_row, bool* 
eof) {
+    while (true) {
+        Status st = _parse_json(is_empty_row, eof);
+        if (st.is_data_quality_error()) {
+            continue; // continue to read next
+        }
+        RETURN_IF_ERROR(st);
+        if (*is_empty_row == true) {
+            return Status::OK();
+        }
+        *is_empty_row = false;
+        break; // read a valid row
+    }
+    bool valid = true;
+    RETURN_IF_ERROR(_write_columns_by_jsonpath(*_json_doc, slot_descs, 
columns, &valid));
+    if (!valid) {
+        // there is only one line in this case, so if it return false, just 
set is_empty_row true
+        // so that the caller will continue reading next line.
+        *is_empty_row = true;
+    }
+    return Status::OK();
+}
+
+Status NewJsonReader::_parse_json(bool* is_empty_row, bool* eof) {
+    size_t size = 0;
+    RETURN_IF_ERROR(_parse_json_doc(&size, eof));
+
+    // read all data, then return
+    if (size == 0 || *eof) {
+        *is_empty_row = true;
+        return Status::OK();
+    }
+
+    if (!_parsed_jsonpaths.empty() && _strip_outer_array) {
+        _total_rows = _json_doc->Size();
+        _next_row = 0;
+
+        if (_total_rows == 0) {
+            // meet an empty json array.
+            *is_empty_row = true;
+        }
+    }
+    return Status::OK();
+}
+
+// read one json string from line reader or file reader and parse it to json 
doc.
+// return Status::DataQualityError() if data has quality error.
+// return other error if encounter other problemes.
+// return Status::OK() if parse succeed or reach EOF.
+Status NewJsonReader::_parse_json_doc(size_t* size, bool* eof) {
+    // read a whole message
+    SCOPED_TIMER(_file_read_timer);
+    const uint8_t* json_str = nullptr;
+    std::unique_ptr<uint8_t[]> json_str_ptr;
+    if (_line_reader != nullptr) {
+        RETURN_IF_ERROR(_line_reader->read_line(&json_str, size, eof));
+    } else {
+        int64_t length = 0;
+        RETURN_IF_ERROR(_real_file_reader->read_one_message(&json_str_ptr, 
&length));
+        json_str = json_str_ptr.get();
+        *size = length;
+        if (length == 0) {
+            *eof = true;
+        }
+    }
+
+    _bytes_read_counter += *size;
+    if (*eof) {
+        return Status::OK();
+    }
+
+    // clear memory here.
+    _value_allocator.Clear();
+    _parse_allocator.Clear();
+    bool has_parse_error = false;
+    // parse jsondata to JsonDoc
+
+    // As the issue: https://github.com/Tencent/rapidjson/issues/1458
+    // Now, rapidjson only support uint64_t, So lagreint load cause bug. We 
use kParseNumbersAsStringsFlag.
+    if (_num_as_string) {
+        has_parse_error =
+                _origin_json_doc
+                        
.Parse<rapidjson::kParseNumbersAsStringsFlag>((char*)json_str, *size)
+                        .HasParseError();
+    } else {
+        has_parse_error = _origin_json_doc.Parse((char*)json_str, 
*size).HasParseError();
+    }
+
+    if (has_parse_error) {
+        fmt::memory_buffer error_msg;
+        fmt::format_to(error_msg, "Parse json data for JsonDoc failed. code: 
{}, error info: {}",
+                       _origin_json_doc.GetParseError(),
+                       
rapidjson::GetParseError_En(_origin_json_doc.GetParseError()));
+        RETURN_IF_ERROR(_state->append_error_msg_to_file(
+                [&]() -> std::string { return std::string((char*)json_str, 
*size); },
+                [&]() -> std::string { return fmt::to_string(error_msg); }, 
_scanner_eof));
+        _counter->num_rows_filtered++;
+        if (*_scanner_eof) {
+            // Case A: if _scanner_eof is set to true in 
"append_error_msg_to_file", which means
+            // we meet enough invalid rows and the scanner should be stopped.
+            // So we set eof to true and return OK, the caller will stop the 
process as we meet the end of file.
+            *eof = true;
+            return Status::OK();
+        }
+        return Status::DataQualityError(fmt::to_string(error_msg));
+    }
+
+    // set json root
+    if (_parsed_json_root.size() != 0) {
+        _json_doc = JsonFunctions::get_json_object_from_parsed_json(
+                _parsed_json_root, &_origin_json_doc, 
_origin_json_doc.GetAllocator());
+        if (_json_doc == nullptr) {
+            fmt::memory_buffer error_msg;
+            fmt::format_to(error_msg, "{}", "JSON Root not found.");
+            RETURN_IF_ERROR(_state->append_error_msg_to_file(
+                    [&]() -> std::string { return 
_print_json_value(_origin_json_doc); },
+                    [&]() -> std::string { return fmt::to_string(error_msg); 
}, _scanner_eof));
+            _counter->num_rows_filtered++;
+            if (*_scanner_eof) {
+                // Same as Case A
+                *eof = true;
+                return Status::OK();
+            }
+            return Status::DataQualityError(fmt::to_string(error_msg));
+        }
+    } else {
+        _json_doc = &_origin_json_doc;
+    }
+
+    if (_json_doc->IsArray() && !_strip_outer_array) {
+        fmt::memory_buffer error_msg;
+        fmt::format_to(error_msg, "{}",
+                       "JSON data is array-object, `strip_outer_array` must be 
TRUE.");
+        RETURN_IF_ERROR(_state->append_error_msg_to_file(
+                [&]() -> std::string { return 
_print_json_value(_origin_json_doc); },
+                [&]() -> std::string { return fmt::to_string(error_msg); }, 
_scanner_eof));
+        _counter->num_rows_filtered++;
+        if (*_scanner_eof) {
+            // Same as Case A
+            *eof = true;
+            return Status::OK();
+        }
+        return Status::DataQualityError(fmt::to_string(error_msg));
+    }
+
+    if (!_json_doc->IsArray() && _strip_outer_array) {
+        fmt::memory_buffer error_msg;
+        fmt::format_to(error_msg, "{}",
+                       "JSON data is not an array-object, `strip_outer_array` 
must be FALSE.");
+        RETURN_IF_ERROR(_state->append_error_msg_to_file(
+                [&]() -> std::string { return 
_print_json_value(_origin_json_doc); },
+                [&]() -> std::string { return fmt::to_string(error_msg); }, 
_scanner_eof));
+        _counter->num_rows_filtered++;
+        if (*_scanner_eof) {
+            // Same as Case A
+            *eof = true;
+            return Status::OK();
+        }
+        return Status::DataQualityError(fmt::to_string(error_msg));
+    }
+
+    return Status::OK();
+}
+
+// for simple format json
+// set valid to true and return OK if succeed.
+// set valid to false and return OK if we met an invalid row.
+// return other status if encounter other problmes.
+Status NewJsonReader::_set_column_value(rapidjson::Value& objectValue,
+                                        std::vector<MutableColumnPtr>& columns,
+                                        const std::vector<SlotDescriptor*>& 
slot_descs,
+                                        bool* valid) {
+    if (!objectValue.IsObject()) {
+        // Here we expect the incoming `objectValue` to be a Json Object, such 
as {"key" : "value"},
+        // not other type of Json format.
+        RETURN_IF_ERROR(_append_error_msg(objectValue, "Expect json object 
value", "", valid));
+        return Status::OK();
+    }
+
+    int ctx_idx = 0;
+    bool has_valid_value = false;
+    size_t cur_row_count = columns[0]->size();
+    for (auto slot_desc : slot_descs) {
+        if (!slot_desc->is_materialized()) {
+            continue;
+        }
+
+        int dest_index = ctx_idx++;
+        auto* column_ptr = columns[dest_index].get();
+        rapidjson::Value::ConstMemberIterator it = objectValue.MemberEnd();
+
+        if (_fuzzy_parse) {
+            auto idx_it = _name_map.find(slot_desc->col_name());
+            if (idx_it != _name_map.end() && idx_it->second < 
objectValue.MemberCount()) {
+                it = objectValue.MemberBegin() + idx_it->second;
+            }
+        } else {
+            it = objectValue.FindMember(
+                    rapidjson::Value(slot_desc->col_name().c_str(), 
slot_desc->col_name().size()));
+        }
+
+        if (it != objectValue.MemberEnd()) {
+            const rapidjson::Value& value = it->value;
+            RETURN_IF_ERROR(_write_data_to_column(&value, slot_desc, 
column_ptr, valid));
+            if (!(*valid)) {
+                return Status::OK();
+            }
+            has_valid_value = true;
+        } else { // not found
+            // When the entire row has no valid value, this row should be 
filtered,
+            // so the default value cannot be directly inserted here
+            if (!slot_desc->is_nullable()) {
+                RETURN_IF_ERROR(_append_error_msg(
+                        objectValue,
+                        "The column `{}` is not nullable, but it's not found 
in jsondata.",
+                        slot_desc->col_name(), valid));
+                break;
+            }
+        }
+    }
+    if (!has_valid_value) {
+        RETURN_IF_ERROR(_append_error_msg(objectValue, "All fields is null, 
this is a invalid row.",
+                                          "", valid));
+        return Status::OK();
+    }
+    ctx_idx = 0;
+    int nullcount = 0;
+    // fill missing slot
+    for (auto slot_desc : slot_descs) {
+        if (!slot_desc->is_materialized()) {
+            continue;
+        }
+        int dest_index = ctx_idx++;
+        auto* column_ptr = columns[dest_index].get();
+        if (column_ptr->size() < cur_row_count + 1) {
+            DCHECK(column_ptr->size() == cur_row_count);
+            column_ptr->assume_mutable()->insert_default();
+            ++nullcount;
+        }
+        DCHECK(column_ptr->size() == cur_row_count + 1);
+    }
+    // There is at least one valid value here
+    DCHECK(nullcount < columns.size());
+    *valid = true;
+    return Status::OK();
+}
+
+Status 
NewJsonReader::_write_data_to_column(rapidjson::Value::ConstValueIterator value,
+                                            SlotDescriptor* slot_desc,
+                                            vectorized::IColumn* column_ptr, 
bool* valid) {
+    const char* str_value = nullptr;
+    char tmp_buf[128] = {0};
+    int32_t wbytes = 0;
+    std::string json_str;
+
+    vectorized::ColumnNullable* nullable_column = nullptr;
+    if (slot_desc->is_nullable()) {
+        nullable_column = 
reinterpret_cast<vectorized::ColumnNullable*>(column_ptr);
+        // kNullType will put 1 into the Null map, so there is no need to push 
0 for kNullType.
+        if (value->GetType() != rapidjson::Type::kNullType) {
+            nullable_column->get_null_map_data().push_back(0);
+        } else {
+            nullable_column->insert_default();
+        }
+        column_ptr = &nullable_column->get_nested_column();
+    }
+
+    switch (value->GetType()) {
+    case rapidjson::Type::kStringType:
+        str_value = value->GetString();
+        wbytes = strlen(str_value);
+        break;
+    case rapidjson::Type::kNumberType:
+        if (value->IsUint()) {
+            wbytes = sprintf(tmp_buf, "%u", value->GetUint());
+        } else if (value->IsInt()) {
+            wbytes = sprintf(tmp_buf, "%d", value->GetInt());
+        } else if (value->IsUint64()) {
+            wbytes = sprintf(tmp_buf, "%" PRIu64, value->GetUint64());
+        } else if (value->IsInt64()) {
+            wbytes = sprintf(tmp_buf, "%" PRId64, value->GetInt64());
+        } else {
+            wbytes = sprintf(tmp_buf, "%f", value->GetDouble());
+        }
+        str_value = tmp_buf;
+        break;
+    case rapidjson::Type::kFalseType:
+        wbytes = 1;
+        str_value = (char*)"0";
+        break;
+    case rapidjson::Type::kTrueType:
+        wbytes = 1;
+        str_value = (char*)"1";
+        break;
+    case rapidjson::Type::kNullType:
+        if (!slot_desc->is_nullable()) {
+            RETURN_IF_ERROR(_append_error_msg(
+                    *value, "Json value is null, but the column `{}` is not 
nullable.",
+                    slot_desc->col_name(), valid));
+            return Status::OK();
+        }
+        // return immediately to prevent from repeatedly insert_data
+        *valid = true;
+        return Status::OK();
+    default:
+        // for other type like array or object. we convert it to string to save
+        json_str = NewJsonReader::_print_json_value(*value);
+        wbytes = json_str.size();
+        str_value = json_str.c_str();
+        break;
+    }
+
+    // TODO: if the vexpr can support another 'slot_desc type' than 
'TYPE_VARCHAR',
+    // we need use a function to support these types to insert data in columns.
+    DCHECK(slot_desc->type().type == TYPE_VARCHAR);
+    assert_cast<ColumnString*>(column_ptr)->insert_data(str_value, wbytes);
+
+    *valid = true;
+    return Status::OK();
+}
+
+Status NewJsonReader::_write_columns_by_jsonpath(rapidjson::Value& objectValue,
+                                                 const 
std::vector<SlotDescriptor*>& slot_descs,
+                                                 
std::vector<MutableColumnPtr>& columns,
+                                                 bool* valid) {
+    int ctx_idx = 0;
+    bool has_valid_value = false;
+    size_t cur_row_count = columns[0]->size();
+    for (auto slot_desc : slot_descs) {
+        if (!slot_desc->is_materialized()) {
+            continue;
+        }
+        int i = ctx_idx++;
+        auto* column_ptr = columns[i].get();
+        rapidjson::Value* json_values = nullptr;
+        bool wrap_explicitly = false;
+        if (LIKELY(i < _parsed_jsonpaths.size())) {
+            json_values = JsonFunctions::get_json_array_from_parsed_json(
+                    _parsed_jsonpaths[i], &objectValue, 
_origin_json_doc.GetAllocator(),
+                    &wrap_explicitly);
+        }
+
+        if (json_values == nullptr) {
+            // not match in jsondata.
+            if (!slot_descs[i]->is_nullable()) {
+                RETURN_IF_ERROR(_append_error_msg(
+                        objectValue,
+                        "The column `{}` is not nullable, but it's not found 
in jsondata.",
+                        slot_descs[i]->col_name(), valid));
+                return Status::OK();
+            }
+        } else {
+            CHECK(json_values->IsArray());
+            if (json_values->Size() == 1 && wrap_explicitly) {
+                // NOTICE1: JsonFunctions::get_json_array_from_parsed_json() 
will wrap the single json object with an array.
+                // so here we unwrap the array to get the real element.
+                // if json_values' size > 1, it means we just match an array, 
not a wrapped one, so no need to unwrap.
+                json_values = &((*json_values)[0]);
+            }
+            RETURN_IF_ERROR(_write_data_to_column(json_values, slot_descs[i], 
column_ptr, valid));
+            if (!(*valid)) {
+                return Status::OK();
+            }
+            has_valid_value = true;
+        }
+    }
+    if (!has_valid_value) {
+        RETURN_IF_ERROR(_append_error_msg(
+                objectValue, "All fields is null or not matched, this is a 
invalid row.", "",
+                valid));
+        return Status::OK();
+    }
+    ctx_idx = 0;
+    for (auto slot_desc : slot_descs) {
+        if (!slot_desc->is_materialized()) {
+            continue;
+        }
+        int dest_index = ctx_idx++;
+        auto* column_ptr = columns[dest_index].get();
+        if (column_ptr->size() < cur_row_count + 1) {
+            DCHECK(column_ptr->size() == cur_row_count);
+            column_ptr->assume_mutable()->insert_default();
+        }
+        DCHECK(column_ptr->size() == cur_row_count + 1);
+    }
+    return Status::OK();
+}
+
+Status NewJsonReader::_append_error_msg(const rapidjson::Value& objectValue, 
std::string error_msg,
+                                        std::string col_name, bool* valid) {
+    std::string err_msg;
+    if (!col_name.empty()) {
+        fmt::memory_buffer error_buf;
+        fmt::format_to(error_buf, error_msg, col_name);
+        err_msg = fmt::to_string(error_buf);
+    } else {
+        err_msg = error_msg;
+    }
+
+    RETURN_IF_ERROR(_state->append_error_msg_to_file(
+            [&]() -> std::string { return 
NewJsonReader::_print_json_value(objectValue); },
+            [&]() -> std::string { return err_msg; }, _scanner_eof));
+
+    // TODO(ftw): check here?
+    if (*_scanner_eof == true) {
+        _reader_eof = true;
+    }
+
+    _counter->num_rows_filtered++;
+    if (valid != nullptr) {
+        // current row is invalid
+        *valid = false;
+    }
+    return Status::OK();
+}
+
+std::string NewJsonReader::_print_json_value(const rapidjson::Value& value) {
+    rapidjson::StringBuffer buffer;
+    buffer.Clear();
+    rapidjson::Writer<rapidjson::StringBuffer> writer(buffer);
+    value.Accept(writer);
+    return std::string(buffer.GetString());
+}
+
+} // namespace doris::vectorized
diff --git a/be/src/vec/exec/format/json/new_json_reader.h 
b/be/src/vec/exec/format/json/new_json_reader.h
new file mode 100644
index 0000000000..aee11535fb
--- /dev/null
+++ b/be/src/vec/exec/format/json/new_json_reader.h
@@ -0,0 +1,150 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+#pragma once
+
+#include <rapidjson/document.h>
+#include <rapidjson/error/en.h>
+#include <rapidjson/stringbuffer.h>
+#include <rapidjson/writer.h>
+
+#include "vec/exec/format/generic_reader.h"
+namespace doris {
+
+class FileReader;
+struct JsonPath;
+class LineReader;
+class SlotDescriptor;
+
+namespace vectorized {
+
+struct ScannerCounter;
+
+class NewJsonReader : public GenericReader {
+public:
+    NewJsonReader(RuntimeState* state, RuntimeProfile* profile, 
ScannerCounter* counter,
+                  const TFileScanRangeParams& params, const TFileRangeDesc& 
range,
+                  const std::vector<SlotDescriptor*>& file_slot_descs, bool* 
scanner_eof);
+    ~NewJsonReader() override = default;
+
+    Status init_reader();
+    Status get_next_block(Block* block, size_t* read_rows, bool* eof) override;
+    Status get_columns(std::unordered_map<std::string, TypeDescriptor>* 
name_to_type,
+                       std::unordered_set<std::string>* missing_cols) override;
+
+private:
+    Status _get_range_params();
+    Status _open_file_reader();
+    Status _open_line_reader();
+    Status _parse_jsonpath_and_json_root();
+
+    Status _read_json_column(std::vector<MutableColumnPtr>& columns,
+                             const std::vector<SlotDescriptor*>& slot_descs, 
bool* is_empty_row,
+                             bool* eof);
+
+    Status _vhandle_simple_json(std::vector<MutableColumnPtr>& columns,
+                                const std::vector<SlotDescriptor*>& 
slot_descs, bool* is_empty_row,
+                                bool* eof);
+
+    Status _vhandle_flat_array_complex_json(std::vector<MutableColumnPtr>& 
columns,
+                                            const 
std::vector<SlotDescriptor*>& slot_descs,
+                                            bool* is_empty_row, bool* eof);
+
+    Status _vhandle_nested_complex_json(std::vector<MutableColumnPtr>& columns,
+                                        const std::vector<SlotDescriptor*>& 
slot_descs,
+                                        bool* is_empty_row, bool* eof);
+
+    Status _parse_json(bool* is_empty_row, bool* eof);
+    Status _parse_json_doc(size_t* size, bool* eof);
+
+    Status _set_column_value(rapidjson::Value& objectValue, 
std::vector<MutableColumnPtr>& columns,
+                             const std::vector<SlotDescriptor*>& slot_descs, 
bool* valid);
+
+    Status _write_data_to_column(rapidjson::Value::ConstValueIterator value,
+                                 SlotDescriptor* slot_desc, 
vectorized::IColumn* column_ptr,
+                                 bool* valid);
+
+    Status _write_columns_by_jsonpath(rapidjson::Value& objectValue,
+                                      const std::vector<SlotDescriptor*>& 
slot_descs,
+                                      std::vector<MutableColumnPtr>& columns, 
bool* valid);
+
+    Status _append_error_msg(const rapidjson::Value& objectValue, std::string 
error_msg,
+                             std::string col_name, bool* valid);
+
+    std::string _print_json_value(const rapidjson::Value& value);
+
+private:
+    Status (NewJsonReader::*_vhandle_json_callback)(
+            std::vector<vectorized::MutableColumnPtr>& columns,
+            const std::vector<SlotDescriptor*>& slot_descs, bool* 
is_empty_row, bool* eof);
+    RuntimeState* _state;
+    RuntimeProfile* _profile;
+    ScannerCounter* _counter;
+    const TFileScanRangeParams& _params;
+    const TFileRangeDesc& _range;
+    const std::vector<SlotDescriptor*>& _file_slot_descs;
+
+    // _file_reader_s is for stream load pipe reader,
+    // and _file_reader is for other file reader.
+    // TODO: refactor this to use only shared_ptr or unique_ptr
+    std::unique_ptr<FileReader> _file_reader;
+    std::shared_ptr<FileReader> _file_reader_s;
+    FileReader* _real_file_reader;
+    std::unique_ptr<LineReader> _line_reader;
+    bool _reader_eof;
+
+    TFileFormatType::type _file_format_type;
+
+    // When we fetch range doesn't start from 0 will always skip the first line
+    bool _skip_first_line;
+
+    std::string _line_delimiter;
+    int _line_delimiter_length;
+
+    int _next_row;
+    int _total_rows;
+
+    std::string _jsonpaths;
+    std::string _json_root;
+    bool _read_json_by_line;
+    bool _strip_outer_array;
+    bool _num_as_string;
+    bool _fuzzy_parse;
+
+    std::vector<std::vector<JsonPath>> _parsed_jsonpaths;
+    std::vector<JsonPath> _parsed_json_root;
+
+    char _value_buffer[4 * 1024 * 1024]; // 4MB
+    char _parse_buffer[512 * 1024];      // 512KB
+
+    typedef rapidjson::GenericDocument<rapidjson::UTF8<>, 
rapidjson::MemoryPoolAllocator<>,
+                                       rapidjson::MemoryPoolAllocator<>>
+            Document;
+    rapidjson::MemoryPoolAllocator<> _value_allocator;
+    rapidjson::MemoryPoolAllocator<> _parse_allocator;
+    Document _origin_json_doc;   // origin json document object from parsed 
json string
+    rapidjson::Value* _json_doc; // _json_doc equals _final_json_doc iff not 
set `json_root`
+    std::unordered_map<std::string, int> _name_map;
+
+    bool* _scanner_eof;
+
+    RuntimeProfile::Counter* _bytes_read_counter;
+    RuntimeProfile::Counter* _read_timer;
+    RuntimeProfile::Counter* _file_read_timer;
+};
+} // namespace vectorized
+} // namespace doris
diff --git a/be/src/vec/exec/scan/vfile_scanner.cpp 
b/be/src/vec/exec/scan/vfile_scanner.cpp
index 3503e8c468..9df3722b95 100644
--- a/be/src/vec/exec/scan/vfile_scanner.cpp
+++ b/be/src/vec/exec/scan/vfile_scanner.cpp
@@ -31,6 +31,7 @@
 #include "runtime/raw_value.h"
 #include "runtime/runtime_state.h"
 #include "vec/exec/format/csv/csv_reader.h"
+#include "vec/exec/format/json/new_json_reader.h"
 #include "vec/exec/format/orc/vorc_reader.h"
 #include "vec/exec/format/parquet/vparquet_reader.h"
 #include "vec/exec/scan/new_file_scan_node.h"
@@ -494,6 +495,12 @@ Status VFileScanner::_get_next_reader() {
             init_status = 
((CsvReader*)(_cur_reader.get()))->init_reader(_is_load);
             break;
         }
+        case TFileFormatType::FORMAT_JSON: {
+            _cur_reader.reset(new NewJsonReader(_state, _profile, &_counter, 
_params, range,
+                                                _file_slot_descs, 
&_scanner_eof));
+            init_status = ((NewJsonReader*)(_cur_reader.get()))->init_reader();
+            break;
+        }
         default:
             return Status::InternalError("Not supported file format: {}", 
_params.format_type);
         }
diff --git a/docker/thirdparties/docker-compose/hive/scripts/hive-metastore.sh 
b/docker/thirdparties/docker-compose/hive/scripts/hive-metastore.sh
index 1fe80f6e84..0d07bf5e36 100755
--- a/docker/thirdparties/docker-compose/hive/scripts/hive-metastore.sh
+++ b/docker/thirdparties/docker-compose/hive/scripts/hive-metastore.sh
@@ -27,6 +27,8 @@ echo "hadoop fs -mkdir /user/doris/"
 hadoop fs -mkdir -p /user/doris/
 echo "hadoop fs -put /mnt/scripts/tpch1.db /user/doris/"
 hadoop fs -put /mnt/scripts/tpch1.db /user/doris/
+echo "hadoop fs -put /mnt/scripts/json_format_test.db /user/doris/"
+hadoop fs -put /mnt/scripts/json_format_test /user/doris/
 echo "hive -f /mnt/scripts/create.hql"
 hive -f /mnt/scripts/create.hql
 
diff --git 
a/docker/thirdparties/docker-compose/hive/scripts/json_format_test/multi_line_json.json
 
b/docker/thirdparties/docker-compose/hive/scripts/json_format_test/multi_line_json.json
new file mode 100644
index 0000000000..b5079899fc
--- /dev/null
+++ 
b/docker/thirdparties/docker-compose/hive/scripts/json_format_test/multi_line_json.json
@@ -0,0 +1,2 @@
+[{"id": 1, "city": "beijing", "code": 1454547},{"id": 2, "city": "shanghai", 
"code": 1244264}, {"id": 3, "city": "guangzhou", "code": 528369},{"id": 4, 
"city": "shenzhen", "code": 594201},{"id": 5, "city": "hangzhou", "code": 
594201}]
+[{"id": 6, "city": "nanjing", "code": 2345672},{"id": 7, "city": "wuhan", 
"code": 2345673}, {"id": 8, "city": "chengdu", "code": 2345674},{"id": 9, 
"city": "xian", "code": 2345675},{"id": 10, "city": "hefei", "code": 2345676}]
\ No newline at end of file
diff --git 
a/docker/thirdparties/docker-compose/hive/scripts/json_format_test/multi_line_json_lack_column.json
 
b/docker/thirdparties/docker-compose/hive/scripts/json_format_test/multi_line_json_lack_column.json
new file mode 100644
index 0000000000..338eaa726f
--- /dev/null
+++ 
b/docker/thirdparties/docker-compose/hive/scripts/json_format_test/multi_line_json_lack_column.json
@@ -0,0 +1,2 @@
+[{"id": 1, "code": 1454547},{"city": "shanghai", "id": 2, "code": 1244264}, 
{"id": 3, "code": 528369},{"id": 4, "code": 594202},{"city": "hangzhou", "id": 
5, "code": 594201}]
+[{"city": "nanjing", "id": 6, "code": 2345672},{"id": 7, "code": 2345673}, 
{"id": 8, "code": 2345674},{"city": "xian", "id": 9, "code": 2345675},{"id": 
10, "code": 2345676}]
diff --git 
a/docker/thirdparties/docker-compose/hive/scripts/json_format_test/multi_line_json_unorder.json
 
b/docker/thirdparties/docker-compose/hive/scripts/json_format_test/multi_line_json_unorder.json
new file mode 100644
index 0000000000..539d66df8d
--- /dev/null
+++ 
b/docker/thirdparties/docker-compose/hive/scripts/json_format_test/multi_line_json_unorder.json
@@ -0,0 +1,2 @@
+[{"city": "beijing", "id": 1, "code": 1454547},{"city": "shanghai", "id": 2, 
"code": 1244264}, {"city": "guangzhou", "id": 3, "code": 528369},{"city": 
"shenzhen", "id": 4, "code": 594202},{"city": "hangzhou", "id": 5, "code": 
594201}]
+[{"city": "nanjing", "id": 6, "code": 2345672},{"city": "wuhan", "id": 7, 
"code": 2345673}, {"city": "chengdu", "id": 8, "code": 2345674},{"city": 
"xian", "id": 9, "code": 2345675},{"city": "hefei", "id": 10, "code": 2345676}]
diff --git 
a/docker/thirdparties/docker-compose/hive/scripts/json_format_test/nest_json.json
 
b/docker/thirdparties/docker-compose/hive/scripts/json_format_test/nest_json.json
new file mode 100644
index 0000000000..b28159b2a0
--- /dev/null
+++ 
b/docker/thirdparties/docker-compose/hive/scripts/json_format_test/nest_json.json
@@ -0,0 +1,5 @@
+{"no": 1, "item": {"id": 1, "city": "beijing", "code": 2345671}}
+{"no": 2, "item": {"id": 2, "city": "shanghai", "code": 2345672}}
+{"no": 3, "item": {"id": 3, "city": "hangzhou", "code": 2345673}}
+{"no": 4, "item": {"id": 4, "city": "shenzhen", "code": 2345674}}
+{"no": 5, "item": {"id": 5, "city": "guangzhou", "code": 2345675}}
diff --git 
a/docker/thirdparties/docker-compose/hive/scripts/json_format_test/simple_object_json.json
 
b/docker/thirdparties/docker-compose/hive/scripts/json_format_test/simple_object_json.json
new file mode 100644
index 0000000000..a7912466fd
--- /dev/null
+++ 
b/docker/thirdparties/docker-compose/hive/scripts/json_format_test/simple_object_json.json
@@ -0,0 +1,12 @@
+{"id": 1, "city": "beijing", "code": 2345671}
+{"id": 2, "city": "shanghai", "code": 2345672}
+{"id": 3, "city": "guangzhou", "code": 2345673}
+{"id": 4, "city": "shenzhen", "code": 2345674}
+{"id": 5, "city": "hangzhou", "code": 2345675}
+{"id": 6, "city": "nanjing", "code": 2345676}
+{"id": 7, "city": "wuhan", "code": 2345677}
+{"id": 8, "city": "chengdu", "code": 2345678}
+{"id": 9, "city": "xian", "code": 2345679}
+{"id": 10, "city": "hefei", "code": 23456710}
+{"id": 10, "city": null, "code": 23456711}
+{"id": 10, "city": "hefei", "code": null}
diff --git 
a/fe/fe-core/src/main/java/org/apache/doris/analysis/DataDescription.java 
b/fe/fe-core/src/main/java/org/apache/doris/analysis/DataDescription.java
index 32aed58211..5854be7970 100644
--- a/fe/fe-core/src/main/java/org/apache/doris/analysis/DataDescription.java
+++ b/fe/fe-core/src/main/java/org/apache/doris/analysis/DataDescription.java
@@ -121,7 +121,10 @@ public class DataDescription {
     private String jsonPaths = "";
     private String jsonRoot = "";
     private boolean fuzzyParse = false;
-    private boolean readJsonByLine = false;
+    // the default must be true.
+    // So that for broker load, this is always true,
+    // and for stream load, it will set on demand.
+    private boolean readJsonByLine = true;
     private boolean numAsString = false;
 
     private String sequenceCol;
@@ -616,6 +619,10 @@ public class DataDescription {
         return !Strings.isNullOrEmpty(srcTableName);
     }
 
+    public boolean isReadJsonByLine() {
+        return readJsonByLine;
+    }
+
     /*
      * Analyze parsedExprMap and columnToHadoopFunction from columns, columns 
from path and columnMappingList
      * Example:
diff --git a/fe/fe-core/src/main/java/org/apache/doris/catalog/Env.java 
b/fe/fe-core/src/main/java/org/apache/doris/catalog/Env.java
index f62f81ea54..aa341ad283 100755
--- a/fe/fe-core/src/main/java/org/apache/doris/catalog/Env.java
+++ b/fe/fe-core/src/main/java/org/apache/doris/catalog/Env.java
@@ -4031,7 +4031,7 @@ public class Env {
         }
     }
 
-    public void renameColumn(Database db, OlapTable table, String colName,
+    private void renameColumn(Database db, OlapTable table, String colName,
             String newColName, boolean isReplay) throws DdlException {
         if (table.getState() != OlapTableState.NORMAL) {
             throw new DdlException("Table[" + table.getName() + "] is under " 
+ table.getState());
diff --git 
a/fe/fe-core/src/main/java/org/apache/doris/load/BrokerFileGroup.java 
b/fe/fe-core/src/main/java/org/apache/doris/load/BrokerFileGroup.java
index 2762ca89d3..a89f48e52b 100644
--- a/fe/fe-core/src/main/java/org/apache/doris/load/BrokerFileGroup.java
+++ b/fe/fe-core/src/main/java/org/apache/doris/load/BrokerFileGroup.java
@@ -256,9 +256,9 @@ public class BrokerFileGroup implements Writable {
             jsonPaths = dataDescription.getJsonPaths();
             jsonRoot = dataDescription.getJsonRoot();
             fuzzyParse = dataDescription.isFuzzyParse();
-            // For broker load, we only support reading json format data line 
by line,
-            // so we set readJsonByLine to true here.
-            readJsonByLine = true;
+            // ATTN: for broker load, we only support reading json format data 
line by line,
+            // so if this is set to false, it must be stream load.
+            readJsonByLine = dataDescription.isReadJsonByLine();
             numAsString = dataDescription.isNumAsString();
         }
     }
diff --git 
a/fe/fe-core/src/main/java/org/apache/doris/planner/StreamLoadPlanner.java 
b/fe/fe-core/src/main/java/org/apache/doris/planner/StreamLoadPlanner.java
index 5d68f6edef..c47699142b 100644
--- a/fe/fe-core/src/main/java/org/apache/doris/planner/StreamLoadPlanner.java
+++ b/fe/fe-core/src/main/java/org/apache/doris/planner/StreamLoadPlanner.java
@@ -42,7 +42,6 @@ import org.apache.doris.common.DdlException;
 import org.apache.doris.common.ErrorCode;
 import org.apache.doris.common.ErrorReport;
 import org.apache.doris.common.UserException;
-import org.apache.doris.common.util.Util;
 import org.apache.doris.load.BrokerFileGroup;
 import org.apache.doris.load.LoadErrorHub;
 import org.apache.doris.load.loadv2.LoadTask;
@@ -174,10 +173,6 @@ public class StreamLoadPlanner {
         // create scan node
         if (Config.enable_new_load_scan_node && Config.enable_vectorized_load) 
{
             ExternalFileScanNode fileScanNode = new ExternalFileScanNode(new 
PlanNodeId(0), scanTupleDesc);
-            if (!Util.isCsvFormat(taskInfo.getFormatType())) {
-                throw new AnalysisException(
-                        "New stream load scan load not support non-csv type 
now: " + taskInfo.getFormatType());
-            }
             // 1. create file group
             DataDescription dataDescription = new 
DataDescription(destTable.getName(), taskInfo);
             dataDescription.analyzeWithoutCheckPriv(db.getFullName());
diff --git a/regression-test/conf/regression-conf.groovy 
b/regression-test/conf/regression-conf.groovy
index 00872d275c..6791f7cacd 100644
--- a/regression-test/conf/regression-conf.groovy
+++ b/regression-test/conf/regression-conf.groovy
@@ -78,3 +78,4 @@ pg_14_port=5442
 // See `docker/thirdparties/start-thirdparties-docker.sh`
 enableHiveTest=false
 hms_port=9183
+hdfs_port=8120
diff --git a/regression-test/data/load_p0/broker_load/test_array_load.out 
b/regression-test/data/load_p0/broker_load/test_array_load.out
index baff23e6f2..6c568b6ef1 100644
--- a/regression-test/data/load_p0/broker_load/test_array_load.out
+++ b/regression-test/data/load_p0/broker_load/test_array_load.out
@@ -39,3 +39,43 @@
 5      \N      \N      \N      \N      \N      \N      \N      \N      \N      
\N
 100    [1, 2, 3]       [32767, 32768, 32769]   [65534, 65535, 65536]   ['a', 
'b', 'c'] ['hello', 'world']      [2022-07-13]    [2022-07-13 12:30:00]   
[0.33, 0.67]    [3.1415926, 0.878787878]        [4.000000, 5.500000, 6.670000]
 
+-- !select --
+1      [1, 2, 3, 4, 5] [32767, 32768, 32769]   [65534, 65535, 65536]   ['a', 
'b', 'c', 'd', 'e']       ['hello', 'world']      [1991-01-01]    [1991-01-01 
00:00:00]   [0.33, 0.67]    [3.1415926, 0.878787878]        [1, 1.2, 1.3]
+2      [6, 7, 8, 9, 10]        [32767, 32768, 32769]   [65534, 65535, 65536]   
['a', 'b', 'c', 'd', 'e']       ['hello', 'world']      [1991-01-01]    
[1991-01-01 00:00:00]   [0.33, 0.67]    [3.1415926, 0.878787878]        [1, 
1.2, 1.3]
+3      []      [32767, 32768, 32769]   [NULL, NULL, 65536]     ['a', 'b', 'c', 
'd', 'e']       ['happy', 'birthday']   [1991-01-01]    [1991-01-01 00:00:00]   
[0.33, 0.67]    [3.1415926, 0.878787878]        [1, 1.2, 1.3]
+4      [NULL]  [32767, 32768, 32769]   [NULL, NULL, 65536]     ['a', 'b', 'c', 
'd', 'e']       ['hello', 'world']      [1991-01-01]    [1991-01-01 00:00:00]   
[0.33, 0.67]    [3.1415926, 0.878787878]        [1, 1.2, 1.3]
+5      [NULL, NULL]    [32767, 32768, NULL]    [65534, NULL, 65536]    ['a', 
'b', 'c', 'd', 'e']       ['hello', 'world']      [1991-01-01]    [1991-01-01 
00:00:00]   [0.33, 0.67]    [3.1415926, 0.878787878]        [1, 1.2, 1.3]
+100    [1, 2, 3]       [32767, 32768, 32769]   [65534, 65535, 65536]   ['a', 
'b', 'c'] ['hello', 'world']      [2022-07-13]    [2022-07-13 12:30:00]   
[0.33, 0.67]    [3.1415926, 0.878787878]        [4, 5.5, 6.67]
+
+-- !select --
+1      [1, 2, 3, 4, 5] [32767, 32768, 32769]   [65534, 65535, 65536]   ['a', 
'b', 'c', 'd', 'e']       ['hello', 'world']      [1991-01-01]    [1991-01-01 
00:00:00]   [0.33, 0.67]    [3.1415926, 0.878787878]        [1.000000, 
1.200000, 1.300000]
+2      [6, 7, 8, 9, 10]        [32767, 32768, 32769]   [65534, 65535, 65536]   
['a', 'b', 'c', 'd', 'e']       ['hello', 'world']      [1991-01-01]    
[1991-01-01 00:00:00]   [0.33, 0.67]    [3.1415926, 0.878787878]        
[1.000000, 1.200000, 1.300000]
+3      []      [32767, 32768, 32769]   [NULL, NULL, 65536]     ['a', 'b', 'c', 
'd', 'e']       ['happy', 'birthday']   [1991-01-01]    [1991-01-01 00:00:00]   
[0.33, 0.67]    [3.1415926, 0.878787878]        [1.000000, 1.200000, 1.300000]
+4      [NULL]  [32767, 32768, 32769]   [NULL, NULL, 65536]     ['a', 'b', 'c', 
'd', 'e']       ['hello', 'world']      [1991-01-01]    [1991-01-01 00:00:00]   
[0.33, 0.67]    [3.1415926, 0.878787878]        [1.000000, 1.200000, 1.300000]
+5      [NULL, NULL]    [32767, 32768, NULL]    [65534, NULL, 65536]    ['a', 
'b', 'c', 'd', 'e']       ['hello', 'world']      [1991-01-01]    [1991-01-01 
00:00:00]   [0.33, 0.67]    [3.1415926, 0.878787878]        [1.000000, 
1.200000, 1.300000]
+100    [1, 2, 3]       [32767, 32768, 32769]   [65534, 65535, 65536]   ['a', 
'b', 'c'] ['hello', 'world']      [2022-07-13]    [2022-07-13 12:30:00]   
[0.33, 0.67]    [3.1415926, 0.878787878]        [4.000000, 5.500000, 6.670000]
+
+-- !select --
+1      [1, 2, 3, 4, 5] [32767, 32768, 32769]   [65534, 65535, 65536]   ['a', 
'b', 'c', 'd', 'e']       ['hello', 'world']      [1991-01-01]    [1991-01-01 
00:00:00]   [0.33, 0.67]    [3.1415926, 0.878787878]        [1, 1.2, 1.3]
+2      [6, 7, 8, 9, 10]        [32767, 32768, 32769]   [65534, 65535, 65536]   
['a', 'b', 'c', 'd', 'e']       ['hello', 'world']      [1991-01-01]    
[1991-01-01 00:00:00]   [0.33, 0.67]    [3.1415926, 0.878787878]        [1, 
1.2, 1.3]
+3      []      [32767, 32768, 32769]   [NULL, NULL, 65536]     ['a', 'b', 'c', 
'd', 'e']       ['happy', 'birthday']   [1991-01-01]    [1991-01-01 00:00:00]   
[0.33, 0.67]    [3.1415926, 0.878787878]        [1, 1.2, 1.3]
+4      [NULL]  [32767, 32768, 32769]   [NULL, NULL, 65536]     ['a', 'b', 'c', 
'd', 'e']       ['hello', 'world']      [1991-01-01]    [1991-01-01 00:00:00]   
[0.33, 0.67]    [3.1415926, 0.878787878]        [1, 1.2, 1.3]
+5      [NULL, NULL]    [32767, 32768, NULL]    [65534, NULL, 65536]    ['a', 
'b', 'c', 'd', 'e']       ['hello', 'world']      [1991-01-01]    [1991-01-01 
00:00:00]   [0.33, 0.67]    [3.1415926, 0.878787878]        [1, 1.2, 1.3]
+100    [1, 2, 3]       [32767, 32768, 32769]   [65534, 65535, 65536]   ['a', 
'b', 'c'] ['hello', 'world']      [2022-07-13]    [2022-07-13 12:30:00]   
[0.33, 0.67]    [3.1415926, 0.878787878]        [4, 5.5, 6.67]
+
+-- !select --
+1      [1, 2, 3, 4, 5] [32767, 32768, 32769]   [65534, 65535, 65536]   ['a', 
'b', 'c', 'd', 'e']       ['hello', 'world']      [1991-01-01]    [1991-01-01 
00:00:00]   [0.33, 0.67]    [3.1415926, 0.878787878]        [1.000000, 
1.200000, 1.300000]
+2      [6, 7, 8, 9, 10]        [32767, 32768, 32769]   [65534, 65535, 65536]   
['a', 'b', 'c', 'd', 'e']       ['hello', 'world']      [1991-01-01]    
[1991-01-01 00:00:00]   [0.33, 0.67]    [3.1415926, 0.878787878]        
[1.000000, 1.200000, 1.300000]
+3      []      [32767, 32768, 32769]   [NULL, NULL, 65536]     ['a', 'b', 'c', 
'd', 'e']       ['happy', 'birthday']   [1991-01-01]    [1991-01-01 00:00:00]   
[0.33, 0.67]    [3.1415926, 0.878787878]        [1.000000, 1.200000, 1.300000]
+4      [NULL]  [32767, 32768, 32769]   [NULL, NULL, 65536]     ['a', 'b', 'c', 
'd', 'e']       ['hello', 'world']      [1991-01-01]    [1991-01-01 00:00:00]   
[0.33, 0.67]    [3.1415926, 0.878787878]        [1.000000, 1.200000, 1.300000]
+5      [NULL, NULL]    [32767, 32768, NULL]    [65534, NULL, 65536]    ['a', 
'b', 'c', 'd', 'e']       ['hello', 'world']      [1991-01-01]    [1991-01-01 
00:00:00]   [0.33, 0.67]    [3.1415926, 0.878787878]        [1.000000, 
1.200000, 1.300000]
+100    [1, 2, 3]       [32767, 32768, 32769]   [65534, 65535, 65536]   ['a', 
'b', 'c'] ['hello', 'world']      [2022-07-13]    [2022-07-13 12:30:00]   
[0.33, 0.67]    [3.1415926, 0.878787878]        [4.000000, 5.500000, 6.670000]
+
+-- !select --
+1      [1, 2, 3, 4, 5] [32767, 32768, 32769]   [65534, 65535, 65536]   ['a', 
'b', 'c', 'd', 'e']       ['hello', 'world']      [1991-01-01, 1992-02-02, 
1993-03-03]    [1991-01-01 00:00:00]   [0.33, 0.67]    [3.1415926, 0.878787878] 
       [1.000000, 1.200000, 1.300000]
+2      [1, 2, 3, 4, 5] [32767, 32768, 32769]   [65534, 65535, 65536]   ['a', 
'b', 'c', 'd', 'e']       ['hello', 'world']      [1991-01-01, 1992-02-02, 
1993-03-03]    \N      \N      \N      [1.000000, NULL, 1.300000]
+3      \N      \N      \N      \N      \N      \N      \N      \N      \N      
\N
+4      \N      \N      \N      \N      \N      \N      \N      \N      \N      
\N
+5      \N      \N      \N      \N      \N      \N      \N      \N      \N      
\N
+100    [1, 2, 3]       [32767, 32768, 32769]   [65534, 65535, 65536]   ['a', 
'b', 'c'] ['hello', 'world']      [2022-07-13]    [2022-07-13 12:30:00]   
[0.33, 0.67]    [3.1415926, 0.878787878]        [4.000000, 5.500000, 6.670000]
+
diff --git 
a/regression-test/data/load_p0/stream_load/load_json_null_to_nullable.out 
b/regression-test/data/load_p0/stream_load/load_json_null_to_nullable.out
index d564b5b99a..8bbc4be012 100644
--- a/regression-test/data/load_p0/stream_load/load_json_null_to_nullable.out
+++ b/regression-test/data/load_p0/stream_load/load_json_null_to_nullable.out
@@ -11,3 +11,15 @@ h    h
 H      H
 h      h
 
+-- !select --
+\N     \N
+       
+H      H
+h      h
+
+-- !select --
+\N     \N
+       
+H      H
+h      h
+
diff --git 
a/regression-test/data/load_p0/stream_load/load_json_with_jsonpath.out 
b/regression-test/data/load_p0/stream_load/load_json_with_jsonpath.out
index 43037b624d..4918c59dd8 100644
--- a/regression-test/data/load_p0/stream_load/load_json_with_jsonpath.out
+++ b/regression-test/data/load_p0/stream_load/load_json_with_jsonpath.out
@@ -9,3 +9,13 @@
 1000   7395.231067
 2000   \N
 
+-- !select --
+22     \N
+1000   7395.231067
+2000   \N
+
+-- !select --
+22     \N
+1000   7395.231067
+2000   \N
+
diff --git a/regression-test/data/load_p0/stream_load/nest_json.json 
b/regression-test/data/load_p0/stream_load/nest_json.json
index 90647c490b..b28159b2a0 100644
--- a/regression-test/data/load_p0/stream_load/nest_json.json
+++ b/regression-test/data/load_p0/stream_load/nest_json.json
@@ -1,2 +1,5 @@
 {"no": 1, "item": {"id": 1, "city": "beijing", "code": 2345671}}
 {"no": 2, "item": {"id": 2, "city": "shanghai", "code": 2345672}}
+{"no": 3, "item": {"id": 3, "city": "hangzhou", "code": 2345673}}
+{"no": 4, "item": {"id": 4, "city": "shenzhen", "code": 2345674}}
+{"no": 5, "item": {"id": 5, "city": "guangzhou", "code": 2345675}}
diff --git a/regression-test/data/load_p0/stream_load/nest_json_array.json 
b/regression-test/data/load_p0/stream_load/nest_json_array.json
new file mode 100644
index 0000000000..b8b3cf917d
--- /dev/null
+++ b/regression-test/data/load_p0/stream_load/nest_json_array.json
@@ -0,0 +1,74 @@
+[
+    {
+        "no": 1,
+        "item": {
+            "id": 1,
+            "city": [
+                "zhejiang",
+                "hangzhou",
+                "xihu"
+            ],
+            "code": 2345671
+        }
+    },
+    {
+        "no": 2,
+        "item": {
+            "id": 2,
+            "city": [
+                "zhejiang",
+                "hangzhou",
+                "xiaoshan"
+            ],
+            "code": 2345672
+        }
+    },
+    {
+        "no": 3,
+        "item": {
+            "id": 3,
+            "city": [
+                "zhejiang",
+                "hangzhou",
+                "binjiang"
+            ],
+            "code": 2345673
+        }
+    },
+    {
+        "no": 4,
+        "item": {
+            "id": 4,
+            "city": [
+                "zhejiang",
+                "hangzhou",
+                "shangcheng"
+            ],
+            "code": 2345674
+        }
+    },
+    {
+        "no": 5,
+        "item": {
+            "id": 5,
+            "city": [
+                "zhejiang",
+                "hangzhou",
+                "tonglu"
+            ],
+            "code": 2345675
+        }
+    },
+    {
+        "no": 6,
+        "item": {
+            "id": 6,
+            "city": [
+                "zhejiang",
+                "hangzhou",
+                "fuyang"
+            ],
+            "code": 2345676
+        }
+    }
+]
\ No newline at end of file
diff --git a/regression-test/data/load_p0/stream_load/simple_json2.json 
b/regression-test/data/load_p0/stream_load/simple_json2.json
new file mode 100644
index 0000000000..eb698453de
--- /dev/null
+++ b/regression-test/data/load_p0/stream_load/simple_json2.json
@@ -0,0 +1,52 @@
+[
+    {
+        "code": 2345671,
+        "id": 1,
+        "city": "beijing"
+    },
+    {
+        "code": 2345672,
+        "id": 2,
+        "city": "shanghai"
+    },
+    {
+        "code": 2345673,
+        "id": 3,
+        "city": "guangzhou"
+    },
+    {
+        "code": 2345674,
+        "id": 4,
+        "city": "shenzhen"
+    },
+    {
+        "code": 2345675,
+        "id": 5,
+        "city": "hangzhou"
+    },
+    {
+        "code": 2345676,
+        "id": 6,
+        "city": "nanjing"
+    },
+    {
+        "code": 2345677,
+        "id": 7,
+        "city": "wuhan"
+    },
+    {
+        "code": 2345678,
+        "id": 8,
+        "city": "chengdu"
+    },
+    {
+        "code": 2345679,
+        "id": 9,
+        "city": "xian"
+    },
+    {
+        "code": 23456710,
+        "id": 10,
+        "city": "hefei"
+    }
+]
\ No newline at end of file
diff --git 
a/regression-test/data/load_p0/stream_load/simple_json2_lack_one_column.json 
b/regression-test/data/load_p0/stream_load/simple_json2_lack_one_column.json
new file mode 100644
index 0000000000..7b6b4ad800
--- /dev/null
+++ b/regression-test/data/load_p0/stream_load/simple_json2_lack_one_column.json
@@ -0,0 +1,48 @@
+[
+    {
+        "code": 2345671,
+        "id": 1
+    },
+    {
+        "code": 2345672,
+        "id": 2,
+        "city": "shanghai"
+    },
+    {
+        "code": 2345673,
+        "id": 3,
+        "city": "beijing"
+    },
+    {
+        "code": 2345674,
+        "id": 4,
+        "city": "shenzhen"
+    },
+    {
+        "code": 2345675,
+        "id": 5,
+        "city": "hangzhou"
+    },
+    {
+        "code": 2345676,
+        "id": 6,
+        "city": "nanjing"
+    },
+    {
+        "code": 2345677,
+        "id": 7
+    },
+    {
+        "code": 2345678,
+        "id": 8,
+        "city": "chengdu"
+    },
+    {
+        "code": 2345679,
+        "id": 9
+    },
+    {
+        "code": 23456710,
+        "id": 10
+    }
+]
\ No newline at end of file
diff --git a/regression-test/data/load_p0/stream_load/test_hdfs_json_load.out 
b/regression-test/data/load_p0/stream_load/test_hdfs_json_load.out
new file mode 100644
index 0000000000..594d2ec60a
--- /dev/null
+++ b/regression-test/data/load_p0/stream_load/test_hdfs_json_load.out
@@ -0,0 +1,305 @@
+-- This file is automatically generated. You should know what you did if you 
want to edit this
+-- !select1 --
+1      beijing 2345671
+2      shanghai        2345672
+3      guangzhou       2345673
+4      shenzhen        2345674
+5      hangzhou        2345675
+6      nanjing 2345676
+7      wuhan   2345677
+8      chengdu 2345678
+9      xian    2345679
+10     \N      23456711
+10     hefei   23456710
+200    changsha        3456789
+
+-- !select1 --
+1      beijing 2345671
+2      shanghai        2345672
+3      guangzhou       2345673
+4      shenzhen        2345674
+5      hangzhou        2345675
+6      nanjing 2345676
+7      wuhan   2345677
+8      chengdu 2345678
+9      xian    2345679
+10     \N      23456711
+10     hefei   23456710
+200    changsha        3456789
+
+-- !select2 --
+10     beijing 2345671
+20     shanghai        2345672
+30     guangzhou       2345673
+40     shenzhen        2345674
+50     hangzhou        2345675
+60     nanjing 2345676
+70     wuhan   2345677
+80     chengdu 2345678
+90     xian    2345679
+100    \N      23456711
+100    hefei   23456710
+200    changsha        3456789
+
+-- !select2 --
+10     beijing 2345671
+20     shanghai        2345672
+30     guangzhou       2345673
+40     shenzhen        2345674
+50     hangzhou        2345675
+60     nanjing 2345676
+70     wuhan   2345677
+80     chengdu 2345678
+90     xian    2345679
+100    \N      23456711
+100    hefei   23456710
+200    changsha        3456789
+
+-- !select3 --
+1      2345671 \N
+2      2345672 \N
+3      2345673 \N
+4      2345674 \N
+5      2345675 \N
+6      2345676 \N
+7      2345677 \N
+8      2345678 \N
+9      2345679 \N
+10     \N      \N
+10     23456710        \N
+10     23456711        \N
+200    changsha        3456789
+
+-- !select3 --
+1      2345671 \N
+2      2345672 \N
+3      2345673 \N
+4      2345674 \N
+5      2345675 \N
+6      2345676 \N
+7      2345677 \N
+8      2345678 \N
+9      2345679 \N
+10     \N      \N
+10     23456710        \N
+10     23456711        \N
+200    changsha        3456789
+
+-- !select4 --
+1      \N      210
+2      \N      220
+3      \N      230
+4      \N      240
+5      \N      250
+6      \N      260
+7      \N      270
+8      \N      280
+9      \N      290
+10     \N      900
+200    changsha        3456789
+
+-- !select4 --
+1      \N      210
+2      \N      220
+3      \N      230
+4      \N      240
+5      \N      250
+6      \N      260
+7      \N      270
+8      \N      280
+9      \N      290
+10     \N      900
+200    changsha        3456789
+
+-- !select5 --
+1      beijing 1454547
+2      shanghai        1244264
+3      guangzhou       528369
+4      shenzhen        594201
+5      hangzhou        594201
+6      nanjing 2345672
+7      wuhan   2345673
+8      chengdu 2345674
+9      xian    2345675
+10     hefei   2345676
+200    changsha        3456789
+
+-- !select5 --
+1      beijing 1454547
+2      shanghai        1244264
+3      guangzhou       528369
+4      shenzhen        594201
+5      hangzhou        594201
+6      nanjing 2345672
+7      wuhan   2345673
+8      chengdu 2345674
+9      xian    2345675
+10     hefei   2345676
+200    changsha        3456789
+
+-- !select6 --
+10     1454547 \N
+20     1244264 \N
+30     528369  \N
+40     594201  \N
+50     594201  \N
+60     2345672 \N
+70     2345673 \N
+80     2345674 \N
+90     2345675 \N
+100    2345676 \N
+200    changsha        3456789
+
+-- !select6 --
+10     1454547 \N
+20     1244264 \N
+30     528369  \N
+40     594201  \N
+50     594201  \N
+60     2345672 \N
+70     2345673 \N
+80     2345674 \N
+90     2345675 \N
+100    2345676 \N
+200    changsha        3456789
+
+-- !select7 --
+60     2345672 \N
+70     2345673 \N
+80     2345674 \N
+90     2345675 \N
+100    2345676 \N
+200    changsha        3456789
+
+-- !select7 --
+60     2345672 \N
+70     2345673 \N
+80     2345674 \N
+90     2345675 \N
+100    2345676 \N
+200    changsha        3456789
+
+-- !select8 --
+60     nanjing \N
+70     wuhan   \N
+80     chengdu \N
+90     xian    \N
+100    hefei   \N
+200    changsha        3456789
+
+-- !select8 --
+60     nanjing \N
+70     wuhan   \N
+80     chengdu \N
+90     xian    \N
+100    hefei   \N
+200    changsha        3456789
+
+-- !select9 --
+10     beijing 2345671
+20     shanghai        2345672
+30     hangzhou        2345673
+40     shenzhen        2345674
+50     guangzhou       2345675
+200    changsha        3456789
+
+-- !select9 --
+10     beijing 2345671
+20     shanghai        2345672
+30     hangzhou        2345673
+40     shenzhen        2345674
+50     guangzhou       2345675
+200    changsha        3456789
+
+-- !select10 --
+1      beijing 1454547
+2      shanghai        1244264
+3      guangzhou       528369
+4      shenzhen        594202
+5      hangzhou        594201
+6      nanjing 2345672
+7      wuhan   2345673
+8      chengdu 2345674
+9      xian    2345675
+10     hefei   2345676
+200    changsha        3456789
+
+-- !select10 --
+1      beijing 1454547
+2      shanghai        1244264
+3      guangzhou       528369
+4      shenzhen        594202
+5      hangzhou        594201
+6      nanjing 2345672
+7      wuhan   2345673
+8      chengdu 2345674
+9      xian    2345675
+10     hefei   2345676
+200    changsha        3456789
+
+-- !select11 --
+1      \N      1454547
+2      shanghai        1244264
+3      \N      528369
+4      \N      594202
+5      hangzhou        594201
+6      nanjing 2345672
+7      \N      2345673
+8      \N      2345674
+9      xian    2345675
+10     \N      2345676
+200    changsha        3456789
+
+-- !select11 --
+1      \N      1454547
+2      shanghai        1244264
+3      \N      528369
+4      \N      594202
+5      hangzhou        594201
+6      nanjing 2345672
+7      \N      2345673
+8      \N      2345674
+9      xian    2345675
+10     \N      2345676
+200    changsha        3456789
+
+-- !select12 --
+10     beijing \N
+20     shanghai        \N
+30     hangzhou        \N
+40     shenzhen        \N
+50     guangzhou       \N
+200    changsha        3456789
+
+-- !select12 --
+10     beijing \N
+20     shanghai        \N
+30     hangzhou        \N
+40     shenzhen        \N
+50     guangzhou       \N
+200    changsha        3456789
+
+-- !select13 --
+30     hangzhou        \N
+40     shenzhen        \N
+50     guangzhou       \N
+200    changsha        3456789
+
+-- !select13 --
+30     hangzhou        \N
+40     shenzhen        \N
+50     guangzhou       \N
+200    changsha        3456789
+
+-- !select14 --
+30     hangzhou        2345673
+40     shenzhen        2345674
+50     guangzhou       2345675
+200    changsha        3456789
+
+-- !select14 --
+30     hangzhou        2345673
+40     shenzhen        2345674
+50     guangzhou       2345675
+200    changsha        3456789
+
diff --git a/regression-test/data/load_p0/stream_load/test_json_load.out 
b/regression-test/data/load_p0/stream_load/test_json_load.out
index b6df264df1..ebf706f594 100644
--- a/regression-test/data/load_p0/stream_load/test_json_load.out
+++ b/regression-test/data/load_p0/stream_load/test_json_load.out
@@ -1,5 +1,5 @@
 -- This file is automatically generated. You should know what you did if you 
want to edit this
--- !select --
+-- !select1 --
 1      beijing 2345671
 2      shanghai        2345672
 3      guangzhou       2345673
@@ -12,7 +12,20 @@
 10     hefei   23456710
 200    changsha        3456789
 
--- !select --
+-- !select1 --
+1      beijing 2345671
+2      shanghai        2345672
+3      guangzhou       2345673
+4      shenzhen        2345674
+5      hangzhou        2345675
+6      nanjing 2345676
+7      wuhan   2345677
+8      chengdu 2345678
+9      xian    2345679
+10     hefei   23456710
+200    changsha        3456789
+
+-- !select2 --
 10     beijing 2345671
 20     shanghai        2345672
 30     guangzhou       2345673
@@ -25,7 +38,33 @@
 100    hefei   23456710
 200    changsha        3456789
 
--- !select --
+-- !select2 --
+10     beijing 2345671
+20     shanghai        2345672
+30     guangzhou       2345673
+40     shenzhen        2345674
+50     hangzhou        2345675
+60     nanjing 2345676
+70     wuhan   2345677
+80     chengdu 2345678
+90     xian    2345679
+100    hefei   23456710
+200    changsha        3456789
+
+-- !select3 --
+1      2345671
+2      2345672
+3      2345673
+4      2345674
+5      2345675
+6      2345676
+7      2345677
+8      2345678
+9      2345679
+10     23456710
+200    755
+
+-- !select3 --
 1      2345671
 2      2345672
 3      2345673
@@ -38,7 +77,20 @@
 10     23456710
 200    755
 
--- !select --
+-- !select4 --
+1      210
+2      220
+3      230
+4      240
+5      250
+6      260
+7      270
+8      280
+9      290
+10     300
+200    755
+
+-- !select4 --
 1      210
 2      220
 3      230
@@ -51,7 +103,7 @@
 10     300
 200    755
 
--- !select --
+-- !select5 --
 1      1454547
 2      1244264
 3      528369
@@ -64,7 +116,33 @@
 10     2345676
 200    755
 
--- !select --
+-- !select5 --
+1      1454547
+2      1244264
+3      528369
+4      594201
+5      594201
+6      2345672
+7      2345673
+8      2345674
+9      2345675
+10     2345676
+200    755
+
+-- !select6 --
+10     1454547
+20     1244264
+30     528369
+40     594201
+50     594201
+60     2345672
+70     2345673
+80     2345674
+90     2345675
+100    2345676
+200    755
+
+-- !select6 --
 10     1454547
 20     1244264
 30     528369
@@ -77,7 +155,7 @@
 100    2345676
 200    755
 
--- !select --
+-- !select7 --
 60     2345672
 70     2345673
 80     2345674
@@ -85,7 +163,7 @@
 100    2345676
 200    755
 
--- !select --
+-- !select7 --
 60     2345672
 70     2345673
 80     2345674
@@ -93,15 +171,46 @@
 100    2345676
 200    755
 
--- !select --
+-- !select8 --
+60     2345672
+70     2345673
+80     2345674
+90     2345675
+100    2345676
+200    755
+
+-- !select8 --
+60     2345672
+70     2345673
+80     2345674
+90     2345675
+100    2345676
+200    755
+
+-- !select9 --
 10     beijing 2345671
 20     shanghai        2345672
+30     hangzhou        2345673
+40     shenzhen        2345674
+50     guangzhou       2345675
 200    changsha        3456789
 
--- !select --
+-- !select9 --
+10     beijing 2345671
+20     shanghai        2345672
+30     hangzhou        2345673
+40     shenzhen        2345674
+50     guangzhou       2345675
+200    changsha        3456789
+
+-- !select10 --
 200    changsha        3456789
 
--- !select --
+-- !select10 --
+200    changsha        3456789
+
+-- !select11 --
+1      beijing 2345671
 2      shanghai        2345672
 3      guangzhou       2345673
 4      shenzhen        2345674
@@ -110,12 +219,113 @@
 7      wuhan   2345677
 8      chengdu 2345678
 9      xian    2345679
+10     hefei   23456710
 200    changsha        3456789
 
--- !select --
+-- !select11 --
+1      beijing 2345671
 2      shanghai        2345672
 3      guangzhou       2345673
 4      shenzhen        2345674
+5      hangzhou        2345675
+6      nanjing 2345676
+7      wuhan   2345677
+8      chengdu 2345678
+9      xian    2345679
+10     hefei   23456710
+200    changsha        3456789
+
+-- !select12 --
+1      \N      2345671
+2      shanghai        2345672
+3      beijing 2345673
+4      shenzhen        2345674
+5      hangzhou        2345675
+6      nanjing 2345676
+7      \N      2345677
+8      chengdu 2345678
+9      \N      2345679
+10     \N      23456710
+200    changsha        3456789
+
+-- !select12 --
+1      \N      2345671
+2      shanghai        2345672
+3      beijing 2345673
+4      shenzhen        2345674
+5      hangzhou        2345675
+6      nanjing 2345676
+7      \N      2345677
+8      chengdu 2345678
+9      \N      2345679
+10     \N      23456710
+200    changsha        3456789
+
+-- !select13 --
+2      shanghai        2345672
+3      beijing 2345673
+4      shenzhen        2345674
+5      hangzhou        2345675
+6      nanjing 2345676
+8      chengdu 2345678
+200    hangzhou        12345
+
+-- !select13 --
+2      shanghai        2345672
+3      beijing 2345673
+4      shenzhen        2345674
+5      hangzhou        2345675
+6      nanjing 2345676
+8      chengdu 2345678
+200    hangzhou        12345
+
+-- !select14 --
+10     2345671 \N
+20     2345672 \N
+30     2345673 \N
+40     2345674 \N
+50     2345675 \N
+200    changsha        3456789
+
+-- !select14 --
+10     2345671 \N
+20     2345672 \N
+30     2345673 \N
+40     2345674 \N
+50     2345675 \N
+200    changsha        3456789
+
+-- !select15 --
+10     beijing 2345671
+20     shanghai        2345672
+30     hangzhou        2345673
+40     shenzhen        2345674
+50     guangzhou       2345675
+200    changsha        3456789
+
+-- !select15 --
+10     beijing 2345671
+20     shanghai        2345672
+30     hangzhou        2345673
+40     shenzhen        2345674
+50     guangzhou       2345675
+200    changsha        3456789
+
+-- !select16 --
+1      xihu    2345671
+2      xiaoshan        2345672
+3      binjiang        2345673
+4      shangcheng      2345674
+5      tonglu  2345675
+6      fuyang  2345676
+200    changsha        3456789
+
+-- !select16 --
+1      xihu    2345671
+2      xiaoshan        2345672
+3      binjiang        2345673
+4      shangcheng      2345674
+5      tonglu  2345675
+6      fuyang  2345676
 200    changsha        3456789
 
--- !select --
diff --git a/regression-test/suites/load_p0/broker_load/test_array_load.groovy 
b/regression-test/suites/load_p0/broker_load/test_array_load.groovy
index 5581a4928e..b462d3c00e 100644
--- a/regression-test/suites/load_p0/broker_load/test_array_load.groovy
+++ b/regression-test/suites/load_p0/broker_load/test_array_load.groovy
@@ -194,86 +194,96 @@ suite("test_array_load", "p0") {
         }
     }
 
-    // case1: import array data in json format and enable vectorized engine
-    try {
-        sql "DROP TABLE IF EXISTS ${testTable}"
-        
-        create_test_table.call(testTable, true)
-
-        load_array_data.call(testTable, 'true', '', 'json', '', '', '', '', 
'', '', 'simple_array.json')
-        
-        // select the table and check whether the data is correct
+    def check_data_correct = {table_name ->
         sql "sync"
-        qt_select "select * from ${testTable} order by k1"
-
-    } finally {
-        try_sql("DROP TABLE IF EXISTS ${testTable}")
+        // select the table and check whether the data is correct
+        qt_select "select * from ${table_name} order by k1" 
     }
 
-    // case2: import array data in json format and disable vectorized engine
     try {
-        sql "DROP TABLE IF EXISTS ${testTable}"
-        
-        create_test_table.call(testTable, false)
+        for ( i in 0..1 ) {
+            // should be deleted after new_load_scan is ready
+            if (i == 1) {
+                sql """ADMIN SET FRONTEND CONFIG ("enable_new_load_scan_node" 
= "false");"""
+            } else {
+                sql """ADMIN SET FRONTEND CONFIG ("enable_new_load_scan_node" 
= "true");"""
+            }
 
-        load_array_data.call(testTable, 'true', '', 'json', '', '', '', '', 
'', '', 'simple_array.json')
-        
-        // select the table and check whether the data is correct
-        sql "sync"
-        qt_select "select * from ${testTable} order by k1"
+            // case1: import array data in json format and enable vectorized 
engine
+            try {
+                sql "DROP TABLE IF EXISTS ${testTable}"
+                
+                create_test_table.call(testTable, true)
 
-    } finally {
-        try_sql("DROP TABLE IF EXISTS ${testTable}")
-    }
-    
-    // case3: import array data in csv format and enable vectorized engine
-    try {
-        sql "DROP TABLE IF EXISTS ${testTable}"
-        
-        create_test_table.call(testTable, true)
+                load_array_data.call(testTable, 'true', '', 'json', '', '', 
'', '', '', '', 'simple_array.json')
+                
+                check_data_correct(testTable)
 
-        load_array_data.call(testTable, 'true', '', 'csv', '', '', '', '', '', 
'/', 'simple_array.csv')
-        
-        // select the table and check whether the data is correct
-        sql "sync"
-        qt_select "select * from ${testTable} order by k1"
+            } finally {
+                try_sql("DROP TABLE IF EXISTS ${testTable}")
+            }
 
-    } finally {
-        try_sql("DROP TABLE IF EXISTS ${testTable}")
-    }
+            // case2: import array data in json format and disable vectorized 
engine
+            try {
+                sql "DROP TABLE IF EXISTS ${testTable}"
+                
+                create_test_table.call(testTable, false)
 
-    // case4: import array data in csv format and disable vectorized engine
-    try {
-        sql "DROP TABLE IF EXISTS ${testTable}"
-        
-        create_test_table.call(testTable, false)
+                load_array_data.call(testTable, 'true', '', 'json', '', '', 
'', '', '', '', 'simple_array.json')
+                
+                check_data_correct(testTable)
 
-        load_array_data.call(testTable, 'true', '', 'csv', '', '', '', '', '', 
'/', 'simple_array.csv')
-        
-        // select the table and check whether the data is correct
-        sql "sync"
-        qt_select "select * from ${testTable} order by k1"
+            } finally {
+                try_sql("DROP TABLE IF EXISTS ${testTable}")
+            }
+            
+            // case3: import array data in csv format and enable vectorized 
engine
+            try {
+                sql "DROP TABLE IF EXISTS ${testTable}"
+                
+                create_test_table.call(testTable, true)
+
+                load_array_data.call(testTable, 'true', '', 'csv', '', '', '', 
'', '', '/', 'simple_array.csv')
+                
+                check_data_correct(testTable)
+
+            } finally {
+                try_sql("DROP TABLE IF EXISTS ${testTable}")
+            }
 
-    } finally {
-        try_sql("DROP TABLE IF EXISTS ${testTable}")
-    }
+            // case4: import array data in csv format and disable vectorized 
engine
+            try {
+                sql "DROP TABLE IF EXISTS ${testTable}"
+                
+                create_test_table.call(testTable, false)
 
-    // case5: import array data not specify the format
-    try {
-        sql "DROP TABLE IF EXISTS ${testTable01}"
-        
-        create_test_table01.call(testTable01)
+                load_array_data.call(testTable, 'true', '', 'csv', '', '', '', 
'', '', '/', 'simple_array.csv')
+                
+                check_data_correct(testTable)
 
-        load_array_data.call(testTable01, '', '', '', '', '', '', '', '', '/', 
'simple_array.data')
-        
-        // select the table and check whether the data is correct
-        sql "sync"
-        qt_select "select * from ${testTable01} order by k1"
+            } finally {
+                try_sql("DROP TABLE IF EXISTS ${testTable}")
+            }
 
+            // case5: import array data not specify the format
+            try {
+                sql "DROP TABLE IF EXISTS ${testTable01}"
+                
+                create_test_table01.call(testTable01)
+
+                load_array_data.call(testTable01, '', '', '', '', '', '', '', 
'', '/', 'simple_array.data')
+                
+                check_data_correct(testTable01)
+
+            } finally {
+                // try_sql("DROP TABLE IF EXISTS ${testTable01}")
+            }
+        }
     } finally {
-        // try_sql("DROP TABLE IF EXISTS ${testTable01}")
+        try_sql("""ADMIN SET FRONTEND CONFIG ("enable_new_load_scan_node" = 
"false");""")
     }
 
+
     // if 'enableHdfs' in regression-conf.groovy has been set to true,
     // the test will run these case as below.
     if (enableHdfs()) {
diff --git 
a/regression-test/suites/load_p0/stream_load/load_json_column_exclude_schema_without_jsonpath.groovy
 
b/regression-test/suites/load_p0/stream_load/load_json_column_exclude_schema_without_jsonpath.groovy
index f3edfd225c..760af3344e 100644
--- 
a/regression-test/suites/load_p0/stream_load/load_json_column_exclude_schema_without_jsonpath.groovy
+++ 
b/regression-test/suites/load_p0/stream_load/load_json_column_exclude_schema_without_jsonpath.groovy
@@ -49,8 +49,11 @@ 
suite("test_load_json_column_exclude_schema_without_jsonpath", "p0") {
             """
     }
 
-    def load_array_data = {table_name, strip_flag, read_flag, format_flag, 
exprs, json_paths, 
+    def load_array_data = {new_json_reader_flag, table_name, strip_flag, 
read_flag, format_flag, exprs, json_paths, 
                             json_root, where_expr, fuzzy_flag, column_sep, 
file_name ->
+        // should be deleted after new_load_scan is ready
+        sql """ADMIN SET FRONTEND CONFIG ("enable_new_load_scan_node" = 
"${new_json_reader_flag}");"""
+
         // load the json data
         streamLoad {
             table table_name
@@ -91,7 +94,12 @@ 
suite("test_load_json_column_exclude_schema_without_jsonpath", "p0") {
         
         create_test_table.call(true)
 
-        load_array_data.call(testTable, 'true', '', 'json', '', '', '', '', 
'', '', 'json_column_match.json')
+        load_array_data.call('false', testTable, 'true', '', 'json', '', '', 
'', '', '', '', 'json_column_match.json')
+
+        // test new json load, should be deleted after new_load_scan ready
+        sql "DROP TABLE IF EXISTS ${testTable}"
+        create_test_table.call(true)
+        load_array_data.call('true', testTable, 'true', '', 'json', '', '', 
'', '', '', '', 'json_column_match.json')
 
     } finally {
         try_sql("DROP TABLE IF EXISTS ${testTable}")
@@ -103,7 +111,12 @@ 
suite("test_load_json_column_exclude_schema_without_jsonpath", "p0") {
         
         create_test_table.call(false)
 
-        load_array_data.call(testTable, 'true', '', 'json', '', '', '', '', 
'', '', 'json_column_match.json')
+        load_array_data.call('false', testTable, 'true', '', 'json', '', '', 
'', '', '', '', 'json_column_match.json')
+
+        // test new json load, should be deleted after new_load_scan ready
+        sql "DROP TABLE IF EXISTS ${testTable}"
+        create_test_table.call(false)
+        load_array_data.call('true', testTable, 'true', '', 'json', '', '', 
'', '', '', '', 'json_column_match.json')
 
     } finally {
         try_sql("DROP TABLE IF EXISTS ${testTable}")
diff --git 
a/regression-test/suites/load_p0/stream_load/load_json_null_to_nullable.groovy 
b/regression-test/suites/load_p0/stream_load/load_json_null_to_nullable.groovy
index 4dfc5a9fcb..f934c038a2 100644
--- 
a/regression-test/suites/load_p0/stream_load/load_json_null_to_nullable.groovy
+++ 
b/regression-test/suites/load_p0/stream_load/load_json_null_to_nullable.groovy
@@ -40,8 +40,11 @@ suite("test_load_json_null_to_nullable", "p0") {
             """
     }
 
-    def load_array_data = {table_name, strip_flag, read_flag, format_flag, 
exprs, json_paths, 
+    def load_array_data = {new_json_reader_flag, table_name, strip_flag, 
read_flag, format_flag, exprs, json_paths, 
                             json_root, where_expr, fuzzy_flag, column_sep, 
file_name ->
+        // should be deleted after new_load_scan is ready
+        sql """ADMIN SET FRONTEND CONFIG ("enable_new_load_scan_node" = 
"${new_json_reader_flag}");"""
+
         // load the json data
         streamLoad {
             table table_name
@@ -74,6 +77,15 @@ suite("test_load_json_null_to_nullable", "p0") {
                 assertTrue(json.NumberLoadedRows > 0 && json.LoadBytes > 0)
             }
         }
+
+        // should be deleted after new_load_scan is ready
+        sql """ADMIN SET FRONTEND CONFIG ("enable_new_load_scan_node" = 
"false");"""
+    }
+
+    def check_data_correct = {table_name ->
+        sql "sync"
+        // select the table and check whether the data is correct
+        qt_select "select * from ${table_name} order by k1" 
     }
 
     // case1: import array data in json format and enable vectorized engine
@@ -82,11 +94,15 @@ suite("test_load_json_null_to_nullable", "p0") {
         
         create_test_table.call(true)
 
-        load_array_data.call(testTable, 'true', '', 'json', '', '', '', '', 
'', '', 'test_char.json')
+        load_array_data.call('false', testTable, 'true', '', 'json', '', '', 
'', '', '', '', 'test_char.json')
         
-        sql "sync"
-        // select the table and check whether the data is correct
-        qt_select "select * from ${testTable} order by k1"
+        check_data_correct(testTable)
+
+        // test new json load, should be deleted after new_load_scan ready
+        sql "DROP TABLE IF EXISTS ${testTable}"
+        create_test_table.call(true)
+        load_array_data.call('true', testTable, 'true', '', 'json', '', '', 
'', '', '', '', 'test_char.json')
+        check_data_correct(testTable)
 
     } finally {
         try_sql("DROP TABLE IF EXISTS ${testTable}")
@@ -98,11 +114,15 @@ suite("test_load_json_null_to_nullable", "p0") {
         
         create_test_table.call(false)
 
-        load_array_data.call(testTable, 'true', '', 'json', '', '', '', '', 
'', '', 'test_char.json')
+        load_array_data.call('false', testTable, 'true', '', 'json', '', '', 
'', '', '', '', 'test_char.json')
         
-        sql "sync"
-        // select the table and check whether the data is correct
-        qt_select "select * from ${testTable} order by k1"
+        check_data_correct(testTable)
+
+        // test new json load, should be deleted after new_load_scan ready
+        sql "DROP TABLE IF EXISTS ${testTable}"
+        create_test_table.call(false)
+        load_array_data.call('true', testTable, 'true', '', 'json', '', '', 
'', '', '', '', 'test_char.json')
+        check_data_correct(testTable)
 
     } finally {
         try_sql("DROP TABLE IF EXISTS ${testTable}")
diff --git 
a/regression-test/suites/load_p0/stream_load/load_json_with_jsonpath.groovy 
b/regression-test/suites/load_p0/stream_load/load_json_with_jsonpath.groovy
index b5fc9ea096..02ffd808e2 100644
--- a/regression-test/suites/load_p0/stream_load/load_json_with_jsonpath.groovy
+++ b/regression-test/suites/load_p0/stream_load/load_json_with_jsonpath.groovy
@@ -40,8 +40,12 @@ suite("test_load_json_with_jsonpath", "p0") {
             """
     }
 
-    def load_array_data = {table_name, strip_flag, read_flag, format_flag, 
exprs, json_paths,
+    def load_array_data = {new_json_reader_flag, table_name, strip_flag, 
read_flag, format_flag, exprs, json_paths,
                             json_root, where_expr, fuzzy_flag, column_sep, 
file_name ->
+        
+        // should be deleted after new_load_scan is ready
+        sql """ADMIN SET FRONTEND CONFIG ("enable_new_load_scan_node" = 
"${new_json_reader_flag}");"""
+
         // load the json data
         streamLoad {
             table table_name
@@ -74,6 +78,15 @@ suite("test_load_json_with_jsonpath", "p0") {
                 assertTrue(json.NumberLoadedRows > 0 && json.LoadBytes > 0)
             }
         }
+
+        // should be deleted after new_load_scan is ready
+        sql """ADMIN SET FRONTEND CONFIG ("enable_new_load_scan_node" = 
"false");"""
+    }
+
+    def check_data_correct = {table_name ->
+        sql "sync"
+        // select the table and check whether the data is correct
+        qt_select "select * from ${table_name} order by k1" 
     }
 
     // case1: import array data in json format and enable vectorized engine
@@ -82,11 +95,15 @@ suite("test_load_json_with_jsonpath", "p0") {
 
         create_test_table.call(true)
 
-        load_array_data.call(testTable, 'true', '', 'json', '', '["$.k1", 
"$.v1"]', '', '', '', '', 'test_load_with_jsonpath.json')
+        load_array_data.call('false', testTable, 'true', '', 'json', '', 
'["$.k1", "$.v1"]', '', '', '', '', 'test_load_with_jsonpath.json')
 
-        // select the table and check whether the data is correct
-        sql "sync"
-        qt_select "select * from ${testTable} order by k1"
+        check_data_correct(testTable)
+
+        // test new json load, should be deleted after new_load_scan ready
+        sql "DROP TABLE IF EXISTS ${testTable}"
+        create_test_table.call(true)
+        load_array_data.call('true', testTable, 'true', '', 'json', '', 
'["$.k1", "$.v1"]', '', '', '', '', 'test_load_with_jsonpath.json')
+        check_data_correct(testTable)
 
     } finally {
         try_sql("DROP TABLE IF EXISTS ${testTable}")
@@ -98,12 +115,19 @@ suite("test_load_json_with_jsonpath", "p0") {
 
         create_test_table.call(false)
 
-        load_array_data.call(testTable, 'true', '', 'json', '', '["$.k1", 
"$.v1"]', '', '', '', '', 'test_load_with_jsonpath.json')
+        load_array_data.call('false', testTable, 'true', '', 'json', '', 
'["$.k1", "$.v1"]', '', '', '', '', 'test_load_with_jsonpath.json')
 
         sql "sync"
         // select the table and check whether the data is correct
         qt_select "select * from ${testTable} order by k1"
 
+
+        // test new json load, should be deleted after new_load_scan ready
+        sql "DROP TABLE IF EXISTS ${testTable}"
+        create_test_table.call(false)
+        load_array_data.call('true', testTable, 'true', '', 'json', '', 
'["$.k1", "$.v1"]', '', '', '', '', 'test_load_with_jsonpath.json')
+        check_data_correct(testTable)
+
     } finally {
         try_sql("DROP TABLE IF EXISTS ${testTable}")
     }
diff --git 
a/regression-test/suites/load_p0/stream_load/test_hdfs_json_load.groovy 
b/regression-test/suites/load_p0/stream_load/test_hdfs_json_load.groovy
new file mode 100644
index 0000000000..4c7a1c162b
--- /dev/null
+++ b/regression-test/suites/load_p0/stream_load/test_hdfs_json_load.groovy
@@ -0,0 +1,554 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+suite("test_hdfs_json_load", "p0") {
+    // define a sql table
+    def testTable = "test_hdfs_json_load"
+    
+    def create_test_table1 = {testTablex ->
+        // multi-line sql
+        def result1 = sql """
+                        CREATE TABLE IF NOT EXISTS ${testTablex} (
+                        id INT DEFAULT '10',
+                        city VARCHAR(32) DEFAULT '',
+                        code BIGINT SUM DEFAULT '0')
+                        DISTRIBUTED BY HASH(id) BUCKETS 10
+                        PROPERTIES("replication_num" = "1");
+                        """
+        
+        // DDL/DML return 1 row and 3 column, the only value is update row 
count
+        assertTrue(result1.size() == 1)
+        assertTrue(result1[0].size() == 1)
+        assertTrue(result1[0][0] == 0, "Create table should update 0 rows")
+        
+        // insert 1 row to check whether the table is ok
+        def result2 = sql "INSERT INTO ${testTablex} (id, city, code) VALUES 
(200, 'changsha', 3456789)"
+        assertTrue(result2.size() == 1)
+        assertTrue(result2[0].size() == 1)
+        assertTrue(result2[0][0] == 1, "Insert should update 1 rows")
+    }
+
+    def load_from_hdfs1 = {new_json_reader_flag, strip_flag, fuzzy_flag, 
testTablex, label, fileName,
+                            fsPath, hdfsUser, exprs, jsonpaths, json_root, 
columns_parameter, where ->
+        // should be delete after new_load_scan is ready
+        sql """ADMIN SET FRONTEND CONFIG ("enable_new_load_scan_node" = 
"${new_json_reader_flag}");"""
+        
+        def hdfsFilePath = "${fsPath}/user/doris/json_format_test/${fileName}"
+        def result1= sql """
+                        LOAD LABEL ${label} (
+                            DATA INFILE("${hdfsFilePath}")
+                            INTO TABLE ${testTablex} 
+                            FORMAT as "json"
+                            ${columns_parameter}
+                            ${exprs}
+                            ${where}
+                            properties(
+                                "json_root" = "${json_root}",
+                                "jsonpaths" = "${jsonpaths}",
+                                "strip_outer_array" = "${strip_flag}",
+                                "fuzzy_parse" = "${fuzzy_flag}"
+                                )
+                            )
+                        with HDFS (
+                            "fs.defaultFS"="${fsPath}",
+                            "hadoop.username" = "${hdfsUser}"
+                            )
+                        PROPERTIES (
+                            "timeout"="1200",
+                            "max_filter_ratio"="0"
+                            );
+                        """
+        
+        assertTrue(result1.size() == 1)
+        assertTrue(result1[0].size() == 1)
+        assertTrue(result1[0][0] == 0, "Query OK, 0 rows affected")
+
+        // should be delete after new_load_scan is ready
+        sql """ADMIN SET FRONTEND CONFIG ("enable_new_load_scan_node" = 
"false");"""
+    }
+
+    def check_load_result = {checklabel, testTablex ->
+        max_try_milli_secs = 10000
+        while(max_try_milli_secs) {
+            result = sql "show load where label = '${checklabel}'"
+            if(result[0][2] == "FINISHED") {
+                log.info("LOAD FINISHED!")
+                break
+            } else {
+                sleep(1000) // wait 1 second every time
+                max_try_milli_secs -= 1000
+                if(max_try_milli_secs <= 0) {
+                    log.info("Broker load result: ${result}".toString())
+                    assertEquals(1, 2)
+                }
+            }
+        }
+    }
+    
+
+
+    String hdfs_port = context.config.otherConfigs.get("hdfs_port")
+    def fsPath = "hdfs://127.0.0.1:${hdfs_port}"
+    // It's okay to use random `hdfsUser`, but can not be empty.
+    def hdfsUser = "doris"
+
+
+    // case1: import simple json
+    def q1 = {
+        try {
+            def test_load_label1 = 
UUID.randomUUID().toString().replaceAll("-", "")
+            sql "DROP TABLE IF EXISTS ${testTable}"
+        
+            create_test_table1.call(testTable)
+            load_from_hdfs1.call("false", "false", "false", testTable, 
test_load_label1, "simple_object_json.json",
+                                fsPath, hdfsUser, '', '', '', '', '')
+
+            check_load_result(test_load_label1, testTable)
+            sql "sync"
+            qt_select1 "select * from ${testTable} order by id"
+
+            // test new json reader
+            def test_load_label2 = 
UUID.randomUUID().toString().replaceAll("-", "")
+            sql "DROP TABLE IF EXISTS ${testTable}"
+            create_test_table1.call(testTable)
+            load_from_hdfs1.call("true", "false", "false", testTable, 
test_load_label2, "simple_object_json.json",
+                                fsPath, hdfsUser, '', '', '', '', '')
+
+            check_load_result(test_load_label2, testTable)
+            sql "sync"
+            qt_select1 "select * from ${testTable} order by id"
+        } finally {
+            try_sql("DROP TABLE IF EXISTS ${testTable}")
+        }
+    }
+
+    // case2: import json and apply exprs
+    def q2 = {
+        try {
+            def test_load_label1 = 
UUID.randomUUID().toString().replaceAll("-", "")
+            sql "DROP TABLE IF EXISTS ${testTable}"
+        
+            create_test_table1.call(testTable)
+            load_from_hdfs1.call("false", "false", "false", testTable, 
test_load_label1, "simple_object_json.json",
+                                fsPath, hdfsUser, "SET(id= id * 10)", '', '', 
'', '')
+
+            check_load_result(test_load_label1, testTable)
+            sql "sync"
+            qt_select2 "select * from ${testTable} order by id"
+
+            // test new json reader
+            def test_load_label2 = 
UUID.randomUUID().toString().replaceAll("-", "")
+            sql "DROP TABLE IF EXISTS ${testTable}"
+            create_test_table1.call(testTable)
+            load_from_hdfs1.call("true", "false", "false", testTable, 
test_load_label2, "simple_object_json.json",
+                                fsPath, hdfsUser, "SET(id= id * 10)", '', '', 
'', '')
+
+            check_load_result(test_load_label2, testTable)
+            sql "sync"
+            qt_select2 "select * from ${testTable} order by id"
+        } finally {
+            try_sql("DROP TABLE IF EXISTS ${testTable}")
+        }
+    }
+
+    // case3: import json and apply jsonpaths
+    def q3 = {
+        try {
+            def test_load_label1 = 
UUID.randomUUID().toString().replaceAll("-", "")
+            sql "DROP TABLE IF EXISTS ${testTable}"
+        
+            create_test_table1.call(testTable)
+            load_from_hdfs1.call("false", "false", "false", testTable, 
test_load_label1, "simple_object_json.json",
+                                fsPath, hdfsUser, '', """[\\"\$.id\\", 
\\"\$.code\\"]""", '', '', '')
+
+            check_load_result(test_load_label1, testTable)
+            sql "sync"
+            qt_select3 "select * from ${testTable} order by id"
+
+            // test new json reader
+            def test_load_label2 = 
UUID.randomUUID().toString().replaceAll("-", "")
+            sql "DROP TABLE IF EXISTS ${testTable}"
+            create_test_table1.call(testTable)
+            load_from_hdfs1.call("true", "false", "false", testTable, 
test_load_label2, "simple_object_json.json",
+                                fsPath, hdfsUser, '', """[\\"\$.id\\", 
\\"\$.code\\"]""", '', '', '')
+
+            check_load_result(test_load_label2, testTable)
+            sql "sync"
+            qt_select3 "select * from ${testTable} order by id"
+        } finally {
+            try_sql("DROP TABLE IF EXISTS ${testTable}")
+        }
+    }
+
+    // case4: import json and apply jsonpaths & exprs
+    def q4 = {
+        try {
+            def test_load_label1 = 
UUID.randomUUID().toString().replaceAll("-", "")
+            sql "DROP TABLE IF EXISTS ${testTable}"
+        
+            create_test_table1.call(testTable)
+            load_from_hdfs1.call("false", "false", "false", testTable, 
test_load_label1, "simple_object_json.json",
+                                fsPath, hdfsUser, "SET(code = id * 10 + 200)", 
"""[\\"\$.id\\"]""", '', '', '')
+
+            check_load_result(test_load_label1, testTable)
+            sql "sync"
+            qt_select4 "select * from ${testTable} order by id"
+
+            // test new json reader
+            def test_load_label2 = 
UUID.randomUUID().toString().replaceAll("-", "")
+            sql "DROP TABLE IF EXISTS ${testTable}"
+            create_test_table1.call(testTable)
+            load_from_hdfs1.call("true", "false", "false", testTable, 
test_load_label2, "simple_object_json.json",
+                                fsPath, hdfsUser, "SET(code = id * 10 + 200)", 
"""[\\"\$.id\\"]""", '', '', '')
+
+            check_load_result(test_load_label2, testTable)
+            sql "sync"
+            qt_select4 "select * from ${testTable} order by id"
+        } finally {
+            try_sql("DROP TABLE IF EXISTS ${testTable}")
+        }
+    }
+
+    // case5: import json with line reader
+    def q5 = {
+        try {
+            def test_load_label1 = 
UUID.randomUUID().toString().replaceAll("-", "")
+            sql "DROP TABLE IF EXISTS ${testTable}"
+        
+            create_test_table1.call(testTable)
+            load_from_hdfs1.call("false", "true", "false", testTable, 
test_load_label1, "multi_line_json.json",
+                                fsPath, hdfsUser, '', '', '', '', '')
+
+            check_load_result(test_load_label1, testTable)
+            sql "sync"
+            qt_select5 "select * from ${testTable} order by id"
+
+            // test new json reader
+            def test_load_label2 = 
UUID.randomUUID().toString().replaceAll("-", "")
+            sql "DROP TABLE IF EXISTS ${testTable}"
+            create_test_table1.call(testTable)
+            load_from_hdfs1.call("true", "true", "false", testTable, 
test_load_label2, "multi_line_json.json",
+                                fsPath, hdfsUser, '', '', '', '', '')
+
+            check_load_result(test_load_label2, testTable)
+            sql "sync"
+            qt_select5 "select * from ${testTable} order by id"
+        } finally {
+            try_sql("DROP TABLE IF EXISTS ${testTable}")
+        }
+    }
+
+    // case6: import json use exprs and jsonpaths
+    def q6 = {
+        try {
+            def test_load_label1 = 
UUID.randomUUID().toString().replaceAll("-", "")
+            sql "DROP TABLE IF EXISTS ${testTable}"
+        
+            create_test_table1.call(testTable)
+            load_from_hdfs1.call("false", "true", "false", testTable, 
test_load_label1, "multi_line_json.json",
+                                fsPath, hdfsUser, "SET(id = id * 10)", 
"""[\\"\$.id\\", \\"\$.code\\"]""", '', '', '')
+
+            check_load_result(test_load_label1, testTable)
+            sql "sync"
+            qt_select6 "select * from ${testTable} order by id"
+
+            // test new json reader
+            def test_load_label2 = 
UUID.randomUUID().toString().replaceAll("-", "")
+            sql "DROP TABLE IF EXISTS ${testTable}"
+            create_test_table1.call(testTable)
+            load_from_hdfs1.call("true", "true", "false", testTable, 
test_load_label2, "multi_line_json.json",
+                                fsPath, hdfsUser, "SET(id = id * 10)", 
"""[\\"\$.id\\", \\"\$.code\\"]""", '', '', '')
+
+            check_load_result(test_load_label2, testTable)
+            sql "sync"
+            qt_select6 "select * from ${testTable} order by id"
+        } finally {
+            try_sql("DROP TABLE IF EXISTS ${testTable}")
+        }
+    }
+
+    // case7: import json use where
+    def q7 = {
+        try {
+            def test_load_label1 = 
UUID.randomUUID().toString().replaceAll("-", "")
+            sql "DROP TABLE IF EXISTS ${testTable}"
+        
+            create_test_table1.call(testTable)
+            load_from_hdfs1.call("false", "true", "false", testTable, 
test_load_label1, "multi_line_json.json",
+                                fsPath, hdfsUser, "SET(id = id * 10)", 
"""[\\"\$.id\\", \\"\$.code\\"]""", '', '', 'WHERE id>50')
+
+            check_load_result(test_load_label1, testTable)
+            sql "sync"
+            qt_select7 "select * from ${testTable} order by id"
+
+            // test new json reader
+            def test_load_label2 = 
UUID.randomUUID().toString().replaceAll("-", "")
+            sql "DROP TABLE IF EXISTS ${testTable}"
+            create_test_table1.call(testTable)
+            load_from_hdfs1.call("true", "true", "false", testTable, 
test_load_label2, "multi_line_json.json",
+                                fsPath, hdfsUser, "SET(id = id * 10)", 
"""[\\"\$.id\\", \\"\$.code\\"]""", '', '', 'WHERE id>50')
+
+            check_load_result(test_load_label2, testTable)
+            sql "sync"
+            qt_select7 "select * from ${testTable} order by id"
+        } finally {
+            try_sql("DROP TABLE IF EXISTS ${testTable}")
+        }
+    }
+
+
+    // case8: import json use fuzzy_parse
+    def q8 = {
+        try {
+            def test_load_label1 = 
UUID.randomUUID().toString().replaceAll("-", "")
+            sql "DROP TABLE IF EXISTS ${testTable}"
+        
+            create_test_table1.call(testTable)
+            load_from_hdfs1.call("false", "true", "true", testTable, 
test_load_label1, "multi_line_json.json",
+                                fsPath, hdfsUser, "SET(id = id * 10)", 
"""[\\"\$.id\\", \\"\$.city\\"]""", '', '', 'WHERE id>50')
+
+            check_load_result(test_load_label1, testTable)
+            sql "sync"
+            qt_select8 "select * from ${testTable} order by id"
+
+            // test new json reader
+            def test_load_label2 = 
UUID.randomUUID().toString().replaceAll("-", "")
+            sql "DROP TABLE IF EXISTS ${testTable}"
+            create_test_table1.call(testTable)
+            load_from_hdfs1.call("true", "true", "true", testTable, 
test_load_label2, "multi_line_json.json",
+                                fsPath, hdfsUser, "SET(id = id * 10)", 
"""[\\"\$.id\\", \\"\$.city\\"]""", '', '', 'WHERE id>50')
+
+            check_load_result(test_load_label2, testTable)
+            sql "sync"
+            qt_select8 "select * from ${testTable} order by id"
+        } finally {
+            try_sql("DROP TABLE IF EXISTS ${testTable}")
+        }
+    }
+
+    // case9: import json use json_root
+    def q9 = {
+        try {
+            def test_load_label1 = 
UUID.randomUUID().toString().replaceAll("-", "")
+            sql "DROP TABLE IF EXISTS ${testTable}"
+        
+            create_test_table1.call(testTable)
+            load_from_hdfs1.call("false", "false", "true", testTable, 
test_load_label1, "nest_json.json",
+                                fsPath, hdfsUser, "SET(id = id * 10)", '', 
'$.item', '', '')
+
+            check_load_result(test_load_label1, testTable)
+            sql "sync"
+            qt_select9 "select * from ${testTable} order by id"
+
+            // test new json reader
+            def test_load_label2 = 
UUID.randomUUID().toString().replaceAll("-", "")
+            sql "DROP TABLE IF EXISTS ${testTable}"
+            create_test_table1.call(testTable)
+            load_from_hdfs1.call("true", "false", "true", testTable, 
test_load_label2, "nest_json.json",
+                                fsPath, hdfsUser, "SET(id = id * 10)", '', 
'$.item', '', '')
+
+            check_load_result(test_load_label2, testTable)
+            sql "sync"
+            qt_select9 "select * from ${testTable} order by id"
+        } finally {
+            try_sql("DROP TABLE IF EXISTS ${testTable}")
+        }
+    }
+
+    // case10: test json file which is unordered and no use json_path
+    def q10 = {
+        try {
+            def test_load_label1 = 
UUID.randomUUID().toString().replaceAll("-", "")
+            sql "DROP TABLE IF EXISTS ${testTable}"
+        
+            create_test_table1.call(testTable)
+            load_from_hdfs1.call("false", "true", "false", testTable, 
test_load_label1, "multi_line_json_unorder.json",
+                                fsPath, hdfsUser, '', '', '', '', '')
+
+            check_load_result(test_load_label1, testTable)
+            sql "sync"
+            qt_select10 "select * from ${testTable} order by id"
+
+            // test new json reader
+            def test_load_label2 = 
UUID.randomUUID().toString().replaceAll("-", "")
+            sql "DROP TABLE IF EXISTS ${testTable}"
+            create_test_table1.call(testTable)
+            load_from_hdfs1.call("true", "true", "false", testTable, 
test_load_label2, "multi_line_json_unorder.json",
+                                fsPath, hdfsUser, '', '', '', '', '')
+
+            check_load_result(test_load_label2, testTable)
+            sql "sync"
+            qt_select10 "select * from ${testTable} order by id"
+        } finally {
+            try_sql("DROP TABLE IF EXISTS ${testTable}")
+        }
+    }
+
+    // case11: test json file which is unordered and lack one column which is 
nullable
+    def q11 = {
+        try {
+            def test_load_label1 = 
UUID.randomUUID().toString().replaceAll("-", "")
+            sql "DROP TABLE IF EXISTS ${testTable}"
+        
+            create_test_table1.call(testTable)
+            load_from_hdfs1.call("false", "true", "false", testTable, 
test_load_label1, "multi_line_json_lack_column.json",
+                                fsPath, hdfsUser, '', '', '', '', '')
+
+            check_load_result(test_load_label1, testTable)
+            sql "sync"
+            qt_select11 "select * from ${testTable} order by id"
+
+            // test new json reader
+            def test_load_label2 = 
UUID.randomUUID().toString().replaceAll("-", "")
+            sql "DROP TABLE IF EXISTS ${testTable}"
+            create_test_table1.call(testTable)
+            load_from_hdfs1.call("true", "true", "false", testTable, 
test_load_label2, "multi_line_json_lack_column.json",
+                                fsPath, hdfsUser, '', '', '', '', '')
+
+            check_load_result(test_load_label2, testTable)
+            sql "sync"
+            qt_select11 "select * from ${testTable} order by id"
+        } finally {
+            try_sql("DROP TABLE IF EXISTS ${testTable}")
+        }
+    }
+
+
+    // case12: use json_path and json_root
+    def q12 = {
+        try {
+            def test_load_label1 = 
UUID.randomUUID().toString().replaceAll("-", "")
+            sql "DROP TABLE IF EXISTS ${testTable}"
+        
+            create_test_table1.call(testTable)
+            load_from_hdfs1.call("false", "false", "false", testTable, 
test_load_label1, "nest_json.json", fsPath, hdfsUser,
+                                "SET(id = id * 10)", """[\\"\$.id\\", 
\\"\$.city\\"]""", '$.item', '', '')
+
+            check_load_result(test_load_label1, testTable)
+            sql "sync"
+            qt_select12 "select * from ${testTable} order by id"
+
+            // test new json reader
+            def test_load_label2 = 
UUID.randomUUID().toString().replaceAll("-", "")
+            sql "DROP TABLE IF EXISTS ${testTable}"
+            create_test_table1.call(testTable)
+            load_from_hdfs1.call("true", "false", "false", testTable, 
test_load_label2, "nest_json.json", fsPath, hdfsUser,
+                                "SET(id = id * 10)", """[\\"\$.id\\", 
\\"\$.city\\"]""", '$.item', '', '')
+
+            check_load_result(test_load_label2, testTable)
+            sql "sync"
+            qt_select12 "select * from ${testTable} order by id"
+        } finally {
+            try_sql("DROP TABLE IF EXISTS ${testTable}")
+        }
+    }
+
+    // case13: use json_path & json_root & where
+    def q13 = {
+        try {
+            def test_load_label1 = 
UUID.randomUUID().toString().replaceAll("-", "")
+            sql "DROP TABLE IF EXISTS ${testTable}"
+        
+            create_test_table1.call(testTable)
+            load_from_hdfs1.call("false", "false", "false", testTable, 
test_load_label1, "nest_json.json", fsPath, hdfsUser,
+                                "SET(id = id * 10)", """[\\"\$.id\\", 
\\"\$.city\\"]""", '$.item', '', 'WHERE id>20')
+
+            check_load_result(test_load_label1, testTable)
+            sql "sync"
+            qt_select13 "select * from ${testTable} order by id"
+
+            // test new json reader
+            def test_load_label2 = 
UUID.randomUUID().toString().replaceAll("-", "")
+            sql "DROP TABLE IF EXISTS ${testTable}"
+            create_test_table1.call(testTable)
+            load_from_hdfs1.call("true", "false", "false", testTable, 
test_load_label2, "nest_json.json", fsPath, hdfsUser,
+                                "SET(id = id * 10)", """[\\"\$.id\\", 
\\"\$.city\\"]""", '$.item', '', 'WHERE id>20')
+
+            check_load_result(test_load_label2, testTable)
+            sql "sync"
+            qt_select13 "select * from ${testTable} order by id"
+        } finally {
+            try_sql("DROP TABLE IF EXISTS ${testTable}")
+        }
+    }
+
+    // case14: use jsonpaths & json_root & where & columns
+    def q14 = {
+        try {
+            def test_load_label1 = 
UUID.randomUUID().toString().replaceAll("-", "")
+            sql "DROP TABLE IF EXISTS ${testTable}"
+        
+            create_test_table1.call(testTable)
+            load_from_hdfs1.call("false", "false", "false", testTable, 
test_load_label1, "nest_json.json", fsPath, hdfsUser,
+                                "SET(id = id * 10)", """[\\"\$.id\\", 
\\"\$.code\\",\\"\$.city\\"]""", '$.item',
+                                '(id, code, city)', 'WHERE id>20')
+
+            check_load_result(test_load_label1, testTable)
+            sql "sync"
+            qt_select14 "select * from ${testTable} order by id"
+
+            // test new json reader
+            def test_load_label2 = 
UUID.randomUUID().toString().replaceAll("-", "")
+            sql "DROP TABLE IF EXISTS ${testTable}"
+            create_test_table1.call(testTable)
+            load_from_hdfs1.call("true", "false", "false", testTable, 
test_load_label2, "nest_json.json", fsPath, hdfsUser,
+                                "SET(id = id * 10)", """[\\"\$.id\\", 
\\"\$.code\\", \\"\$.city\\"]""", '$.item',
+                                '(id, code, city)', 'WHERE id>20')
+
+            check_load_result(test_load_label2, testTable)
+            sql "sync"
+            qt_select14 "select * from ${testTable} order by id"
+        } finally {
+            try_sql("DROP TABLE IF EXISTS ${testTable}")
+        }
+    }
+
+
+
+    
+    String enabled = context.config.otherConfigs.get("enableHiveTest")
+    if (enabled != null && enabled.equalsIgnoreCase("true")) {
+        log.info("Begin Test q1:")
+        q1()
+        log.info("Begin Test q2:")
+        q2()
+        log.info("Begin Test q3:")
+        q3()
+        log.info("Begin Test q4:")
+        q4()
+        log.info("Begin Test q5:")
+        q5()
+        log.info("Begin Test q6:")
+        q6()
+        log.info("Begin Test q7:")
+        q7()
+        log.info("Begin Test q8:")
+        q8()
+        log.info("Begin Test q9:")
+        q9()
+        log.info("Begin Test q10:")
+        q10()
+        log.info("Begin Test q11:")
+        q11()
+        log.info("Begin Test q12:")
+        q12()
+        log.info("Begin Test q13:")
+        q13()
+        log.info("Begin Test q14:")
+        q14()
+    }
+}
\ No newline at end of file
diff --git a/regression-test/suites/load_p0/stream_load/test_json_load.groovy 
b/regression-test/suites/load_p0/stream_load/test_json_load.groovy
index 8dc594c660..e066467e3e 100644
--- a/regression-test/suites/load_p0/stream_load/test_json_load.groovy
+++ b/regression-test/suites/load_p0/stream_load/test_json_load.groovy
@@ -15,7 +15,7 @@
 // specific language governing permissions and limitations
 // under the License.
 
-suite("test_json_load", "p0") {
+suite("test_json_load", "p0") { 
     // define a sql table
     def testTable = "test_json_load"
     
@@ -64,6 +64,30 @@ suite("test_json_load", "p0") {
         assertTrue(result2[0][0] == 1, "Insert should update 1 rows")
     }
 
+    // city is NOT NULL
+    def create_test_table3 = {testTablex ->
+        // multi-line sql
+        def result1 = sql """
+                        CREATE TABLE IF NOT EXISTS ${testTablex} (
+                        id INT DEFAULT '10',
+                        city VARCHAR(32) NOT NULL,
+                        code BIGINT SUM DEFAULT '0')
+                        DISTRIBUTED BY HASH(id) BUCKETS 10
+                        PROPERTIES("replication_num" = "1");
+                        """
+        
+        // DDL/DML return 1 row and 3 column, the only value is update row 
count
+        assertTrue(result1.size() == 1)
+        assertTrue(result1[0].size() == 1)
+        assertTrue(result1[0][0] == 0, "Create table should update 0 rows")
+        
+        // insert 1 row to check whether the table is ok
+        def result2 = sql "INSERT INTO ${testTablex} (id, city, code) VALUES 
(200, 'hangzhou', 12345)"
+        assertTrue(result2.size() == 1)
+        assertTrue(result2[0].size() == 1)
+        assertTrue(result2[0][0] == 1, "Insert should update 1 rows")
+    }
+
     def test_invalid_json_array_table = { testTablex ->
         // multi-line sql
         def result1 = sql """
@@ -90,8 +114,11 @@ suite("test_json_load", "p0") {
         assertTrue(result1[0][0] == 0, "Create table should update 0 rows")
     }
     
-    def load_json_data = {label, strip_flag, read_flag, format_flag, exprs, 
json_paths, 
-                            json_root, where_expr, fuzzy_flag, file_name, 
ignore_failure=false ->
+    def load_json_data = {new_json_reader_flag, label, strip_flag, read_flag, 
format_flag, exprs, json_paths, 
+                        json_root, where_expr, fuzzy_flag, file_name, 
ignore_failure=false ->
+        // should be delete after new_load_scan is ready
+        sql """ADMIN SET FRONTEND CONFIG ("enable_new_load_scan_node" = 
"${new_json_reader_flag}");"""
+        
         // load the json data
         streamLoad {
             table "test_json_load"
@@ -123,6 +150,9 @@ suite("test_json_load", "p0") {
                 assertTrue(json.NumberLoadedRows > 0 && json.LoadBytes > 0)
             }
         }
+
+        // should be deleted after new_load_scan is ready
+        sql """ADMIN SET FRONTEND CONFIG ("enable_new_load_scan_node" = 
"false");"""
     }
     
     def load_from_hdfs1 = {testTablex, label, hdfsFilePath, format, 
brokerName, hdfsUser, hdfsPasswd ->
@@ -182,10 +212,20 @@ suite("test_json_load", "p0") {
         
         create_test_table1.call(testTable)
 
-        load_json_data.call('test_json_load_case1', 'true', '', 'json', '', 
'', '', '', '', 'simple_json.json')
+        load_json_data.call('false', 'test_json_load_case1', 'true', '', 
'json', '', '', '', '', '', 'simple_json.json')
 
         sql "sync"
-        qt_select "select * from ${testTable} order by id"
+        qt_select1 "select * from ${testTable} order by id"
+
+        // test new json reader
+        sql "DROP TABLE IF EXISTS ${testTable}"
+        
+        create_test_table1.call(testTable)
+
+        load_json_data.call('true', 'test_json_load_case1_2', 'true', '', 
'json', '', '', '', '', '', 'simple_json.json')
+
+        sql "sync"
+        qt_select1 "select * from ${testTable} order by id"
 
     } finally {
         try_sql("DROP TABLE IF EXISTS ${testTable}")
@@ -197,10 +237,20 @@ suite("test_json_load", "p0") {
 
         create_test_table1.call(testTable)
 
-        load_json_data.call('test_json_load_case2', 'true', '', 'json', 'id= 
id * 10', '', '', '', '', 'simple_json.json')
+        load_json_data.call('false', 'test_json_load_case2', 'true', '', 
'json', 'id= id * 10', '', '', '', '', 'simple_json.json')
+
+        sql "sync" 
+        qt_select2 "select * from ${testTable} order by id"
+
+        // test new json reader
+        sql "DROP TABLE IF EXISTS ${testTable}"
+
+        create_test_table1.call(testTable)
+
+        load_json_data.call('true', 'test_json_load_case2_2', 'true', '', 
'json', 'id= id * 10', '', '', '', '', 'simple_json.json')
 
         sql "sync" 
-        qt_select "select * from ${testTable} order by id"
+        qt_select2 "select * from ${testTable} order by id"
 
     } finally {
         try_sql("DROP TABLE IF EXISTS ${testTable}")
@@ -212,11 +262,22 @@ suite("test_json_load", "p0") {
         
         create_test_table2.call(testTable)
         
-        load_json_data.call('test_json_load_case3', 'true', '', 'json', '', 
'[\"$.id\", \"$.code\"]',
+        load_json_data.call('false', 'test_json_load_case3', 'true', '', 
'json', '', '[\"$.id\", \"$.code\"]',
+                            '', '', '', 'simple_json.json')
+
+        sql "sync"
+        qt_select3 "select * from ${testTable} order by id"
+
+        // test new json reader
+        sql "DROP TABLE IF EXISTS ${testTable}"
+        
+        create_test_table2.call(testTable)
+        
+        load_json_data.call('true', 'test_json_load_case3_2', 'true', '', 
'json', '', '[\"$.id\", \"$.code\"]',
                             '', '', '', 'simple_json.json')
 
         sql "sync"
-        qt_select "select * from ${testTable} order by id"
+        qt_select3 "select * from ${testTable} order by id"
 
     } finally {
         try_sql("DROP TABLE IF EXISTS ${testTable}")
@@ -228,11 +289,22 @@ suite("test_json_load", "p0") {
         
         create_test_table2.call(testTable)
         
-        load_json_data.call('test_json_load_case4', 'true', '', 'json', 'code 
= id * 10 + 200', '[\"$.id\"]',
+        load_json_data.call('false', 'test_json_load_case4', 'true', '', 
'json', 'code = id * 10 + 200', '[\"$.id\"]',
                             '', '', '', 'simple_json.json')
 
         sql "sync"
-        qt_select "select * from ${testTable} order by id"
+        qt_select4 "select * from ${testTable} order by id"
+
+        // test new json reader
+        sql "DROP TABLE IF EXISTS ${testTable}"
+        
+        create_test_table2.call(testTable)
+        
+        load_json_data.call('true', 'test_json_load_case4_2', 'true', '', 
'json', 'code = id * 10 + 200', '[\"$.id\"]',
+                            '', '', '', 'simple_json.json')
+
+        sql "sync"
+        qt_select4 "select * from ${testTable} order by id"
 
     } finally {
         try_sql("DROP TABLE IF EXISTS ${testTable}")
@@ -244,11 +316,22 @@ suite("test_json_load", "p0") {
         
         create_test_table2.call(testTable)
         
-        load_json_data.call('test_json_load_case5', 'true', 'true', 'json', 
'', '[\"$.id\", \"$.code\"]',
+        load_json_data.call('false', 'test_json_load_case5', 'true', 'true', 
'json', '', '[\"$.id\", \"$.code\"]',
                             '', '', '', 'multi_line_json.json')
         
         sql "sync"
-        qt_select "select * from ${testTable} order by id"
+        qt_select5 "select * from ${testTable} order by id"
+
+        // test new json reader
+        sql "DROP TABLE IF EXISTS ${testTable}"
+        
+        create_test_table2.call(testTable)
+        
+        load_json_data.call('true', 'test_json_load_case5_2', 'true', 'true', 
'json', '', '[\"$.id\", \"$.code\"]',
+                            '', '', '', 'multi_line_json.json')
+        
+        sql "sync"
+        qt_select5 "select * from ${testTable} order by id"
 
     } finally {
         try_sql("DROP TABLE IF EXISTS ${testTable}")
@@ -260,11 +343,23 @@ suite("test_json_load", "p0") {
 
         create_test_table2.call(testTable)
         
-        load_json_data.call('test_json_load_case6', 'true', 'true', 'json', 
'id= id * 10', '[\"$.id\", \"$.code\"]',
+        load_json_data.call('false', 'test_json_load_case6', 'true', 'true', 
'json', 'id= id * 10', '[\"$.id\", \"$.code\"]',
+                            '', '', '', 'multi_line_json.json')
+
+        sql "sync"
+        qt_select6 "select * from ${testTable} order by id"
+
+
+        // test new json reader
+        sql "DROP TABLE IF EXISTS ${testTable}"
+
+        create_test_table2.call(testTable)
+        
+        load_json_data.call('true', 'test_json_load_case6_2', 'true', 'true', 
'json', 'id= id * 10', '[\"$.id\", \"$.code\"]',
                             '', '', '', 'multi_line_json.json')
 
         sql "sync"
-        qt_select "select * from ${testTable} order by id"
+        qt_select6 "select * from ${testTable} order by id"
 
     } finally {
         try_sql("DROP TABLE IF EXISTS ${testTable}")
@@ -276,11 +371,22 @@ suite("test_json_load", "p0") {
 
         create_test_table2.call(testTable)
         
-        load_json_data.call('test_json_load_case7', 'true', 'true', 'json', 
'id= id * 10', '[\"$.id\", \"$.code\"]',
+        load_json_data.call('false', 'test_json_load_case7', 'true', 'true', 
'json', 'id= id * 10', '[\"$.id\", \"$.code\"]',
+                            '', 'id > 50', '', 'multi_line_json.json')
+
+        sql "sync"
+        qt_select7 "select * from ${testTable} order by id"
+
+        // test new json reader
+        sql "DROP TABLE IF EXISTS ${testTable}"
+
+        create_test_table2.call(testTable)
+        
+        load_json_data.call('true', 'test_json_load_case7_2', 'true', 'true', 
'json', 'id= id * 10', '[\"$.id\", \"$.code\"]',
                             '', 'id > 50', '', 'multi_line_json.json')
 
         sql "sync"
-        qt_select "select * from ${testTable} order by id"
+        qt_select7 "select * from ${testTable} order by id"
 
     } finally {
         try_sql("DROP TABLE IF EXISTS ${testTable}")
@@ -292,11 +398,23 @@ suite("test_json_load", "p0") {
 
         create_test_table2.call(testTable)
         
-        load_json_data.call('test_json_load_case8', 'true', 'true', 'json', 
'id= id * 10', '[\"$.id\", \"$.code\"]',
+        load_json_data.call('false', 'test_json_load_case8', 'true', 'true', 
'json', 'id= id * 10', '[\"$.id\", \"$.code\"]',
                             '', 'id > 50', 'true', 'multi_line_json.json')
 
         sql "sync"
-        qt_select "select * from ${testTable} order by id"
+        qt_select8 "select * from ${testTable} order by id"
+
+
+        // test new json reader
+        sql "DROP TABLE IF EXISTS ${testTable}"
+
+        create_test_table2.call(testTable)
+        
+        load_json_data.call('true', 'test_json_load_case8_2', 'true', 'true', 
'json', 'id= id * 10', '[\"$.id\", \"$.code\"]',
+                            '', 'id > 50', 'true', 'multi_line_json.json')
+
+        sql "sync"
+        qt_select8 "select * from ${testTable} order by id"
 
     } finally {
         try_sql("DROP TABLE IF EXISTS ${testTable}")
@@ -308,26 +426,258 @@ suite("test_json_load", "p0") {
 
         create_test_table1.call(testTable)
         
-        load_json_data.call('test_json_load_case9', '', 'true', 'json', 'id= 
id * 10', '',
+        load_json_data.call('false', 'test_json_load_case9', '', 'true', 
'json', 'id= id * 10', '',
                             '$.item', '', 'true', 'nest_json.json')
 
         sql "sync"
-        qt_select "select * from ${testTable} order by id"
+        qt_select9 "select * from ${testTable} order by id"
+
+        // test new json reader
+         sql "DROP TABLE IF EXISTS ${testTable}"
+
+        create_test_table1.call(testTable)
+        
+        load_json_data.call('true', 'test_json_load_case9_2', '', 'true', 
'json', 'id= id * 10', '',
+                            '$.item', '', 'true', 'nest_json.json')
+
+        sql "sync"
+        qt_select9 "select * from ${testTable} order by id"
 
     } finally {
         try_sql("DROP TABLE IF EXISTS ${testTable}")
     }
+
     // case10: invalid json
     try {
         sql "DROP TABLE IF EXISTS ${testTable}"
 
         create_test_table1.call(testTable)
         
-        load_json_data.call('test_json_load_case10', '', 'true', 'json', 'id= 
id * 10', '',
+        load_json_data.call('false', 'test_json_load_case10', '', 'true', 
'json', 'id= id * 10', '',
                             '$.item', '', 'true', 'invalid_json.json', true)
 
         sql "sync"
-        qt_select "select * from ${testTable} order by id"
+        qt_select10 "select * from ${testTable} order by id"
+
+
+        // test new json reader
+        sql "DROP TABLE IF EXISTS ${testTable}"
+
+        create_test_table1.call(testTable)
+        
+        load_json_data.call('true', 'test_json_load_case10_2', '', 'true', 
'json', 'id= id * 10', '',
+                            '$.item', '', 'true', 'invalid_json.json', true)
+
+        sql "sync"
+        qt_select10 "select * from ${testTable} order by id"
+
+    } finally {
+        try_sql("DROP TABLE IF EXISTS ${testTable}")
+    }
+
+    // case11: test json file which is unordered and no use json_path
+    try {
+        sql "DROP TABLE IF EXISTS ${testTable}"
+        
+        create_test_table1.call(testTable)
+
+        load_json_data.call('false', 'test_json_load_case11', 'true', '', 
'json', '', '', '', '', '', 'simple_json2.json')
+
+        sql "sync"
+        qt_select11 "select * from ${testTable} order by id"
+
+        // test new json reader
+        sql "DROP TABLE IF EXISTS ${testTable}"
+        
+        create_test_table1.call(testTable)
+
+        load_json_data.call('true', 'test_json_load_case11_2', 'true', '', 
'json', '', '', '', '', '', 'simple_json2.json')
+
+        sql "sync"
+        qt_select11 "select * from ${testTable} order by id"
+
+    } finally {
+        try_sql("DROP TABLE IF EXISTS ${testTable}")
+    }
+
+    // case12: test json file which is unordered and lack one column which is 
nullable
+    try {
+        sql "DROP TABLE IF EXISTS ${testTable}"
+        
+        create_test_table1.call(testTable)
+
+        load_json_data.call('false', 'test_json_load_case12', 'true', '', 
'json', '', '', '', '', '', 'simple_json2_lack_one_column.json')
+
+        sql "sync"
+        qt_select12 "select * from ${testTable} order by id"
+
+        // test new json reader
+        sql "DROP TABLE IF EXISTS ${testTable}"
+        
+        create_test_table1.call(testTable)
+
+        load_json_data.call('true', 'test_json_load_case12_2', 'true', '', 
'json', '', '', '', '', '', 'simple_json2_lack_one_column.json')
+
+        sql "sync"
+        qt_select12 "select * from ${testTable} order by id"
+
+    } finally {
+        try_sql("DROP TABLE IF EXISTS ${testTable}")
+    }
+
+    // case13: test json file which is unordered and lack one column which is 
not nullable
+    try {
+        sql "DROP TABLE IF EXISTS ${testTable}"
+        create_test_table3.call(testTable)
+        // should be delete after new_load_scan is ready
+        sql """ADMIN SET FRONTEND CONFIG ("enable_new_load_scan_node" = 
"false");"""
+        // load the json data
+        streamLoad {
+            table "${testTable}"
+            
+            // set http request header params
+            set 'strip_outer_array', "true"
+            set 'format', "json"
+            set 'max_filter_ratio', '1'
+            file "simple_json2_lack_one_column.json" // import json file
+            time 10000 // limit inflight 10s
+
+            // if declared a check callback, the default check condition will 
ignore.
+            // So you must check all condition
+            check { result, exception, startTime, endTime ->
+                if (exception != null) {
+                    throw exception
+                }
+                log.info("Stream load result: ${result}".toString())
+                def json = parseJson(result)
+                assertEquals("success", json.Status.toLowerCase())
+                assertEquals(json.NumberTotalRows, json.NumberLoadedRows + 
json.NumberUnselectedRows + json.NumberFilteredRows)
+                assertEquals(json.NumberFilteredRows, 4)
+                assertEquals(json.NumberLoadedRows, 6)
+                assertTrue(json.NumberLoadedRows > 0 && json.LoadBytes > 0)
+            }
+        }
+        // should be deleted after new_load_scan is ready
+        sql """ADMIN SET FRONTEND CONFIG ("enable_new_load_scan_node" = 
"false");"""
+        sql "sync"
+        qt_select13 "select * from ${testTable} order by id"
+
+
+        sql "DROP TABLE IF EXISTS ${testTable}"
+        create_test_table3.call(testTable)
+        // should be delete after new_load_scan is ready
+        sql """ADMIN SET FRONTEND CONFIG ("enable_new_load_scan_node" = 
"true");"""
+        // load the json data
+        streamLoad {
+            table "${testTable}"
+            
+            // set http request header params
+            set 'strip_outer_array', "true"
+            set 'format', "json"
+            set 'max_filter_ratio', '1'
+            file "simple_json2_lack_one_column.json" // import json file
+            time 10000 // limit inflight 10s
+
+            // if declared a check callback, the default check condition will 
ignore.
+            // So you must check all condition
+            check { result, exception, startTime, endTime ->
+                if (exception != null) {
+                    throw exception
+                }
+                log.info("Stream load result: ${result}".toString())
+                def json = parseJson(result)
+                assertEquals("success", json.Status.toLowerCase())
+                assertEquals(json.NumberTotalRows, json.NumberLoadedRows + 
json.NumberUnselectedRows + json.NumberFilteredRows)
+                assertEquals(json.NumberFilteredRows, 4)
+                assertEquals(json.NumberLoadedRows, 6)
+                assertTrue(json.NumberLoadedRows > 0 && json.LoadBytes > 0)
+            }
+        }
+        // should be deleted after new_load_scan is ready
+        sql """ADMIN SET FRONTEND CONFIG ("enable_new_load_scan_node" = 
"false");"""
+        sql "sync"
+        qt_select13 "select * from ${testTable} order by id"
+
+    } finally {
+        try_sql("DROP TABLE IF EXISTS ${testTable}")
+    }
+
+    // case14: use json_path and json_root
+    try {
+        sql "DROP TABLE IF EXISTS ${testTable}"
+
+        create_test_table1.call(testTable)
+        
+        load_json_data.call('false', 'test_json_load_case14', '', 'true', 
'json', 'id= id * 10', '[\"$.id\", \"$.code\"]',
+                            '$.item', '', 'true', 'nest_json.json')
+
+        sql "sync"
+        qt_select14 "select * from ${testTable} order by id"
+
+        // test new json reader
+        sql "DROP TABLE IF EXISTS ${testTable}"
+
+        create_test_table1.call(testTable)
+        
+        load_json_data.call('true', 'test_json_load_case14_2', '', 'true', 
'json', 'id= id * 10', '[\"$.id\", \"$.code\"]',
+                            '$.item', '', 'true', 'nest_json.json')
+
+        sql "sync"
+        qt_select14 "select * from ${testTable} order by id"
+
+    } finally {
+        try_sql("DROP TABLE IF EXISTS ${testTable}")
+    }
+
+    // case15: apply jsonpaths & exprs & json_root
+    try {
+        sql "DROP TABLE IF EXISTS ${testTable}"
+        
+        create_test_table1.call(testTable)
+        
+        load_json_data.call('false', 'test_json_load_case15', '', 'true', 
'json', 'id, code, city, id= id * 10',
+                            '[\"$.id\", \"$.code\", \"$.city\"]', '$.item', 
'', 'true', 'nest_json.json')
+
+        sql "sync"
+        qt_select15 "select * from ${testTable} order by id"
+
+        // test new json reader
+        sql "DROP TABLE IF EXISTS ${testTable}"
+        
+        create_test_table1.call(testTable)
+        
+        load_json_data.call('true', 'test_json_load_case15_2', '', 'true', 
'json', 'id, code, city,id= id * 10',
+                            '[\"$.id\", \"$.code\", \"$.city\"]', '$.item', 
'', 'true', 'nest_json.json')
+
+        sql "sync"
+        qt_select15 "select * from ${testTable} order by id"
+
+    } finally {
+        try_sql("DROP TABLE IF EXISTS ${testTable}")
+    }
+
+    // case16: apply jsonpaths & exprs & json_root
+    try {
+        sql "DROP TABLE IF EXISTS ${testTable}"
+        
+        create_test_table1.call(testTable)
+        
+        load_json_data.call('false', 'test_json_load_case16', 'true', '', 
'json', 'id, code, city',
+                            '[\"$.id\", \"$.code\", \"$.city[2]\"]', '$.item', 
'', 'true', 'nest_json_array.json')
+
+        sql "sync"
+        qt_select16 "select * from ${testTable} order by id"
+
+        // test new json reader
+        sql "DROP TABLE IF EXISTS ${testTable}"
+        
+        create_test_table1.call(testTable)
+        
+        load_json_data.call('true', 'test_json_load_case16_2', 'true', '', 
'json', 'id, code, city',
+                            '[\"$.id\", \"$.code\", \"$.city[2]\"]', '$.item', 
'', 'true', 'nest_json_array.json')
+
+        sql "sync"
+        qt_select16 "select * from ${testTable} order by id"
 
     } finally {
         try_sql("DROP TABLE IF EXISTS ${testTable}")
@@ -342,7 +692,7 @@ suite("test_json_load", "p0") {
         def hdfs_file_path = uploadToHdfs "stream_load/simple_object_json.json"
         def format = "json" 
 
-        // case11: import json use pre-filter exprs
+        // case17: import json use pre-filter exprs
         try {
             sql "DROP TABLE IF EXISTS ${testTable}"
             
@@ -357,7 +707,7 @@ suite("test_json_load", "p0") {
             try_sql("DROP TABLE IF EXISTS ${testTable}")
         }
 
-        // case12: import json use pre-filter and where exprs
+        // case18: import json use pre-filter and where exprs
         try {
             sql "DROP TABLE IF EXISTS ${testTable}"
             
@@ -372,12 +722,12 @@ suite("test_json_load", "p0") {
             try_sql("DROP TABLE IF EXISTS ${testTable}")
         }
 
-        // case13: invalid json
+        // case19: invalid json
         try {
             sql "DROP TABLE IF EXISTS ${testTable}"
 
             test_invalid_json_array_table.call(testTable)
-            load_json_data.call('test_json_load_case11', 'true', '', 'json', 
'', '',
+            load_json_data.call('false', 'test_json_load_case19', 'true', '', 
'json', '', '',
                     '', '', '', 'invalid_json_array.json', true)
 
             sql "sync"
diff --git 
a/regression-test/suites/tpch_sf1_p0/multi_catalog_query_parquet/hive_catalog.groovy
 
b/regression-test/suites/tpch_sf1_p0/multi_catalog_query_parquet/hive_catalog.groovy
index 9db1324d4e..30d2798ef2 100644
--- 
a/regression-test/suites/tpch_sf1_p0/multi_catalog_query_parquet/hive_catalog.groovy
+++ 
b/regression-test/suites/tpch_sf1_p0/multi_catalog_query_parquet/hive_catalog.groovy
@@ -801,22 +801,23 @@ order by
         String[][] backends = sql """ show backends; """
         assertTrue(backends.size() > 0)
         for (String[] backend in backends) {
-            StringBuilder setConfigCommand = new StringBuilder();
-            setConfigCommand.append("curl -X POST http://";)
-            setConfigCommand.append(backend[2])
-            setConfigCommand.append(":")
-            setConfigCommand.append(backend[5])
-            setConfigCommand.append("/api/update_config?")
-            String command1 = setConfigCommand.toString() + 
"enable_new_load_scan_node=true"
-            logger.info(command1)
-            String command2 = setConfigCommand.toString() + 
"enable_new_file_scanner=true"
-            logger.info(command2)
-            def process1 = command1.execute()
-            int code = process1.waitFor()
-            assertEquals(code, 0)
-            def process2 = command2.execute()
-            code = process1.waitFor()
-            assertEquals(code, 0)
+            // No need to set this config anymore, but leave this code sample 
here
+            // StringBuilder setConfigCommand = new StringBuilder();
+            // setConfigCommand.append("curl -X POST http://";)
+            // setConfigCommand.append(backend[2])
+            // setConfigCommand.append(":")
+            // setConfigCommand.append(backend[5])
+            // setConfigCommand.append("/api/update_config?")
+            // String command1 = setConfigCommand.toString() + 
"enable_new_load_scan_node=true"
+            // logger.info(command1)
+            // String command2 = setConfigCommand.toString() + 
"enable_new_file_scanner=true"
+            // logger.info(command2)
+            // def process1 = command1.execute()
+            // int code = process1.waitFor()
+            // assertEquals(code, 0)
+            // def process2 = command2.execute()
+            // code = process1.waitFor()
+            // assertEquals(code, 0)
         }
     }
 


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]


Reply via email to