This is an automated email from the ASF dual-hosted git repository.

kxiao pushed a commit to branch branch-2.0
in repository https://gitbox.apache.org/repos/asf/doris.git

commit f70a37c6d5bac03e37b1aab1eab27ecf87e87d1c
Author: Yongqiang YANG <[email protected]>
AuthorDate: Sat Aug 26 10:56:35 2023 +0800

    [enhancement](routineload) add debug conf and set broker.name.ttl = 0 
(#23302)
    
    * set broker.name.ttl = 0
    
    * add debug config for librdkafka
---
 be/src/common/config.cpp                      | 1 +
 be/src/common/config.h                        | 1 +
 be/src/runtime/routine_load/data_consumer.cpp | 4 ++++
 3 files changed, 6 insertions(+)

diff --git a/be/src/common/config.cpp b/be/src/common/config.cpp
index aaa85d959a..c865e53813 100644
--- a/be/src/common/config.cpp
+++ b/be/src/common/config.cpp
@@ -766,6 +766,7 @@ DEFINE_mInt32(max_remote_storage_count, "10");
 // and the valid values are: 0.9.0.x, 0.8.x.y.
 DEFINE_String(kafka_api_version_request, "true");
 DEFINE_String(kafka_broker_version_fallback, "0.10.0");
+DEFINE_String(kafka_debug, "disable");
 
 // The number of pool siz of routine load consumer.
 // If you meet the error describe in 
https://github.com/edenhill/librdkafka/issues/3608
diff --git a/be/src/common/config.h b/be/src/common/config.h
index 0f41837396..a6905c3f3f 100644
--- a/be/src/common/config.h
+++ b/be/src/common/config.h
@@ -821,6 +821,7 @@ DECLARE_mInt32(max_remote_storage_count);
 // and the valid values are: 0.9.0.x, 0.8.x.y.
 DECLARE_String(kafka_api_version_request);
 DECLARE_String(kafka_broker_version_fallback);
+DECLARE_mString(kafka_debug);
 
 // The number of pool siz of routine load consumer.
 // If you meet the error describe in 
https://github.com/edenhill/librdkafka/issues/3608
diff --git a/be/src/runtime/routine_load/data_consumer.cpp 
b/be/src/runtime/routine_load/data_consumer.cpp
index cdbff1c1d1..3706e31fb4 100644
--- a/be/src/runtime/routine_load/data_consumer.cpp
+++ b/be/src/runtime/routine_load/data_consumer.cpp
@@ -91,6 +91,10 @@ Status 
KafkaDataConsumer::init(std::shared_ptr<StreamLoadContext> ctx) {
     RETURN_IF_ERROR(set_conf("api.version.request", 
config::kafka_api_version_request));
     RETURN_IF_ERROR(set_conf("api.version.fallback.ms", "0"));
     RETURN_IF_ERROR(set_conf("broker.version.fallback", 
config::kafka_broker_version_fallback));
+    RETURN_IF_ERROR(set_conf("broker.address.ttl", "0"));
+    if (config::kafka_debug != "disable") {
+        RETURN_IF_ERROR(set_conf("debug", config::kafka_debug));
+    }
 
     for (auto& item : ctx->kafka_info->properties) {
         if (starts_with(item.second, "FILE:")) {


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to