This is an automated email from the ASF dual-hosted git repository.
dataroaring pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/doris.git
The following commit(s) were added to refs/heads/master by this push:
new d0cd535cb9a [improvement](insert) refactor group commit stream load
(#25560)
d0cd535cb9a is described below
commit d0cd535cb9a615d837d41dc6c09d02ba4db1565e
Author: meiyi <[email protected]>
AuthorDate: Fri Oct 20 13:27:30 2023 +0800
[improvement](insert) refactor group commit stream load (#25560)
---
be/src/common/config.cpp | 2 +-
be/src/common/config.h | 2 +-
be/src/exec/data_sink.cpp | 15 ++
be/src/http/action/http_stream.cpp | 6 -
be/src/http/action/stream_load.cpp | 7 -
.../exec/group_commit_block_sink_operator.h | 50 ++++
be/src/pipeline/pipeline_fragment_context.cpp | 6 +
be/src/runtime/group_commit_mgr.cpp | 297 +++++++--------------
be/src/runtime/group_commit_mgr.h | 26 +-
.../runtime/stream_load/stream_load_executor.cpp | 12 +-
be/src/vec/core/future_block.cpp | 6 +-
be/src/vec/core/future_block.h | 7 +-
be/src/vec/sink/group_commit_block_sink.cpp | 70 ++++-
be/src/vec/sink/group_commit_block_sink.h | 18 +-
.../apache/doris/analysis/NativeInsertStmt.java | 20 +-
.../apache/doris/planner/GroupCommitBlockSink.java | 34 +--
.../org/apache/doris/planner/OlapTableSink.java | 1 +
.../apache/doris/planner/StreamLoadPlanner.java | 23 +-
.../apache/doris/service/FrontendServiceImpl.java | 21 +-
.../GroupCommitTableValuedFunction.java | 4 +-
.../java/org/apache/doris/task/StreamLoadTask.java | 11 +
gensrc/thrift/DataSinks.thrift | 2 +
.../insert_group_commit_into_duplicate.out | 3 +
.../insert_group_commit_into_duplicate.groovy | 78 +++++-
.../test_group_commit_http_stream.groovy | 78 +++---
.../test_group_commit_stream_load.groovy | 75 +++---
26 files changed, 495 insertions(+), 379 deletions(-)
diff --git a/be/src/common/config.cpp b/be/src/common/config.cpp
index 1634ea575f7..58121768723 100644
--- a/be/src/common/config.cpp
+++ b/be/src/common/config.cpp
@@ -1099,7 +1099,7 @@ DEFINE_Int32(group_commit_sync_wal_batch, "10");
// the count of thread to group commit insert
DEFINE_Int32(group_commit_insert_threads, "10");
-DEFINE_mInt32(group_commit_interval_seconds, "10");
+DEFINE_mInt32(group_commit_interval_ms, "10000");
DEFINE_mInt32(scan_thread_nice_value, "0");
DEFINE_mInt32(tablet_schema_cache_recycle_interval, "86400");
diff --git a/be/src/common/config.h b/be/src/common/config.h
index e17e4b5f670..f8350d87316 100644
--- a/be/src/common/config.h
+++ b/be/src/common/config.h
@@ -1162,7 +1162,7 @@ DECLARE_Int32(group_commit_sync_wal_batch);
// This config can be set to limit thread number in group commit insert thread
pool.
DECLARE_mInt32(group_commit_insert_threads);
-DECLARE_mInt32(group_commit_interval_seconds);
+DECLARE_mInt32(group_commit_interval_ms);
// The configuration item is used to lower the priority of the scanner thread,
// typically employed to ensure CPU scheduling for write operations.
diff --git a/be/src/exec/data_sink.cpp b/be/src/exec/data_sink.cpp
index e160e9ebeec..1b9653e8b8a 100644
--- a/be/src/exec/data_sink.cpp
+++ b/be/src/exec/data_sink.cpp
@@ -31,6 +31,7 @@
#include "common/config.h"
#include "vec/sink/async_writer_sink.h"
+#include "vec/sink/group_commit_block_sink.h"
#include "vec/sink/multi_cast_data_stream_sink.h"
#include "vec/sink/vdata_stream_sender.h"
#include "vec/sink/vmemory_scratch_sink.h"
@@ -163,6 +164,13 @@ Status DataSink::create_data_sink(ObjectPool* pool, const
TDataSink& thrift_sink
RETURN_IF_ERROR(status);
break;
}
+ case TDataSinkType::GROUP_COMMIT_BLOCK_SINK: {
+ Status status = Status::OK();
+ DCHECK(thrift_sink.__isset.olap_table_sink);
+ sink->reset(new vectorized::GroupCommitBlockSink(pool, row_desc,
output_exprs, &status));
+ RETURN_IF_ERROR(status);
+ break;
+ }
case TDataSinkType::MULTI_CAST_DATA_STREAM_SINK: {
return Status::NotSupported("MULTI_CAST_DATA_STREAM_SINK only support
in pipeline engine");
}
@@ -319,6 +327,13 @@ Status DataSink::create_data_sink(ObjectPool* pool, const
TDataSink& thrift_sink
RETURN_IF_ERROR(status);
break;
}
+ case TDataSinkType::GROUP_COMMIT_BLOCK_SINK: {
+ Status status = Status::OK();
+ DCHECK(thrift_sink.__isset.olap_table_sink);
+ sink->reset(new vectorized::GroupCommitBlockSink(pool, row_desc,
output_exprs, &status));
+ RETURN_IF_ERROR(status);
+ break;
+ }
default: {
std::stringstream error_msg;
std::map<int, const char*>::const_iterator i =
diff --git a/be/src/http/action/http_stream.cpp
b/be/src/http/action/http_stream.cpp
index 2a2ddd8ad44..067f8c5d28d 100644
--- a/be/src/http/action/http_stream.cpp
+++ b/be/src/http/action/http_stream.cpp
@@ -324,12 +324,6 @@ Status HttpStreamAction::_process_put(HttpRequest*
http_req,
ctx->label = ctx->put_result.params.import_label;
ctx->put_result.params.__set_wal_id(ctx->wal_id);
- if (ctx->group_commit) {
- ctx->db_id = ctx->put_result.db_id;
- ctx->table_id = ctx->put_result.table_id;
- ctx->schema_version = ctx->put_result.base_schema_version;
- return _exec_env->group_commit_mgr()->group_commit_stream_load(ctx);
- }
return _exec_env->stream_load_executor()->execute_plan_fragment(ctx);
}
diff --git a/be/src/http/action/stream_load.cpp
b/be/src/http/action/stream_load.cpp
index be4d8703236..55541843241 100644
--- a/be/src/http/action/stream_load.cpp
+++ b/be/src/http/action/stream_load.cpp
@@ -601,13 +601,6 @@ Status StreamLoadAction::_process_put(HttpRequest*
http_req,
return Status::OK();
}
- if (ctx->group_commit) {
- ctx->db_id = ctx->put_result.db_id;
- ctx->table_id = ctx->put_result.table_id;
- ctx->schema_version = ctx->put_result.base_schema_version;
- return _exec_env->group_commit_mgr()->group_commit_stream_load(ctx);
- }
-
return _exec_env->stream_load_executor()->execute_plan_fragment(ctx);
}
diff --git a/be/src/pipeline/exec/group_commit_block_sink_operator.h
b/be/src/pipeline/exec/group_commit_block_sink_operator.h
new file mode 100644
index 00000000000..0cf36818db1
--- /dev/null
+++ b/be/src/pipeline/exec/group_commit_block_sink_operator.h
@@ -0,0 +1,50 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements. See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership. The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License. You may obtain a copy of the License at
+//
+// http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied. See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+#pragma once
+
+#include "operator.h"
+#include "vec/sink/group_commit_block_sink.h"
+
+namespace doris {
+
+namespace pipeline {
+
+class GroupCommitBlockSinkOperatorBuilder final
+ : public DataSinkOperatorBuilder<vectorized::GroupCommitBlockSink> {
+public:
+ GroupCommitBlockSinkOperatorBuilder(int32_t id, DataSink* sink)
+ : DataSinkOperatorBuilder(id, "GroupCommitBlockSinkOperator",
sink) {}
+
+ OperatorPtr build_operator() override;
+};
+
+class GroupCommitBlockSinkOperator final
+ : public DataSinkOperator<GroupCommitBlockSinkOperatorBuilder> {
+public:
+ GroupCommitBlockSinkOperator(OperatorBuilderBase* operator_builder,
DataSink* sink)
+ : DataSinkOperator(operator_builder, sink) {}
+
+ bool can_write() override { return true; } // TODO: need use mem_limit
+};
+
+OperatorPtr GroupCommitBlockSinkOperatorBuilder::build_operator() {
+ return std::make_shared<GroupCommitBlockSinkOperator>(this, _sink);
+}
+
+} // namespace pipeline
+} // namespace doris
\ No newline at end of file
diff --git a/be/src/pipeline/pipeline_fragment_context.cpp
b/be/src/pipeline/pipeline_fragment_context.cpp
index 22a533c233e..6ba3ba941aa 100644
--- a/be/src/pipeline/pipeline_fragment_context.cpp
+++ b/be/src/pipeline/pipeline_fragment_context.cpp
@@ -57,6 +57,7 @@
#include "pipeline/exec/empty_source_operator.h"
#include "pipeline/exec/exchange_sink_operator.h"
#include "pipeline/exec/exchange_source_operator.h"
+#include "pipeline/exec/group_commit_block_sink_operator.h"
#include "pipeline/exec/hashjoin_build_sink.h"
#include "pipeline/exec/hashjoin_probe_operator.h"
#include "pipeline/exec/multi_cast_data_stream_sink.h"
@@ -770,6 +771,11 @@ Status PipelineFragmentContext::_create_sink(int
sender_id, const TDataSink& thr
}
break;
}
+ case TDataSinkType::GROUP_COMMIT_BLOCK_SINK: {
+ sink_ =
std::make_shared<GroupCommitBlockSinkOperatorBuilder>(next_operator_builder_id(),
+
_sink.get());
+ break;
+ }
case TDataSinkType::MYSQL_TABLE_SINK:
case TDataSinkType::JDBC_TABLE_SINK:
case TDataSinkType::ODBC_TABLE_SINK: {
diff --git a/be/src/runtime/group_commit_mgr.cpp
b/be/src/runtime/group_commit_mgr.cpp
index 9c08ccaf8d0..3c919158874 100644
--- a/be/src/runtime/group_commit_mgr.cpp
+++ b/be/src/runtime/group_commit_mgr.cpp
@@ -48,11 +48,6 @@ Status
LoadBlockQueue::add_block(std::shared_ptr<vectorized::FutureBlock> block)
if (block->rows() > 0) {
_block_queue.push_back(block);
}
- if (block->is_eos()) {
- _load_ids.erase(block->get_load_id());
- } else if (block->is_first()) {
- _load_ids.emplace(block->get_load_id());
- }
_cv->notify_one();
return Status::OK();
}
@@ -62,31 +57,31 @@ Status LoadBlockQueue::get_block(vectorized::Block* block,
bool* find_block, boo
*eos = false;
std::unique_lock l(*_mutex);
if (!need_commit) {
- auto left_seconds = config::group_commit_interval_seconds -
- std::chrono::duration_cast<std::chrono::seconds>(
- std::chrono::steady_clock::now() -
_start_time)
- .count();
- if (left_seconds <= 0) {
+ auto left_milliseconds = config::group_commit_interval_ms -
+
std::chrono::duration_cast<std::chrono::milliseconds>(
+ std::chrono::steady_clock::now() -
_start_time)
+ .count();
+ if (left_milliseconds <= 0) {
need_commit = true;
}
}
while (_status.ok() && _block_queue.empty() &&
(!need_commit || (need_commit && !_load_ids.empty()))) {
- auto left_seconds = config::group_commit_interval_seconds;
+ auto left_milliseconds = config::group_commit_interval_ms;
if (!need_commit) {
- left_seconds = config::group_commit_interval_seconds -
- std::chrono::duration_cast<std::chrono::seconds>(
- std::chrono::steady_clock::now() -
_start_time)
- .count();
- if (left_seconds <= 0) {
+ left_milliseconds = config::group_commit_interval_ms -
+
std::chrono::duration_cast<std::chrono::milliseconds>(
+ std::chrono::steady_clock::now() -
_start_time)
+ .count();
+ if (left_milliseconds <= 0) {
need_commit = true;
break;
}
}
#if !defined(USE_BTHREAD_SCANNER)
- _cv->wait_for(l, std::chrono::seconds(left_seconds));
+ _cv->wait_for(l, std::chrono::milliseconds(left_milliseconds));
#else
- _cv->wait_for(l, left_seconds * 1000000);
+ _cv->wait_for(l, left_milliseconds * 1000);
#endif
}
if (!_block_queue.empty()) {
@@ -96,12 +91,10 @@ Status LoadBlockQueue::get_block(vectorized::Block* block,
bool* find_block, boo
*find_block = true;
_block_queue.pop_front();
}
- if (_block_queue.empty()) {
- if (need_commit && _load_ids.empty()) {
- *eos = true;
- } else {
- *eos = false;
- }
+ if (_block_queue.empty() && need_commit && _load_ids.empty()) {
+ *eos = true;
+ } else {
+ *eos = false;
}
return Status::OK();
}
@@ -114,6 +107,16 @@ void LoadBlockQueue::remove_load_id(const UniqueId&
load_id) {
}
}
+Status LoadBlockQueue::add_load_id(const UniqueId& load_id) {
+ std::unique_lock l(*_mutex);
+ if (need_commit) {
+ return Status::InternalError("block queue is set need commit, id=" +
+ load_instance_id.to_string());
+ }
+ _load_ids.emplace(load_id);
+ return Status::OK();
+}
+
void LoadBlockQueue::cancel(const Status& st) {
DCHECK(!st.ok());
std::unique_lock l(*_mutex);
@@ -133,59 +136,62 @@ Status GroupCommitTable::get_first_block_load_queue(
int64_t table_id, std::shared_ptr<vectorized::FutureBlock> block,
std::shared_ptr<LoadBlockQueue>& load_block_queue) {
DCHECK(table_id == _table_id);
- DCHECK(block->is_first() == true);
+ auto base_schema_version = block->get_schema_version();
{
std::unique_lock l(_lock);
- for (auto it = _load_block_queues.begin(); it !=
_load_block_queues.end(); ++it) {
- // TODO if block schema version is less than fragment schema
version, return error
- if (!it->second->need_commit &&
- it->second->schema_version == block->get_schema_version()) {
- if (block->get_schema_version() == it->second->schema_version)
{
- load_block_queue = it->second;
- break;
- } else if (block->get_schema_version() <
it->second->schema_version) {
- return Status::DataQualityError("schema version not
match");
- }
- }
- }
- }
- if (load_block_queue == nullptr) {
- Status st = Status::OK();
- for (int i = 0; i < 3; ++i) {
- std::unique_lock l(_request_fragment_mutex);
- // check if there is a re-usefully fragment
- {
- std::unique_lock l1(_lock);
- for (auto it = _load_block_queues.begin(); it !=
_load_block_queues.end(); ++it) {
- // TODO if block schema version is less than fragment
schema version, return error
- if (!it->second->need_commit) {
- if (block->get_schema_version() ==
it->second->schema_version) {
+ for (int i = 0; i < 3; i++) {
+ bool is_schema_version_match = true;
+ for (auto it = _load_block_queues.begin(); it !=
_load_block_queues.end(); ++it) {
+ if (!it->second->need_commit) {
+ if (base_schema_version == it->second->schema_version) {
+ if
(it->second->add_load_id(block->get_load_id()).ok()) {
load_block_queue = it->second;
- break;
- } else if (block->get_schema_version() <
it->second->schema_version) {
- return Status::DataQualityError("schema version
not match");
+ return Status::OK();
}
+ } else if (base_schema_version <
it->second->schema_version) {
+ is_schema_version_match = false;
}
}
}
- if (load_block_queue == nullptr) {
- st = _create_group_commit_load(table_id, load_block_queue);
- if (LIKELY(st.ok())) {
- break;
+ if (!is_schema_version_match) {
+ return Status::DataQualityError("schema version not match");
+ }
+ if (!_need_plan_fragment) {
+ _need_plan_fragment = true;
+ RETURN_IF_ERROR(_thread_pool->submit_func([&] {
+ [[maybe_unused]] auto st =
_create_group_commit_load(load_block_queue);
+ }));
+ }
+#if !defined(USE_BTHREAD_SCANNER)
+ _cv.wait_for(l, std::chrono::seconds(4));
+#else
+ _cv.wait_for(l, 4 * 1000000);
+#endif
+ if (load_block_queue != nullptr) {
+ if (load_block_queue->schema_version == base_schema_version) {
+ if
(load_block_queue->add_load_id(block->get_load_id()).ok()) {
+ return Status::OK();
+ }
+ } else if (base_schema_version <
load_block_queue->schema_version) {
+ return Status::DataQualityError("schema version not
match");
}
+ load_block_queue.reset();
}
}
- RETURN_IF_ERROR(st);
- if (load_block_queue->schema_version != block->get_schema_version()) {
- // TODO check this is the first block
- return Status::DataQualityError("schema version not match");
- }
}
- return Status::OK();
+ return Status::InternalError("can not get a block queue");
}
Status GroupCommitTable::_create_group_commit_load(
- int64_t table_id, std::shared_ptr<LoadBlockQueue>& load_block_queue) {
+ std::shared_ptr<LoadBlockQueue>& load_block_queue) {
+ Status st = Status::OK();
+ std::unique_ptr<int, std::function<void(int*)>>
remove_pipe_func((int*)0x01, [&](int*) {
+ if (!st.ok()) {
+ std::unique_lock l(_lock);
+ _need_plan_fragment = false;
+ _cv.notify_all();
+ }
+ });
TStreamLoadPutRequest request;
UniqueId load_id = UniqueId::gen_uid();
TUniqueId tload_id;
@@ -194,8 +200,8 @@ Status GroupCommitTable::_create_group_commit_load(
std::regex reg("-");
std::string label = "group_commit_" +
std::regex_replace(load_id.to_string(), reg, "_");
std::stringstream ss;
- ss << "insert into table_id(" << table_id << ") WITH LABEL " << label
- << " select * from group_commit(\"table_id\"=\"" << table_id << "\")";
+ ss << "insert into table_id(" << _table_id << ") WITH LABEL " << label
+ << " select * from group_commit(\"table_id\"=\"" << _table_id << "\")";
request.__set_load_sql(ss.str());
request.__set_loadId(tload_id);
request.__set_label(label);
@@ -209,13 +215,14 @@ Status GroupCommitTable::_create_group_commit_load(
}
TStreamLoadPutResult result;
TNetworkAddress master_addr = _exec_env->master_info()->network_address;
- RETURN_IF_ERROR(ThriftRpcHelper::rpc<FrontendServiceClient>(
+ st = ThriftRpcHelper::rpc<FrontendServiceClient>(
master_addr.hostname, master_addr.port,
[&result, &request](FrontendServiceConnection& client) {
client->streamLoadPut(result, request);
},
- 10000L));
- Status st = Status::create(result.status);
+ 10000L);
+ RETURN_IF_ERROR(st);
+ st = Status::create(result.status);
if (!st.ok()) {
LOG(WARNING) << "create group commit load error, st=" <<
st.to_string();
}
@@ -236,7 +243,7 @@ Status GroupCommitTable::_create_group_commit_load(
DCHECK(pipeline_params.local_params.size() == 1);
instance_id = pipeline_params.local_params[0].fragment_instance_id;
}
- VLOG_DEBUG << "create plan fragment, db_id=" << _db_id << ", table=" <<
table_id
+ VLOG_DEBUG << "create plan fragment, db_id=" << _db_id << ", table=" <<
_table_id
<< ", schema version=" << schema_version << ", label=" << label
<< ", txn_id=" << txn_id << ", instance_id=" <<
print_id(instance_id)
<< ", is_pipeline=" << is_pipeline;
@@ -245,11 +252,13 @@ Status GroupCommitTable::_create_group_commit_load(
std::make_shared<LoadBlockQueue>(instance_id, label, txn_id,
schema_version);
std::unique_lock l(_lock);
_load_block_queues.emplace(instance_id, load_block_queue);
+ _need_plan_fragment = false;
+ _cv.notify_all();
}
- params.__set_import_label(label);
- st = _exec_plan_fragment(_db_id, table_id, label, txn_id, is_pipeline,
params, pipeline_params);
+ st = _exec_plan_fragment(_db_id, _table_id, label, txn_id, is_pipeline,
params,
+ pipeline_params);
if (!st.ok()) {
- static_cast<void>(_finish_group_commit_load(_db_id, table_id, label,
txn_id, instance_id,
+ static_cast<void>(_finish_group_commit_load(_db_id, _table_id, label,
txn_id, instance_id,
st, true, nullptr));
}
return st;
@@ -346,6 +355,7 @@ Status GroupCommitTable::_finish_group_commit_load(int64_t
db_id, int64_t table_
if (state && !(state->get_error_log_file_path().empty())) {
ss << ", error_url=" << state->get_error_log_file_path();
}
+ ss << ", rows=" << state->num_rows_load_success();
LOG(INFO) << ss.str();
return st;
}
@@ -384,6 +394,10 @@ GroupCommitMgr::GroupCommitMgr(ExecEnv* exec_env) :
_exec_env(exec_env) {
.set_min_threads(config::group_commit_insert_threads)
.set_max_threads(config::group_commit_insert_threads)
.build(&_insert_into_thread_pool));
+ static_cast<void>(ThreadPoolBuilder("GroupCommitThreadPool")
+ .set_min_threads(1)
+
.set_max_threads(config::group_commit_insert_threads)
+ .build(&_thread_pool));
}
GroupCommitMgr::~GroupCommitMgr() {
@@ -392,6 +406,7 @@ GroupCommitMgr::~GroupCommitMgr() {
void GroupCommitMgr::stop() {
_insert_into_thread_pool->shutdown();
+ _thread_pool->shutdown();
LOG(INFO) << "GroupCommitMgr is stopped";
}
@@ -456,17 +471,16 @@ Status GroupCommitMgr::group_commit_insert(int64_t
table_id, const TPlan& plan,
std::unique_ptr<doris::vectorized::Block> _block =
doris::vectorized::Block::create_unique();
bool eof = false;
- bool first = true;
while (!eof) {
// TODO what to do if read one block error
RETURN_IF_ERROR(file_scan_node.get_next(runtime_state.get(),
_block.get(), &eof));
std::shared_ptr<doris::vectorized::FutureBlock> future_block =
std::make_shared<doris::vectorized::FutureBlock>();
future_block->swap(*(_block.get()));
- future_block->set_info(request->base_schema_version(), load_id,
first, eof);
+ future_block->set_info(request->base_schema_version(), load_id);
if (load_block_queue == nullptr) {
- RETURN_IF_ERROR(_get_first_block_load_queue(request->db_id(),
table_id,
- future_block,
load_block_queue));
+ RETURN_IF_ERROR(get_first_block_load_queue(request->db_id(),
table_id, future_block,
+ load_block_queue));
response->set_label(load_block_queue->label);
response->set_txn_id(load_block_queue->txn_id);
}
@@ -475,7 +489,6 @@ Status GroupCommitMgr::group_commit_insert(int64_t
table_id, const TPlan& plan,
future_blocks.emplace_back(future_block);
}
RETURN_IF_ERROR(load_block_queue->add_block(future_block));
- first = false;
}
if (!runtime_state->get_error_log_file_path().empty()) {
LOG(INFO) << "id=" << print_id(load_id)
@@ -515,139 +528,15 @@ Status
GroupCommitMgr::_append_row(std::shared_ptr<io::StreamLoadPipe> pipe,
return Status::OK();
}
-Status
GroupCommitMgr::group_commit_stream_load(std::shared_ptr<StreamLoadContext>
ctx) {
- return _insert_into_thread_pool->submit_func([ctx, this] {
- Status st = _group_commit_stream_load(ctx);
- if (!st.ok()) {
- ctx->promise.set_value(st);
- }
- });
-}
-
-Status
GroupCommitMgr::_group_commit_stream_load(std::shared_ptr<StreamLoadContext>
ctx) {
- auto& fragment_params = ctx->put_result.params;
- auto& tdesc_tbl = fragment_params.desc_tbl;
- DCHECK(fragment_params.params.per_node_scan_ranges.size() == 1);
- DCHECK(fragment_params.params.per_node_scan_ranges.begin()->second.size()
== 1);
- auto& tscan_range_params =
fragment_params.params.per_node_scan_ranges.begin()->second.at(0);
- auto& nodes = fragment_params.fragment.plan.nodes;
- DCHECK(nodes.size() > 0);
- auto& plan_node = nodes.at(0);
-
- std::vector<std::shared_ptr<doris::vectorized::FutureBlock>> future_blocks;
- {
- std::shared_ptr<LoadBlockQueue> load_block_queue;
- // 1. FileScanNode consumes data from the pipe.
- std::unique_ptr<RuntimeState> runtime_state =
RuntimeState::create_unique();
- TUniqueId load_id;
- load_id.hi = ctx->id.hi;
- load_id.lo = ctx->id.lo;
- TQueryOptions query_options;
- query_options.query_type = TQueryType::LOAD;
- TQueryGlobals query_globals;
- static_cast<void>(runtime_state->init(load_id, query_options,
query_globals, _exec_env));
-
runtime_state->set_query_mem_tracker(std::make_shared<MemTrackerLimiter>(
- MemTrackerLimiter::Type::LOAD, fmt::format("Load#Id={}",
ctx->id.to_string()), -1));
- DescriptorTbl* desc_tbl = nullptr;
- RETURN_IF_ERROR(DescriptorTbl::create(runtime_state->obj_pool(),
tdesc_tbl, &desc_tbl));
- runtime_state->set_desc_tbl(desc_tbl);
- auto file_scan_node =
- vectorized::NewFileScanNode(runtime_state->obj_pool(),
plan_node, *desc_tbl);
- Status status = Status::OK();
- auto sink = stream_load::GroupCommitBlockSink(
- runtime_state->obj_pool(), file_scan_node.row_desc(),
- fragment_params.fragment.output_exprs, &status);
- std::unique_ptr<int, std::function<void(int*)>>
close_scan_node_func((int*)0x01, [&](int*) {
- if (load_block_queue != nullptr) {
- load_block_queue->remove_load_id(load_id);
- }
- static_cast<void>(file_scan_node.close(runtime_state.get()));
- static_cast<void>(sink.close(runtime_state.get(), status));
- });
- RETURN_IF_ERROR(file_scan_node.init(plan_node, runtime_state.get()));
- RETURN_IF_ERROR(file_scan_node.prepare(runtime_state.get()));
- std::vector<TScanRangeParams> params_vector;
- params_vector.emplace_back(tscan_range_params);
- file_scan_node.set_scan_ranges(params_vector);
- RETURN_IF_ERROR(file_scan_node.open(runtime_state.get()));
-
- RETURN_IF_ERROR(status);
- RETURN_IF_ERROR(sink.init(fragment_params.fragment.output_sink));
- RETURN_IF_ERROR_OR_CATCH_EXCEPTION(sink.prepare(runtime_state.get()));
- RETURN_IF_ERROR(sink.open(runtime_state.get()));
-
- // 2. Put the block into block queue.
- std::unique_ptr<doris::vectorized::Block> _block =
- doris::vectorized::Block::create_unique();
- bool first = true;
- bool eof = false;
- while (!eof) {
- // TODO what to do if scan one block error
- RETURN_IF_ERROR(file_scan_node.get_next(runtime_state.get(),
_block.get(), &eof));
- RETURN_IF_ERROR(sink.send(runtime_state.get(), _block.get()));
- std::shared_ptr<doris::vectorized::FutureBlock> future_block =
- std::make_shared<doris::vectorized::FutureBlock>();
- future_block->swap(*(_block.get()));
- future_block->set_info(ctx->schema_version, load_id, first, eof);
- // TODO what to do if add one block error
- if (load_block_queue == nullptr) {
- RETURN_IF_ERROR(_get_first_block_load_queue(ctx->db_id,
ctx->table_id, future_block,
- load_block_queue));
- ctx->label = load_block_queue->label;
- ctx->txn_id = load_block_queue->txn_id;
- }
- if (future_block->rows() > 0) {
- future_blocks.emplace_back(future_block);
- }
- RETURN_IF_ERROR(load_block_queue->add_block(future_block));
- first = false;
- }
- ctx->number_unselected_rows =
runtime_state->num_rows_load_unselected();
- ctx->number_filtered_rows = runtime_state->num_rows_load_filtered();
- ctx->error_url = runtime_state->get_error_log_file_path();
- if (!runtime_state->get_error_log_file_path().empty()) {
- LOG(INFO) << "id=" << print_id(load_id)
- << ", url=" << runtime_state->get_error_log_file_path()
- << ", load rows=" << runtime_state->num_rows_load_total()
- << ", filter rows=" <<
runtime_state->num_rows_load_filtered()
- << ", unselect rows=" <<
runtime_state->num_rows_load_unselected()
- << ", success rows=" <<
runtime_state->num_rows_load_success();
- }
- }
-
- int64_t total_rows = 0;
- int64_t loaded_rows = 0;
- // 3. wait to wal
- for (const auto& future_block : future_blocks) {
- std::unique_lock<doris::Mutex> l(*(future_block->lock));
- if (!future_block->is_handled()) {
- future_block->cv->wait(l);
- }
- // future_block->get_status()
- total_rows += future_block->get_total_rows();
- loaded_rows += future_block->get_loaded_rows();
- }
- ctx->number_total_rows = total_rows + ctx->number_unselected_rows +
ctx->number_filtered_rows;
- ctx->number_loaded_rows = loaded_rows;
- ctx->number_filtered_rows += total_rows - ctx->number_loaded_rows;
- ctx->promise.set_value(Status::OK());
- VLOG_DEBUG << "finish read all block of pipe=" << ctx->id.to_string()
- << ", total rows=" << ctx->number_total_rows
- << ", loaded rows=" << ctx->number_loaded_rows
- << ", filtered rows=" << ctx->number_filtered_rows
- << ", unselected rows=" << ctx->number_unselected_rows;
- return Status::OK();
-}
-
-Status GroupCommitMgr::_get_first_block_load_queue(
+Status GroupCommitMgr::get_first_block_load_queue(
int64_t db_id, int64_t table_id,
std::shared_ptr<vectorized::FutureBlock> block,
std::shared_ptr<LoadBlockQueue>& load_block_queue) {
std::shared_ptr<GroupCommitTable> group_commit_table;
{
std::lock_guard wlock(_lock);
if (_table_map.find(table_id) == _table_map.end()) {
- _table_map.emplace(table_id,
- std::make_shared<GroupCommitTable>(_exec_env,
db_id, table_id));
+ _table_map.emplace(table_id, std::make_shared<GroupCommitTable>(
+ _exec_env,
_thread_pool.get(), db_id, table_id));
}
group_commit_table = _table_map[table_id];
}
diff --git a/be/src/runtime/group_commit_mgr.h
b/be/src/runtime/group_commit_mgr.h
index 01a0905c404..aa8d05534ca 100644
--- a/be/src/runtime/group_commit_mgr.h
+++ b/be/src/runtime/group_commit_mgr.h
@@ -53,6 +53,7 @@ public:
Status add_block(std::shared_ptr<vectorized::FutureBlock> block);
Status get_block(vectorized::Block* block, bool* find_block, bool* eos);
+ Status add_load_id(const UniqueId& load_id);
void remove_load_id(const UniqueId& load_id);
void cancel(const Status& st);
@@ -76,8 +77,9 @@ private:
class GroupCommitTable {
public:
- GroupCommitTable(ExecEnv* exec_env, int64_t db_id, int64_t table_id)
- : _exec_env(exec_env), _db_id(db_id), _table_id(table_id) {};
+ GroupCommitTable(ExecEnv* exec_env, doris::ThreadPool* thread_pool,
int64_t db_id,
+ int64_t table_id)
+ : _exec_env(exec_env), _thread_pool(thread_pool), _db_id(db_id),
_table_id(table_id) {};
Status get_first_block_load_queue(int64_t table_id,
std::shared_ptr<vectorized::FutureBlock>
block,
std::shared_ptr<LoadBlockQueue>&
load_block_queue);
@@ -85,8 +87,7 @@ public:
std::shared_ptr<LoadBlockQueue>&
load_block_queue);
private:
- Status _create_group_commit_load(int64_t table_id,
- std::shared_ptr<LoadBlockQueue>&
load_block_queue);
+ Status _create_group_commit_load(std::shared_ptr<LoadBlockQueue>&
load_block_queue);
Status _exec_plan_fragment(int64_t db_id, int64_t table_id, const
std::string& label,
int64_t txn_id, bool is_pipeline,
const TExecPlanFragmentParams& params,
@@ -96,13 +97,14 @@ private:
bool prepare_failed, RuntimeState* state);
ExecEnv* _exec_env;
+ ThreadPool* _thread_pool;
int64_t _db_id;
int64_t _table_id;
doris::Mutex _lock;
+ doris::ConditionVariable _cv;
// fragment_instance_id to load_block_queue
std::unordered_map<UniqueId, std::shared_ptr<LoadBlockQueue>>
_load_block_queues;
-
- doris::Mutex _request_fragment_mutex;
+ bool _need_plan_fragment = false;
};
class GroupCommitMgr {
@@ -119,22 +121,17 @@ public:
const PGroupCommitInsertRequest* request,
PGroupCommitInsertResponse* response);
- // stream load
- Status group_commit_stream_load(std::shared_ptr<StreamLoadContext> ctx);
-
// used when init group_commit_scan_node
Status get_load_block_queue(int64_t table_id, const TUniqueId& instance_id,
std::shared_ptr<LoadBlockQueue>&
load_block_queue);
+ Status get_first_block_load_queue(int64_t db_id, int64_t table_id,
+ std::shared_ptr<vectorized::FutureBlock>
block,
+ std::shared_ptr<LoadBlockQueue>&
load_block_queue);
private:
// used by insert into
Status _append_row(std::shared_ptr<io::StreamLoadPipe> pipe,
const PGroupCommitInsertRequest* request);
- // used by stream load
- Status _group_commit_stream_load(std::shared_ptr<StreamLoadContext> ctx);
- Status _get_first_block_load_queue(int64_t db_id, int64_t table_id,
-
std::shared_ptr<vectorized::FutureBlock> block,
- std::shared_ptr<LoadBlockQueue>&
load_block_queue);
ExecEnv* _exec_env;
@@ -144,6 +141,7 @@ private:
// thread pool to handle insert into: append data to pipe
std::unique_ptr<doris::ThreadPool> _insert_into_thread_pool;
+ std::unique_ptr<doris::ThreadPool> _thread_pool;
};
} // namespace doris
\ No newline at end of file
diff --git a/be/src/runtime/stream_load/stream_load_executor.cpp
b/be/src/runtime/stream_load/stream_load_executor.cpp
index b85d72b3b2a..32e4d76dc7c 100644
--- a/be/src/runtime/stream_load/stream_load_executor.cpp
+++ b/be/src/runtime/stream_load/stream_load_executor.cpp
@@ -74,6 +74,10 @@ Status
StreamLoadExecutor::execute_plan_fragment(std::shared_ptr<StreamLoadConte
if (ctx->put_result.__isset.params) {
st = _exec_env->fragment_mgr()->exec_plan_fragment(
ctx->put_result.params, [ctx, this](RuntimeState* state,
Status* status) {
+ if (ctx->group_commit) {
+ ctx->label = state->import_label();
+ ctx->txn_id = state->wal_id();
+ }
ctx->exec_env()->new_load_stream_mgr()->remove(ctx->id);
ctx->commit_infos =
std::move(state->tablet_commit_infos());
if (status->ok()) {
@@ -84,7 +88,7 @@ Status
StreamLoadExecutor::execute_plan_fragment(std::shared_ptr<StreamLoadConte
int64_t num_selected_rows =
ctx->number_total_rows -
ctx->number_unselected_rows;
- if (num_selected_rows > 0 &&
+ if (!ctx->group_commit && num_selected_rows > 0 &&
(double)ctx->number_filtered_rows /
num_selected_rows >
ctx->max_filter_ratio) {
// NOTE: Do not modify the error message here, for
historical reasons,
@@ -147,6 +151,10 @@ Status
StreamLoadExecutor::execute_plan_fragment(std::shared_ptr<StreamLoadConte
} else {
st = _exec_env->fragment_mgr()->exec_plan_fragment(
ctx->put_result.pipeline_params, [ctx, this](RuntimeState*
state, Status* status) {
+ if (ctx->group_commit) {
+ ctx->label = state->import_label();
+ ctx->txn_id = state->wal_id();
+ }
ctx->exec_env()->new_load_stream_mgr()->remove(ctx->id);
ctx->commit_infos =
std::move(state->tablet_commit_infos());
if (status->ok()) {
@@ -157,7 +165,7 @@ Status
StreamLoadExecutor::execute_plan_fragment(std::shared_ptr<StreamLoadConte
int64_t num_selected_rows =
ctx->number_total_rows -
ctx->number_unselected_rows;
- if (num_selected_rows > 0 &&
+ if (!ctx->group_commit && num_selected_rows > 0 &&
(double)ctx->number_filtered_rows /
num_selected_rows >
ctx->max_filter_ratio) {
// NOTE: Do not modify the error message here, for
historical reasons,
diff --git a/be/src/vec/core/future_block.cpp b/be/src/vec/core/future_block.cpp
index 3f3f59c3446..19cb09163a4 100644
--- a/be/src/vec/core/future_block.cpp
+++ b/be/src/vec/core/future_block.cpp
@@ -21,11 +21,9 @@
namespace doris::vectorized {
-void FutureBlock::set_info(int64_t schema_version, const TUniqueId& load_id,
bool first, bool eos) {
+void FutureBlock::set_info(int64_t schema_version, const TUniqueId& load_id) {
this->_schema_version = schema_version;
this->_load_id = load_id;
- this->_first = first;
- this->_eos = eos;
}
void FutureBlock::set_result(Status status, int64_t total_rows, int64_t
loaded_rows) {
@@ -35,7 +33,7 @@ void FutureBlock::set_result(Status status, int64_t
total_rows, int64_t loaded_r
void FutureBlock::swap_future_block(std::shared_ptr<FutureBlock> other) {
Block::swap(*other.get());
- set_info(other->_schema_version, other->_load_id, other->_first,
other->_eos);
+ set_info(other->_schema_version, other->_load_id);
lock = other->lock;
cv = other->cv;
_result = other->_result;
diff --git a/be/src/vec/core/future_block.h b/be/src/vec/core/future_block.h
index ca63fa3799e..ee943b3f79c 100644
--- a/be/src/vec/core/future_block.h
+++ b/be/src/vec/core/future_block.h
@@ -30,12 +30,9 @@ class FutureBlock : public Block {
public:
FutureBlock() : Block() {};
void swap_future_block(std::shared_ptr<FutureBlock> other);
- void set_info(int64_t block_schema_version, const TUniqueId& load_id, bool
first,
- bool block_eos);
+ void set_info(int64_t block_schema_version, const TUniqueId& load_id);
int64_t get_schema_version() { return _schema_version; }
TUniqueId get_load_id() { return _load_id; }
- bool is_first() { return _first; }
- bool is_eos() { return _eos; }
// hold lock before call this function
void set_result(Status status, int64_t total_rows = 0, int64_t loaded_rows
= 0);
@@ -50,8 +47,6 @@ public:
private:
int64_t _schema_version;
TUniqueId _load_id;
- bool _first = false;
- bool _eos = false;
std::shared_ptr<std::tuple<bool, Status, int64_t, int64_t>> _result =
std::make_shared<std::tuple<bool, Status, int64_t,
int64_t>>(false, Status::OK(), 0, 0);
diff --git a/be/src/vec/sink/group_commit_block_sink.cpp
b/be/src/vec/sink/group_commit_block_sink.cpp
index 08f6d87ade7..d7df3a2e698 100644
--- a/be/src/vec/sink/group_commit_block_sink.cpp
+++ b/be/src/vec/sink/group_commit_block_sink.cpp
@@ -26,7 +26,7 @@
namespace doris {
-namespace stream_load {
+namespace vectorized {
GroupCommitBlockSink::GroupCommitBlockSink(ObjectPool* pool, const
RowDescriptor& row_desc,
const std::vector<TExpr>& texprs,
Status* status)
@@ -44,6 +44,9 @@ Status GroupCommitBlockSink::init(const TDataSink& t_sink) {
_tuple_desc_id = table_sink.tuple_id;
_schema.reset(new OlapTableSchemaParam());
RETURN_IF_ERROR(_schema->init(table_sink.schema));
+ _db_id = table_sink.db_id;
+ _table_id = table_sink.table_id;
+ _base_schema_version = table_sink.base_schema_version;
return Status::OK();
}
@@ -77,6 +80,31 @@ Status GroupCommitBlockSink::open(RuntimeState* state) {
return vectorized::VExpr::open(_output_vexpr_ctxs, state);
}
+Status GroupCommitBlockSink::close(RuntimeState* state, Status close_status) {
+ if (_load_block_queue) {
+ _load_block_queue->remove_load_id(_load_id);
+ }
+ RETURN_IF_ERROR(DataSink::close(state, close_status));
+ RETURN_IF_ERROR(close_status);
+ // wait to wal
+ int64_t total_rows = 0;
+ int64_t loaded_rows = 0;
+ for (const auto& future_block : _future_blocks) {
+ std::unique_lock<doris::Mutex> l(*(future_block->lock));
+ if (!future_block->is_handled()) {
+ future_block->cv->wait(l);
+ }
+ // future_block->get_status()
+ loaded_rows += future_block->get_loaded_rows();
+ total_rows += future_block->get_total_rows();
+ }
+ state->update_num_rows_load_filtered(_block_convertor->num_filtered_rows()
+ total_rows -
+ loaded_rows);
+ state->set_num_rows_load_total(loaded_rows +
state->num_rows_load_unselected() +
+ state->num_rows_load_filtered());
+ return Status::OK();
+}
+
Status GroupCommitBlockSink::send(RuntimeState* state, vectorized::Block*
input_block, bool eos) {
SCOPED_CONSUME_MEM_TRACKER(_mem_tracker.get());
Status status = Status::OK();
@@ -97,9 +125,43 @@ Status GroupCommitBlockSink::send(RuntimeState* state,
vectorized::Block* input_
bool has_filtered_rows = false;
RETURN_IF_ERROR(_block_convertor->validate_and_convert_block(
state, input_block, block, _output_vexpr_ctxs, rows,
has_filtered_rows));
- block->swap(*input_block);
- return Status::OK();
+ // add block into block queue
+ return _add_block(state, block);
+}
+
+Status GroupCommitBlockSink::_add_block(RuntimeState* state,
+ std::shared_ptr<vectorized::Block>
block) {
+ if (block->rows() == 0) {
+ return Status::OK();
+ }
+ // add block to queue
+ auto _cur_mutable_block =
vectorized::MutableBlock::create_unique(block->clone_empty());
+ {
+ vectorized::IColumn::Selector selector;
+ for (auto i = 0; i < block->rows(); i++) {
+ selector.emplace_back(i);
+ }
+ block->append_to_block_by_selector(_cur_mutable_block.get(), selector);
+ }
+ std::shared_ptr<vectorized::Block> output_block =
+
std::make_shared<vectorized::Block>(_cur_mutable_block->to_block());
+
+ std::shared_ptr<doris::vectorized::FutureBlock> future_block =
+ std::make_shared<doris::vectorized::FutureBlock>();
+ future_block->swap(*(output_block.get()));
+ TUniqueId load_id;
+ load_id.__set_hi(load_id.hi);
+ load_id.__set_lo(load_id.lo);
+ future_block->set_info(_base_schema_version, load_id);
+ if (_load_block_queue == nullptr) {
+
RETURN_IF_ERROR(state->exec_env()->group_commit_mgr()->get_first_block_load_queue(
+ _db_id, _table_id, future_block, _load_block_queue));
+ state->set_import_label(_load_block_queue->label);
+ state->set_wal_id(_load_block_queue->txn_id);
+ }
+ _future_blocks.emplace_back(future_block);
+ return _load_block_queue->add_block(future_block);
}
-} // namespace stream_load
+} // namespace vectorized
} // namespace doris
diff --git a/be/src/vec/sink/group_commit_block_sink.h
b/be/src/vec/sink/group_commit_block_sink.h
index a309413f5ad..ff798ffb000 100644
--- a/be/src/vec/sink/group_commit_block_sink.h
+++ b/be/src/vec/sink/group_commit_block_sink.h
@@ -24,8 +24,11 @@ namespace doris {
class OlapTableSchemaParam;
class MemTracker;
+class LoadBlockQueue;
-namespace stream_load {
+namespace vectorized {
+
+class FutureBlock;
class GroupCommitBlockSink : public DataSink {
public:
@@ -42,7 +45,11 @@ public:
Status send(RuntimeState* state, vectorized::Block* block, bool eos =
false) override;
+ Status close(RuntimeState* state, Status close_status) override;
+
private:
+ Status _add_block(RuntimeState* state, std::shared_ptr<vectorized::Block>
block);
+
vectorized::VExprContextSPtrs _output_vexpr_ctxs;
int _tuple_desc_id = -1;
@@ -53,7 +60,14 @@ private:
// this is tuple descriptor of destination OLAP table
TupleDescriptor* _output_tuple_desc = nullptr;
std::unique_ptr<vectorized::OlapTableBlockConvertor> _block_convertor;
+
+ int64_t _db_id;
+ int64_t _table_id;
+ int64_t _base_schema_version = 0;
+ UniqueId _load_id;
+ std::shared_ptr<LoadBlockQueue> _load_block_queue;
+ std::vector<std::shared_ptr<vectorized::FutureBlock>> _future_blocks;
};
-} // namespace stream_load
+} // namespace vectorized
} // namespace doris
diff --git
a/fe/fe-core/src/main/java/org/apache/doris/analysis/NativeInsertStmt.java
b/fe/fe-core/src/main/java/org/apache/doris/analysis/NativeInsertStmt.java
index 637462931b6..dad7dc46015 100644
--- a/fe/fe-core/src/main/java/org/apache/doris/analysis/NativeInsertStmt.java
+++ b/fe/fe-core/src/main/java/org/apache/doris/analysis/NativeInsertStmt.java
@@ -48,6 +48,7 @@ import org.apache.doris.mysql.privilege.PrivPredicate;
import org.apache.doris.planner.DataPartition;
import org.apache.doris.planner.DataSink;
import org.apache.doris.planner.ExportSink;
+import org.apache.doris.planner.GroupCommitBlockSink;
import org.apache.doris.planner.GroupCommitOlapTableSink;
import org.apache.doris.planner.OlapTableSink;
import org.apache.doris.planner.StreamLoadPlanner;
@@ -167,6 +168,7 @@ public class NativeInsertStmt extends InsertStmt {
private long tableId = -1;
// true if be generates an insert from group commit tvf stmt and executes
to load data
public boolean isGroupCommitTvf = false;
+ public boolean isGroupCommitStreamLoadSql = false;
private boolean isFromDeleteOrUpdateStmt = false;
@@ -933,10 +935,17 @@ public class NativeInsertStmt extends InsertStmt {
}
if (targetTable instanceof OlapTable) {
checkInnerGroupCommit();
- OlapTableSink sink = isGroupCommitTvf ? new
GroupCommitOlapTableSink((OlapTable) targetTable, olapTuple,
- targetPartitionIds,
analyzer.getContext().getSessionVariable().isEnableSingleReplicaInsert())
- : new OlapTableSink((OlapTable) targetTable, olapTuple,
targetPartitionIds,
-
analyzer.getContext().getSessionVariable().isEnableSingleReplicaInsert());
+ OlapTableSink sink;
+ if (isGroupCommitTvf) {
+ sink = new GroupCommitOlapTableSink((OlapTable) targetTable,
olapTuple,
+ targetPartitionIds,
analyzer.getContext().getSessionVariable().isEnableSingleReplicaInsert());
+ } else if (isGroupCommitStreamLoadSql) {
+ sink = new GroupCommitBlockSink((OlapTable) targetTable,
olapTuple,
+ targetPartitionIds,
analyzer.getContext().getSessionVariable().isEnableSingleReplicaInsert());
+ } else {
+ sink = new OlapTableSink((OlapTable) targetTable, olapTuple,
targetPartitionIds,
+
analyzer.getContext().getSessionVariable().isEnableSingleReplicaInsert());
+ }
dataSink = sink;
sink.setPartialUpdateInputColumns(isPartialUpdate,
partialUpdateCols);
dataPartition = dataSink.getOutputPartition();
@@ -1092,7 +1101,8 @@ public class NativeInsertStmt extends InsertStmt {
streamLoadPutRequest.setDb(db.getFullName()).setMaxFilterRatio(1)
.setTbl(getTbl())
.setFileType(TFileType.FILE_STREAM).setFormatType(TFileFormatType.FORMAT_CSV_PLAIN)
-
.setMergeType(TMergeType.APPEND).setThriftRpcTimeoutMs(5000).setLoadId(queryId);
+
.setMergeType(TMergeType.APPEND).setThriftRpcTimeoutMs(5000).setLoadId(queryId)
+ .setGroupCommit(true);
StreamLoadTask streamLoadTask =
StreamLoadTask.fromTStreamLoadPutRequest(streamLoadPutRequest);
StreamLoadPlanner planner = new StreamLoadPlanner((Database)
getDbObj(), olapTable, streamLoadTask);
// Will using load id as query id in fragment
diff --git a/be/src/vec/core/future_block.cpp
b/fe/fe-core/src/main/java/org/apache/doris/planner/GroupCommitBlockSink.java
similarity index 50%
copy from be/src/vec/core/future_block.cpp
copy to
fe/fe-core/src/main/java/org/apache/doris/planner/GroupCommitBlockSink.java
index 3f3f59c3446..63ab187335c 100644
--- a/be/src/vec/core/future_block.cpp
+++
b/fe/fe-core/src/main/java/org/apache/doris/planner/GroupCommitBlockSink.java
@@ -15,30 +15,22 @@
// specific language governing permissions and limitations
// under the License.
-#include "vec/core/future_block.h"
+package org.apache.doris.planner;
-#include <tuple>
+import org.apache.doris.analysis.TupleDescriptor;
+import org.apache.doris.catalog.OlapTable;
+import org.apache.doris.thrift.TDataSinkType;
-namespace doris::vectorized {
+import java.util.List;
-void FutureBlock::set_info(int64_t schema_version, const TUniqueId& load_id,
bool first, bool eos) {
- this->_schema_version = schema_version;
- this->_load_id = load_id;
- this->_first = first;
- this->_eos = eos;
-}
+public class GroupCommitBlockSink extends OlapTableSink {
-void FutureBlock::set_result(Status status, int64_t total_rows, int64_t
loaded_rows) {
- auto result = std::make_tuple(true, status, total_rows, loaded_rows);
- result.swap(*_result);
-}
+ public GroupCommitBlockSink(OlapTable dstTable, TupleDescriptor
tupleDescriptor, List<Long> partitionIds,
+ boolean singleReplicaLoad) {
+ super(dstTable, tupleDescriptor, partitionIds, singleReplicaLoad);
+ }
-void FutureBlock::swap_future_block(std::shared_ptr<FutureBlock> other) {
- Block::swap(*other.get());
- set_info(other->_schema_version, other->_load_id, other->_first,
other->_eos);
- lock = other->lock;
- cv = other->cv;
- _result = other->_result;
+ protected TDataSinkType getDataSinkType() {
+ return TDataSinkType.GROUP_COMMIT_BLOCK_SINK;
+ }
}
-
-} // namespace doris::vectorized
diff --git
a/fe/fe-core/src/main/java/org/apache/doris/planner/OlapTableSink.java
b/fe/fe-core/src/main/java/org/apache/doris/planner/OlapTableSink.java
index bd1f6c8bff2..23bec446f4b 100644
--- a/fe/fe-core/src/main/java/org/apache/doris/planner/OlapTableSink.java
+++ b/fe/fe-core/src/main/java/org/apache/doris/planner/OlapTableSink.java
@@ -125,6 +125,7 @@ public class OlapTableSink extends DataSink {
tSink.setLoadId(loadId);
tSink.setTxnId(txnId);
tSink.setDbId(dbId);
+ tSink.setBaseSchemaVersion(dstTable.getBaseSchemaVersion());
tSink.setLoadChannelTimeoutS(loadChannelTimeoutS);
tSink.setSendBatchParallelism(sendBatchParallelism);
this.isStrictMode = isStrictMode;
diff --git
a/fe/fe-core/src/main/java/org/apache/doris/planner/StreamLoadPlanner.java
b/fe/fe-core/src/main/java/org/apache/doris/planner/StreamLoadPlanner.java
index a6e47f33f26..934bca7ac0c 100644
--- a/fe/fe-core/src/main/java/org/apache/doris/planner/StreamLoadPlanner.java
+++ b/fe/fe-core/src/main/java/org/apache/doris/planner/StreamLoadPlanner.java
@@ -48,6 +48,7 @@ import org.apache.doris.load.loadv2.LoadTask;
import org.apache.doris.load.routineload.RoutineLoadJob;
import org.apache.doris.service.FrontendOptions;
import org.apache.doris.task.LoadTaskInfo;
+import org.apache.doris.task.StreamLoadTask;
import org.apache.doris.thrift.PaloInternalServiceVersion;
import org.apache.doris.thrift.TBrokerFileStatus;
import org.apache.doris.thrift.TExecPlanFragmentParams;
@@ -254,10 +255,15 @@ public class StreamLoadPlanner {
// create dest sink
List<Long> partitionIds = getAllPartitionIds();
- OlapTableSink olapTableSink = new OlapTableSink(destTable, tupleDesc,
partitionIds,
- Config.enable_single_replica_load);
- olapTableSink.init(loadId, taskInfo.getTxnId(), db.getId(), timeout,
- taskInfo.getSendBatchParallelism(),
taskInfo.isLoadToSingleTablet(), taskInfo.isStrictMode());
+ OlapTableSink olapTableSink;
+ if (taskInfo instanceof StreamLoadTask && ((StreamLoadTask)
taskInfo).isGroupCommit()) {
+ olapTableSink = new GroupCommitBlockSink(destTable, tupleDesc,
partitionIds,
+ Config.enable_single_replica_load);
+ } else {
+ olapTableSink = new OlapTableSink(destTable, tupleDesc,
partitionIds, Config.enable_single_replica_load);
+ }
+ olapTableSink.init(loadId, taskInfo.getTxnId(), db.getId(), timeout,
taskInfo.getSendBatchParallelism(),
+ taskInfo.isLoadToSingleTablet(), taskInfo.isStrictMode());
olapTableSink.setPartialUpdateInputColumns(isPartialUpdate,
partialUpdateInputColumns);
olapTableSink.complete(analyzer);
@@ -463,8 +469,13 @@ public class StreamLoadPlanner {
// create dest sink
List<Long> partitionIds = getAllPartitionIds();
- OlapTableSink olapTableSink = new OlapTableSink(destTable, tupleDesc,
partitionIds,
- Config.enable_single_replica_load);
+ OlapTableSink olapTableSink;
+ if (taskInfo instanceof StreamLoadTask && ((StreamLoadTask)
taskInfo).isGroupCommit()) {
+ olapTableSink = new GroupCommitBlockSink(destTable, tupleDesc,
partitionIds,
+ Config.enable_single_replica_load);
+ } else {
+ olapTableSink = new OlapTableSink(destTable, tupleDesc,
partitionIds, Config.enable_single_replica_load);
+ }
olapTableSink.init(loadId, taskInfo.getTxnId(), db.getId(), timeout,
taskInfo.getSendBatchParallelism(),
taskInfo.isLoadToSingleTablet(), taskInfo.isStrictMode());
olapTableSink.setPartialUpdateInputColumns(isPartialUpdate,
partialUpdateInputColumns);
diff --git
a/fe/fe-core/src/main/java/org/apache/doris/service/FrontendServiceImpl.java
b/fe/fe-core/src/main/java/org/apache/doris/service/FrontendServiceImpl.java
index 7d8626c1ea0..d9d0a03f3d0 100644
--- a/fe/fe-core/src/main/java/org/apache/doris/service/FrontendServiceImpl.java
+++ b/fe/fe-core/src/main/java/org/apache/doris/service/FrontendServiceImpl.java
@@ -2090,8 +2090,11 @@ public class FrontendServiceImpl implements
FrontendService.Iface {
NativeInsertStmt parsedStmt = (NativeInsertStmt)
SqlParserUtils.getFirstStmt(parser);
parsedStmt.setOrigStmt(new OriginStatement(originStmt, 0));
parsedStmt.setUserInfo(ctx.getCurrentUserIdentity());
- if (request.isGroupCommit() && parsedStmt.getLabel() != null) {
- throw new AnalysisException("label and group_commit can't be
set at the same time");
+ if (request.isGroupCommit()) {
+ if (parsedStmt.getLabel() != null) {
+ throw new AnalysisException("label and group_commit can't
be set at the same time");
+ }
+ parsedStmt.isGroupCommitStreamLoadSql = true;
}
StmtExecutor executor = new StmtExecutor(ctx, parsedStmt);
ctx.setExecutor(executor);
@@ -2235,13 +2238,15 @@ public class FrontendServiceImpl implements
FrontendService.Iface {
StreamLoadPlanner planner = new StreamLoadPlanner(db, table,
streamLoadTask);
TPipelineFragmentParams plan =
planner.planForPipeline(streamLoadTask.getId(),
multiTableFragmentInstanceIdIndex);
- // add table indexes to transaction state
- TransactionState txnState = Env.getCurrentGlobalTransactionMgr()
- .getTransactionState(db.getId(), request.getTxnId());
- if (txnState == null) {
- throw new UserException("txn does not exist: " +
request.getTxnId());
+ if (!request.isGroupCommit()) {
+ // add table indexes to transaction state
+ TransactionState txnState =
Env.getCurrentGlobalTransactionMgr()
+ .getTransactionState(db.getId(), request.getTxnId());
+ if (txnState == null) {
+ throw new UserException("txn does not exist: " +
request.getTxnId());
+ }
+ txnState.addTableIndexes(table);
}
- txnState.addTableIndexes(table);
return plan;
} finally {
table.readUnlock();
diff --git
a/fe/fe-core/src/main/java/org/apache/doris/tablefunction/GroupCommitTableValuedFunction.java
b/fe/fe-core/src/main/java/org/apache/doris/tablefunction/GroupCommitTableValuedFunction.java
index 3abcbee06ec..0b08f8c988c 100644
---
a/fe/fe-core/src/main/java/org/apache/doris/tablefunction/GroupCommitTableValuedFunction.java
+++
b/fe/fe-core/src/main/java/org/apache/doris/tablefunction/GroupCommitTableValuedFunction.java
@@ -69,10 +69,10 @@ public class GroupCommitTableValuedFunction extends
ExternalFileTableValuedFunct
Column deleteSignColumn = ((OlapTable) table).getDeleteSignColumn();
List<Column> tableColumns = table.getBaseSchema(false);
for (int i = 1; i <= tableColumns.size(); i++) {
- fileColumns.add(new Column("c" + i, tableColumns.get(i -
1).getDataType(), true));
+ fileColumns.add(new Column("c" + i, tableColumns.get(i -
1).getType(), true));
}
if (deleteSignColumn != null) {
- fileColumns.add(new Column("c" + (tableColumns.size() + 1),
deleteSignColumn.getDataType(), true));
+ fileColumns.add(new Column("c" + (tableColumns.size() + 1),
deleteSignColumn.getType(), true));
}
return fileColumns;
}
diff --git a/fe/fe-core/src/main/java/org/apache/doris/task/StreamLoadTask.java
b/fe/fe-core/src/main/java/org/apache/doris/task/StreamLoadTask.java
index 4bcf28da156..485a3599b38 100644
--- a/fe/fe-core/src/main/java/org/apache/doris/task/StreamLoadTask.java
+++ b/fe/fe-core/src/main/java/org/apache/doris/task/StreamLoadTask.java
@@ -94,6 +94,8 @@ public class StreamLoadTask implements LoadTaskInfo {
private byte escape = 0;
+ private boolean groupCommit = false;
+
public StreamLoadTask(TUniqueId id, long txnId, TFileType fileType,
TFileFormatType formatType,
TFileCompressType compressType) {
this.id = id;
@@ -312,6 +314,7 @@ public class StreamLoadTask implements LoadTaskInfo {
request.getFileType(), request.getFormatType(),
request.getCompressType());
streamLoadTask.setOptionalFromTSLPutRequest(request);
+ streamLoadTask.setGroupCommit(request.isGroupCommit());
if (request.isSetFileSize()) {
streamLoadTask.fileSize = request.getFileSize();
}
@@ -519,5 +522,13 @@ public class StreamLoadTask implements LoadTaskInfo {
public double getMaxFilterRatio() {
return maxFilterRatio;
}
+
+ public void setGroupCommit(boolean groupCommit) {
+ this.groupCommit = groupCommit;
+ }
+
+ public boolean isGroupCommit() {
+ return groupCommit;
+ }
}
diff --git a/gensrc/thrift/DataSinks.thrift b/gensrc/thrift/DataSinks.thrift
index f1cad4cf266..91458072d3f 100644
--- a/gensrc/thrift/DataSinks.thrift
+++ b/gensrc/thrift/DataSinks.thrift
@@ -37,6 +37,7 @@ enum TDataSinkType {
JDBC_TABLE_SINK,
MULTI_CAST_DATA_STREAM_SINK,
GROUP_COMMIT_OLAP_TABLE_SINK,
+ GROUP_COMMIT_BLOCK_SINK,
}
enum TResultSinkType {
@@ -255,6 +256,7 @@ struct TOlapTableSink {
18: optional Descriptors.TOlapTableLocationParam slave_location
19: optional i64 txn_timeout_s // timeout of load txn in second
20: optional bool write_file_cache
+ 21: optional i64 base_schema_version
}
struct TDataSink {
diff --git
a/regression-test/data/insert_p0/insert_group_commit_into_duplicate.out
b/regression-test/data/insert_p0/insert_group_commit_into_duplicate.out
index 5ee9ddd0be0..97fc1897552 100644
--- a/regression-test/data/insert_p0/insert_group_commit_into_duplicate.out
+++ b/regression-test/data/insert_p0/insert_group_commit_into_duplicate.out
@@ -91,3 +91,6 @@ q 50
q 50
q 50
+-- !sql --
+0 service_46da0dab-e27d-4820-aea2-9bfc15741615 1697032066304 0
3229b7cd-f3a2-4359-aa24-946388c9cc54 0
CgEwEiQzMjI5YjdjZC1mM2EyLTQzNTktYWEyNC05NDYzODhjOWNjNTQaggQY/6n597ExIP+p+fexMWIWCgh0YWdLZXlfMBIKdGFnVmFsdWVfMGIWCgh0YWdLZXlfMRIKdGFnVmFsdWVfMWIWCgh0YWdLZXlfMhIKdGFnVmFsdWVfMmIWCgh0YWdLZXlfMxIKdGFnVmFsdWVfM2IWCgh0YWdLZXlfNBIKdGFnVmFsdWVfNGIWCgh0YWdLZXlfNRIKdGFnVmFsdWVfNWIWCgh0YWdLZXlfNhIKdGFnVmFsdWVfNmIWCgh0YWdLZXlfNxIKdGFnVmFsdWVfN2IWCgh0YWdLZXlfOBIKdGFnVmFsdWVfOGIWCgh0YWdLZXlfORIKdGFnVm
[...]
+
diff --git
a/regression-test/suites/insert_p0/insert_group_commit_into_duplicate.groovy
b/regression-test/suites/insert_p0/insert_group_commit_into_duplicate.groovy
index eed1a26f143..c5ee05ac1e4 100644
--- a/regression-test/suites/insert_p0/insert_group_commit_into_duplicate.groovy
+++ b/regression-test/suites/insert_p0/insert_group_commit_into_duplicate.groovy
@@ -53,6 +53,20 @@ suite("insert_group_commit_into_duplicate") {
return false
}
+ def group_commit_insert = { sql, expected_row_count ->
+ def stmt = prepareStatement """ ${sql} """
+ def result = stmt.executeUpdate()
+ logger.info("insert result: " + result)
+ def serverInfo = (((StatementImpl) stmt).results).getServerInfo()
+ logger.info("result server info: " + serverInfo)
+ if (result != expected_row_count) {
+ logger.warn("insert result: " + result + ", expected_row_count: "
+ expected_row_count + ", sql: " + sql)
+ }
+ // assertEquals(result, expected_row_count)
+ assertTrue(serverInfo.contains("'status':'PREPARE'"))
+ assertTrue(serverInfo.contains("'label':'group_commit_"))
+ }
+
try {
// create table
sql """ drop table if exists ${table}; """
@@ -74,20 +88,6 @@ suite("insert_group_commit_into_duplicate") {
);
"""
- def group_commit_insert = { sql, expected_row_count ->
- def stmt = prepareStatement """ ${sql} """
- def result = stmt.executeUpdate()
- logger.info("insert result: " + result)
- def serverInfo = (((StatementImpl) stmt).results).getServerInfo()
- logger.info("result server info: " + serverInfo)
- if (result != expected_row_count) {
- logger.warn("insert result: " + result + ",
expected_row_count: " + expected_row_count + ", sql: " + sql)
- }
- // assertEquals(result, expected_row_count)
- assertTrue(serverInfo.contains("'status':'PREPARE'"))
- assertTrue(serverInfo.contains("'label':'group_commit_"))
- }
-
connect(user = context.config.jdbcUser, password =
context.config.jdbcPassword, url = context.config.jdbcUrl) {
sql """ set enable_insert_group_commit = true; """
// TODO
@@ -178,4 +178,54 @@ suite("insert_group_commit_into_duplicate") {
} finally {
// try_sql("DROP TABLE ${table}")
}
+
+ // table with array type
+ tableName = "insert_group_commit_into_duplicate_array"
+ table = dbName + "." + tableName
+ try {
+ // create table
+ sql """ drop table if exists ${table}; """
+
+ sql """
+ CREATE table ${table} (
+ teamID varchar(255),
+ service_id varchar(255),
+ start_time BigInt,
+ time_bucket BigInt ,
+ segment_id String ,
+ trace_id String ,
+ data_binary String ,
+ end_time BigInt ,
+ endpoint_id String ,
+ endpoint_name String ,
+ is_error Boolean ,
+ latency Int ,
+ service_instance_id String ,
+ statement String ,
+ tags Array<String>
+ ) UNIQUE key (`teamID`,`service_id`, `start_time`)
+ DISTRIBUTED BY hash(`start_time`)
+ BUCKETS 1
+ PROPERTIES ("replication_allocation" = "tag.location.default: 1")
+ """
+
+ connect(user = context.config.jdbcUser, password =
context.config.jdbcPassword, url = context.config.jdbcUrl) {
+ sql """ set enable_insert_group_commit = true; """
+ // TODO
+ sql """ set enable_nereids_dml = false; """
+
+ // 1. insert into
+ group_commit_insert """
+ INSERT INTO ${table} (`data_binary`, `end_time`, `endpoint_id`,
`endpoint_name`, `is_error`, `latency`, `segment_id`, `service_id`,
`service_instance_id`, `start_time`, `statement`, `tags`, `teamID`,
`time_bucket`, `trace_id`)
+ VALUES
+
('CgEwEiQzMjI5YjdjZC1mM2EyLTQzNTktYWEyNC05NDYzODhjOWNjNTQaggQY/6n597ExIP+p+fexMWIWCgh0YWdLZXlfMBIKdGFnVmFsdWVfMGIWCgh0YWdLZXlfMRIKdGFnVmFsdWVfMWIWCgh0YWdLZXlfMhIKdGFnVmFsdWVfMmIWCgh0YWdLZXlfMxIKdGFnVmFsdWVfM2IWCgh0YWdLZXlfNBIKdGFnVmFsdWVfNGIWCgh0YWdLZXlfNRIKdGFnVmFsdWVfNWIWCgh0YWdLZXlfNhIKdGFnVmFsdWVfNmIWCgh0YWdLZXlfNxIKdGFnVmFsdWVfN2IWCgh0YWdLZXlfOBIKdGFnVmFsdWVfOGIWCgh0YWdLZXlfORIKdGFnVmFsdWVfOWIYCgl0YWdLZXlfMTASC3RhZ1ZhbHVlXzEwYhgKCXRhZ0tleV8xMRILdGFnVmFsdWVfMTFiGAoJdGFnS2
[...]
+ 1697032066304, '36b2d9ff-4c25-49f3-a726-eea812564411',
'355f96cd-b1b1-4688-a5f6-a8e3f3a55c9a', false, 3,
'3229b7cd-f3a2-4359-aa24-946388c9cc54',
'service_46da0dab-e27d-4820-aea2-9bfc15741615',
'service_instanceac89a4b7-81f7-43e8-85ed-d2b578d98050', 1697032066304,
'statement: b9903670-3821-4f4c-a587-bbcf02c04b77', ['[tagKey_5=tagValue_5,
tagKey_3=tagValue_3, tagKey_1=tagValue_1, tagKey_16=tagValue_16,
tagKey_8=tagValue_8, tagKey_15=tagValue_15, tagKey_6=tagValue_6, tagKey_11=t
[...]
+ """, 1
+
+ getRowCount(1)
+ qt_sql """ select * from ${table}; """
+ }
+ } finally {
+ // try_sql("DROP TABLE ${table}")
+ }
}
diff --git
a/regression-test/suites/load_p0/http_stream/test_group_commit_http_stream.groovy
b/regression-test/suites/load_p0/http_stream/test_group_commit_http_stream.groovy
index b1c51286e00..818749f2ffc 100644
---
a/regression-test/suites/load_p0/http_stream/test_group_commit_http_stream.groovy
+++
b/regression-test/suites/load_p0/http_stream/test_group_commit_http_stream.groovy
@@ -49,6 +49,26 @@ suite("test_group_commit_http_stream") {
return false
}
+ def checkStreamLoadResult = { exception, result, total_rows, loaded_rows,
filtered_rows, unselected_rows ->
+ if (exception != null) {
+ throw exception
+ }
+ log.info("Stream load result: ${result}".toString())
+ def json = parseJson(result)
+ assertEquals("success", json.Status.toLowerCase())
+ assertTrue(json.GroupCommit)
+ assertTrue(json.Label.startsWith("group_commit_"))
+ assertEquals(total_rows, json.NumberTotalRows)
+ assertEquals(loaded_rows, json.NumberLoadedRows)
+ assertEquals(filtered_rows, json.NumberFilteredRows)
+ assertEquals(unselected_rows, json.NumberUnselectedRows)
+ if (filtered_rows > 0) {
+ assertFalse(json.ErrorURL.isEmpty())
+ } else {
+ assertTrue(json.ErrorURL == null || json.ErrorURL.isEmpty())
+ }
+ }
+
try {
// create table
sql """ drop table if exists ${tableName}; """
@@ -85,6 +105,10 @@ suite("test_group_commit_http_stream") {
unset 'label'
time 10000 // limit inflight 10s
+
+ check { result, exception, startTime, endTime ->
+ checkStreamLoadResult(exception, result, 4, 4, 0, 0)
+ }
}
}
@@ -101,6 +125,10 @@ suite("test_group_commit_http_stream") {
unset 'label'
time 10000 // limit inflight 10s
+
+ check { result, exception, startTime, endTime ->
+ checkStreamLoadResult(exception, result, 2, 2, 0, 0)
+ }
}
// stream load with different column order
@@ -116,6 +144,10 @@ suite("test_group_commit_http_stream") {
unset 'label'
time 10000 // limit inflight 10s
+
+ check { result, exception, startTime, endTime ->
+ checkStreamLoadResult(exception, result, 2, 2, 0, 0)
+ }
}
// stream load with where condition
@@ -133,17 +165,8 @@ suite("test_group_commit_http_stream") {
time 10000 // limit inflight 10s
check { result, exception, startTime, endTime ->
- if (exception != null) {
- throw exception
- }
- log.info("Stream load result: ${result}".toString())
- def json = parseJson(result)
- assertEquals("success", json.Status.toLowerCase())
- assertTrue(json.GroupCommit)
- // assertEquals(2, json.NumberTotalRows)
- assertEquals(1, json.NumberLoadedRows)
- assertEquals(0, json.NumberFilteredRows)
- // assertEquals(1, json.NumberUnselectedRows)
+ // TODO different with stream load: 2, 1, 0, 1
+ checkStreamLoadResult(exception, result, 1, 1, 0, 0)
}
}
@@ -160,6 +183,10 @@ suite("test_group_commit_http_stream") {
unset 'label'
time 10000 // limit inflight 10s
+
+ check { result, exception, startTime, endTime ->
+ checkStreamLoadResult(exception, result, 2, 2, 0, 0)
+ }
}
// stream load with filtered rows
@@ -179,18 +206,8 @@ suite("test_group_commit_http_stream") {
time 10000 // limit inflight 10s
check { result, exception, startTime, endTime ->
- if (exception != null) {
- throw exception
- }
- log.info("Stream load result: ${result}".toString())
- def json = parseJson(result)
- assertEquals("success", json.Status.toLowerCase())
- assertTrue(json.GroupCommit)
- // assertEquals(6, json.NumberTotalRows)
- // assertEquals(2, json.NumberLoadedRows)
- // assertEquals(3, json.NumberFilteredRows)
- // assertEquals(1, json.NumberUnselectedRows)
- // assertFalse(json.ErrorURL.isEmpty())
+ // TODO different with stream load: 6, 2, 3, 1
+ checkStreamLoadResult(exception, result, 6, 4, 2, 0)
}
}
@@ -217,7 +234,7 @@ suite("test_group_commit_http_stream") {
}
}
- getRowCount(7)
+ getRowCount(23)
qt_sql " SELECT * FROM ${tableName} order by id, name, score asc; "
} finally {
// try_sql("DROP TABLE ${tableName}")
@@ -301,18 +318,7 @@ suite("test_group_commit_http_stream") {
// if declared a check callback, the default check condition
will ignore.
// So you must check all condition
check { result, exception, startTime, endTime ->
- if (exception != null) {
- throw exception
- }
- log.info("Stream load ${i}, result: ${result}")
- def json = parseJson(result)
- assertEquals("success", json.Status.toLowerCase())
- assertEquals(json.NumberTotalRows, json.NumberLoadedRows)
- if (json.NumberLoadedRows != 600572) {
- logger.warn("Stream load ${i}, loaded rows:
${json.NumberLoadedRows}")
- }
- assertTrue(json.LoadBytes > 0)
- assertTrue(json.GroupCommit)
+ checkStreamLoadResult(exception, result, 600572, 600572,
0, 0)
}
}
}
diff --git
a/regression-test/suites/load_p0/stream_load/test_group_commit_stream_load.groovy
b/regression-test/suites/load_p0/stream_load/test_group_commit_stream_load.groovy
index 6034e35a109..b5f46f29225 100644
---
a/regression-test/suites/load_p0/stream_load/test_group_commit_stream_load.groovy
+++
b/regression-test/suites/load_p0/stream_load/test_group_commit_stream_load.groovy
@@ -48,6 +48,26 @@ suite("test_group_commit_stream_load") {
return false
}
+ def checkStreamLoadResult = { exception, result, total_rows, loaded_rows,
filtered_rows, unselected_rows ->
+ if (exception != null) {
+ throw exception
+ }
+ log.info("Stream load result: ${result}".toString())
+ def json = parseJson(result)
+ assertEquals("success", json.Status.toLowerCase())
+ assertTrue(json.GroupCommit)
+ assertTrue(json.Label.startsWith("group_commit_"))
+ assertEquals(total_rows, json.NumberTotalRows)
+ assertEquals(loaded_rows, json.NumberLoadedRows)
+ assertEquals(filtered_rows, json.NumberFilteredRows)
+ assertEquals(unselected_rows, json.NumberUnselectedRows)
+ if (filtered_rows > 0) {
+ assertFalse(json.ErrorURL.isEmpty())
+ } else {
+ assertTrue(json.ErrorURL == null || json.ErrorURL.isEmpty())
+ }
+ }
+
try {
// create table
sql """ drop table if exists ${tableName}; """
@@ -84,6 +104,10 @@ suite("test_group_commit_stream_load") {
unset 'label'
time 10000 // limit inflight 10s
+
+ check { result, exception, startTime, endTime ->
+ checkStreamLoadResult(exception, result, 4, 4, 0, 0)
+ }
}
}
@@ -98,6 +122,10 @@ suite("test_group_commit_stream_load") {
unset 'label'
time 10000 // limit inflight 10s
+
+ check { result, exception, startTime, endTime ->
+ checkStreamLoadResult(exception, result, 2, 2, 0, 0)
+ }
}
// stream load with different column order
@@ -111,6 +139,10 @@ suite("test_group_commit_stream_load") {
unset 'label'
time 10000 // limit inflight 10s
+
+ check { result, exception, startTime, endTime ->
+ checkStreamLoadResult(exception, result, 2, 2, 0, 0)
+ }
}
// stream load with where condition
@@ -127,17 +159,7 @@ suite("test_group_commit_stream_load") {
time 10000 // limit inflight 10s
check { result, exception, startTime, endTime ->
- if (exception != null) {
- throw exception
- }
- log.info("Stream load result: ${result}".toString())
- def json = parseJson(result)
- assertEquals("success", json.Status.toLowerCase())
- assertTrue(json.GroupCommit)
- assertEquals(2, json.NumberTotalRows)
- assertEquals(1, json.NumberLoadedRows)
- assertEquals(0, json.NumberFilteredRows)
- assertEquals(1, json.NumberUnselectedRows)
+ checkStreamLoadResult(exception, result, 2, 1, 0, 1)
}
}
@@ -152,6 +174,10 @@ suite("test_group_commit_stream_load") {
unset 'label'
time 10000 // limit inflight 10s
+
+ check { result, exception, startTime, endTime ->
+ checkStreamLoadResult(exception, result, 2, 2, 0, 0)
+ }
}
// stream load with filtered rows
@@ -168,18 +194,7 @@ suite("test_group_commit_stream_load") {
time 10000 // limit inflight 10s
check { result, exception, startTime, endTime ->
- if (exception != null) {
- throw exception
- }
- log.info("Stream load result: ${result}".toString())
- def json = parseJson(result)
- assertEquals("success", json.Status.toLowerCase())
- assertTrue(json.GroupCommit)
- assertEquals(6, json.NumberTotalRows)
- assertEquals(2, json.NumberLoadedRows)
- assertEquals(3, json.NumberFilteredRows)
- assertEquals(1, json.NumberUnselectedRows)
- assertFalse(json.ErrorURL.isEmpty())
+ checkStreamLoadResult(exception, result, 6, 2, 3, 1)
}
}
@@ -286,19 +301,7 @@ suite("test_group_commit_stream_load") {
// if declared a check callback, the default check condition
will ignore.
// So you must check all condition
check { result, exception, startTime, endTime ->
- if (exception != null) {
- throw exception
- }
- log.info("Stream load ${i}, result: ${result}")
- def json = parseJson(result)
- assertEquals("success", json.Status.toLowerCase())
- assertEquals(json.NumberTotalRows, json.NumberLoadedRows)
- if (json.NumberLoadedRows != 600572) {
- logger.warn("Stream load ${i}, loaded rows:
${json.NumberLoadedRows}")
- }
- // assertEquals(json.NumberLoadedRows, 600572)
- assertTrue(json.LoadBytes > 0)
- assertTrue(json.GroupCommit)
+ checkStreamLoadResult(exception, result, 600572, 600572,
0, 0)
}
}
}
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]