This is an automated email from the ASF dual-hosted git repository.
dataroaring pushed a commit to branch branch-2.0
in repository https://gitbox.apache.org/repos/asf/doris.git
The following commit(s) were added to refs/heads/branch-2.0 by this push:
new a837282ca72 [fix](routineload) fix consume data too slow in partial
partitions (#32126) (#32303)
a837282ca72 is described below
commit a837282ca729bc928fb94d8f300acf1c54e467b8
Author: HHoflittlefish777 <[email protected]>
AuthorDate: Fri Mar 15 22:32:10 2024 +0800
[fix](routineload) fix consume data too slow in partial partitions (#32126)
(#32303)
---
be/src/runtime/routine_load/data_consumer_group.h | 8 +++++---
be/src/runtime/routine_load/data_consumer_pool.cpp | 5 +++--
2 files changed, 8 insertions(+), 5 deletions(-)
diff --git a/be/src/runtime/routine_load/data_consumer_group.h
b/be/src/runtime/routine_load/data_consumer_group.h
index e15ad7115f6..e4f45cff15d 100644
--- a/be/src/runtime/routine_load/data_consumer_group.h
+++ b/be/src/runtime/routine_load/data_consumer_group.h
@@ -45,8 +45,10 @@ class DataConsumerGroup {
public:
typedef std::function<void(const Status&)> ConsumeFinishCallback;
- DataConsumerGroup()
- : _grp_id(UniqueId::gen_uid()), _thread_pool(3, 10,
"data_consumer"), _counter(0) {}
+ DataConsumerGroup(size_t consumer_num)
+ : _grp_id(UniqueId::gen_uid()),
+ _thread_pool(consumer_num, consumer_num, "data_consumer"),
+ _counter(0) {}
virtual ~DataConsumerGroup() { _consumers.clear(); }
@@ -82,7 +84,7 @@ protected:
// for kafka
class KafkaDataConsumerGroup : public DataConsumerGroup {
public:
- KafkaDataConsumerGroup() : DataConsumerGroup(), _queue(500) {}
+ KafkaDataConsumerGroup(size_t consumer_num) :
DataConsumerGroup(consumer_num), _queue(500) {}
virtual ~KafkaDataConsumerGroup();
diff --git a/be/src/runtime/routine_load/data_consumer_pool.cpp
b/be/src/runtime/routine_load/data_consumer_pool.cpp
index 48d16e9c219..a361008c651 100644
--- a/be/src/runtime/routine_load/data_consumer_pool.cpp
+++ b/be/src/runtime/routine_load/data_consumer_pool.cpp
@@ -87,12 +87,13 @@ Status
DataConsumerPool::get_consumer_grp(std::shared_ptr<StreamLoadContext> ctx
return Status::InternalError("PAUSE: The size of begin_offset of task
should not be 0.");
}
- std::shared_ptr<KafkaDataConsumerGroup> grp =
std::make_shared<KafkaDataConsumerGroup>();
-
// one data consumer group contains at least one data consumers.
int max_consumer_num = config::max_consumer_num_per_group;
size_t consumer_num = std::min((size_t)max_consumer_num,
ctx->kafka_info->begin_offset.size());
+ std::shared_ptr<KafkaDataConsumerGroup> grp =
+ std::make_shared<KafkaDataConsumerGroup>(consumer_num);
+
for (int i = 0; i < consumer_num; ++i) {
std::shared_ptr<DataConsumer> consumer;
RETURN_IF_ERROR(get_consumer(ctx, &consumer));
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]