This is an automated email from the ASF dual-hosted git repository.

liaoxin01 pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/doris.git


The following commit(s) were added to refs/heads/master by this push:
     new cbe23a31d8d [fix](job) fix NPE in routine load Kafka meta request 
(#63180)
cbe23a31d8d is described below

commit cbe23a31d8d37f4e25bcc40e945d6559299903b7
Author: hui lai <[email protected]>
AuthorDate: Fri May 22 11:16:12 2026 +0800

    [fix](job) fix NPE in routine load Kafka meta request (#63180)
    
    ### What problem does this PR solve?
    
    Problem Summary:
    
    In a single-BE deployment, Kafka routine load fetches topic metadata
    through the only BE. If that BE cannot connect to Kafka, the metadata
    request fails and the BE is skipped in the current retry loop. Then FE
    may have no normal candidate backend left and falls back to backend ids
    in the routine load blacklist.
    
    The blacklist can contain stale backend ids that no longer exist in
    `SystemInfoService`. In that case, `KafkaUtil` may get a null `Backend`
    and throw a NullPointerException when calling `be.getHost()`. This hides
    the real Kafka metadata error, such as broker connection failure.
    
    This PR filters stale backend ids when reading the routine load
    blacklist and adds a final null check before creating the BE address.
    The original Kafka metadata error is preserved instead of being replaced
    by the secondary NPE.
    
    A regression case is added with an invalid `kafka_broker_list` to verify
    that routine load reports the expected Kafka metadata error path.
---
 .../apache/doris/datasource/kafka/KafkaUtil.java   | 23 +++++++--
 .../load_p0/routine_load/test_black_list.groovy    | 56 ++++++++++++++++++++++
 2 files changed, 76 insertions(+), 3 deletions(-)

diff --git 
a/fe/fe-core/src/main/java/org/apache/doris/datasource/kafka/KafkaUtil.java 
b/fe/fe-core/src/main/java/org/apache/doris/datasource/kafka/KafkaUtil.java
index a097f052aa9..00fd28c88da 100644
--- a/fe/fe-core/src/main/java/org/apache/doris/datasource/kafka/KafkaUtil.java
+++ b/fe/fe-core/src/main/java/org/apache/doris/datasource/kafka/KafkaUtil.java
@@ -252,8 +252,16 @@ public class KafkaUtil {
                 // 2. If that sole backend is decommissioned, the 
aliveBackends list becomes empty.
                 // Hence, in such cases, it's essential to rely on the 
blacklist to obtain meta information.
                 if (backendIds.isEmpty()) {
-                    for (Long beId : 
Env.getCurrentEnv().getRoutineLoadManager().getBlacklist().keySet()) {
-                        backendIds.add(beId);
+                    Map<Long, Long> blacklist = 
Env.getCurrentEnv().getRoutineLoadManager().getBlacklist();
+                    for (Long beId : blacklist.keySet()) {
+                        Backend backend = 
Env.getCurrentSystemInfo().getBackend(beId);
+                        if (backend != null) {
+                            backendIds.add(beId);
+                        } else {
+                            blacklist.remove(beId);
+                            LOG.warn("remove stale backend {} from routine 
load blacklist when getting kafka meta",
+                                    beId);
+                        }
                     }
                 }
                 if (backendIds.isEmpty()) {
@@ -264,7 +272,16 @@ public class KafkaUtil {
                     throw new LoadException("failed to get info: " + errorMsg 
+ ",");
                 }
                 Collections.shuffle(backendIds);
-                Backend be = 
Env.getCurrentSystemInfo().getBackend(backendIds.get(0));
+                long selectedBeId = backendIds.get(0);
+                Backend be = 
Env.getCurrentSystemInfo().getBackend(selectedBeId);
+                if (be == null) {
+                    if (errorMsg == null) {
+                        errorMsg = "backend " + selectedBeId + " does not 
exist";
+                    }
+                    LOG.warn("skip stale backend {} when getting kafka meta", 
selectedBeId);
+                    retryTimes++;
+                    continue;
+                }
                 address = new TNetworkAddress(be.getHost(), be.getBrpcPort());
                 long beId = be.getId();
 
diff --git a/regression-test/suites/load_p0/routine_load/test_black_list.groovy 
b/regression-test/suites/load_p0/routine_load/test_black_list.groovy
index a68ced241d6..84c1a925caf 100644
--- a/regression-test/suites/load_p0/routine_load/test_black_list.groovy
+++ b/regression-test/suites/load_p0/routine_load/test_black_list.groovy
@@ -150,5 +150,61 @@ suite("test_black_list","nonConcurrent,p0") {
             GetDebugPoint().disableDebugPointForAllBEs(inject)
             sql "stop routine load for ${job}"
         }
+
+        def invalidBrokerTableName = "test_black_list_invalid_broker"
+        def invalidBrokerJob = "test_black_list_invalid_broker_job"
+        sql """ DROP TABLE IF EXISTS ${invalidBrokerTableName} """
+        sql """
+            CREATE TABLE IF NOT EXISTS ${invalidBrokerTableName} (
+                `k1` int(20) NULL,
+                `k2` string NULL,
+                `v1` date  NULL,
+                `v2` string  NULL,
+                `v3` datetime  NULL,
+                `v4` string  NULL
+            ) ENGINE=OLAP
+            DUPLICATE KEY(`k1`)
+            COMMENT 'OLAP'
+            DISTRIBUTED BY HASH(`k1`) BUCKETS 3
+            PROPERTIES ("replication_allocation" = "tag.location.default: 1");
+        """
+
+        try {
+            sql """
+                CREATE ROUTINE LOAD ${invalidBrokerJob} ON 
${invalidBrokerTableName}
+                COLUMNS TERMINATED BY ","
+                FROM KAFKA
+                (
+                    "kafka_broker_list" = "127.0.0.1:1",
+                    "kafka_topic" = "${kafkaCsvTpoics[0]}",
+                    "property.kafka_default_offsets" = "OFFSET_BEGINNING"
+                );
+            """
+
+            def count = 0
+            while (true) {
+                sleep(1000)
+                def state = sql "show routine load for ${invalidBrokerJob}"
+                def stateChangedReason = state[0][17].toString()
+                def otherMsg = state[0][19].toString()
+                def errorMsg = "${stateChangedReason} ${otherMsg}"
+                log.info("invalid broker routine load state: 
${state[0][8].toString()}".toString())
+                log.info("invalid broker reason of state changed: 
${stateChangedReason}".toString())
+                log.info("invalid broker other msg: ${otherMsg}".toString())
+                if (errorMsg.contains("Failed to get all partitions of kafka 
topic")) {
+                    assertTrue(errorMsg.contains("failed to get info"))
+                    assertTrue(errorMsg.contains("failed to get partition 
meta: Local: Broker transport failure"))
+                    break
+                }
+                if (count >= 90) {
+                    log.error("routine load invalid broker test fail")
+                    assertEquals(1, 2)
+                    break
+                }
+                count++
+            }
+        } finally {
+            try_sql "stop routine load for ${invalidBrokerJob}"
+        }
     }
 }
\ No newline at end of file


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to