This is an automated email from the ASF dual-hosted git repository.
yiguolei pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/doris.git
The following commit(s) were added to refs/heads/master by this push:
new 6fe060b79ed [fix](streamload) fix http_stream retry mechanism (#24978)
6fe060b79ed is described below
commit 6fe060b79ed77e8549bd2e32e668b0599483ffaf
Author: zzzzzzzs <[email protected]>
AuthorDate: Sun Oct 8 11:16:21 2023 +0800
[fix](streamload) fix http_stream retry mechanism (#24978)
If a failure occurs, doris may retry. Due to ctx->is_read_schema is a
global variable that has not been reset in a timely manner, which may cause
exceptions.
---------
Co-authored-by: yiguolei <[email protected]>
---
be/src/http/action/http_stream.cpp | 7 ++++---
be/src/io/file_factory.cpp | 6 +++---
be/src/io/file_factory.h | 2 +-
be/src/runtime/stream_load/stream_load_context.h | 1 -
be/src/service/internal_service.cpp | 8 --------
be/src/vec/exec/format/csv/csv_reader.cpp | 7 +++++--
be/src/vec/exec/format/json/new_json_reader.cpp | 12 +++++++-----
be/src/vec/exec/format/json/new_json_reader.h | 2 +-
8 files changed, 21 insertions(+), 24 deletions(-)
diff --git a/be/src/http/action/http_stream.cpp
b/be/src/http/action/http_stream.cpp
index 1b59bf0b6bb..e215543e1ed 100644
--- a/be/src/http/action/http_stream.cpp
+++ b/be/src/http/action/http_stream.cpp
@@ -245,13 +245,13 @@ void HttpStreamAction::on_chunk_data(HttpRequest* req) {
if (ctx->schema_buffer->pos + remove_bytes <
config::stream_tvf_buffer_size) {
ctx->schema_buffer->put_bytes(bb->ptr, remove_bytes);
} else {
- ctx->need_schema = true;
+ LOG(INFO) << "use a portion of data to request fe to obtain
column information";
ctx->is_read_schema = false;
ctx->status = _process_put(req, ctx);
}
}
- if (!st.ok()) {
+ if (!st.ok() && !ctx->status.ok()) {
LOG(WARNING) << "append body content failed. errmsg=" << st << ",
" << ctx->brief();
ctx->status = st;
return;
@@ -260,7 +260,8 @@ void HttpStreamAction::on_chunk_data(HttpRequest* req) {
}
// after all the data has been read and it has not reached 1M, it will
execute here
if (ctx->is_read_schema) {
- ctx->need_schema = true;
+ LOG(INFO) << "after all the data has been read and it has not reached
1M, it will execute "
+ << "here";
ctx->is_read_schema = false;
ctx->status = _process_put(req, ctx);
}
diff --git a/be/src/io/file_factory.cpp b/be/src/io/file_factory.cpp
index 12037c1b62e..afbf18c189a 100644
--- a/be/src/io/file_factory.cpp
+++ b/be/src/io/file_factory.cpp
@@ -141,12 +141,13 @@ Status FileFactory::create_file_reader(const
io::FileSystemProperties& system_pr
// file scan node/stream load pipe
Status FileFactory::create_pipe_reader(const TUniqueId& load_id,
io::FileReaderSPtr* file_reader,
- RuntimeState* runtime_state) {
+ RuntimeState* runtime_state, bool
need_schema) {
auto stream_load_ctx =
ExecEnv::GetInstance()->new_load_stream_mgr()->get(load_id);
if (!stream_load_ctx) {
return Status::InternalError("unknown stream load id: {}",
UniqueId(load_id).to_string());
}
- if (stream_load_ctx->need_schema == true) {
+ if (need_schema == true) {
+ // Here, a portion of the data is processed to parse column information
auto pipe = std::make_shared<io::StreamLoadPipe>(
io::kMaxPipeBufferedBytes /* max_buffered_bytes */, 64 * 1024
/* min_chunk_size */,
stream_load_ctx->schema_buffer->pos /* total_length */);
@@ -154,7 +155,6 @@ Status FileFactory::create_pipe_reader(const TUniqueId&
load_id, io::FileReaderS
static_cast<void>(pipe->append(stream_load_ctx->schema_buffer));
static_cast<void>(pipe->finish());
*file_reader = std::move(pipe);
- stream_load_ctx->need_schema = false;
} else {
*file_reader = stream_load_ctx->pipe;
}
diff --git a/be/src/io/file_factory.h b/be/src/io/file_factory.h
index 06bf8e0cc4c..7c51118fc5b 100644
--- a/be/src/io/file_factory.h
+++ b/be/src/io/file_factory.h
@@ -85,7 +85,7 @@ public:
// Create FileReader for stream load pipe
static Status create_pipe_reader(const TUniqueId& load_id,
io::FileReaderSPtr* file_reader,
- RuntimeState* runtime_state);
+ RuntimeState* runtime_state, bool
need_schema);
static Status create_hdfs_reader(const THdfsParams& hdfs_params, const
io::FileDescription& fd,
const io::FileReaderOptions&
reader_options,
diff --git a/be/src/runtime/stream_load/stream_load_context.h
b/be/src/runtime/stream_load/stream_load_context.h
index ab8cc6be044..ffbed37fe3a 100644
--- a/be/src/runtime/stream_load/stream_load_context.h
+++ b/be/src/runtime/stream_load/stream_load_context.h
@@ -165,7 +165,6 @@ public:
int64_t txn_id = default_txn_id;
// http stream
- bool need_schema = false;
bool is_read_schema = true;
std::string txn_operation = "";
diff --git a/be/src/service/internal_service.cpp
b/be/src/service/internal_service.cpp
index 4e770cd9b8e..02ab1b4450a 100644
--- a/be/src/service/internal_service.cpp
+++ b/be/src/service/internal_service.cpp
@@ -682,14 +682,6 @@ void
PInternalServiceImpl::fetch_table_schema(google::protobuf::RpcController* c
st.to_protobuf(result->mutable_status());
return;
}
- if (params.file_type == TFileType::FILE_STREAM) {
- auto stream_load_ctx =
-
ExecEnv::GetInstance()->new_load_stream_mgr()->get(params.load_id);
- if (!stream_load_ctx) {
- st = Status::InternalError("unknown stream load id: {}",
-
UniqueId(params.load_id).to_string());
- }
- }
result->set_column_nums(col_names.size());
for (size_t idx = 0; idx < col_names.size(); ++idx) {
result->add_column_names(col_names[idx]);
diff --git a/be/src/vec/exec/format/csv/csv_reader.cpp
b/be/src/vec/exec/format/csv/csv_reader.cpp
index bdfc685a3a3..2382cb92de4 100644
--- a/be/src/vec/exec/format/csv/csv_reader.cpp
+++ b/be/src/vec/exec/format/csv/csv_reader.cpp
@@ -294,7 +294,8 @@ Status CsvReader::init_reader(bool is_load) {
}
if (_params.file_type == TFileType::FILE_STREAM) {
- RETURN_IF_ERROR(FileFactory::create_pipe_reader(_range.load_id,
&_file_reader, _state));
+ RETURN_IF_ERROR(
+ FileFactory::create_pipe_reader(_range.load_id, &_file_reader,
_state, false));
} else {
_file_description.mtime = _range.__isset.modification_time ?
_range.modification_time : 0;
io::FileReaderOptions reader_options =
@@ -824,7 +825,9 @@ Status CsvReader::_prepare_parse(size_t* read_line, bool*
is_parse_name) {
io::FileReaderOptions reader_options =
FileFactory::get_reader_options(_state, _file_description);
if (_params.file_type == TFileType::FILE_STREAM) {
- RETURN_IF_ERROR(FileFactory::create_pipe_reader(_params.load_id,
&_file_reader, _state));
+ // Due to http_stream needs to pre read a portion of the data to parse
column information, so it is set to true here
+ RETURN_IF_ERROR(
+ FileFactory::create_pipe_reader(_params.load_id,
&_file_reader, _state, true));
} else {
RETURN_IF_ERROR(FileFactory::create_file_reader(_system_properties,
_file_description,
reader_options,
&_file_system,
diff --git a/be/src/vec/exec/format/json/new_json_reader.cpp
b/be/src/vec/exec/format/json/new_json_reader.cpp
index 6ad72de22ab..86b58fdc0ac 100644
--- a/be/src/vec/exec/format/json/new_json_reader.cpp
+++ b/be/src/vec/exec/format/json/new_json_reader.cpp
@@ -170,7 +170,7 @@ Status NewJsonReader::init_reader(
}
RETURN_IF_ERROR(_get_range_params());
- RETURN_IF_ERROR(_open_file_reader());
+ RETURN_IF_ERROR(_open_file_reader(false));
if (_read_json_by_line) {
RETURN_IF_ERROR(_open_line_reader());
}
@@ -237,7 +237,7 @@ Status
NewJsonReader::get_parsed_schema(std::vector<std::string>* col_names,
std::vector<TypeDescriptor>*
col_types) {
RETURN_IF_ERROR(_get_range_params());
- RETURN_IF_ERROR(_open_file_reader());
+ RETURN_IF_ERROR(_open_file_reader(true));
if (_read_json_by_line) {
RETURN_IF_ERROR(_open_line_reader());
}
@@ -373,7 +373,7 @@ Status NewJsonReader::_get_range_params() {
return Status::OK();
}
-Status NewJsonReader::_open_file_reader() {
+Status NewJsonReader::_open_file_reader(bool need_schema) {
int64_t start_offset = _range.start_offset;
if (start_offset != 0) {
start_offset -= 1;
@@ -382,7 +382,9 @@ Status NewJsonReader::_open_file_reader() {
_current_offset = start_offset;
if (_params.file_type == TFileType::FILE_STREAM) {
- RETURN_IF_ERROR(FileFactory::create_pipe_reader(_range.load_id,
&_file_reader, _state));
+ // Due to http_stream needs to pre read a portion of the data to parse
column information, so it is set to true here
+ RETURN_IF_ERROR(FileFactory::create_pipe_reader(_range.load_id,
&_file_reader, _state,
+ need_schema));
} else {
_file_description.mtime = _range.__isset.modification_time ?
_range.modification_time : 0;
io::FileReaderOptions reader_options =
@@ -978,7 +980,7 @@ Status
NewJsonReader::_read_one_message(std::unique_ptr<uint8_t[]>* file_buf, si
Status NewJsonReader::_simdjson_init_reader() {
RETURN_IF_ERROR(_get_range_params());
- RETURN_IF_ERROR(_open_file_reader());
+ RETURN_IF_ERROR(_open_file_reader(false));
if (_read_json_by_line) {
RETURN_IF_ERROR(_open_line_reader());
}
diff --git a/be/src/vec/exec/format/json/new_json_reader.h
b/be/src/vec/exec/format/json/new_json_reader.h
index 2f4acf91c66..f40fe4c9f92 100644
--- a/be/src/vec/exec/format/json/new_json_reader.h
+++ b/be/src/vec/exec/format/json/new_json_reader.h
@@ -98,7 +98,7 @@ private:
Status _get_range_params();
void _init_system_properties();
void _init_file_description();
- Status _open_file_reader();
+ Status _open_file_reader(bool need_schema);
Status _open_line_reader();
Status _parse_jsonpath_and_json_root();
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]