This is an automated email from the ASF dual-hosted git repository.
hello-stephen pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/doris.git
The following commit(s) were added to refs/heads/master by this push:
new f9fb157ad85 [fix](publish) Avoid NPE for force-finished publish task
(#63069)
f9fb157ad85 is described below
commit f9fb157ad852ce7617ebcd28282038f9e3e0df22
Author: deardeng <[email protected]>
AuthorDate: Tue May 19 10:11:22 2026 +0800
[fix](publish) Avoid NPE for force-finished publish task (#63069)
FE publish can loop on:
java.lang.NullPointerException: Cannot invoke
"java.util.Map.containsKey(Object)" because the return value of
"PublishVersionTask.getSuccTablets()" is null
at DatabaseTransactionMgr.checkReplicaContinuousVersionSucc(...)
at DatabaseTransactionMgr.finishCheckQuorumReplicas(...)
at DatabaseTransactionMgr.finishTransaction(...)
PublishVersionTask starts with succTablets == null, but
AgentTaskCleanupDaemon.removeInactiveBeAgentTasks can force-finish an
in-flight task with setFinished(true) without ever calling
setSuccTablets.
That leaves a finished task whose getSuccTablets() still returns null.
MasterImpl.finishPublishVersion is only a theoretical source here:
BE task_worker_pool.cpp unconditionally sets succ_tablets on the finish
request, so that path currently receives at worst an empty map.
Fix this on both sides:
- default PublishVersionTask.succTablets to an empty map and coerce null
- guard the call site before containsKey(tabletId)
Add FE unit coverage and a regression suite for BE-down publish cleanup.
---
.../apache/doris/task/AgentTaskCleanupDaemon.java | 2 +-
.../org/apache/doris/task/PublishVersionTask.java | 7 +-
.../doris/transaction/DatabaseTransactionMgr.java | 6 +-
.../apache/doris/task/PublishVersionTaskTest.java | 109 +++++++++++++
.../CheckReplicaContinuousVersionSuccTest.java | 177 +++++++++++++++++++++
.../test_publish_no_npe_when_be_down.groovy | 142 +++++++++++++++++
6 files changed, 438 insertions(+), 5 deletions(-)
diff --git
a/fe/fe-core/src/main/java/org/apache/doris/task/AgentTaskCleanupDaemon.java
b/fe/fe-core/src/main/java/org/apache/doris/task/AgentTaskCleanupDaemon.java
index bdf87911ec9..23c1d2ee7fc 100644
--- a/fe/fe-core/src/main/java/org/apache/doris/task/AgentTaskCleanupDaemon.java
+++ b/fe/fe-core/src/main/java/org/apache/doris/task/AgentTaskCleanupDaemon.java
@@ -58,7 +58,7 @@ public class AgentTaskCleanupDaemon extends MasterDaemon {
}
return updated;
});
- LOG.info("Check failure on be={}, times={}",
failureTimes, failureTimes);
+ LOG.info("Check failure on be={}, times={}", id,
failureTimes);
}
});
diff --git
a/fe/fe-core/src/main/java/org/apache/doris/task/PublishVersionTask.java
b/fe/fe-core/src/main/java/org/apache/doris/task/PublishVersionTask.java
index 2a369d0cf4c..63a82a9f885 100644
--- a/fe/fe-core/src/main/java/org/apache/doris/task/PublishVersionTask.java
+++ b/fe/fe-core/src/main/java/org/apache/doris/task/PublishVersionTask.java
@@ -47,7 +47,9 @@ public class PublishVersionTask extends AgentTask {
private List<Long> errorTablets;
// tabletId => version, current version = 0
- private Map<Long, Long> succTablets;
+ // Initialized to an empty map (not null) so that getSuccTablets() never
returns null
+ // even when the task is force-finished without a real BE response.
+ private Map<Long, Long> succTablets = Maps.newHashMap();
/**
* To collect loaded rows for each tablet from each BE
@@ -59,7 +61,6 @@ public class PublishVersionTask extends AgentTask {
super(null, backendId, TTaskType.PUBLISH_VERSION, dbId, -1L, -1L, -1L,
-1L, transactionId, createTime);
this.transactionId = transactionId;
this.partitionVersionInfos = partitionVersionInfos;
- this.succTablets = null;
this.errorTablets = new ArrayList<>();
this.isFinished = false;
}
@@ -84,7 +85,7 @@ public class PublishVersionTask extends AgentTask {
}
public void setSuccTablets(Map<Long, Long> succTablets) {
- this.succTablets = succTablets;
+ this.succTablets = (succTablets == null) ? Maps.newHashMap() :
succTablets;
}
public synchronized List<Long> getErrorTablets() {
diff --git
a/fe/fe-core/src/main/java/org/apache/doris/transaction/DatabaseTransactionMgr.java
b/fe/fe-core/src/main/java/org/apache/doris/transaction/DatabaseTransactionMgr.java
index b3a103fbfbf..27a61243d81 100644
---
a/fe/fe-core/src/main/java/org/apache/doris/transaction/DatabaseTransactionMgr.java
+++
b/fe/fe-core/src/main/java/org/apache/doris/transaction/DatabaseTransactionMgr.java
@@ -1515,7 +1515,11 @@ public class DatabaseTransactionMgr {
boolean success = true;
for (int i = 0; i < subTxnIds.size(); i++) {
PublishVersionTask task = replicaPublishTasks.get(i);
- success = (task != null && task.isFinished() &&
task.getSuccTablets().containsKey(tabletId)) || (
+ // Defensive null guard: AgentTaskCleanupDaemon may force-finish a
PublishVersionTask
+ // without populating succTablets; MasterImpl.finishPublishVersion
may also call
+ // setSuccTablets(null) on a non-OK BE response.
+ Map<Long, Long> succ = (task == null) ? null :
task.getSuccTablets();
+ success = (task != null && task.isFinished() && succ != null &&
succ.containsKey(tabletId)) || (
replica.getState() == Replica.ReplicaState.ALTER &&
(!Config.publish_version_check_alter_replica
|| subTxnIds.get(i) < alterWaterschedTxnId ||
alterWaterschedTxnId == -1));
if (!success) {
diff --git
a/fe/fe-core/src/test/java/org/apache/doris/task/PublishVersionTaskTest.java
b/fe/fe-core/src/test/java/org/apache/doris/task/PublishVersionTaskTest.java
new file mode 100644
index 00000000000..df6c9e3cc57
--- /dev/null
+++ b/fe/fe-core/src/test/java/org/apache/doris/task/PublishVersionTaskTest.java
@@ -0,0 +1,109 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements. See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership. The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License. You may obtain a copy of the License at
+//
+// http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied. See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+package org.apache.doris.task;
+
+import com.google.common.collect.ImmutableMap;
+import org.junit.Assert;
+import org.junit.Test;
+
+import java.util.Collections;
+import java.util.Map;
+
+/**
+ * Regression tests for the invariant: PublishVersionTask.getSuccTablets()
must never return null,
+ * so that DatabaseTransactionMgr.checkReplicaContinuousVersionSucc cannot NPE
when a task is
+ * force-finished without a real BE response (AgentTaskCleanupDaemon path) or
finished from a
+ * non-OK BE callback (MasterImpl.finishPublishVersion path).
+ */
+public class PublishVersionTaskTest {
+
+ private PublishVersionTask newTask() {
+ return new PublishVersionTask(
+ /* backendId */ 10001L,
+ /* transactionId*/ 99L,
+ /* dbId */ 1L,
+ /* partitionVersionInfos */ null,
+ /* createTime */ System.currentTimeMillis());
+ }
+
+ /** Default constructor must yield a non-null succTablets. */
+ @Test
+ public void testDefaultSuccTabletsIsNotNull() {
+ PublishVersionTask task = newTask();
+ Assert.assertNotNull("succTablets must be non-null right after
construction",
+ task.getSuccTablets());
+ Assert.assertTrue("succTablets must start empty",
task.getSuccTablets().isEmpty());
+ // Should not NPE.
+ Assert.assertFalse(task.getSuccTablets().containsKey(1L));
+ }
+
+ /** setSuccTablets(null) must coerce to an empty map, not store null. */
+ @Test
+ public void testSetSuccTabletsNullCoercesToEmptyMap() {
+ PublishVersionTask task = newTask();
+ task.setSuccTablets(null);
+ Assert.assertNotNull(task.getSuccTablets());
+ Assert.assertTrue(task.getSuccTablets().isEmpty());
+ Assert.assertFalse(task.getSuccTablets().containsKey(123L));
+ }
+
+ /** A populated map must be returned as-is by the getter. */
+ @Test
+ public void testSetSuccTabletsKeepsValues() {
+ PublishVersionTask task = newTask();
+ Map<Long, Long> populated = ImmutableMap.of(1L, 100L, 2L, 200L);
+ task.setSuccTablets(populated);
+ Assert.assertEquals(populated, task.getSuccTablets());
+ Assert.assertTrue(task.getSuccTablets().containsKey(1L));
+ }
+
+ /**
+ * Simulate AgentTaskCleanupDaemon.removeInactiveBeAgentTasks: the daemon
flips isFinished to
+ * true on every queued PublishVersionTask without ever calling
setSuccTablets. Pre-fix this
+ * left succTablets at the constructor's null and any caller of
getSuccTablets() NPE'd.
+ * After the fix, succTablets is a non-null empty map and downstream
checks see
+ * "no tablet succeeded" instead of crashing.
+ */
+ @Test
+ public void testForceFinishWithoutSetSuccTabletsDoesNotNpe() {
+ PublishVersionTask task = newTask();
+ task.setFinished(true);
+ // No setSuccTablets call — this is the AgentTaskCleanupDaemon code
path.
+ Map<Long, Long> succ = task.getSuccTablets();
+ Assert.assertNotNull("getSuccTablets() must not return null even when
force-finished", succ);
+ Assert.assertTrue(task.isFinished());
+ Assert.assertFalse(succ.containsKey(42L));
+ }
+
+ /**
+ * Simulate MasterImpl.finishPublishVersion on a non-OK BE response that
does not set the
+ * succTablets field on the Thrift request. Pre-fix this stored null on
the task; after the
+ * fix it stores an empty map.
+ */
+ @Test
+ public void testFinishPublishVersionPathWithNullSuccTablets() {
+ PublishVersionTask task = newTask();
+ task.setSuccTablets(null); // emulates request.isSetSuccTablets()
== false
+ task.setFinished(true); // matches MasterImpl ordering
+ Map<Long, Long> succ = task.getSuccTablets();
+ Assert.assertNotNull(succ);
+ Assert.assertEquals(Collections.emptyMap(), succ);
+ // The exact line that crashed pre-fix at
DatabaseTransactionMgr.java:1478.
+ Assert.assertFalse(succ.containsKey(7L));
+ }
+}
diff --git
a/fe/fe-core/src/test/java/org/apache/doris/transaction/CheckReplicaContinuousVersionSuccTest.java
b/fe/fe-core/src/test/java/org/apache/doris/transaction/CheckReplicaContinuousVersionSuccTest.java
new file mode 100644
index 00000000000..6e34d3173d7
--- /dev/null
+++
b/fe/fe-core/src/test/java/org/apache/doris/transaction/CheckReplicaContinuousVersionSuccTest.java
@@ -0,0 +1,177 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements. See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership. The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License. You may obtain a copy of the License at
+//
+// http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied. See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+package org.apache.doris.transaction;
+
+import org.apache.doris.catalog.LocalReplica;
+import org.apache.doris.catalog.Replica;
+import org.apache.doris.task.PublishVersionTask;
+
+import com.google.common.collect.Lists;
+import com.google.common.collect.Sets;
+import org.junit.Assert;
+import org.junit.Test;
+import org.mockito.Mockito;
+
+import java.lang.reflect.Field;
+import java.lang.reflect.InvocationTargetException;
+import java.lang.reflect.Method;
+import java.util.List;
+import java.util.Set;
+
+/**
+ * Regression test: even if a PublishVersionTask somehow has a null
succTablets field (e.g. via a
+ * future regression that re-introduces the AgentTaskCleanupDaemon
force-finish path),
+ * DatabaseTransactionMgr.checkReplicaContinuousVersionSucc must not throw
NPE. It must treat the
+ * replica as "publish not yet succeeded for this tablet" and route it through
the normal
+ * error/version-failed branches.
+ */
+public class CheckReplicaContinuousVersionSuccTest {
+
+ private static final long BACKEND_ID = 10001L;
+ private static final long TXN_ID = 99L;
+ private static final long DB_ID = 1L;
+ private static final long TABLET_ID = 3001L;
+ private static final long REPLICA_ID = 2001L;
+
+ private PublishVersionTask newFinishedTaskWithNullSuccTablets() throws
Exception {
+ PublishVersionTask task = new PublishVersionTask(BACKEND_ID, TXN_ID,
DB_ID,
+ /* partitionVersionInfos */ null, System.currentTimeMillis());
+ task.setFinished(true);
+ // Force succTablets to null via reflection. After the
constructor-level fix the field
+ // is initialized to an empty map, so we simulate the pre-fix bad
state to independently
+ // exercise the call-site null guard in
checkReplicaContinuousVersionSucc.
+ Field f = PublishVersionTask.class.getDeclaredField("succTablets");
+ f.setAccessible(true);
+ f.set(task, null);
+ Assert.assertNull("precondition: succTablets must be null for this
test",
+ task.getSuccTablets());
+ return task;
+ }
+
+ private void invokeCheck(Set<Long> errorReplicaIds,
+ List<Replica> tabletSuccReplicas,
+ List<Replica> tabletWriteFailedReplicas,
+ List<Replica> tabletVersionFailedReplicas,
+ PublishVersionTask task,
+ Replica replica,
+ long minReplicaVersion, long maxReplicaVersion) throws Exception {
+ DatabaseTransactionMgr mgr =
+ Mockito.mock(DatabaseTransactionMgr.class,
Mockito.CALLS_REAL_METHODS);
+ Method m = DatabaseTransactionMgr.class.getDeclaredMethod(
+ "checkReplicaContinuousVersionSucc",
+ List.class, long.class, long.class,
+ Replica.class, long.class, long.class,
+ List.class, Set.class, List.class, List.class, List.class);
+ m.setAccessible(true);
+ try {
+ m.invoke(mgr,
+ Lists.newArrayList(TXN_ID), -1L, TABLET_ID,
+ replica, minReplicaVersion, maxReplicaVersion,
+ Lists.newArrayList(task), errorReplicaIds,
+ tabletSuccReplicas, tabletWriteFailedReplicas,
tabletVersionFailedReplicas);
+ } catch (InvocationTargetException ite) {
+ if (ite.getCause() instanceof NullPointerException) {
+ Assert.fail("checkReplicaContinuousVersionSucc threw NPE on
null succTablets: "
+ + ite.getCause());
+ }
+ throw ite;
+ }
+ }
+
+ /**
+ * Negative case: task.isFinished() is true but task.getSuccTablets() is
null. Pre-fix this
+ * NPE'd at line 1478. Post-fix it must treat the replica as a
write-failure (or similar
+ * non-success branch) without throwing.
+ */
+ @Test
+ public void testNoNpeWhenSuccTabletsIsNull() throws Exception {
+ PublishVersionTask task = newFinishedTaskWithNullSuccTablets();
+ Replica replica = new LocalReplica(REPLICA_ID, BACKEND_ID,
/*version*/100L, /*schemaHash*/0,
+ /*dataSize*/0L, /*remoteDataSize*/0L, /*rowCount*/0L,
+ Replica.ReplicaState.NORMAL, /*lastFailedVersion*/-1L,
/*lastSuccessVersion*/100L);
+
+ Set<Long> errorReplicaIds = Sets.newHashSet();
+ List<Replica> tabletSuccReplicas = Lists.newArrayList();
+ List<Replica> tabletWriteFailedReplicas = Lists.newArrayList();
+ List<Replica> tabletVersionFailedReplicas = Lists.newArrayList();
+
+ // replica.version (100) < maxReplicaVersion (101) → after the failure
branch,
+ // replica should land in tabletWriteFailedReplicas.
+ invokeCheck(errorReplicaIds, tabletSuccReplicas,
tabletWriteFailedReplicas,
+ tabletVersionFailedReplicas, task, replica,
/*minReplicaVersion*/100L,
+ /*maxReplicaVersion*/101L);
+
+ Assert.assertTrue("replica should be classified as write-failed when
succTablets is null",
+ tabletWriteFailedReplicas.contains(replica));
+ Assert.assertTrue(tabletSuccReplicas.isEmpty());
+ Assert.assertTrue(tabletVersionFailedReplicas.isEmpty());
+ }
+
+ /**
+ * Positive case: task.isFinished() is true, succTablets contains the
tablet — replica must
+ * be treated as success and removed from errorReplicaIds.
+ */
+ @Test
+ public void testHappyPathWhenSuccTabletsContainsTabletId() throws
Exception {
+ PublishVersionTask task = new PublishVersionTask(BACKEND_ID, TXN_ID,
DB_ID, null,
+ System.currentTimeMillis());
+ task.setFinished(true);
+ java.util.Map<Long, Long> populated = new java.util.HashMap<>();
+ populated.put(TABLET_ID, 100L);
+ task.setSuccTablets(populated);
+
+ Replica replica = new LocalReplica(REPLICA_ID, BACKEND_ID,
/*version*/100L, /*schemaHash*/0,
+ /*dataSize*/0L, /*remoteDataSize*/0L, /*rowCount*/0L,
+ Replica.ReplicaState.NORMAL, /*lastFailedVersion*/-1L,
/*lastSuccessVersion*/100L);
+
+ Set<Long> errorReplicaIds = Sets.newHashSet(REPLICA_ID); // pretend it
was tagged earlier
+ List<Replica> tabletSuccReplicas = Lists.newArrayList();
+ List<Replica> tabletWriteFailedReplicas = Lists.newArrayList();
+ List<Replica> tabletVersionFailedReplicas = Lists.newArrayList();
+
+ invokeCheck(errorReplicaIds, tabletSuccReplicas,
tabletWriteFailedReplicas,
+ tabletVersionFailedReplicas, task, replica,
/*minReplicaVersion*/100L,
+ /*maxReplicaVersion*/100L);
+
+ Assert.assertFalse("happy path must clear the replica from
errorReplicaIds",
+ errorReplicaIds.contains(REPLICA_ID));
+ Assert.assertTrue("happy path must add the replica to
tabletSuccReplicas",
+ tabletSuccReplicas.contains(replica));
+ }
+
+ /**
+ * Task is null in the list (older code path observed in production logs).
Should be treated
+ * as a failure without NPE.
+ */
+ @Test
+ public void testNoNpeWhenTaskIsNull() throws Exception {
+ Replica replica = new LocalReplica(REPLICA_ID, BACKEND_ID,
/*version*/100L, /*schemaHash*/0,
+ /*dataSize*/0L, /*remoteDataSize*/0L, /*rowCount*/0L,
+ Replica.ReplicaState.NORMAL, /*lastFailedVersion*/-1L,
/*lastSuccessVersion*/100L);
+
+ Set<Long> errorReplicaIds = Sets.newHashSet();
+ List<Replica> tabletSuccReplicas = Lists.newArrayList();
+ List<Replica> tabletWriteFailedReplicas = Lists.newArrayList();
+ List<Replica> tabletVersionFailedReplicas = Lists.newArrayList();
+
+ invokeCheck(errorReplicaIds, tabletSuccReplicas,
tabletWriteFailedReplicas,
+ tabletVersionFailedReplicas, /*task*/null, replica, 100L,
101L);
+
+ Assert.assertTrue(tabletWriteFailedReplicas.contains(replica));
+ }
+}
diff --git
a/regression-test/suites/fault_injection_p0/test_publish_no_npe_when_be_down.groovy
b/regression-test/suites/fault_injection_p0/test_publish_no_npe_when_be_down.groovy
new file mode 100644
index 00000000000..7bf8c9649e6
--- /dev/null
+++
b/regression-test/suites/fault_injection_p0/test_publish_no_npe_when_be_down.groovy
@@ -0,0 +1,142 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements. See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership. The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License. You may obtain a copy of the License at
+//
+// http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied. See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+import org.apache.doris.regression.suite.ClusterOptions
+import org.apache.doris.regression.util.DebugPoint
+import org.apache.doris.regression.util.NodeType
+
+// Regression test: when AgentTaskCleanupDaemon force-finishes in-flight
PublishVersionTasks
+// for a dead BE, the master FE must NOT enter a permanent NPE loop in
+// PublishVersionDaemon ->
DatabaseTransactionMgr.checkReplicaContinuousVersionSucc due to
+// PublishVersionTask.getSuccTablets() returning null.
+//
+// Pre-fix behaviour: master FE log fills with
+// NullPointerException: ... PublishVersionTask.getSuccTablets() is null
+// at DatabaseTransactionMgr.checkReplicaContinuousVersionSucc(...:1478)
+// for the affected transaction at ~tens of times per second, indefinitely.
+//
+// Post-fix behaviour: replicas of the dead BE are routed through the normal
error/version-
+// failed branches; the transaction either succeeds (if quorum holds on
remaining BEs) or
+// fails cleanly via the standard publish timeout. No NPE shows up in the FE
log.
+suite("test_publish_no_npe_when_be_down", "docker") {
+ def options = new ClusterOptions()
+ options.cloudMode = false
+ options.beNum = 3
+ options.feNum = 1
+ options.enableDebugPoints()
+ // Speed up AgentTaskCleanupDaemon: 5s interval * 3 failures = 15s to
force-finish.
+ options.feConfigs += [
+ "agent_task_health_check_intervals_ms=5000",
+ "publish_version_interval_ms=10",
+ "publish_version_timeout_second=60",
+ // Enable debug logs for AgentTaskCleanupDaemon so we can assert the
+ // force-finish path actually fired below.
+ "sys_log_verbose_modules=org.apache.doris.task.AgentTaskCleanupDaemon",
+ ]
+
+ docker(options) {
+ GetDebugPoint().clearDebugPointsForAllBEs()
+
+ def tblName = "test_publish_no_npe_when_be_down"
+ sql """ DROP TABLE IF EXISTS ${tblName} """
+ sql """
+ CREATE TABLE IF NOT EXISTS ${tblName} (
+ `k` int NOT NULL,
+ `v` int NOT NULL
+ )
+ DUPLICATE KEY(`k`)
+ DISTRIBUTED BY HASH(`k`) BUCKETS 8
+ PROPERTIES (
+ "replication_allocation" = "tag.location.default: 3"
+ )
+ """
+
+ // Slow down BE-side publish so we can stop a BE while publish is
in-flight.
+ // The spin-wait debug point keeps the task in AgentTaskQueue until BE
down,
+ // so AgentTaskCleanupDaemon will be the actor that flips
isFinished=true without
+ // populating succTablets - exactly the regression trigger path.
+
GetDebugPoint().enableDebugPointForAllBEs("EnginePublishVersionTask::execute.enable_spin_wait")
+
GetDebugPoint().enableDebugPointForAllBEs("EnginePublishVersionTask::execute.block")
+
+ // Kick off a write that produces an in-flight publish.
+ def loadFuture = thread {
+ try {
+ sql """
+ INSERT INTO ${tblName}
+ SELECT number, number FROM numbers("number" = "1024")
+ """
+ } catch (Throwable t) {
+ logger.warn("expected: insert may fail when BE is killed
mid-publish: ${t.message}")
+ }
+ }
+
+ // Let the publish actually start before we kill the BE.
+ sleep(2000)
+
+ // Stop one BE — the in-flight PublishVersionTask for that BE will be
force-finished
+ // by AgentTaskCleanupDaemon after MAX_FAILURE_TIMES (3) * interval
(5s) = ~15s.
+ cluster.stopBackends(1)
+
+ // Wait for cleanup to fire (~20s) plus a few more daemon poll cycles,
so any latent
+ // NPE has time to surface on the master FE.
+ sleep(45000)
+
+ // The smoking gun: master FE log must not contain the
getSuccTablets() NPE signature.
+ def feIndex = 1
+ def fe = cluster.getFeByIndex(feIndex)
+ assertNotNull(fe, "master FE handle missing")
+ def feLogPath = "${fe.path}/log/fe.log"
+
+ def npeCmdResult = ["bash", "-c",
+ "grep -c 'PublishVersionTask.getSuccTablets()\\\" is null'
${feLogPath} || true"
+ ].execute().text.trim()
+ logger.info("getSuccTablets NPE occurrences in master FE log:
${npeCmdResult}")
+ assertEquals("0", npeCmdResult,
+ "master FE log must not contain the getSuccTablets() NPE after
the fix")
+
+ def stackCmdResult = ["bash", "-c",
+ "grep -c 'checkReplicaContinuousVersionSucc' ${feLogPath} ||
true"
+ ].execute().text.trim()
+ logger.info("checkReplicaContinuousVersionSucc stack frames in master
FE log: ${stackCmdResult}")
+ assertEquals("0", stackCmdResult,
+ "no NPE stack should mention
checkReplicaContinuousVersionSucc")
+
+ // Confirm AgentTaskCleanupDaemon actually fired for the stopped BE —
proves we
+ // exercised the exact regression path, not a no-op.
+ def cleanupCmdResult = ["bash", "-c",
+ "grep -c 'BE down, remove agent task' ${feLogPath} || true"
+ ].execute().text.trim()
+ logger.info("AgentTaskCleanupDaemon force-finish count:
${cleanupCmdResult}")
+ assertTrue(Integer.parseInt(cleanupCmdResult) > 0,
+ "AgentTaskCleanupDaemon should have force-finished tasks for
the stopped BE")
+
+ cluster.getAllBackends(true).each { be ->
+ DebugPoint.disableDebugPoint(be.host, be.httpPort, NodeType.BE,
"EnginePublishVersionTask::execute.block")
+ DebugPoint.disableDebugPoint(be.host, be.httpPort, NodeType.BE,
+ "EnginePublishVersionTask::execute.enable_spin_wait")
+ }
+ loadFuture.get()
+
+ // Cluster should still be healthy: the table is queryable,
transactions are not stuck.
+ cluster.startBackends(1)
+ sleep(5000)
+ GetDebugPoint().clearDebugPointsForAllBEs()
+
+ def rowCount = sql """ SELECT COUNT(*) FROM ${tblName} """
+ logger.info("rowCount after recovery: ${rowCount}")
+ }
+}
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]