This is an automated email from the ASF dual-hosted git repository.
dataroaring pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/doris.git
The following commit(s) were added to refs/heads/master by this push:
new b19f275714f [improvement](insert) refactor group commit insert into
(#25795)
b19f275714f is described below
commit b19f275714f270c598589fbb039e0f8a682b21f0
Author: meiyi <[email protected]>
AuthorDate: Fri Nov 3 12:02:40 2023 +0800
[improvement](insert) refactor group commit insert into (#25795)
---
be/src/pipeline/pipeline_task.cpp | 31 +++--
be/src/runtime/group_commit_mgr.cpp | 142 +--------------------
be/src/runtime/group_commit_mgr.h | 22 ----
be/src/runtime/plan_fragment_executor.cpp | 32 +++--
be/src/service/internal_service.cpp | 119 ++++++++++-------
be/src/service/internal_service.h | 5 +-
be/src/vec/sink/group_commit_block_sink.cpp | 5 +-
.../apache/doris/analysis/NativeInsertStmt.java | 66 ++++++----
.../java/org/apache/doris/qe/StmtExecutor.java | 4 +-
gensrc/proto/internal_service.proto | 8 +-
.../insert_group_commit_into_duplicate.groovy | 18 +++
11 files changed, 181 insertions(+), 271 deletions(-)
diff --git a/be/src/pipeline/pipeline_task.cpp
b/be/src/pipeline/pipeline_task.cpp
index 111259062cf..72552b48468 100644
--- a/be/src/pipeline/pipeline_task.cpp
+++ b/be/src/pipeline/pipeline_task.cpp
@@ -256,6 +256,18 @@ Status PipelineTask::execute(bool* eos) {
}
}
+ auto status = Status::OK();
+ auto handle_group_commit = [&]() {
+ if (UNLIKELY(_fragment_context->is_group_commit() && !status.ok() &&
_block != nullptr)) {
+ auto* future_block =
dynamic_cast<vectorized::FutureBlock*>(_block.get());
+ std::unique_lock<doris::Mutex> l(*(future_block->lock));
+ if (!future_block->is_handled()) {
+ future_block->set_result(status, 0, 0);
+ future_block->cv->notify_all();
+ }
+ }
+ };
+
this->set_begin_execute_time();
while (!_fragment_context->is_canceled()) {
if (_data_state != SourceState::MORE_DATA && !source_can_read()) {
@@ -279,7 +291,11 @@ Status PipelineTask::execute(bool* eos) {
{
SCOPED_TIMER(_get_block_timer);
_get_block_counter->update(1);
- RETURN_IF_ERROR(_root->get_block(_state, block, _data_state));
+ status = _root->get_block(_state, block, _data_state);
+ if (UNLIKELY(!status.ok())) {
+ handle_group_commit();
+ return status;
+ }
}
*eos = _data_state == SourceState::FINISHED;
@@ -289,17 +305,8 @@ Status PipelineTask::execute(bool* eos) {
_collect_query_statistics_with_every_batch) {
RETURN_IF_ERROR(_collect_query_statistics());
}
- auto status = _sink->sink(_state, block, _data_state);
- if (UNLIKELY(!status.ok() || block->rows() == 0)) {
- if (_fragment_context->is_group_commit()) {
- auto* future_block =
dynamic_cast<vectorized::FutureBlock*>(block);
- std::unique_lock<doris::Mutex> l(*(future_block->lock));
- if (!future_block->is_handled()) {
- future_block->set_result(status, 0, 0);
- future_block->cv->notify_all();
- }
- }
- }
+ status = _sink->sink(_state, block, _data_state);
+ handle_group_commit();
if (!status.is<ErrorCode::END_OF_FILE>()) {
RETURN_IF_ERROR(status);
}
diff --git a/be/src/runtime/group_commit_mgr.cpp
b/be/src/runtime/group_commit_mgr.cpp
index 718d7f19aa4..02e484fd778 100644
--- a/be/src/runtime/group_commit_mgr.cpp
+++ b/be/src/runtime/group_commit_mgr.cpp
@@ -17,35 +17,18 @@
#include "runtime/group_commit_mgr.h"
-#include <gen_cpp/Descriptors_types.h>
-#include <gen_cpp/FrontendService.h>
-#include <gen_cpp/HeartbeatService.h>
-#include <gen_cpp/PaloInternalService_types.h>
#include <gen_cpp/Types_types.h>
-#include <memory>
-#include <numeric>
-
#include "client_cache.h"
#include "common/config.h"
-#include "common/object_pool.h"
-#include "exec/data_sink.h"
-#include "io/fs/stream_load_pipe.h"
#include "olap/wal_manager.h"
#include "runtime/exec_env.h"
#include "runtime/fragment_mgr.h"
#include "runtime/runtime_state.h"
-#include "runtime/stream_load/new_load_stream_mgr.h"
-#include "runtime/stream_load/stream_load_context.h"
#include "util/thrift_rpc_helper.h"
-#include "vec/core/future_block.h"
-#include "vec/exec/scan/new_file_scan_node.h"
-#include "vec/sink/group_commit_block_sink.h"
namespace doris {
-class TPlan;
-
Status LoadBlockQueue::add_block(std::shared_ptr<vectorized::FutureBlock>
block) {
DCHECK(block->get_schema_version() == schema_version);
std::unique_lock l(*_mutex);
@@ -203,7 +186,7 @@ Status GroupCommitTable::get_first_block_load_queue(
Status GroupCommitTable::_create_group_commit_load(
std::shared_ptr<LoadBlockQueue>& load_block_queue) {
Status st = Status::OK();
- std::unique_ptr<int, std::function<void(int*)>>
remove_pipe_func((int*)0x01, [&](int*) {
+ std::unique_ptr<int, std::function<void(int*)>>
finish_plan_func((int*)0x01, [&](int*) {
if (!st.ok()) {
std::unique_lock l(_lock);
_need_plan_fragment = false;
@@ -408,10 +391,6 @@ Status GroupCommitTable::get_load_block_queue(const
TUniqueId& instance_id,
}
GroupCommitMgr::GroupCommitMgr(ExecEnv* exec_env) : _exec_env(exec_env) {
- static_cast<void>(ThreadPoolBuilder("InsertIntoGroupCommitThreadPool")
-
.set_min_threads(config::group_commit_insert_threads)
-
.set_max_threads(config::group_commit_insert_threads)
- .build(&_insert_into_thread_pool));
static_cast<void>(ThreadPoolBuilder("GroupCommitThreadPool")
.set_min_threads(1)
.set_max_threads(config::group_commit_insert_threads)
@@ -424,129 +403,10 @@ GroupCommitMgr::~GroupCommitMgr() {
}
void GroupCommitMgr::stop() {
- _insert_into_thread_pool->shutdown();
_thread_pool->shutdown();
LOG(INFO) << "GroupCommitMgr is stopped";
}
-Status GroupCommitMgr::group_commit_insert(int64_t table_id, const TPlan& plan,
- const TDescriptorTable& tdesc_tbl,
- const TScanRangeParams&
scan_range_params,
- const PGroupCommitInsertRequest*
request,
- PGroupCommitInsertResponse*
response) {
- auto& nodes = plan.nodes;
- DCHECK(nodes.size() > 0);
- auto& plan_node = nodes.at(0);
-
- TUniqueId load_id;
- load_id.__set_hi(request->load_id().hi());
- load_id.__set_lo(request->load_id().lo());
-
- std::vector<std::shared_ptr<doris::vectorized::FutureBlock>> future_blocks;
- {
- // 1. Prepare a pipe, then append rows to pipe,
- // then scan node scans from the pipe, like stream load.
- std::shared_ptr<LoadBlockQueue> load_block_queue;
- auto pipe = std::make_shared<io::StreamLoadPipe>(
- io::kMaxPipeBufferedBytes /* max_buffered_bytes */, 64 * 1024
/* min_chunk_size */,
- -1 /* total_length */, true /* use_proto */);
- std::shared_ptr<StreamLoadContext> ctx =
std::make_shared<StreamLoadContext>(_exec_env);
- ctx->pipe = pipe;
- RETURN_IF_ERROR(_exec_env->new_load_stream_mgr()->put(load_id, ctx));
- std::unique_ptr<int, std::function<void(int*)>>
remove_pipe_func((int*)0x01, [&](int*) {
- if (load_block_queue != nullptr) {
- load_block_queue->remove_load_id(load_id);
- }
- _exec_env->new_load_stream_mgr()->remove(load_id);
- });
- static_cast<void>(_insert_into_thread_pool->submit_func(
- std::bind<void>(&GroupCommitMgr::_append_row, this, pipe,
request)));
-
- // 2. FileScanNode consumes data from the pipe.
- std::unique_ptr<RuntimeState> runtime_state =
RuntimeState::create_unique();
- TQueryOptions query_options;
- query_options.query_type = TQueryType::LOAD;
- TQueryGlobals query_globals;
- static_cast<void>(runtime_state->init(load_id, query_options,
query_globals, _exec_env));
-
runtime_state->set_query_mem_tracker(std::make_shared<MemTrackerLimiter>(
- MemTrackerLimiter::Type::LOAD, fmt::format("Load#Id={}",
print_id(load_id)), -1));
- DescriptorTbl* desc_tbl = nullptr;
- RETURN_IF_ERROR(DescriptorTbl::create(runtime_state->obj_pool(),
tdesc_tbl, &desc_tbl));
- runtime_state->set_desc_tbl(desc_tbl);
- auto file_scan_node =
- vectorized::NewFileScanNode(runtime_state->obj_pool(),
plan_node, *desc_tbl);
- std::unique_ptr<int, std::function<void(int*)>>
close_scan_node_func((int*)0x01, [&](int*) {
- static_cast<void>(file_scan_node.close(runtime_state.get()));
- });
- // TFileFormatType::FORMAT_PROTO, TFileType::FILE_STREAM, set
_range.load_id
- RETURN_IF_ERROR(file_scan_node.init(plan_node, runtime_state.get()));
- RETURN_IF_ERROR(file_scan_node.prepare(runtime_state.get()));
- std::vector<TScanRangeParams> params_vector;
- params_vector.emplace_back(scan_range_params);
- file_scan_node.set_scan_ranges(runtime_state.get(), params_vector);
- RETURN_IF_ERROR(file_scan_node.open(runtime_state.get()));
-
- // 3. Put the block into block queue.
- std::unique_ptr<doris::vectorized::Block> _block =
- doris::vectorized::Block::create_unique();
- bool eof = false;
- while (!eof) {
- // TODO what to do if read one block error
- RETURN_IF_ERROR(file_scan_node.get_next(runtime_state.get(),
_block.get(), &eof));
- std::shared_ptr<doris::vectorized::FutureBlock> future_block =
- std::make_shared<doris::vectorized::FutureBlock>();
- future_block->swap(*(_block.get()));
- future_block->set_info(request->base_schema_version(), load_id);
- if (load_block_queue == nullptr) {
- RETURN_IF_ERROR(get_first_block_load_queue(request->db_id(),
table_id, future_block,
- load_block_queue));
- response->set_label(load_block_queue->label);
- response->set_txn_id(load_block_queue->txn_id);
- }
- // TODO what to do if add one block error
- if (future_block->rows() > 0) {
- future_blocks.emplace_back(future_block);
- }
- RETURN_IF_ERROR(load_block_queue->add_block(future_block));
- }
- if (!runtime_state->get_error_log_file_path().empty()) {
- LOG(INFO) << "id=" << print_id(load_id)
- << ", url=" << runtime_state->get_error_log_file_path()
- << ", load rows=" << runtime_state->num_rows_load_total()
- << ", filter rows=" <<
runtime_state->num_rows_load_filtered()
- << ", unselect rows=" <<
runtime_state->num_rows_load_unselected()
- << ", success rows=" <<
runtime_state->num_rows_load_success();
- }
- }
- int64_t total_rows = 0;
- int64_t loaded_rows = 0;
- // 4. wait to wal
- for (const auto& future_block : future_blocks) {
- std::unique_lock<doris::Mutex> l(*(future_block->lock));
- if (!future_block->is_handled()) {
- future_block->cv->wait(l);
- }
- // future_block->get_status()
- total_rows += future_block->get_total_rows();
- loaded_rows += future_block->get_loaded_rows();
- }
- response->set_loaded_rows(loaded_rows);
- response->set_filtered_rows(total_rows - loaded_rows);
- return Status::OK();
-}
-
-Status GroupCommitMgr::_append_row(std::shared_ptr<io::StreamLoadPipe> pipe,
- const PGroupCommitInsertRequest* request) {
- for (int i = 0; i < request->data().size(); ++i) {
- std::unique_ptr<PDataRow> row(new PDataRow());
- row->CopyFrom(request->data(i));
- // TODO append may error when pipe is cancelled
- RETURN_IF_ERROR(pipe->append(std::move(row)));
- }
- static_cast<void>(pipe->finish());
- return Status::OK();
-}
-
Status GroupCommitMgr::get_first_block_load_queue(
int64_t db_id, int64_t table_id,
std::shared_ptr<vectorized::FutureBlock> block,
std::shared_ptr<LoadBlockQueue>& load_block_queue) {
diff --git a/be/src/runtime/group_commit_mgr.h
b/be/src/runtime/group_commit_mgr.h
index 9bbbc5da619..4983811233a 100644
--- a/be/src/runtime/group_commit_mgr.h
+++ b/be/src/runtime/group_commit_mgr.h
@@ -23,23 +23,15 @@
#include <memory>
#include "common/status.h"
-#include "io/fs/stream_load_pipe.h"
#include "util/lock.h"
#include "util/threadpool.h"
-#include "util/thrift_util.h"
#include "vec/core/block.h"
#include "vec/core/future_block.h"
namespace doris {
class ExecEnv;
-class TPlan;
-class TDescriptorTable;
class TUniqueId;
-class TExecPlanFragmentParams;
-class ObjectPool;
class RuntimeState;
-class StreamLoadContext;
-class StreamLoadPipe;
class LoadBlockQueue {
public:
@@ -131,13 +123,6 @@ public:
void stop();
- // insert into
- Status group_commit_insert(int64_t table_id, const TPlan& plan,
- const TDescriptorTable& desc_tbl,
- const TScanRangeParams& scan_range_params,
- const PGroupCommitInsertRequest* request,
- PGroupCommitInsertResponse* response);
-
// used when init group_commit_scan_node
Status get_load_block_queue(int64_t table_id, const TUniqueId& instance_id,
std::shared_ptr<LoadBlockQueue>&
load_block_queue);
@@ -146,18 +131,11 @@ public:
std::shared_ptr<LoadBlockQueue>&
load_block_queue);
private:
- // used by insert into
- Status _append_row(std::shared_ptr<io::StreamLoadPipe> pipe,
- const PGroupCommitInsertRequest* request);
-
ExecEnv* _exec_env;
doris::Mutex _lock;
// TODO remove table when unused
std::unordered_map<int64_t, std::shared_ptr<GroupCommitTable>> _table_map;
-
- // thread pool to handle insert into: append data to pipe
- std::unique_ptr<doris::ThreadPool> _insert_into_thread_pool;
std::unique_ptr<doris::ThreadPool> _thread_pool;
// memory consumption of all tables' load block queues, used for back
pressure.
std::shared_ptr<std::atomic_size_t> _all_block_queues_bytes;
diff --git a/be/src/runtime/plan_fragment_executor.cpp
b/be/src/runtime/plan_fragment_executor.cpp
index a58744a5692..9344378e91e 100644
--- a/be/src/runtime/plan_fragment_executor.cpp
+++ b/be/src/runtime/plan_fragment_executor.cpp
@@ -332,9 +332,25 @@ Status PlanFragmentExecutor::open_vectorized_internal() {
: doris::vectorized::Block::create_unique();
bool eos = false;
+ auto st = Status::OK();
+ auto handle_group_commit = [&]() {
+ if (UNLIKELY(_group_commit && !st.ok() && block != nullptr)) {
+ auto* future_block =
dynamic_cast<vectorized::FutureBlock*>(block.get());
+ std::unique_lock<doris::Mutex> l(*(future_block->lock));
+ if (!future_block->is_handled()) {
+ future_block->set_result(st, 0, 0);
+ future_block->cv->notify_all();
+ }
+ }
+ };
+
while (!eos) {
RETURN_IF_CANCELLED(_runtime_state);
- RETURN_IF_ERROR(get_vectorized_internal(block.get(), &eos));
+ st = get_vectorized_internal(block.get(), &eos);
+ if (UNLIKELY(!st.ok())) {
+ handle_group_commit();
+ return st;
+ }
// Collect this plan and sub plan statistics, and send to parent
plan.
if (_collect_query_statistics_with_every_batch) {
@@ -342,23 +358,13 @@ Status PlanFragmentExecutor::open_vectorized_internal() {
}
if (!eos || block->rows() > 0) {
- auto st = _sink->send(runtime_state(), block.get());
+ st = _sink->send(runtime_state(), block.get());
//TODO: Asynchronisation need refactor this
if (st.is<NEED_SEND_AGAIN>()) { // created partition, do it
again.
st = _sink->send(runtime_state(), block.get());
DCHECK(!st.is<NEED_SEND_AGAIN>());
}
- if (UNLIKELY(!st.ok() || block->rows() == 0)) {
- // Used for group commit insert
- if (_group_commit) {
- auto* future_block =
dynamic_cast<vectorized::FutureBlock*>(block.get());
- std::unique_lock<doris::Mutex>
l(*(future_block->lock));
- if (!future_block->is_handled()) {
- future_block->set_result(st, 0, 0);
- future_block->cv->notify_all();
- }
- }
- }
+ handle_group_commit();
if (st.is<END_OF_FILE>()) {
break;
}
diff --git a/be/src/service/internal_service.cpp
b/be/src/service/internal_service.cpp
index f80f4ddb5e0..cfaef4895b3 100644
--- a/be/src/service/internal_service.cpp
+++ b/be/src/service/internal_service.cpp
@@ -86,7 +86,6 @@
#include "runtime/exec_env.h"
#include "runtime/fold_constant_executor.h"
#include "runtime/fragment_mgr.h"
-#include "runtime/group_commit_mgr.h"
#include "runtime/load_channel_mgr.h"
#include "runtime/load_stream_mgr.h"
#include "runtime/result_buffer_mgr.h"
@@ -498,9 +497,9 @@ void
PInternalServiceImpl::tablet_writer_cancel(google::protobuf::RpcController*
}
}
-Status PInternalServiceImpl::_exec_plan_fragment_impl(const std::string&
ser_request,
- PFragmentRequestVersion
version,
- bool compact) {
+Status PInternalServiceImpl::_exec_plan_fragment_impl(
+ const std::string& ser_request, PFragmentRequestVersion version, bool
compact,
+ const std::function<void(RuntimeState*, Status*)>& cb) {
// Sometimes the BE do not receive the first heartbeat message and it
receives request from FE
// If BE execute this fragment, it will core when it wants to get some
property from master info.
if (ExecEnv::GetInstance()->master_info() == nullptr) {
@@ -516,7 +515,11 @@ Status
PInternalServiceImpl::_exec_plan_fragment_impl(const std::string& ser_req
uint32_t len = ser_request.size();
RETURN_IF_ERROR(deserialize_thrift_msg(buf, &len, compact,
&t_request));
}
- return _exec_env->fragment_mgr()->exec_plan_fragment(t_request);
+ if (cb) {
+ return _exec_env->fragment_mgr()->exec_plan_fragment(t_request,
cb);
+ } else {
+ return _exec_env->fragment_mgr()->exec_plan_fragment(t_request);
+ }
} else if (version == PFragmentRequestVersion::VERSION_2) {
TExecPlanFragmentParamsList t_request;
{
@@ -526,7 +529,11 @@ Status
PInternalServiceImpl::_exec_plan_fragment_impl(const std::string& ser_req
}
for (const TExecPlanFragmentParams& params : t_request.paramsList) {
-
RETURN_IF_ERROR(_exec_env->fragment_mgr()->exec_plan_fragment(params));
+ if (cb) {
+
RETURN_IF_ERROR(_exec_env->fragment_mgr()->exec_plan_fragment(params, cb));
+ } else {
+
RETURN_IF_ERROR(_exec_env->fragment_mgr()->exec_plan_fragment(params));
+ }
}
return Status::OK();
} else if (version == PFragmentRequestVersion::VERSION_3) {
@@ -538,7 +545,11 @@ Status
PInternalServiceImpl::_exec_plan_fragment_impl(const std::string& ser_req
}
for (const TPipelineFragmentParams& params : t_request.params_list) {
-
RETURN_IF_ERROR(_exec_env->fragment_mgr()->exec_plan_fragment(params));
+ if (cb) {
+
RETURN_IF_ERROR(_exec_env->fragment_mgr()->exec_plan_fragment(params, cb));
+ } else {
+
RETURN_IF_ERROR(_exec_env->fragment_mgr()->exec_plan_fragment(params));
+ }
}
return Status::OK();
} else {
@@ -1785,53 +1796,65 @@ void
PInternalServiceImpl::group_commit_insert(google::protobuf::RpcController*
const
PGroupCommitInsertRequest* request,
PGroupCommitInsertResponse*
response,
google::protobuf::Closure*
done) {
- bool ret = _light_work_pool.try_offer([this, request, response, done]() {
+ TUniqueId load_id;
+ load_id.__set_hi(request->load_id().hi());
+ load_id.__set_lo(request->load_id().lo());
+ bool ret = _light_work_pool.try_offer([this, request, response, done,
load_id]() {
brpc::ClosureGuard closure_guard(done);
- auto table_id = request->table_id();
- Status st = Status::OK();
- TPlan plan;
- {
- auto& plan_node = request->plan_node();
- const uint8_t* buf = (const uint8_t*)plan_node.data();
- uint32_t len = plan_node.size();
- st = deserialize_thrift_msg(buf, &len, false, &plan);
- if (UNLIKELY(!st.ok())) {
- LOG(WARNING) << "deserialize plan failed, msg=" << st;
- response->mutable_status()->set_status_code(st.code());
- response->mutable_status()->set_error_msgs(0, st.to_string());
- return;
- }
- }
- TDescriptorTable tdesc_tbl;
- {
- auto& desc_tbl = request->desc_tbl();
- const uint8_t* buf = (const uint8_t*)desc_tbl.data();
- uint32_t len = desc_tbl.size();
- st = deserialize_thrift_msg(buf, &len, false, &tdesc_tbl);
- if (UNLIKELY(!st.ok())) {
- LOG(WARNING) << "deserialize desc tbl failed, msg=" << st;
- response->mutable_status()->set_status_code(st.code());
- response->mutable_status()->set_error_msgs(0, st.to_string());
- return;
+ std::shared_ptr<StreamLoadContext> ctx =
std::make_shared<StreamLoadContext>(_exec_env);
+ auto pipe = std::make_shared<io::StreamLoadPipe>(
+ io::kMaxPipeBufferedBytes /* max_buffered_bytes */, 64 * 1024
/* min_chunk_size */,
+ -1 /* total_length */, true /* use_proto */);
+ ctx->pipe = pipe;
+ Status st = _exec_env->new_load_stream_mgr()->put(load_id, ctx);
+ if (st.ok()) {
+ doris::Mutex mutex;
+ doris::ConditionVariable cv;
+ bool handled = false;
+ try {
+ st = _exec_plan_fragment_impl(
+ request->exec_plan_fragment_request().request(),
+ request->exec_plan_fragment_request().version(),
+ request->exec_plan_fragment_request().compact(),
+ [&](RuntimeState* state, Status* status) {
+ response->set_label(state->import_label());
+ response->set_txn_id(state->wal_id());
+
response->set_loaded_rows(state->num_rows_load_success());
+
response->set_filtered_rows(state->num_rows_load_filtered());
+ st = *status;
+ std::unique_lock l(mutex);
+ handled = true;
+ cv.notify_one();
+ });
+ } catch (const Exception& e) {
+ st = e.to_status();
+ } catch (...) {
+ st = Status::Error(ErrorCode::INTERNAL_ERROR,
+ "_exec_plan_fragment_impl meet unknown
error");
}
- }
- TScanRangeParams tscan_range_params;
- {
- auto& bytes = request->scan_range_params();
- const uint8_t* buf = (const uint8_t*)bytes.data();
- uint32_t len = bytes.size();
- st = deserialize_thrift_msg(buf, &len, false, &tscan_range_params);
- if (UNLIKELY(!st.ok())) {
- LOG(WARNING) << "deserialize scan range failed, msg=" << st;
- response->mutable_status()->set_status_code(st.code());
- response->mutable_status()->set_error_msgs(0, st.to_string());
- return;
+ if (!st.ok()) {
+ LOG(WARNING) << "exec plan fragment failed, errmsg=" << st;
+ } else {
+ for (int i = 0; i < request->data().size(); ++i) {
+ std::unique_ptr<PDataRow> row(new PDataRow());
+ row->CopyFrom(request->data(i));
+ st = pipe->append(std::move(row));
+ if (!st.ok()) {
+ break;
+ }
+ }
+ if (st.ok()) {
+ static_cast<void>(pipe->finish());
+ std::unique_lock l(mutex);
+ if (!handled) {
+ cv.wait(l);
+ }
+ }
}
}
- st = _exec_env->group_commit_mgr()->group_commit_insert(
- table_id, plan, tdesc_tbl, tscan_range_params, request,
response);
st.to_protobuf(response->mutable_status());
});
+ _exec_env->new_load_stream_mgr()->remove(load_id);
if (!ret) {
offer_failed(response, done, _light_work_pool);
return;
diff --git a/be/src/service/internal_service.h
b/be/src/service/internal_service.h
index d5712e654cf..3ef14b6c265 100644
--- a/be/src/service/internal_service.h
+++ b/be/src/service/internal_service.h
@@ -38,6 +38,7 @@ class ExecEnv;
class PHandShakeRequest;
class PHandShakeResponse;
class LoadStreamMgr;
+class RuntimeState;
class PInternalServiceImpl : public PBackendService {
public:
@@ -211,7 +212,9 @@ private:
google::protobuf::Closure* done);
Status _exec_plan_fragment_impl(const std::string& s_request,
PFragmentRequestVersion version,
- bool compact);
+ bool compact,
+ const std::function<void(RuntimeState*,
Status*)>& cb =
+ std::function<void(RuntimeState*,
Status*)>());
Status _fold_constant_expr(const std::string& ser_request,
PConstantExprResult* response);
diff --git a/be/src/vec/sink/group_commit_block_sink.cpp
b/be/src/vec/sink/group_commit_block_sink.cpp
index d7df3a2e698..dcef9dccec6 100644
--- a/be/src/vec/sink/group_commit_block_sink.cpp
+++ b/be/src/vec/sink/group_commit_block_sink.cpp
@@ -94,7 +94,7 @@ Status GroupCommitBlockSink::close(RuntimeState* state,
Status close_status) {
if (!future_block->is_handled()) {
future_block->cv->wait(l);
}
- // future_block->get_status()
+ RETURN_IF_ERROR(future_block->get_status());
loaded_rows += future_block->get_loaded_rows();
total_rows += future_block->get_total_rows();
}
@@ -159,8 +159,9 @@ Status GroupCommitBlockSink::_add_block(RuntimeState* state,
state->set_import_label(_load_block_queue->label);
state->set_wal_id(_load_block_queue->txn_id);
}
+ RETURN_IF_ERROR(_load_block_queue->add_block(future_block));
_future_blocks.emplace_back(future_block);
- return _load_block_queue->add_block(future_block);
+ return Status::OK();
}
} // namespace vectorized
diff --git
a/fe/fe-core/src/main/java/org/apache/doris/analysis/NativeInsertStmt.java
b/fe/fe-core/src/main/java/org/apache/doris/analysis/NativeInsertStmt.java
index 9f371fbd031..68bec33a5e1 100644
--- a/fe/fe-core/src/main/java/org/apache/doris/analysis/NativeInsertStmt.java
+++ b/fe/fe-core/src/main/java/org/apache/doris/analysis/NativeInsertStmt.java
@@ -53,18 +53,18 @@ import org.apache.doris.planner.GroupCommitOlapTableSink;
import org.apache.doris.planner.OlapTableSink;
import org.apache.doris.planner.StreamLoadPlanner;
import org.apache.doris.planner.external.jdbc.JdbcTableSink;
+import org.apache.doris.proto.InternalService;
import org.apache.doris.qe.ConnectContext;
import org.apache.doris.rewrite.ExprRewriter;
import org.apache.doris.service.FrontendOptions;
import org.apache.doris.tablefunction.GroupCommitTableValuedFunction;
import org.apache.doris.task.StreamLoadTask;
import org.apache.doris.thrift.TExecPlanFragmentParams;
+import org.apache.doris.thrift.TExecPlanFragmentParamsList;
import org.apache.doris.thrift.TFileCompressType;
import org.apache.doris.thrift.TFileFormatType;
import org.apache.doris.thrift.TFileType;
import org.apache.doris.thrift.TMergeType;
-import org.apache.doris.thrift.TPlan;
-import org.apache.doris.thrift.TPlanFragment;
import org.apache.doris.thrift.TQueryOptions;
import org.apache.doris.thrift.TScanRangeParams;
import org.apache.doris.thrift.TStreamLoadPutRequest;
@@ -162,9 +162,7 @@ public class NativeInsertStmt extends InsertStmt {
private boolean isGroupCommit = false;
private int baseSchemaVersion = -1;
private TUniqueId loadId = null;
- private ByteString planBytes = null;
- private ByteString tableBytes = null;
- private ByteString rangeBytes = null;
+ private ByteString execPlanFragmentParamsBytes = null;
private long tableId = -1;
// true if be generates an insert from group commit tvf stmt and executes
to load data
public boolean isGroupCommitTvf = false;
@@ -1074,10 +1072,35 @@ public class NativeInsertStmt extends InsertStmt {
}
private void analyzeGroupCommit() {
- if (ConnectContext.get().getSessionVariable().enableInsertGroupCommit
&& targetTable instanceof OlapTable
+ if (ConnectContext.get().getSessionVariable().enableInsertGroupCommit
+ && targetTable instanceof OlapTable
&& !ConnectContext.get().isTxnModel()
&& getQueryStmt() instanceof SelectStmt
- && ((SelectStmt) getQueryStmt()).getTableRefs().isEmpty() &&
targetPartitionNames == null) {
+ && ((SelectStmt) getQueryStmt()).getTableRefs().isEmpty() &&
targetPartitionNames == null
+ && (label == null ||
Strings.isNullOrEmpty(label.getLabelName()))
+ && (analyzer == null || analyzer != null &&
!analyzer.isReAnalyze())) {
+ SelectStmt selectStmt = (SelectStmt) queryStmt;
+ if (selectStmt.getValueList() != null) {
+ for (List<Expr> row : selectStmt.getValueList().getRows()) {
+ for (Expr expr : row) {
+ if (!expr.isLiteralOrCastExpr()) {
+ return;
+ }
+ }
+ }
+ } else {
+ SelectList selectList = selectStmt.getSelectList();
+ if (selectList != null) {
+ List<SelectListItem> items = selectList.getItems();
+ if (items != null) {
+ for (SelectListItem item : items) {
+ if (item.getExpr() != null &&
!item.getExpr().isLiteralOrCastExpr()) {
+ return;
+ }
+ }
+ }
+ }
+ }
isGroupCommit = true;
}
}
@@ -1088,7 +1111,7 @@ public class NativeInsertStmt extends InsertStmt {
public void planForGroupCommit(TUniqueId queryId) throws UserException,
TException {
OlapTable olapTable = (OlapTable) getTargetTable();
- if (planBytes != null && olapTable.getBaseSchemaVersion() ==
baseSchemaVersion) {
+ if (execPlanFragmentParamsBytes != null &&
olapTable.getBaseSchemaVersion() == baseSchemaVersion) {
return;
}
if (!targetColumns.isEmpty()) {
@@ -1110,9 +1133,6 @@ public class NativeInsertStmt extends InsertStmt {
StreamLoadPlanner planner = new StreamLoadPlanner((Database)
getDbObj(), olapTable, streamLoadTask);
// Will using load id as query id in fragment
TExecPlanFragmentParams tRequest =
planner.plan(streamLoadTask.getId());
- DescriptorTable descTable = planner.getDescTable();
- TPlanFragment fragment = tRequest.getFragment();
- TPlan plan = fragment.getPlan();
for (Map.Entry<Integer, List<TScanRangeParams>> entry :
tRequest.params.per_node_scan_ranges.entrySet()) {
for (TScanRangeParams scanRangeParams : entry.getValue()) {
scanRangeParams.scan_range.ext_scan_range.file_scan_range.params.setFormatType(
@@ -1126,32 +1146,26 @@ public class NativeInsertStmt extends InsertStmt {
Preconditions.checkState(scanRangeParams.size() == 1);
// save plan message to be reused for prepare stmt
loadId = queryId;
- planBytes = ByteString.copyFrom(new TSerializer().serialize(plan));
- tableBytes = ByteString.copyFrom(new
TSerializer().serialize(descTable.toThrift()));
- rangeBytes = ByteString.copyFrom(new
TSerializer().serialize(scanRangeParams.get(0)));
baseSchemaVersion = olapTable.getBaseSchemaVersion();
+ // see BackendServiceProxy#execPlanFragmentsAsync
+ TExecPlanFragmentParamsList paramsList = new
TExecPlanFragmentParamsList();
+ paramsList.addToParamsList(tRequest);
+ execPlanFragmentParamsBytes = ByteString.copyFrom(new
TSerializer().serialize(paramsList));
}
- public TUniqueId getLoadId() {
- return loadId;
- }
-
- public ByteString getPlanBytes() {
- return planBytes;
+ public InternalService.PExecPlanFragmentRequest
getExecPlanFragmentRequest() {
+ return
InternalService.PExecPlanFragmentRequest.newBuilder().setRequest(execPlanFragmentParamsBytes)
+
.setCompact(false).setVersion(InternalService.PFragmentRequestVersion.VERSION_2).build();
}
- public ByteString getTableBytes() {
- return tableBytes;
+ public TUniqueId getLoadId() {
+ return loadId;
}
public int getBaseSchemaVersion() {
return baseSchemaVersion;
}
- public ByteString getRangeBytes() {
- return rangeBytes;
- }
-
public void setIsFromDeleteOrUpdateStmt(boolean isFromDeleteOrUpdateStmt) {
this.isFromDeleteOrUpdateStmt = isFromDeleteOrUpdateStmt;
}
diff --git a/fe/fe-core/src/main/java/org/apache/doris/qe/StmtExecutor.java
b/fe/fe-core/src/main/java/org/apache/doris/qe/StmtExecutor.java
index 09afe499435..82d6188bfce 100644
--- a/fe/fe-core/src/main/java/org/apache/doris/qe/StmtExecutor.java
+++ b/fe/fe-core/src/main/java/org/apache/doris/qe/StmtExecutor.java
@@ -1845,10 +1845,8 @@ public class StmtExecutor {
PGroupCommitInsertRequest request =
PGroupCommitInsertRequest.newBuilder()
.setDbId(insertStmt.getTargetTable().getDatabase().getId())
.setTableId(insertStmt.getTargetTable().getId())
- .setDescTbl(nativeInsertStmt.getTableBytes())
.setBaseSchemaVersion(nativeInsertStmt.getBaseSchemaVersion())
- .setPlanNode(nativeInsertStmt.getPlanBytes())
- .setScanRangeParams(nativeInsertStmt.getRangeBytes())
+
.setExecPlanFragmentRequest(nativeInsertStmt.getExecPlanFragmentRequest())
.setLoadId(Types.PUniqueId.newBuilder().setHi(loadId.hi).setLo(loadId.lo)
.build()).addAllData(rows)
.build();
diff --git a/gensrc/proto/internal_service.proto
b/gensrc/proto/internal_service.proto
index 5881e5bf2a4..99f4464fd59 100644
--- a/gensrc/proto/internal_service.proto
+++ b/gensrc/proto/internal_service.proto
@@ -720,16 +720,18 @@ message PGroupCommitInsertRequest {
optional int64 db_id = 1;
optional int64 table_id = 2;
// Descriptors.TDescriptorTable
- optional bytes desc_tbl = 3;
+ // optional bytes desc_tbl = 3;
optional int64 base_schema_version = 4;
// TExecPlanFragmentParams -> TPlanFragment -> PlanNodes.TPlan
- optional bytes plan_node = 5;
+ // optional bytes plan_node = 5;
// TScanRangeParams
- optional bytes scan_range_params = 6;
+ // optional bytes scan_range_params = 6;
optional PUniqueId load_id = 7;
repeated PDataRow data = 8;
+ // TExecPlanFragmentParams
+ optional PExecPlanFragmentRequest exec_plan_fragment_request = 9;
}
message PGroupCommitInsertResponse {
diff --git
a/regression-test/suites/insert_p0/insert_group_commit_into_duplicate.groovy
b/regression-test/suites/insert_p0/insert_group_commit_into_duplicate.groovy
index c5ee05ac1e4..9fbc6aaf365 100644
--- a/regression-test/suites/insert_p0/insert_group_commit_into_duplicate.groovy
+++ b/regression-test/suites/insert_p0/insert_group_commit_into_duplicate.groovy
@@ -67,6 +67,17 @@ suite("insert_group_commit_into_duplicate") {
assertTrue(serverInfo.contains("'label':'group_commit_"))
}
+ def none_group_commit_insert = { sql, expected_row_count ->
+ def stmt = prepareStatement """ ${sql} """
+ def result = stmt.executeUpdate()
+ logger.info("insert result: " + result)
+ def serverInfo = (((StatementImpl) stmt).results).getServerInfo()
+ logger.info("result server info: " + serverInfo)
+ assertEquals(result, expected_row_count)
+ assertTrue(serverInfo.contains("'status':'VISIBLE'"))
+ assertTrue(!serverInfo.contains("'label':'group_commit_"))
+ }
+
try {
// create table
sql """ drop table if exists ${table}; """
@@ -174,6 +185,13 @@ suite("insert_group_commit_into_duplicate") {
getRowCount(20)
qt_sql """ select name, score from ${table} order by name asc; """
+
+ none_group_commit_insert """ insert into ${table}(id, name, score)
values(10 + 1, 'h', 100); """, 1
+ none_group_commit_insert """ insert into ${table}(id, name, score)
select 10 + 2, 'h', 100; """, 1
+ none_group_commit_insert """ insert into ${table} with label
test_gc_""" + System.currentTimeMillis() + """ (id, name, score) values(13,
'h', 100); """, 1
+ def rowCount = sql "select count(*) from ${table}"
+ logger.info("row count: " + rowCount)
+ assertEquals(rowCount[0][0], 23)
}
} finally {
// try_sql("DROP TABLE ${table}")
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]