This is an automated email from the ASF dual-hosted git repository.

hellostephen pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/doris.git


The following commit(s) were added to refs/heads/master by this push:
     new 48894b90f61 [fix](cloud) Fix NPE in group commit when backend belongs 
to a different cluster (#60652)
48894b90f61 is described below

commit 48894b90f618f181cd504e4d78b7f0932d1ebde4
Author: deardeng <[email protected]>
AuthorDate: Tue Feb 24 14:27:04 2026 +0800

    [fix](cloud) Fix NPE in group commit when backend belongs to a different 
cluster (#60652)
    
    Fix:
    - Add cluster membership check in getCachedBackend() to invalidate
    cached backends that no longer belong to the expected cluster, which can
    happen during auto-scaling.
---
 .../org/apache/doris/load/GroupCommitManager.java  | 14 ++++++++++--
 .../main/java/org/apache/doris/qe/Coordinator.java | 26 ++++++++++++++++++++++
 2 files changed, 38 insertions(+), 2 deletions(-)

diff --git 
a/fe/fe-core/src/main/java/org/apache/doris/load/GroupCommitManager.java 
b/fe/fe-core/src/main/java/org/apache/doris/load/GroupCommitManager.java
index 25e72199d2a..a2234c5366a 100644
--- a/fe/fe-core/src/main/java/org/apache/doris/load/GroupCommitManager.java
+++ b/fe/fe-core/src/main/java/org/apache/doris/load/GroupCommitManager.java
@@ -213,7 +213,15 @@ public class GroupCommitManager {
             try {
                 long backendId = new MasterOpExecutor(context)
                         .getGroupCommitLoadBeId(tableId, clusterName);
-                return Env.getCurrentSystemInfo().getBackend(backendId);
+                Backend be = Env.getCurrentSystemInfo().getBackend(backendId);
+                if (LOG.isDebugEnabled()) {
+                    LOG.debug("selectBackendForGroupCommit on non-master: 
tableId={}, clusterName={},"
+                            + " backendId={}, backend={}, backendCluster={}",
+                            tableId, clusterName, backendId,
+                            be != null ? be.getHost() + ":" + be.getBePort() : 
"null",
+                            be != null ? be.getCloudClusterName() : "null");
+                }
+                return be;
             } catch (Exception e) {
                 throw new LoadException(e.getMessage());
             }
@@ -350,7 +358,9 @@ public class GroupCommitManager {
                 }
                 Backend backend = 
Env.getCurrentSystemInfo().getBackend(backendId);
                 if (backend != null && backend.isAlive() && 
!backend.isDecommissioned()
-                        && (!Config.isCloudMode() || 
!backend.isDecommissioning())) {
+                        && (!Config.isCloudMode() || 
!backend.isDecommissioning())
+                        && (!Config.isCloudMode() || cluster == null
+                                || 
cluster.equals(backend.getCloudClusterName()))) {
                     return backend.getId();
                 } else {
                     tableToBeMap.remove(encode(cluster, tableId));
diff --git a/fe/fe-core/src/main/java/org/apache/doris/qe/Coordinator.java 
b/fe/fe-core/src/main/java/org/apache/doris/qe/Coordinator.java
index f6c929b6526..fefd77c7df8 100644
--- a/fe/fe-core/src/main/java/org/apache/doris/qe/Coordinator.java
+++ b/fe/fe-core/src/main/java/org/apache/doris/qe/Coordinator.java
@@ -565,6 +565,26 @@ public class Coordinator implements CoordInterface {
 
         this.idToBackend = 
Env.getCurrentSystemInfo().getBackendsByCurrentCluster();
 
+        // Log cluster info and groupCommitBackend for debugging NPE in 
PipelineExecContext
+        if (groupCommitBackend != null) {
+            String currentCluster = "unknown";
+            try {
+                if (ConnectContext.get() != null) {
+                    currentCluster = ConnectContext.get().getCloudCluster();
+                }
+            } catch (Exception e) {
+                LOG.debug("failed to get current cloud cluster for debug log", 
e);
+            }
+            if (LOG.isDebugEnabled()) {
+                LOG.debug("query {} prepare: currentCluster={}, 
idToBackend.size={}, idToBackend.keys={},"
+                                + " groupCommitBackend=[id={}, host={}, 
cluster={}], groupCommitBackendInMap={}",
+                        DebugUtil.printId(queryId), currentCluster, 
idToBackend.size(), idToBackend.keySet(),
+                        groupCommitBackend.getId(), 
groupCommitBackend.getHost(),
+                        groupCommitBackend.getCloudClusterName(),
+                        idToBackend.containsKey(groupCommitBackend.getId()));
+            }
+        }
+
         if (LOG.isDebugEnabled()) {
             int backendNum = idToBackend.size();
             StringBuilder backendInfos = new StringBuilder("backends info:");
@@ -835,6 +855,12 @@ public class Coordinator implements CoordInterface {
                 // So that we can use one RPC to send all fragment instances 
of a BE.
                 for (Map.Entry<TNetworkAddress, TPipelineFragmentParams> entry 
: tParams.entrySet()) {
                     Long backendId = 
this.addressToBackendID.get(entry.getKey());
+                    if (backendId == null) {
+                        LOG.warn("query {} sendPipelineCtx: addressToBackendID 
lookup returned null!"
+                                + " address={}, fragmentId={}, 
addressToBackendID={}",
+                                DebugUtil.printId(queryId), entry.getKey(),
+                                fragment.getFragmentId(), addressToBackendID);
+                    }
                     backendFragments.add(Pair.of(fragment.getFragmentId(), 
backendId));
                     PipelineExecContext pipelineExecContext = new 
PipelineExecContext(fragment.getFragmentId(),
                             entry.getValue(), idToBackend.get(backendId), 
executionProfile, jobId);


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to