This is an automated email from the ASF dual-hosted git repository.

dataroaring pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/doris.git


The following commit(s) were added to refs/heads/master by this push:
     new b19f275714f [improvement](insert) refactor group commit insert into 
(#25795)
b19f275714f is described below

commit b19f275714f270c598589fbb039e0f8a682b21f0
Author: meiyi <[email protected]>
AuthorDate: Fri Nov 3 12:02:40 2023 +0800

    [improvement](insert) refactor group commit insert into (#25795)
---
 be/src/pipeline/pipeline_task.cpp                  |  31 +++--
 be/src/runtime/group_commit_mgr.cpp                | 142 +--------------------
 be/src/runtime/group_commit_mgr.h                  |  22 ----
 be/src/runtime/plan_fragment_executor.cpp          |  32 +++--
 be/src/service/internal_service.cpp                | 119 ++++++++++-------
 be/src/service/internal_service.h                  |   5 +-
 be/src/vec/sink/group_commit_block_sink.cpp        |   5 +-
 .../apache/doris/analysis/NativeInsertStmt.java    |  66 ++++++----
 .../java/org/apache/doris/qe/StmtExecutor.java     |   4 +-
 gensrc/proto/internal_service.proto                |   8 +-
 .../insert_group_commit_into_duplicate.groovy      |  18 +++
 11 files changed, 181 insertions(+), 271 deletions(-)

diff --git a/be/src/pipeline/pipeline_task.cpp 
b/be/src/pipeline/pipeline_task.cpp
index 111259062cf..72552b48468 100644
--- a/be/src/pipeline/pipeline_task.cpp
+++ b/be/src/pipeline/pipeline_task.cpp
@@ -256,6 +256,18 @@ Status PipelineTask::execute(bool* eos) {
         }
     }
 
+    auto status = Status::OK();
+    auto handle_group_commit = [&]() {
+        if (UNLIKELY(_fragment_context->is_group_commit() && !status.ok() && 
_block != nullptr)) {
+            auto* future_block = 
dynamic_cast<vectorized::FutureBlock*>(_block.get());
+            std::unique_lock<doris::Mutex> l(*(future_block->lock));
+            if (!future_block->is_handled()) {
+                future_block->set_result(status, 0, 0);
+                future_block->cv->notify_all();
+            }
+        }
+    };
+
     this->set_begin_execute_time();
     while (!_fragment_context->is_canceled()) {
         if (_data_state != SourceState::MORE_DATA && !source_can_read()) {
@@ -279,7 +291,11 @@ Status PipelineTask::execute(bool* eos) {
         {
             SCOPED_TIMER(_get_block_timer);
             _get_block_counter->update(1);
-            RETURN_IF_ERROR(_root->get_block(_state, block, _data_state));
+            status = _root->get_block(_state, block, _data_state);
+            if (UNLIKELY(!status.ok())) {
+                handle_group_commit();
+                return status;
+            }
         }
         *eos = _data_state == SourceState::FINISHED;
 
@@ -289,17 +305,8 @@ Status PipelineTask::execute(bool* eos) {
                 _collect_query_statistics_with_every_batch) {
                 RETURN_IF_ERROR(_collect_query_statistics());
             }
-            auto status = _sink->sink(_state, block, _data_state);
-            if (UNLIKELY(!status.ok() || block->rows() == 0)) {
-                if (_fragment_context->is_group_commit()) {
-                    auto* future_block = 
dynamic_cast<vectorized::FutureBlock*>(block);
-                    std::unique_lock<doris::Mutex> l(*(future_block->lock));
-                    if (!future_block->is_handled()) {
-                        future_block->set_result(status, 0, 0);
-                        future_block->cv->notify_all();
-                    }
-                }
-            }
+            status = _sink->sink(_state, block, _data_state);
+            handle_group_commit();
             if (!status.is<ErrorCode::END_OF_FILE>()) {
                 RETURN_IF_ERROR(status);
             }
diff --git a/be/src/runtime/group_commit_mgr.cpp 
b/be/src/runtime/group_commit_mgr.cpp
index 718d7f19aa4..02e484fd778 100644
--- a/be/src/runtime/group_commit_mgr.cpp
+++ b/be/src/runtime/group_commit_mgr.cpp
@@ -17,35 +17,18 @@
 
 #include "runtime/group_commit_mgr.h"
 
-#include <gen_cpp/Descriptors_types.h>
-#include <gen_cpp/FrontendService.h>
-#include <gen_cpp/HeartbeatService.h>
-#include <gen_cpp/PaloInternalService_types.h>
 #include <gen_cpp/Types_types.h>
 
-#include <memory>
-#include <numeric>
-
 #include "client_cache.h"
 #include "common/config.h"
-#include "common/object_pool.h"
-#include "exec/data_sink.h"
-#include "io/fs/stream_load_pipe.h"
 #include "olap/wal_manager.h"
 #include "runtime/exec_env.h"
 #include "runtime/fragment_mgr.h"
 #include "runtime/runtime_state.h"
-#include "runtime/stream_load/new_load_stream_mgr.h"
-#include "runtime/stream_load/stream_load_context.h"
 #include "util/thrift_rpc_helper.h"
-#include "vec/core/future_block.h"
-#include "vec/exec/scan/new_file_scan_node.h"
-#include "vec/sink/group_commit_block_sink.h"
 
 namespace doris {
 
-class TPlan;
-
 Status LoadBlockQueue::add_block(std::shared_ptr<vectorized::FutureBlock> 
block) {
     DCHECK(block->get_schema_version() == schema_version);
     std::unique_lock l(*_mutex);
@@ -203,7 +186,7 @@ Status GroupCommitTable::get_first_block_load_queue(
 Status GroupCommitTable::_create_group_commit_load(
         std::shared_ptr<LoadBlockQueue>& load_block_queue) {
     Status st = Status::OK();
-    std::unique_ptr<int, std::function<void(int*)>> 
remove_pipe_func((int*)0x01, [&](int*) {
+    std::unique_ptr<int, std::function<void(int*)>> 
finish_plan_func((int*)0x01, [&](int*) {
         if (!st.ok()) {
             std::unique_lock l(_lock);
             _need_plan_fragment = false;
@@ -408,10 +391,6 @@ Status GroupCommitTable::get_load_block_queue(const 
TUniqueId& instance_id,
 }
 
 GroupCommitMgr::GroupCommitMgr(ExecEnv* exec_env) : _exec_env(exec_env) {
-    static_cast<void>(ThreadPoolBuilder("InsertIntoGroupCommitThreadPool")
-                              
.set_min_threads(config::group_commit_insert_threads)
-                              
.set_max_threads(config::group_commit_insert_threads)
-                              .build(&_insert_into_thread_pool));
     static_cast<void>(ThreadPoolBuilder("GroupCommitThreadPool")
                               .set_min_threads(1)
                               
.set_max_threads(config::group_commit_insert_threads)
@@ -424,129 +403,10 @@ GroupCommitMgr::~GroupCommitMgr() {
 }
 
 void GroupCommitMgr::stop() {
-    _insert_into_thread_pool->shutdown();
     _thread_pool->shutdown();
     LOG(INFO) << "GroupCommitMgr is stopped";
 }
 
-Status GroupCommitMgr::group_commit_insert(int64_t table_id, const TPlan& plan,
-                                           const TDescriptorTable& tdesc_tbl,
-                                           const TScanRangeParams& 
scan_range_params,
-                                           const PGroupCommitInsertRequest* 
request,
-                                           PGroupCommitInsertResponse* 
response) {
-    auto& nodes = plan.nodes;
-    DCHECK(nodes.size() > 0);
-    auto& plan_node = nodes.at(0);
-
-    TUniqueId load_id;
-    load_id.__set_hi(request->load_id().hi());
-    load_id.__set_lo(request->load_id().lo());
-
-    std::vector<std::shared_ptr<doris::vectorized::FutureBlock>> future_blocks;
-    {
-        // 1. Prepare a pipe, then append rows to pipe,
-        // then scan node scans from the pipe, like stream load.
-        std::shared_ptr<LoadBlockQueue> load_block_queue;
-        auto pipe = std::make_shared<io::StreamLoadPipe>(
-                io::kMaxPipeBufferedBytes /* max_buffered_bytes */, 64 * 1024 
/* min_chunk_size */,
-                -1 /* total_length */, true /* use_proto */);
-        std::shared_ptr<StreamLoadContext> ctx = 
std::make_shared<StreamLoadContext>(_exec_env);
-        ctx->pipe = pipe;
-        RETURN_IF_ERROR(_exec_env->new_load_stream_mgr()->put(load_id, ctx));
-        std::unique_ptr<int, std::function<void(int*)>> 
remove_pipe_func((int*)0x01, [&](int*) {
-            if (load_block_queue != nullptr) {
-                load_block_queue->remove_load_id(load_id);
-            }
-            _exec_env->new_load_stream_mgr()->remove(load_id);
-        });
-        static_cast<void>(_insert_into_thread_pool->submit_func(
-                std::bind<void>(&GroupCommitMgr::_append_row, this, pipe, 
request)));
-
-        // 2. FileScanNode consumes data from the pipe.
-        std::unique_ptr<RuntimeState> runtime_state = 
RuntimeState::create_unique();
-        TQueryOptions query_options;
-        query_options.query_type = TQueryType::LOAD;
-        TQueryGlobals query_globals;
-        static_cast<void>(runtime_state->init(load_id, query_options, 
query_globals, _exec_env));
-        
runtime_state->set_query_mem_tracker(std::make_shared<MemTrackerLimiter>(
-                MemTrackerLimiter::Type::LOAD, fmt::format("Load#Id={}", 
print_id(load_id)), -1));
-        DescriptorTbl* desc_tbl = nullptr;
-        RETURN_IF_ERROR(DescriptorTbl::create(runtime_state->obj_pool(), 
tdesc_tbl, &desc_tbl));
-        runtime_state->set_desc_tbl(desc_tbl);
-        auto file_scan_node =
-                vectorized::NewFileScanNode(runtime_state->obj_pool(), 
plan_node, *desc_tbl);
-        std::unique_ptr<int, std::function<void(int*)>> 
close_scan_node_func((int*)0x01, [&](int*) {
-            static_cast<void>(file_scan_node.close(runtime_state.get()));
-        });
-        // TFileFormatType::FORMAT_PROTO, TFileType::FILE_STREAM, set 
_range.load_id
-        RETURN_IF_ERROR(file_scan_node.init(plan_node, runtime_state.get()));
-        RETURN_IF_ERROR(file_scan_node.prepare(runtime_state.get()));
-        std::vector<TScanRangeParams> params_vector;
-        params_vector.emplace_back(scan_range_params);
-        file_scan_node.set_scan_ranges(runtime_state.get(), params_vector);
-        RETURN_IF_ERROR(file_scan_node.open(runtime_state.get()));
-
-        // 3. Put the block into block queue.
-        std::unique_ptr<doris::vectorized::Block> _block =
-                doris::vectorized::Block::create_unique();
-        bool eof = false;
-        while (!eof) {
-            // TODO what to do if read one block error
-            RETURN_IF_ERROR(file_scan_node.get_next(runtime_state.get(), 
_block.get(), &eof));
-            std::shared_ptr<doris::vectorized::FutureBlock> future_block =
-                    std::make_shared<doris::vectorized::FutureBlock>();
-            future_block->swap(*(_block.get()));
-            future_block->set_info(request->base_schema_version(), load_id);
-            if (load_block_queue == nullptr) {
-                RETURN_IF_ERROR(get_first_block_load_queue(request->db_id(), 
table_id, future_block,
-                                                           load_block_queue));
-                response->set_label(load_block_queue->label);
-                response->set_txn_id(load_block_queue->txn_id);
-            }
-            // TODO what to do if add one block error
-            if (future_block->rows() > 0) {
-                future_blocks.emplace_back(future_block);
-            }
-            RETURN_IF_ERROR(load_block_queue->add_block(future_block));
-        }
-        if (!runtime_state->get_error_log_file_path().empty()) {
-            LOG(INFO) << "id=" << print_id(load_id)
-                      << ", url=" << runtime_state->get_error_log_file_path()
-                      << ", load rows=" << runtime_state->num_rows_load_total()
-                      << ", filter rows=" << 
runtime_state->num_rows_load_filtered()
-                      << ", unselect rows=" << 
runtime_state->num_rows_load_unselected()
-                      << ", success rows=" << 
runtime_state->num_rows_load_success();
-        }
-    }
-    int64_t total_rows = 0;
-    int64_t loaded_rows = 0;
-    // 4. wait to wal
-    for (const auto& future_block : future_blocks) {
-        std::unique_lock<doris::Mutex> l(*(future_block->lock));
-        if (!future_block->is_handled()) {
-            future_block->cv->wait(l);
-        }
-        // future_block->get_status()
-        total_rows += future_block->get_total_rows();
-        loaded_rows += future_block->get_loaded_rows();
-    }
-    response->set_loaded_rows(loaded_rows);
-    response->set_filtered_rows(total_rows - loaded_rows);
-    return Status::OK();
-}
-
-Status GroupCommitMgr::_append_row(std::shared_ptr<io::StreamLoadPipe> pipe,
-                                   const PGroupCommitInsertRequest* request) {
-    for (int i = 0; i < request->data().size(); ++i) {
-        std::unique_ptr<PDataRow> row(new PDataRow());
-        row->CopyFrom(request->data(i));
-        // TODO append may error when pipe is cancelled
-        RETURN_IF_ERROR(pipe->append(std::move(row)));
-    }
-    static_cast<void>(pipe->finish());
-    return Status::OK();
-}
-
 Status GroupCommitMgr::get_first_block_load_queue(
         int64_t db_id, int64_t table_id, 
std::shared_ptr<vectorized::FutureBlock> block,
         std::shared_ptr<LoadBlockQueue>& load_block_queue) {
diff --git a/be/src/runtime/group_commit_mgr.h 
b/be/src/runtime/group_commit_mgr.h
index 9bbbc5da619..4983811233a 100644
--- a/be/src/runtime/group_commit_mgr.h
+++ b/be/src/runtime/group_commit_mgr.h
@@ -23,23 +23,15 @@
 #include <memory>
 
 #include "common/status.h"
-#include "io/fs/stream_load_pipe.h"
 #include "util/lock.h"
 #include "util/threadpool.h"
-#include "util/thrift_util.h"
 #include "vec/core/block.h"
 #include "vec/core/future_block.h"
 
 namespace doris {
 class ExecEnv;
-class TPlan;
-class TDescriptorTable;
 class TUniqueId;
-class TExecPlanFragmentParams;
-class ObjectPool;
 class RuntimeState;
-class StreamLoadContext;
-class StreamLoadPipe;
 
 class LoadBlockQueue {
 public:
@@ -131,13 +123,6 @@ public:
 
     void stop();
 
-    // insert into
-    Status group_commit_insert(int64_t table_id, const TPlan& plan,
-                               const TDescriptorTable& desc_tbl,
-                               const TScanRangeParams& scan_range_params,
-                               const PGroupCommitInsertRequest* request,
-                               PGroupCommitInsertResponse* response);
-
     // used when init group_commit_scan_node
     Status get_load_block_queue(int64_t table_id, const TUniqueId& instance_id,
                                 std::shared_ptr<LoadBlockQueue>& 
load_block_queue);
@@ -146,18 +131,11 @@ public:
                                       std::shared_ptr<LoadBlockQueue>& 
load_block_queue);
 
 private:
-    // used by insert into
-    Status _append_row(std::shared_ptr<io::StreamLoadPipe> pipe,
-                       const PGroupCommitInsertRequest* request);
-
     ExecEnv* _exec_env;
 
     doris::Mutex _lock;
     // TODO remove table when unused
     std::unordered_map<int64_t, std::shared_ptr<GroupCommitTable>> _table_map;
-
-    // thread pool to handle insert into: append data to pipe
-    std::unique_ptr<doris::ThreadPool> _insert_into_thread_pool;
     std::unique_ptr<doris::ThreadPool> _thread_pool;
     // memory consumption of all tables' load block queues, used for back 
pressure.
     std::shared_ptr<std::atomic_size_t> _all_block_queues_bytes;
diff --git a/be/src/runtime/plan_fragment_executor.cpp 
b/be/src/runtime/plan_fragment_executor.cpp
index a58744a5692..9344378e91e 100644
--- a/be/src/runtime/plan_fragment_executor.cpp
+++ b/be/src/runtime/plan_fragment_executor.cpp
@@ -332,9 +332,25 @@ Status PlanFragmentExecutor::open_vectorized_internal() {
                               : doris::vectorized::Block::create_unique();
         bool eos = false;
 
+        auto st = Status::OK();
+        auto handle_group_commit = [&]() {
+            if (UNLIKELY(_group_commit && !st.ok() && block != nullptr)) {
+                auto* future_block = 
dynamic_cast<vectorized::FutureBlock*>(block.get());
+                std::unique_lock<doris::Mutex> l(*(future_block->lock));
+                if (!future_block->is_handled()) {
+                    future_block->set_result(st, 0, 0);
+                    future_block->cv->notify_all();
+                }
+            }
+        };
+
         while (!eos) {
             RETURN_IF_CANCELLED(_runtime_state);
-            RETURN_IF_ERROR(get_vectorized_internal(block.get(), &eos));
+            st = get_vectorized_internal(block.get(), &eos);
+            if (UNLIKELY(!st.ok())) {
+                handle_group_commit();
+                return st;
+            }
 
             // Collect this plan and sub plan statistics, and send to parent 
plan.
             if (_collect_query_statistics_with_every_batch) {
@@ -342,23 +358,13 @@ Status PlanFragmentExecutor::open_vectorized_internal() {
             }
 
             if (!eos || block->rows() > 0) {
-                auto st = _sink->send(runtime_state(), block.get());
+                st = _sink->send(runtime_state(), block.get());
                 //TODO: Asynchronisation need refactor this
                 if (st.is<NEED_SEND_AGAIN>()) { // created partition, do it 
again.
                     st = _sink->send(runtime_state(), block.get());
                     DCHECK(!st.is<NEED_SEND_AGAIN>());
                 }
-                if (UNLIKELY(!st.ok() || block->rows() == 0)) {
-                    // Used for group commit insert
-                    if (_group_commit) {
-                        auto* future_block = 
dynamic_cast<vectorized::FutureBlock*>(block.get());
-                        std::unique_lock<doris::Mutex> 
l(*(future_block->lock));
-                        if (!future_block->is_handled()) {
-                            future_block->set_result(st, 0, 0);
-                            future_block->cv->notify_all();
-                        }
-                    }
-                }
+                handle_group_commit();
                 if (st.is<END_OF_FILE>()) {
                     break;
                 }
diff --git a/be/src/service/internal_service.cpp 
b/be/src/service/internal_service.cpp
index f80f4ddb5e0..cfaef4895b3 100644
--- a/be/src/service/internal_service.cpp
+++ b/be/src/service/internal_service.cpp
@@ -86,7 +86,6 @@
 #include "runtime/exec_env.h"
 #include "runtime/fold_constant_executor.h"
 #include "runtime/fragment_mgr.h"
-#include "runtime/group_commit_mgr.h"
 #include "runtime/load_channel_mgr.h"
 #include "runtime/load_stream_mgr.h"
 #include "runtime/result_buffer_mgr.h"
@@ -498,9 +497,9 @@ void 
PInternalServiceImpl::tablet_writer_cancel(google::protobuf::RpcController*
     }
 }
 
-Status PInternalServiceImpl::_exec_plan_fragment_impl(const std::string& 
ser_request,
-                                                      PFragmentRequestVersion 
version,
-                                                      bool compact) {
+Status PInternalServiceImpl::_exec_plan_fragment_impl(
+        const std::string& ser_request, PFragmentRequestVersion version, bool 
compact,
+        const std::function<void(RuntimeState*, Status*)>& cb) {
     // Sometimes the BE do not receive the first heartbeat message and it 
receives request from FE
     // If BE execute this fragment, it will core when it wants to get some 
property from master info.
     if (ExecEnv::GetInstance()->master_info() == nullptr) {
@@ -516,7 +515,11 @@ Status 
PInternalServiceImpl::_exec_plan_fragment_impl(const std::string& ser_req
             uint32_t len = ser_request.size();
             RETURN_IF_ERROR(deserialize_thrift_msg(buf, &len, compact, 
&t_request));
         }
-        return _exec_env->fragment_mgr()->exec_plan_fragment(t_request);
+        if (cb) {
+            return _exec_env->fragment_mgr()->exec_plan_fragment(t_request, 
cb);
+        } else {
+            return _exec_env->fragment_mgr()->exec_plan_fragment(t_request);
+        }
     } else if (version == PFragmentRequestVersion::VERSION_2) {
         TExecPlanFragmentParamsList t_request;
         {
@@ -526,7 +529,11 @@ Status 
PInternalServiceImpl::_exec_plan_fragment_impl(const std::string& ser_req
         }
 
         for (const TExecPlanFragmentParams& params : t_request.paramsList) {
-            
RETURN_IF_ERROR(_exec_env->fragment_mgr()->exec_plan_fragment(params));
+            if (cb) {
+                
RETURN_IF_ERROR(_exec_env->fragment_mgr()->exec_plan_fragment(params, cb));
+            } else {
+                
RETURN_IF_ERROR(_exec_env->fragment_mgr()->exec_plan_fragment(params));
+            }
         }
         return Status::OK();
     } else if (version == PFragmentRequestVersion::VERSION_3) {
@@ -538,7 +545,11 @@ Status 
PInternalServiceImpl::_exec_plan_fragment_impl(const std::string& ser_req
         }
 
         for (const TPipelineFragmentParams& params : t_request.params_list) {
-            
RETURN_IF_ERROR(_exec_env->fragment_mgr()->exec_plan_fragment(params));
+            if (cb) {
+                
RETURN_IF_ERROR(_exec_env->fragment_mgr()->exec_plan_fragment(params, cb));
+            } else {
+                
RETURN_IF_ERROR(_exec_env->fragment_mgr()->exec_plan_fragment(params));
+            }
         }
         return Status::OK();
     } else {
@@ -1785,53 +1796,65 @@ void 
PInternalServiceImpl::group_commit_insert(google::protobuf::RpcController*
                                                const 
PGroupCommitInsertRequest* request,
                                                PGroupCommitInsertResponse* 
response,
                                                google::protobuf::Closure* 
done) {
-    bool ret = _light_work_pool.try_offer([this, request, response, done]() {
+    TUniqueId load_id;
+    load_id.__set_hi(request->load_id().hi());
+    load_id.__set_lo(request->load_id().lo());
+    bool ret = _light_work_pool.try_offer([this, request, response, done, 
load_id]() {
         brpc::ClosureGuard closure_guard(done);
-        auto table_id = request->table_id();
-        Status st = Status::OK();
-        TPlan plan;
-        {
-            auto& plan_node = request->plan_node();
-            const uint8_t* buf = (const uint8_t*)plan_node.data();
-            uint32_t len = plan_node.size();
-            st = deserialize_thrift_msg(buf, &len, false, &plan);
-            if (UNLIKELY(!st.ok())) {
-                LOG(WARNING) << "deserialize plan failed, msg=" << st;
-                response->mutable_status()->set_status_code(st.code());
-                response->mutable_status()->set_error_msgs(0, st.to_string());
-                return;
-            }
-        }
-        TDescriptorTable tdesc_tbl;
-        {
-            auto& desc_tbl = request->desc_tbl();
-            const uint8_t* buf = (const uint8_t*)desc_tbl.data();
-            uint32_t len = desc_tbl.size();
-            st = deserialize_thrift_msg(buf, &len, false, &tdesc_tbl);
-            if (UNLIKELY(!st.ok())) {
-                LOG(WARNING) << "deserialize desc tbl failed, msg=" << st;
-                response->mutable_status()->set_status_code(st.code());
-                response->mutable_status()->set_error_msgs(0, st.to_string());
-                return;
+        std::shared_ptr<StreamLoadContext> ctx = 
std::make_shared<StreamLoadContext>(_exec_env);
+        auto pipe = std::make_shared<io::StreamLoadPipe>(
+                io::kMaxPipeBufferedBytes /* max_buffered_bytes */, 64 * 1024 
/* min_chunk_size */,
+                -1 /* total_length */, true /* use_proto */);
+        ctx->pipe = pipe;
+        Status st = _exec_env->new_load_stream_mgr()->put(load_id, ctx);
+        if (st.ok()) {
+            doris::Mutex mutex;
+            doris::ConditionVariable cv;
+            bool handled = false;
+            try {
+                st = _exec_plan_fragment_impl(
+                        request->exec_plan_fragment_request().request(),
+                        request->exec_plan_fragment_request().version(),
+                        request->exec_plan_fragment_request().compact(),
+                        [&](RuntimeState* state, Status* status) {
+                            response->set_label(state->import_label());
+                            response->set_txn_id(state->wal_id());
+                            
response->set_loaded_rows(state->num_rows_load_success());
+                            
response->set_filtered_rows(state->num_rows_load_filtered());
+                            st = *status;
+                            std::unique_lock l(mutex);
+                            handled = true;
+                            cv.notify_one();
+                        });
+            } catch (const Exception& e) {
+                st = e.to_status();
+            } catch (...) {
+                st = Status::Error(ErrorCode::INTERNAL_ERROR,
+                                   "_exec_plan_fragment_impl meet unknown 
error");
             }
-        }
-        TScanRangeParams tscan_range_params;
-        {
-            auto& bytes = request->scan_range_params();
-            const uint8_t* buf = (const uint8_t*)bytes.data();
-            uint32_t len = bytes.size();
-            st = deserialize_thrift_msg(buf, &len, false, &tscan_range_params);
-            if (UNLIKELY(!st.ok())) {
-                LOG(WARNING) << "deserialize scan range failed, msg=" << st;
-                response->mutable_status()->set_status_code(st.code());
-                response->mutable_status()->set_error_msgs(0, st.to_string());
-                return;
+            if (!st.ok()) {
+                LOG(WARNING) << "exec plan fragment failed, errmsg=" << st;
+            } else {
+                for (int i = 0; i < request->data().size(); ++i) {
+                    std::unique_ptr<PDataRow> row(new PDataRow());
+                    row->CopyFrom(request->data(i));
+                    st = pipe->append(std::move(row));
+                    if (!st.ok()) {
+                        break;
+                    }
+                }
+                if (st.ok()) {
+                    static_cast<void>(pipe->finish());
+                    std::unique_lock l(mutex);
+                    if (!handled) {
+                        cv.wait(l);
+                    }
+                }
             }
         }
-        st = _exec_env->group_commit_mgr()->group_commit_insert(
-                table_id, plan, tdesc_tbl, tscan_range_params, request, 
response);
         st.to_protobuf(response->mutable_status());
     });
+    _exec_env->new_load_stream_mgr()->remove(load_id);
     if (!ret) {
         offer_failed(response, done, _light_work_pool);
         return;
diff --git a/be/src/service/internal_service.h 
b/be/src/service/internal_service.h
index d5712e654cf..3ef14b6c265 100644
--- a/be/src/service/internal_service.h
+++ b/be/src/service/internal_service.h
@@ -38,6 +38,7 @@ class ExecEnv;
 class PHandShakeRequest;
 class PHandShakeResponse;
 class LoadStreamMgr;
+class RuntimeState;
 
 class PInternalServiceImpl : public PBackendService {
 public:
@@ -211,7 +212,9 @@ private:
                                         google::protobuf::Closure* done);
 
     Status _exec_plan_fragment_impl(const std::string& s_request, 
PFragmentRequestVersion version,
-                                    bool compact);
+                                    bool compact,
+                                    const std::function<void(RuntimeState*, 
Status*)>& cb =
+                                            std::function<void(RuntimeState*, 
Status*)>());
 
     Status _fold_constant_expr(const std::string& ser_request, 
PConstantExprResult* response);
 
diff --git a/be/src/vec/sink/group_commit_block_sink.cpp 
b/be/src/vec/sink/group_commit_block_sink.cpp
index d7df3a2e698..dcef9dccec6 100644
--- a/be/src/vec/sink/group_commit_block_sink.cpp
+++ b/be/src/vec/sink/group_commit_block_sink.cpp
@@ -94,7 +94,7 @@ Status GroupCommitBlockSink::close(RuntimeState* state, 
Status close_status) {
         if (!future_block->is_handled()) {
             future_block->cv->wait(l);
         }
-        // future_block->get_status()
+        RETURN_IF_ERROR(future_block->get_status());
         loaded_rows += future_block->get_loaded_rows();
         total_rows += future_block->get_total_rows();
     }
@@ -159,8 +159,9 @@ Status GroupCommitBlockSink::_add_block(RuntimeState* state,
         state->set_import_label(_load_block_queue->label);
         state->set_wal_id(_load_block_queue->txn_id);
     }
+    RETURN_IF_ERROR(_load_block_queue->add_block(future_block));
     _future_blocks.emplace_back(future_block);
-    return _load_block_queue->add_block(future_block);
+    return Status::OK();
 }
 
 } // namespace vectorized
diff --git 
a/fe/fe-core/src/main/java/org/apache/doris/analysis/NativeInsertStmt.java 
b/fe/fe-core/src/main/java/org/apache/doris/analysis/NativeInsertStmt.java
index 9f371fbd031..68bec33a5e1 100644
--- a/fe/fe-core/src/main/java/org/apache/doris/analysis/NativeInsertStmt.java
+++ b/fe/fe-core/src/main/java/org/apache/doris/analysis/NativeInsertStmt.java
@@ -53,18 +53,18 @@ import org.apache.doris.planner.GroupCommitOlapTableSink;
 import org.apache.doris.planner.OlapTableSink;
 import org.apache.doris.planner.StreamLoadPlanner;
 import org.apache.doris.planner.external.jdbc.JdbcTableSink;
+import org.apache.doris.proto.InternalService;
 import org.apache.doris.qe.ConnectContext;
 import org.apache.doris.rewrite.ExprRewriter;
 import org.apache.doris.service.FrontendOptions;
 import org.apache.doris.tablefunction.GroupCommitTableValuedFunction;
 import org.apache.doris.task.StreamLoadTask;
 import org.apache.doris.thrift.TExecPlanFragmentParams;
+import org.apache.doris.thrift.TExecPlanFragmentParamsList;
 import org.apache.doris.thrift.TFileCompressType;
 import org.apache.doris.thrift.TFileFormatType;
 import org.apache.doris.thrift.TFileType;
 import org.apache.doris.thrift.TMergeType;
-import org.apache.doris.thrift.TPlan;
-import org.apache.doris.thrift.TPlanFragment;
 import org.apache.doris.thrift.TQueryOptions;
 import org.apache.doris.thrift.TScanRangeParams;
 import org.apache.doris.thrift.TStreamLoadPutRequest;
@@ -162,9 +162,7 @@ public class NativeInsertStmt extends InsertStmt {
     private boolean isGroupCommit = false;
     private int baseSchemaVersion = -1;
     private TUniqueId loadId = null;
-    private ByteString planBytes = null;
-    private ByteString tableBytes = null;
-    private ByteString rangeBytes = null;
+    private ByteString execPlanFragmentParamsBytes = null;
     private long tableId = -1;
     // true if be generates an insert from group commit tvf stmt and executes 
to load data
     public boolean isGroupCommitTvf = false;
@@ -1074,10 +1072,35 @@ public class NativeInsertStmt extends InsertStmt {
     }
 
     private void analyzeGroupCommit() {
-        if (ConnectContext.get().getSessionVariable().enableInsertGroupCommit 
&& targetTable instanceof OlapTable
+        if (ConnectContext.get().getSessionVariable().enableInsertGroupCommit
+                && targetTable instanceof OlapTable
                 && !ConnectContext.get().isTxnModel()
                 && getQueryStmt() instanceof SelectStmt
-                && ((SelectStmt) getQueryStmt()).getTableRefs().isEmpty() && 
targetPartitionNames == null) {
+                && ((SelectStmt) getQueryStmt()).getTableRefs().isEmpty() && 
targetPartitionNames == null
+                && (label == null || 
Strings.isNullOrEmpty(label.getLabelName()))
+                && (analyzer == null || analyzer != null && 
!analyzer.isReAnalyze())) {
+            SelectStmt selectStmt = (SelectStmt) queryStmt;
+            if (selectStmt.getValueList() != null) {
+                for (List<Expr> row : selectStmt.getValueList().getRows()) {
+                    for (Expr expr : row) {
+                        if (!expr.isLiteralOrCastExpr()) {
+                            return;
+                        }
+                    }
+                }
+            } else {
+                SelectList selectList = selectStmt.getSelectList();
+                if (selectList != null) {
+                    List<SelectListItem> items = selectList.getItems();
+                    if (items != null) {
+                        for (SelectListItem item : items) {
+                            if (item.getExpr() != null && 
!item.getExpr().isLiteralOrCastExpr()) {
+                                return;
+                            }
+                        }
+                    }
+                }
+            }
             isGroupCommit = true;
         }
     }
@@ -1088,7 +1111,7 @@ public class NativeInsertStmt extends InsertStmt {
 
     public void planForGroupCommit(TUniqueId queryId) throws UserException, 
TException {
         OlapTable olapTable = (OlapTable) getTargetTable();
-        if (planBytes != null && olapTable.getBaseSchemaVersion() == 
baseSchemaVersion) {
+        if (execPlanFragmentParamsBytes != null && 
olapTable.getBaseSchemaVersion() == baseSchemaVersion) {
             return;
         }
         if (!targetColumns.isEmpty()) {
@@ -1110,9 +1133,6 @@ public class NativeInsertStmt extends InsertStmt {
         StreamLoadPlanner planner = new StreamLoadPlanner((Database) 
getDbObj(), olapTable, streamLoadTask);
         // Will using load id as query id in fragment
         TExecPlanFragmentParams tRequest = 
planner.plan(streamLoadTask.getId());
-        DescriptorTable descTable = planner.getDescTable();
-        TPlanFragment fragment = tRequest.getFragment();
-        TPlan plan = fragment.getPlan();
         for (Map.Entry<Integer, List<TScanRangeParams>> entry : 
tRequest.params.per_node_scan_ranges.entrySet()) {
             for (TScanRangeParams scanRangeParams : entry.getValue()) {
                 
scanRangeParams.scan_range.ext_scan_range.file_scan_range.params.setFormatType(
@@ -1126,32 +1146,26 @@ public class NativeInsertStmt extends InsertStmt {
         Preconditions.checkState(scanRangeParams.size() == 1);
         // save plan message to be reused for prepare stmt
         loadId = queryId;
-        planBytes = ByteString.copyFrom(new TSerializer().serialize(plan));
-        tableBytes = ByteString.copyFrom(new 
TSerializer().serialize(descTable.toThrift()));
-        rangeBytes = ByteString.copyFrom(new 
TSerializer().serialize(scanRangeParams.get(0)));
         baseSchemaVersion = olapTable.getBaseSchemaVersion();
+        // see BackendServiceProxy#execPlanFragmentsAsync
+        TExecPlanFragmentParamsList paramsList = new 
TExecPlanFragmentParamsList();
+        paramsList.addToParamsList(tRequest);
+        execPlanFragmentParamsBytes = ByteString.copyFrom(new 
TSerializer().serialize(paramsList));
     }
 
-    public TUniqueId getLoadId() {
-        return loadId;
-    }
-
-    public ByteString getPlanBytes() {
-        return planBytes;
+    public InternalService.PExecPlanFragmentRequest 
getExecPlanFragmentRequest() {
+        return 
InternalService.PExecPlanFragmentRequest.newBuilder().setRequest(execPlanFragmentParamsBytes)
+                
.setCompact(false).setVersion(InternalService.PFragmentRequestVersion.VERSION_2).build();
     }
 
-    public ByteString getTableBytes() {
-        return tableBytes;
+    public TUniqueId getLoadId() {
+        return loadId;
     }
 
     public int getBaseSchemaVersion() {
         return baseSchemaVersion;
     }
 
-    public ByteString getRangeBytes() {
-        return rangeBytes;
-    }
-
     public void setIsFromDeleteOrUpdateStmt(boolean isFromDeleteOrUpdateStmt) {
         this.isFromDeleteOrUpdateStmt = isFromDeleteOrUpdateStmt;
     }
diff --git a/fe/fe-core/src/main/java/org/apache/doris/qe/StmtExecutor.java 
b/fe/fe-core/src/main/java/org/apache/doris/qe/StmtExecutor.java
index 09afe499435..82d6188bfce 100644
--- a/fe/fe-core/src/main/java/org/apache/doris/qe/StmtExecutor.java
+++ b/fe/fe-core/src/main/java/org/apache/doris/qe/StmtExecutor.java
@@ -1845,10 +1845,8 @@ public class StmtExecutor {
                 PGroupCommitInsertRequest request = 
PGroupCommitInsertRequest.newBuilder()
                         
.setDbId(insertStmt.getTargetTable().getDatabase().getId())
                         .setTableId(insertStmt.getTargetTable().getId())
-                        .setDescTbl(nativeInsertStmt.getTableBytes())
                         
.setBaseSchemaVersion(nativeInsertStmt.getBaseSchemaVersion())
-                        .setPlanNode(nativeInsertStmt.getPlanBytes())
-                        .setScanRangeParams(nativeInsertStmt.getRangeBytes())
+                        
.setExecPlanFragmentRequest(nativeInsertStmt.getExecPlanFragmentRequest())
                         
.setLoadId(Types.PUniqueId.newBuilder().setHi(loadId.hi).setLo(loadId.lo)
                                 .build()).addAllData(rows)
                         .build();
diff --git a/gensrc/proto/internal_service.proto 
b/gensrc/proto/internal_service.proto
index 5881e5bf2a4..99f4464fd59 100644
--- a/gensrc/proto/internal_service.proto
+++ b/gensrc/proto/internal_service.proto
@@ -720,16 +720,18 @@ message PGroupCommitInsertRequest {
     optional int64 db_id = 1;
     optional int64 table_id = 2;
     //  Descriptors.TDescriptorTable
-    optional bytes desc_tbl = 3;
+    // optional bytes desc_tbl = 3;
     optional int64 base_schema_version = 4;
 
     // TExecPlanFragmentParams -> TPlanFragment -> PlanNodes.TPlan
-    optional bytes plan_node = 5;
+    // optional bytes plan_node = 5;
     // TScanRangeParams
-    optional bytes scan_range_params = 6;
+    // optional bytes scan_range_params = 6;
 
     optional PUniqueId load_id = 7;
     repeated PDataRow data = 8;
+    // TExecPlanFragmentParams
+    optional PExecPlanFragmentRequest exec_plan_fragment_request = 9;
 }
 
 message PGroupCommitInsertResponse {
diff --git 
a/regression-test/suites/insert_p0/insert_group_commit_into_duplicate.groovy 
b/regression-test/suites/insert_p0/insert_group_commit_into_duplicate.groovy
index c5ee05ac1e4..9fbc6aaf365 100644
--- a/regression-test/suites/insert_p0/insert_group_commit_into_duplicate.groovy
+++ b/regression-test/suites/insert_p0/insert_group_commit_into_duplicate.groovy
@@ -67,6 +67,17 @@ suite("insert_group_commit_into_duplicate") {
         assertTrue(serverInfo.contains("'label':'group_commit_"))
     }
 
+    def none_group_commit_insert = { sql, expected_row_count ->
+        def stmt = prepareStatement """ ${sql}  """
+        def result = stmt.executeUpdate()
+        logger.info("insert result: " + result)
+        def serverInfo = (((StatementImpl) stmt).results).getServerInfo()
+        logger.info("result server info: " + serverInfo)
+        assertEquals(result, expected_row_count)
+        assertTrue(serverInfo.contains("'status':'VISIBLE'"))
+        assertTrue(!serverInfo.contains("'label':'group_commit_"))
+    }
+
     try {
         // create table
         sql """ drop table if exists ${table}; """
@@ -174,6 +185,13 @@ suite("insert_group_commit_into_duplicate") {
 
             getRowCount(20)
             qt_sql """ select name, score from ${table} order by name asc; """
+
+            none_group_commit_insert """ insert into ${table}(id, name, score) 
values(10 + 1, 'h', 100);  """, 1
+            none_group_commit_insert """ insert into ${table}(id, name, score) 
select 10 + 2, 'h', 100;  """, 1
+            none_group_commit_insert """ insert into ${table} with label 
test_gc_""" + System.currentTimeMillis() + """ (id, name, score) values(13, 
'h', 100);  """, 1
+            def rowCount = sql "select count(*) from ${table}"
+            logger.info("row count: " + rowCount)
+            assertEquals(rowCount[0][0], 23)
         }
     } finally {
         // try_sql("DROP TABLE ${table}")


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]


Reply via email to