This is an automated email from the ASF dual-hosted git repository.

yiguolei pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/doris.git


The following commit(s) were added to refs/heads/master by this push:
     new 6fe060b79ed [fix](streamload) fix http_stream retry mechanism (#24978)
6fe060b79ed is described below

commit 6fe060b79ed77e8549bd2e32e668b0599483ffaf
Author: zzzzzzzs <[email protected]>
AuthorDate: Sun Oct 8 11:16:21 2023 +0800

    [fix](streamload) fix http_stream retry mechanism (#24978)
    
    
    If a failure occurs, doris may retry. Due to ctx->is_read_schema is a 
global variable that has not been reset in a timely manner, which may cause 
exceptions.
    
    
    ---------
    
    Co-authored-by: yiguolei <[email protected]>
---
 be/src/http/action/http_stream.cpp               |  7 ++++---
 be/src/io/file_factory.cpp                       |  6 +++---
 be/src/io/file_factory.h                         |  2 +-
 be/src/runtime/stream_load/stream_load_context.h |  1 -
 be/src/service/internal_service.cpp              |  8 --------
 be/src/vec/exec/format/csv/csv_reader.cpp        |  7 +++++--
 be/src/vec/exec/format/json/new_json_reader.cpp  | 12 +++++++-----
 be/src/vec/exec/format/json/new_json_reader.h    |  2 +-
 8 files changed, 21 insertions(+), 24 deletions(-)

diff --git a/be/src/http/action/http_stream.cpp 
b/be/src/http/action/http_stream.cpp
index 1b59bf0b6bb..e215543e1ed 100644
--- a/be/src/http/action/http_stream.cpp
+++ b/be/src/http/action/http_stream.cpp
@@ -245,13 +245,13 @@ void HttpStreamAction::on_chunk_data(HttpRequest* req) {
             if (ctx->schema_buffer->pos + remove_bytes < 
config::stream_tvf_buffer_size) {
                 ctx->schema_buffer->put_bytes(bb->ptr, remove_bytes);
             } else {
-                ctx->need_schema = true;
+                LOG(INFO) << "use a portion of data to request fe to obtain 
column information";
                 ctx->is_read_schema = false;
                 ctx->status = _process_put(req, ctx);
             }
         }
 
-        if (!st.ok()) {
+        if (!st.ok() && !ctx->status.ok()) {
             LOG(WARNING) << "append body content failed. errmsg=" << st << ", 
" << ctx->brief();
             ctx->status = st;
             return;
@@ -260,7 +260,8 @@ void HttpStreamAction::on_chunk_data(HttpRequest* req) {
     }
     // after all the data has been read and it has not reached 1M, it will 
execute here
     if (ctx->is_read_schema) {
-        ctx->need_schema = true;
+        LOG(INFO) << "after all the data has been read and it has not reached 
1M, it will execute "
+                  << "here";
         ctx->is_read_schema = false;
         ctx->status = _process_put(req, ctx);
     }
diff --git a/be/src/io/file_factory.cpp b/be/src/io/file_factory.cpp
index 12037c1b62e..afbf18c189a 100644
--- a/be/src/io/file_factory.cpp
+++ b/be/src/io/file_factory.cpp
@@ -141,12 +141,13 @@ Status FileFactory::create_file_reader(const 
io::FileSystemProperties& system_pr
 
 // file scan node/stream load pipe
 Status FileFactory::create_pipe_reader(const TUniqueId& load_id, 
io::FileReaderSPtr* file_reader,
-                                       RuntimeState* runtime_state) {
+                                       RuntimeState* runtime_state, bool 
need_schema) {
     auto stream_load_ctx = 
ExecEnv::GetInstance()->new_load_stream_mgr()->get(load_id);
     if (!stream_load_ctx) {
         return Status::InternalError("unknown stream load id: {}", 
UniqueId(load_id).to_string());
     }
-    if (stream_load_ctx->need_schema == true) {
+    if (need_schema == true) {
+        // Here, a portion of the data is processed to parse column information
         auto pipe = std::make_shared<io::StreamLoadPipe>(
                 io::kMaxPipeBufferedBytes /* max_buffered_bytes */, 64 * 1024 
/* min_chunk_size */,
                 stream_load_ctx->schema_buffer->pos /* total_length */);
@@ -154,7 +155,6 @@ Status FileFactory::create_pipe_reader(const TUniqueId& 
load_id, io::FileReaderS
         static_cast<void>(pipe->append(stream_load_ctx->schema_buffer));
         static_cast<void>(pipe->finish());
         *file_reader = std::move(pipe);
-        stream_load_ctx->need_schema = false;
     } else {
         *file_reader = stream_load_ctx->pipe;
     }
diff --git a/be/src/io/file_factory.h b/be/src/io/file_factory.h
index 06bf8e0cc4c..7c51118fc5b 100644
--- a/be/src/io/file_factory.h
+++ b/be/src/io/file_factory.h
@@ -85,7 +85,7 @@ public:
 
     // Create FileReader for stream load pipe
     static Status create_pipe_reader(const TUniqueId& load_id, 
io::FileReaderSPtr* file_reader,
-                                     RuntimeState* runtime_state);
+                                     RuntimeState* runtime_state, bool 
need_schema);
 
     static Status create_hdfs_reader(const THdfsParams& hdfs_params, const 
io::FileDescription& fd,
                                      const io::FileReaderOptions& 
reader_options,
diff --git a/be/src/runtime/stream_load/stream_load_context.h 
b/be/src/runtime/stream_load/stream_load_context.h
index ab8cc6be044..ffbed37fe3a 100644
--- a/be/src/runtime/stream_load/stream_load_context.h
+++ b/be/src/runtime/stream_load/stream_load_context.h
@@ -165,7 +165,6 @@ public:
     int64_t txn_id = default_txn_id;
 
     // http stream
-    bool need_schema = false;
     bool is_read_schema = true;
 
     std::string txn_operation = "";
diff --git a/be/src/service/internal_service.cpp 
b/be/src/service/internal_service.cpp
index 4e770cd9b8e..02ab1b4450a 100644
--- a/be/src/service/internal_service.cpp
+++ b/be/src/service/internal_service.cpp
@@ -682,14 +682,6 @@ void 
PInternalServiceImpl::fetch_table_schema(google::protobuf::RpcController* c
             st.to_protobuf(result->mutable_status());
             return;
         }
-        if (params.file_type == TFileType::FILE_STREAM) {
-            auto stream_load_ctx =
-                    
ExecEnv::GetInstance()->new_load_stream_mgr()->get(params.load_id);
-            if (!stream_load_ctx) {
-                st = Status::InternalError("unknown stream load id: {}",
-                                           
UniqueId(params.load_id).to_string());
-            }
-        }
         result->set_column_nums(col_names.size());
         for (size_t idx = 0; idx < col_names.size(); ++idx) {
             result->add_column_names(col_names[idx]);
diff --git a/be/src/vec/exec/format/csv/csv_reader.cpp 
b/be/src/vec/exec/format/csv/csv_reader.cpp
index bdfc685a3a3..2382cb92de4 100644
--- a/be/src/vec/exec/format/csv/csv_reader.cpp
+++ b/be/src/vec/exec/format/csv/csv_reader.cpp
@@ -294,7 +294,8 @@ Status CsvReader::init_reader(bool is_load) {
     }
 
     if (_params.file_type == TFileType::FILE_STREAM) {
-        RETURN_IF_ERROR(FileFactory::create_pipe_reader(_range.load_id, 
&_file_reader, _state));
+        RETURN_IF_ERROR(
+                FileFactory::create_pipe_reader(_range.load_id, &_file_reader, 
_state, false));
     } else {
         _file_description.mtime = _range.__isset.modification_time ? 
_range.modification_time : 0;
         io::FileReaderOptions reader_options =
@@ -824,7 +825,9 @@ Status CsvReader::_prepare_parse(size_t* read_line, bool* 
is_parse_name) {
     io::FileReaderOptions reader_options =
             FileFactory::get_reader_options(_state, _file_description);
     if (_params.file_type == TFileType::FILE_STREAM) {
-        RETURN_IF_ERROR(FileFactory::create_pipe_reader(_params.load_id, 
&_file_reader, _state));
+        // Due to http_stream needs to pre read a portion of the data to parse 
column information, so it is set to true here
+        RETURN_IF_ERROR(
+                FileFactory::create_pipe_reader(_params.load_id, 
&_file_reader, _state, true));
     } else {
         RETURN_IF_ERROR(FileFactory::create_file_reader(_system_properties, 
_file_description,
                                                         reader_options, 
&_file_system,
diff --git a/be/src/vec/exec/format/json/new_json_reader.cpp 
b/be/src/vec/exec/format/json/new_json_reader.cpp
index 6ad72de22ab..86b58fdc0ac 100644
--- a/be/src/vec/exec/format/json/new_json_reader.cpp
+++ b/be/src/vec/exec/format/json/new_json_reader.cpp
@@ -170,7 +170,7 @@ Status NewJsonReader::init_reader(
     }
     RETURN_IF_ERROR(_get_range_params());
 
-    RETURN_IF_ERROR(_open_file_reader());
+    RETURN_IF_ERROR(_open_file_reader(false));
     if (_read_json_by_line) {
         RETURN_IF_ERROR(_open_line_reader());
     }
@@ -237,7 +237,7 @@ Status 
NewJsonReader::get_parsed_schema(std::vector<std::string>* col_names,
                                         std::vector<TypeDescriptor>* 
col_types) {
     RETURN_IF_ERROR(_get_range_params());
 
-    RETURN_IF_ERROR(_open_file_reader());
+    RETURN_IF_ERROR(_open_file_reader(true));
     if (_read_json_by_line) {
         RETURN_IF_ERROR(_open_line_reader());
     }
@@ -373,7 +373,7 @@ Status NewJsonReader::_get_range_params() {
     return Status::OK();
 }
 
-Status NewJsonReader::_open_file_reader() {
+Status NewJsonReader::_open_file_reader(bool need_schema) {
     int64_t start_offset = _range.start_offset;
     if (start_offset != 0) {
         start_offset -= 1;
@@ -382,7 +382,9 @@ Status NewJsonReader::_open_file_reader() {
     _current_offset = start_offset;
 
     if (_params.file_type == TFileType::FILE_STREAM) {
-        RETURN_IF_ERROR(FileFactory::create_pipe_reader(_range.load_id, 
&_file_reader, _state));
+        // Due to http_stream needs to pre read a portion of the data to parse 
column information, so it is set to true here
+        RETURN_IF_ERROR(FileFactory::create_pipe_reader(_range.load_id, 
&_file_reader, _state,
+                                                        need_schema));
     } else {
         _file_description.mtime = _range.__isset.modification_time ? 
_range.modification_time : 0;
         io::FileReaderOptions reader_options =
@@ -978,7 +980,7 @@ Status 
NewJsonReader::_read_one_message(std::unique_ptr<uint8_t[]>* file_buf, si
 Status NewJsonReader::_simdjson_init_reader() {
     RETURN_IF_ERROR(_get_range_params());
 
-    RETURN_IF_ERROR(_open_file_reader());
+    RETURN_IF_ERROR(_open_file_reader(false));
     if (_read_json_by_line) {
         RETURN_IF_ERROR(_open_line_reader());
     }
diff --git a/be/src/vec/exec/format/json/new_json_reader.h 
b/be/src/vec/exec/format/json/new_json_reader.h
index 2f4acf91c66..f40fe4c9f92 100644
--- a/be/src/vec/exec/format/json/new_json_reader.h
+++ b/be/src/vec/exec/format/json/new_json_reader.h
@@ -98,7 +98,7 @@ private:
     Status _get_range_params();
     void _init_system_properties();
     void _init_file_description();
-    Status _open_file_reader();
+    Status _open_file_reader(bool need_schema);
     Status _open_line_reader();
     Status _parse_jsonpath_and_json_root();
 


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to