This is an automated email from the ASF dual-hosted git repository.
dataroaring pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/doris.git
The following commit(s) were added to refs/heads/master by this push:
new cb9a6f63ab8 [refactor](simd_json_reader) refactor simd json parse to
adapt stream parse (#27972)
cb9a6f63ab8 is described below
commit cb9a6f63ab87055bfa3633390c187404a10754e2
Author: HHoflittlefish777 <[email protected]>
AuthorDate: Thu Dec 7 14:45:15 2023 +0800
[refactor](simd_json_reader) refactor simd json parse to adapt stream parse
(#27972)
---
be/src/vec/exec/format/json/new_json_reader.cpp | 310 ++++++++++++------------
be/src/vec/exec/format/json/new_json_reader.h | 11 +
2 files changed, 164 insertions(+), 157 deletions(-)
diff --git a/be/src/vec/exec/format/json/new_json_reader.cpp
b/be/src/vec/exec/format/json/new_json_reader.cpp
index 83b78946486..5c62f284f96 100644
--- a/be/src/vec/exec/format/json/new_json_reader.cpp
+++ b/be/src/vec/exec/format/json/new_json_reader.cpp
@@ -1009,158 +1009,189 @@ Status NewJsonReader::_simdjson_init_reader() {
return Status::OK();
}
+Status NewJsonReader::_handle_simdjson_error(simdjson::simdjson_error& error,
Block& block,
+ size_t num_rows, bool* eof) {
+ fmt::memory_buffer error_msg;
+ fmt::format_to(error_msg, "Parse json data failed. code: {}, error info:
{}", error.error(),
+ error.what());
+ RETURN_IF_ERROR(_state->append_error_msg_to_file(
+ [&]() -> std::string {
+ return std::string(_simdjson_ondemand_padding_buffer.data(),
_original_doc_size);
+ },
+ [&]() -> std::string { return fmt::to_string(error_msg); }, eof));
+ _counter->num_rows_filtered++;
+ // Before continuing to process other rows, we need to first clean the
fail parsed row.
+ for (int i = 0; i < block.columns(); ++i) {
+ auto column = block.get_by_position(i).column->assume_mutable();
+ if (column->size() > num_rows) {
+ column->pop_back(column->size() - num_rows);
+ }
+ }
+
+ return Status::OK();
+}
+
Status NewJsonReader::_simdjson_handle_simple_json(RuntimeState* /*state*/,
Block& block,
const
std::vector<SlotDescriptor*>& slot_descs,
bool* is_empty_row, bool*
eof) {
// simple json
+ size_t size = 0;
+ simdjson::error_code error;
+ size_t num_rows = block.rows();
+ try {
+ // step1: get and parse buf to get json doc
+ RETURN_IF_ERROR(_simdjson_parse_json(&size, is_empty_row, eof,
&error));
+ if (size == 0 || *eof) {
+ *is_empty_row = true;
+ return Status::OK();
+ }
+
+ // step2: get json value by json doc
+ Status st = _get_json_value(&size, eof, &error, is_empty_row);
+ if (st.is<DATA_QUALITY_ERROR>()) {
+ return Status::OK();
+ }
+ RETURN_IF_ERROR(st);
+ if (*is_empty_row) {
+ return Status::OK();
+ }
+
+ // step 3: write columns by json value
+ RETURN_IF_ERROR(
+ _simdjson_handle_simple_json_write_columns(block, slot_descs,
is_empty_row, eof));
+ } catch (simdjson::simdjson_error& e) {
+ RETURN_IF_ERROR(_handle_simdjson_error(e, block, num_rows, eof));
+ if (*_scanner_eof) {
+ // When _scanner_eof is true and valid is false, it means that we
have encountered
+ // unqualified data and decided to stop the scan.
+ *is_empty_row = true;
+ return Status::OK();
+ }
+ }
+
+ return Status::OK();
+}
+
+Status NewJsonReader::_simdjson_handle_simple_json_write_columns(
+ Block& block, const std::vector<SlotDescriptor*>& slot_descs, bool*
is_empty_row,
+ bool* eof) {
simdjson::ondemand::object objectValue;
size_t num_rows = block.rows();
- do {
- bool valid = false;
- size_t size = 0;
- simdjson::error_code error;
- try {
- if (_next_row >= _total_rows) { // parse json and generic document
- RETURN_IF_ERROR(_simdjson_parse_json(&size, is_empty_row, eof,
&error));
- if (size == 0 || *eof) {
+ bool valid = false;
+ try {
+ if (_json_value.type() == simdjson::ondemand::json_type::array) {
+ _array = _json_value.get_array();
+ if (_array.count_elements() == 0) {
+ // may be passing an empty json, such as "[]"
+ RETURN_IF_ERROR(_append_error_msg(nullptr, "Empty json line",
"", nullptr));
+ if (*_scanner_eof) {
*is_empty_row = true;
return Status::OK();
}
-
- Status st = _get_json_value(&size, eof, &error, is_empty_row);
- if (st.is<DATA_QUALITY_ERROR>()) {
- continue; // continue to read next
- }
- RETURN_IF_ERROR(st);
- if (*is_empty_row) {
- return Status::OK();
- }
- if (_json_value.type() ==
simdjson::ondemand::json_type::array) {
- _array = _json_value.get_array();
- _array_iter = _array.begin();
-
- _total_rows = _array.count_elements();
- if (_total_rows == 0) {
- // may be passing an empty json, such as "[]"
- RETURN_IF_ERROR(_append_error_msg(nullptr, "Empty json
line", "", nullptr));
- if (*_scanner_eof) {
- *is_empty_row = true;
- return Status::OK();
- }
- continue;
- }
- } else {
- _total_rows = 1; // only one row
- objectValue = _json_value;
- }
- _next_row = 0;
+ return Status::OK();
}
- if (_json_value.type() == simdjson::ondemand::json_type::array) {
// handle case 1
+ _array_iter = _array.begin();
+ while (true) {
objectValue = *_array_iter;
RETURN_IF_ERROR(
_simdjson_set_column_value(&objectValue, block,
slot_descs, &valid));
+ if (!valid) {
+ if (*_scanner_eof) {
+ // When _scanner_eof is true and valid is false, it
means that we have encountered
+ // unqualified data and decided to stop the scan.
+ *is_empty_row = true;
+ return Status::OK();
+ }
+ }
+ ++_array_iter;
if (_array_iter == _array.end()) {
// Hint to read next json doc
- _next_row = _total_rows + 1;
break;
}
- ++_array_iter;
- } else { // handle case 2
- // objectValue = _json_value.get_object();
- RETURN_IF_ERROR(
- _simdjson_set_column_value(&objectValue, block,
slot_descs, &valid));
}
- _next_row++;
+ } else {
+ objectValue = _json_value;
+ RETURN_IF_ERROR(_simdjson_set_column_value(&objectValue, block,
slot_descs, &valid));
if (!valid) {
if (*_scanner_eof) {
- // When _scanner_eof is true and valid is false, it means
that we have encountered
- // unqualified data and decided to stop the scan.
*is_empty_row = true;
return Status::OK();
}
- continue;
}
*is_empty_row = false;
- break; // get a valid row, then break
- } catch (simdjson::simdjson_error& e) {
- // prevent from endless loop
- _next_row = _total_rows + 1;
- fmt::memory_buffer error_msg;
- fmt::format_to(error_msg, "Parse json data failed. code: {}, error
info: {}", e.error(),
- e.what());
- RETURN_IF_ERROR(_state->append_error_msg_to_file(
- [&]() -> std::string {
- return
std::string(_simdjson_ondemand_padding_buffer.data(),
- _original_doc_size);
- },
- [&]() -> std::string { return fmt::to_string(error_msg);
}, eof));
- _counter->num_rows_filtered++;
- // Before continuing to process other rows, we need to first clean
the fail parsed row.
- for (int i = 0; i < block.columns(); ++i) {
- auto column =
block.get_by_position(i).column->assume_mutable();
- if (column->size() > num_rows) {
- column->pop_back(column->size() - num_rows);
- }
- }
- if (!valid) {
- if (*_scanner_eof) {
- // When _scanner_eof is true and valid is false, it means
that we have encountered
- // unqualified data and decided to stop the scan.
- *is_empty_row = true;
- return Status::OK();
- }
- continue;
+ }
+ } catch (simdjson::simdjson_error& e) {
+ RETURN_IF_ERROR(_handle_simdjson_error(e, block, num_rows, eof));
+ if (!valid) {
+ if (*_scanner_eof) {
+ *is_empty_row = true;
+ return Status::OK();
}
- continue;
}
- } while (_next_row <= _total_rows);
+ }
return Status::OK();
}
Status NewJsonReader::_simdjson_handle_flat_array_complex_json(
RuntimeState* /*state*/, Block& block, const
std::vector<SlotDescriptor*>& slot_descs,
bool* is_empty_row, bool* eof) {
+ // array complex json
+ size_t size = 0;
+ simdjson::error_code error;
+ size_t num_rows = block.rows();
+ try {
+ // step1: get and parse buf to get json doc
+ RETURN_IF_ERROR(_simdjson_parse_json(&size, is_empty_row, eof,
&error));
+ if (size == 0 || *eof) {
+ *is_empty_row = true;
+ return Status::OK();
+ }
+
+ // step2: get json value by json doc
+ Status st = _get_json_value(&size, eof, &error, is_empty_row);
+ if (st.is<DATA_QUALITY_ERROR>()) {
+ return Status::OK();
+ }
+ RETURN_IF_ERROR(st);
+ if (*is_empty_row) {
+ return Status::OK();
+ }
+
+ // step 3: write columns by json value
+
RETURN_IF_ERROR(_simdjson_handle_flat_array_complex_json_write_columns(block,
slot_descs,
+
is_empty_row, eof));
+ } catch (simdjson::simdjson_error& e) {
+ RETURN_IF_ERROR(_handle_simdjson_error(e, block, num_rows, eof));
+ if (*_scanner_eof) {
+ // When _scanner_eof is true and valid is false, it means that we
have encountered
+ // unqualified data and decided to stop the scan.
+ *is_empty_row = true;
+ return Status::OK();
+ }
+ }
+
+ return Status::OK();
+}
+
+Status NewJsonReader::_simdjson_handle_flat_array_complex_json_write_columns(
+ Block& block, const std::vector<SlotDescriptor*>& slot_descs, bool*
is_empty_row,
+ bool* eof) {
// Advance one row in array list, if it is the endpoint, stop advance and
break the loop
#define ADVANCE_ROW() \
+ ++_array_iter; \
if (_array_iter == _array.end()) { \
- _next_row = _total_rows + 1; \
break; \
- } \
- ++_array_iter; \
- ++_next_row;
+ }
- // array complex json
- size_t num_rows = block.rows();
simdjson::ondemand::object cur;
- do {
- size_t size = 0;
- simdjson::error_code error;
- try {
- if (_next_row >= _total_rows) {
- RETURN_IF_ERROR(_simdjson_parse_json(&size, is_empty_row, eof,
&error));
- if (size == 0 || *eof) {
- *is_empty_row = true;
- return Status::OK();
- }
- Status st = _get_json_value(&size, eof, &error, is_empty_row);
- if (st.is<DATA_QUALITY_ERROR>()) {
- continue; // continue to read next
- }
- RETURN_IF_ERROR(st);
- if (*is_empty_row) {
- if (st.ok()) {
- return st;
- }
- if (_total_rows == 0) {
- continue;
- }
- }
- _array = _json_value.get_array();
- _array_iter = _array.begin();
- }
+ size_t num_rows = block.rows();
+ try {
+ bool valid = true;
+ _array = _json_value.get_array();
+ _array_iter = _array.begin();
- bool valid = true;
+ while (true) {
cur = (*_array_iter).get_object();
// extract root
if (!_parsed_json_root.empty()) {
@@ -1187,36 +1218,17 @@ Status
NewJsonReader::_simdjson_handle_flat_array_complex_json(
continue; // process next line
}
*is_empty_row = false;
- break; // get a valid row, then break
- } catch (simdjson::simdjson_error& e) {
- // prevent from endless loop
- _next_row = _total_rows + 1;
- fmt::memory_buffer error_msg;
- fmt::format_to(error_msg, "Parse json data failed. code: {}, error
info: {}", e.error(),
- e.what());
- RETURN_IF_ERROR(_state->append_error_msg_to_file(
- [&]() -> std::string {
- return
std::string(_simdjson_ondemand_padding_buffer.data(),
- _original_doc_size);
- },
- [&]() -> std::string { return fmt::to_string(error_msg);
}, eof));
- _counter->num_rows_filtered++;
- // Before continuing to process other rows, we need to first clean
the fail parsed row.
- for (int i = 0; i < block.columns(); ++i) {
- auto column =
block.get_by_position(i).column->assume_mutable();
- if (column->size() > num_rows) {
- column->pop_back(column->size() - num_rows);
- }
- }
- if (*_scanner_eof) {
- // When _scanner_eof is true and valid is false, it means that
we have encountered
- // unqualified data and decided to stop the scan.
- *is_empty_row = true;
- return Status::OK();
- }
- continue;
}
- } while (_next_row <= _total_rows);
+ } catch (simdjson::simdjson_error& e) {
+ RETURN_IF_ERROR(_handle_simdjson_error(e, block, num_rows, eof));
+ if (*_scanner_eof) {
+ // When _scanner_eof is true and valid is false, it means that we
have encountered
+ // unqualified data and decided to stop the scan.
+ *is_empty_row = true;
+ return Status::OK();
+ }
+ }
+
return Status::OK();
}
@@ -1269,23 +1281,7 @@ Status
NewJsonReader::_simdjson_handle_nested_complex_json(
}
break; // read a valid row
} catch (simdjson::simdjson_error& e) {
- fmt::memory_buffer error_msg;
- fmt::format_to(error_msg, "Parse json data failed. code: {}, error
info: {}", e.error(),
- e.what());
- RETURN_IF_ERROR(_state->append_error_msg_to_file(
- [&]() -> std::string {
- return
std::string(_simdjson_ondemand_padding_buffer.data(),
- _original_doc_size);
- },
- [&]() -> std::string { return fmt::to_string(error_msg);
}, eof));
- _counter->num_rows_filtered++;
- // Before continuing to process other rows, we need to first clean
the fail parsed row.
- for (int i = 0; i < block.columns(); ++i) {
- auto column =
block.get_by_position(i).column->assume_mutable();
- if (column->size() > num_rows) {
- column->pop_back(column->size() - num_rows);
- }
- }
+ RETURN_IF_ERROR(_handle_simdjson_error(e, block, num_rows, eof));
if (*_scanner_eof) {
// When _scanner_eof is true and valid is false, it means that
we have encountered
// unqualified data and decided to stop the scan.
diff --git a/be/src/vec/exec/format/json/new_json_reader.h
b/be/src/vec/exec/format/json/new_json_reader.h
index 33a0ae0ba58..92c36c3b283 100644
--- a/be/src/vec/exec/format/json/new_json_reader.h
+++ b/be/src/vec/exec/format/json/new_json_reader.h
@@ -147,14 +147,25 @@ private:
bool* is_empty_row);
Status _judge_empty_row(size_t size, bool eof, bool* is_empty_row);
+ Status _handle_simdjson_error(simdjson::simdjson_error& error, Block&
block, size_t num_rows,
+ bool* eof);
+
Status _simdjson_handle_simple_json(RuntimeState* state, Block& block,
const std::vector<SlotDescriptor*>&
slot_descs,
bool* is_empty_row, bool* eof);
+ Status _simdjson_handle_simple_json_write_columns(
+ Block& block, const std::vector<SlotDescriptor*>& slot_descs,
bool* is_empty_row,
+ bool* eof);
+
Status _simdjson_handle_flat_array_complex_json(RuntimeState* state,
Block& block,
const
std::vector<SlotDescriptor*>& slot_descs,
bool* is_empty_row, bool*
eof);
+ Status _simdjson_handle_flat_array_complex_json_write_columns(
+ Block& block, const std::vector<SlotDescriptor*>& slot_descs,
bool* is_empty_row,
+ bool* eof);
+
Status _simdjson_handle_nested_complex_json(RuntimeState* state, Block&
block,
const
std::vector<SlotDescriptor*>& slot_descs,
bool* is_empty_row, bool* eof);
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]