This is an automated email from the ASF dual-hosted git repository.
yiguolei pushed a commit to branch branch-4.1
in repository https://gitbox.apache.org/repos/asf/doris.git
The following commit(s) were added to refs/heads/branch-4.1 by this push:
new 8ad2c857a41 [fix](cloud) avoid NPE and clear stale cache on warmup job
cancel/expire (#62805) (#63019)
8ad2c857a41 is described below
commit 8ad2c857a41d2520d3623f71d804314e150eb89a
Author: Xin Liao <[email protected]>
AuthorDate: Thu May 7 11:31:41 2026 +0800
[fix](cloud) avoid NPE and clear stale cache on warmup job cancel/expire
(#62805) (#63019)
Pick apache/doris#62805
---
be/src/cloud/cloud_warm_up_manager.cpp | 2 +-
.../apache/doris/service/FrontendServiceImpl.java | 2 +-
.../doris/service/FrontendServiceImplTest.java | 41 +++++
...est_warm_up_cluster_event_cancel_expired.groovy | 203 +++++++++++++++++++++
4 files changed, 246 insertions(+), 2 deletions(-)
diff --git a/be/src/cloud/cloud_warm_up_manager.cpp
b/be/src/cloud/cloud_warm_up_manager.cpp
index e609257f74b..ef7a9d5fa94 100644
--- a/be/src/cloud/cloud_warm_up_manager.cpp
+++ b/be/src/cloud/cloud_warm_up_manager.cpp
@@ -540,7 +540,7 @@ std::vector<TReplicaInfo>
CloudWarmUpManager::get_replica_info(int64_t tablet_id
auto st = Status::create(result.status);
if (!st.ok()) {
- if (st.is<CANCELED>()) {
+ if (st.is<ErrorCode::CANCELLED>()) {
LOG(INFO) << "get_replica_info: warm up job cancelled,
tablet_id=" << tablet_id
<< ", job_id=" << job_id;
cancelled_jobs.push_back(job_id);
diff --git
a/fe/fe-core/src/main/java/org/apache/doris/service/FrontendServiceImpl.java
b/fe/fe-core/src/main/java/org/apache/doris/service/FrontendServiceImpl.java
index f8815f6b750..9a4527827cc 100644
--- a/fe/fe-core/src/main/java/org/apache/doris/service/FrontendServiceImpl.java
+++ b/fe/fe-core/src/main/java/org/apache/doris/service/FrontendServiceImpl.java
@@ -2800,7 +2800,7 @@ public class FrontendServiceImpl implements
FrontendService.Iface {
.getCloudWarmUpJob(request.getWarmUpJobId());
if (job == null || job.isDone()) {
LOG.info("warmup job {} is not running, notify caller BE {} to
cancel job",
- job.getJobId(), clientAddr);
+ request.getWarmUpJobId(), clientAddr);
// notify client to cancel this job
result.setStatus(new TStatus(TStatusCode.CANCELLED));
return result;
diff --git
a/fe/fe-core/src/test/java/org/apache/doris/service/FrontendServiceImplTest.java
b/fe/fe-core/src/test/java/org/apache/doris/service/FrontendServiceImplTest.java
index 21d6dd9b1b7..aa551584b76 100644
---
a/fe/fe-core/src/test/java/org/apache/doris/service/FrontendServiceImplTest.java
+++
b/fe/fe-core/src/test/java/org/apache/doris/service/FrontendServiceImplTest.java
@@ -21,6 +21,8 @@ import org.apache.doris.catalog.Database;
import org.apache.doris.catalog.Env;
import org.apache.doris.catalog.OlapTable;
import org.apache.doris.catalog.Partition;
+import org.apache.doris.cloud.CacheHotspotManager;
+import org.apache.doris.cloud.catalog.CloudEnv;
import org.apache.doris.common.Config;
import org.apache.doris.common.FeConstants;
import org.apache.doris.common.util.PrintableMap;
@@ -40,6 +42,8 @@ import org.apache.doris.thrift.TFetchSchemaTableDataRequest;
import org.apache.doris.thrift.TFetchSchemaTableDataResult;
import org.apache.doris.thrift.TGetDbsParams;
import org.apache.doris.thrift.TGetDbsResult;
+import org.apache.doris.thrift.TGetTabletReplicaInfosRequest;
+import org.apache.doris.thrift.TGetTabletReplicaInfosResult;
import org.apache.doris.thrift.TMaxComputeBlockIdRequest;
import org.apache.doris.thrift.TMaxComputeBlockIdResult;
import org.apache.doris.thrift.TMetadataTableRequestParams;
@@ -59,11 +63,13 @@ import org.junit.BeforeClass;
import org.junit.Rule;
import org.junit.Test;
import org.junit.rules.ExpectedException;
+import org.mockito.MockedStatic;
import org.mockito.Mockito;
import java.lang.reflect.Field;
import java.util.ArrayList;
import java.util.Arrays;
+import java.util.Collections;
import java.util.HashMap;
import java.util.LinkedHashMap;
import java.util.List;
@@ -343,4 +349,39 @@ public class FrontendServiceImplTest {
Env.getCurrentEnv().getAuthenticationIntegrationMgr().dropAuthenticationIntegration(integrationName,
true);
}
}
+
+ @Test
+ public void testGetTabletReplicaInfosNullJobReturnsCancelledWithoutNpe() {
+ String originalCloudUniqueId = Config.cloud_unique_id;
+ Config.cloud_unique_id = "gettabletreplicainfostest";
+
+ CloudEnv cloudEnv = Mockito.mock(CloudEnv.class);
+ CacheHotspotManager cacheHotspotManager =
Mockito.mock(CacheHotspotManager.class);
+
Mockito.when(cloudEnv.getCacheHotspotMgr()).thenReturn(cacheHotspotManager);
+
Mockito.when(cacheHotspotManager.getCloudWarmUpJob(123456L)).thenReturn(null);
+
+ MockedStatic<Env> envMock = Mockito.mockStatic(Env.class);
+ try {
+ envMock.when(Env::getCurrentEnv).thenReturn(cloudEnv);
+
+ FrontendServiceImpl frontendService = new
FrontendServiceImpl(exeEnv);
+ TGetTabletReplicaInfosRequest request = new
TGetTabletReplicaInfosRequest();
+ request.setTabletIds(Collections.singletonList(789L));
+ request.setWarmUpJobId(123456L);
+
+ TGetTabletReplicaInfosResult result;
+ try {
+ result = frontendService.getTabletReplicaInfos(request);
+ } catch (NullPointerException e) {
+ throw new AssertionError("getTabletReplicaInfos must not NPE
when the "
+ + "warm-up job has been removed from
CacheHotspotManager", e);
+ }
+
+ Assert.assertNotNull(result.getStatus());
+ Assert.assertEquals(TStatusCode.CANCELLED,
result.getStatus().getStatusCode());
+ } finally {
+ envMock.close();
+ Config.cloud_unique_id = originalCloudUniqueId;
+ }
+ }
}
diff --git
a/regression-test/suites/cloud_p0/cache/multi_cluster/warm_up/cluster/test_warm_up_cluster_event_cancel_expired.groovy
b/regression-test/suites/cloud_p0/cache/multi_cluster/warm_up/cluster/test_warm_up_cluster_event_cancel_expired.groovy
new file mode 100644
index 00000000000..31c28cbf2b7
--- /dev/null
+++
b/regression-test/suites/cloud_p0/cache/multi_cluster/warm_up/cluster/test_warm_up_cluster_event_cancel_expired.groovy
@@ -0,0 +1,203 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements. See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership. The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License. You may obtain a copy of the License at
+//
+// http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied. See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+import org.apache.doris.regression.suite.ClusterOptions
+import groovy.json.JsonSlurper
+
+// Covers a two-part bug on the event-driven warm-up path:
+// 1. BE side (cloud_warm_up_manager.cpp): `st.is<CANCELED>()` used the
+// proto enum value PCacheStatus::CANCELED=9 instead of
+// ErrorCode::CANCELLED=1, so BE never cleared a cancelled job from
+// `_tablet_replica_cache`.
+// 2. FE side (FrontendServiceImpl.getTabletReplicaInfos): when the job
+// had been removed from `cloudWarmUpJobs` (after
+// history_cloud_warm_up_job_keep_max_second), the branch still
+// called `job.getJobId()` on a null reference, throwing NPE to BE.
+//
+// After the fix, once a warm-up job is cancelled BE must drop it from
+// its cache on the next RPC (via the CANCELLED TStatus), so that later
+// expiry removal on FE never receives follow-up requests with the dead
+// job_id and no NPE path is exercised.
+suite('test_warm_up_cluster_event_cancel_expired', 'docker') {
+ def options = new ClusterOptions()
+ options.feConfigs += [
+ 'cloud_cluster_check_interval_second=1',
+ // Keep expiry small so FE removes the cancelled job quickly.
+ 'history_cloud_warm_up_job_keep_max_second=30',
+ ]
+ options.beConfigs += [
+ 'file_cache_enter_disk_resource_limit_mode_percent=99',
+ 'enable_evict_file_cache_in_advance=false',
+ 'file_cache_background_monitor_interval_ms=1000',
+ ]
+ options.cloudMode = true
+
+ def clearFileCache = { ip, port ->
+ def url = "http://${ip}:${port}/api/file_cache?op=clear&sync=true"
+ def response = new URL(url).text
+ def json = new JsonSlurper().parseText(response)
+ if (json.status != "OK") {
+ throw new RuntimeException("Clear cache on ${ip}:${port} failed:
${json.status}")
+ }
+ }
+
+ def clearFileCacheOnAllBackends = {
+ def backends = sql """SHOW BACKENDS"""
+ for (be in backends) {
+ clearFileCache(be[1], be[4])
+ }
+ sleep(10000)
+ }
+
+ def getBrpcMetrics = { ip, port, name ->
+ def url = "http://${ip}:${port}/brpc_metrics"
+ if
((context.config.otherConfigs.get("enableTLS")?.toString()?.equalsIgnoreCase("true"))
?: false) {
+ url = url.replace("http://", "https://") + " --cert " +
context.config.otherConfigs.get("trustCert") + " --cacert " +
context.config.otherConfigs.get("trustCACert") + " --key " +
context.config.otherConfigs.get("trustCAKey")
+ }
+ def metrics = new URL(url).text
+ def matcher = metrics =~ ~"${name}\\s+(\\d+)"
+ if (matcher.find()) {
+ return matcher[0][1] as long
+ } else {
+ throw new RuntimeException("${name} not found for ${ip}:${port}")
+ }
+ }
+
+ def getSkippedRowsetSum = { cluster ->
+ def backends = sql """SHOW BACKENDS"""
+ def cluster_bes = backends.findAll {
+ it[19].contains("""\"compute_group_name\" : \"${cluster}\"""")
+ }
+ long sum = 0
+ for (be in cluster_bes) {
+ sum += getBrpcMetrics(be[1], be[5],
"file_cache_event_driven_warm_up_skipped_rowset_num")
+ }
+ return sum
+ }
+
+ docker(options) {
+ def clusterSrc = "warmup_source"
+ def clusterDst = "warmup_target"
+
+ cluster.addBackend(1, clusterSrc)
+ cluster.addBackend(1, clusterDst)
+
+ sql """use @${clusterSrc}"""
+ sql """CREATE TABLE IF NOT EXISTS t_exp (
+ id INT, v STRING
+ ) DUPLICATE KEY(id)
+ DISTRIBUTED BY HASH(id) BUCKETS 3
+ PROPERTIES ("file_cache_ttl_seconds" = "3600")"""
+
+ // 1. Start event-driven LOAD warm-up job; this is the only mode
+ // that populates `_tablet_replica_cache[jobId]` on BE.
+ def jobRows = sql """
+ WARM UP CLUSTER ${clusterDst} WITH CLUSTER ${clusterSrc}
+ PROPERTIES (
+ "sync_mode" = "event_driven",
+ "sync_event" = "load"
+ )
+ """
+ def jobId = jobRows[0][0]
+ logger.info("event-driven warm-up jobId=${jobId}")
+ clearFileCacheOnAllBackends()
+ sleep(15000)
+
+ // 2. Drive some loads so BE caches the job_id in _tablet_replica_cache
+ // (we don't care about the warmed data itself for this test).
+ for (int i = 0; i < 20; i++) {
+ sql """INSERT INTO t_exp VALUES (${i}, 'x')"""
+ }
+ sleep(5000)
+
+ // 3. Cancel the job. After the fix, BE sees TStatus.CANCELLED on
+ // the next get_replica_info and drops the job from
+ // _tablet_replica_cache; before the fix (CANCELED typo) it
+ // would keep the entry.
+ sql """CANCEL WARM UP JOB WHERE ID = ${jobId}"""
+ def st = sql """SHOW WARM UP JOB WHERE ID = ${jobId}"""
+ assertEquals("CANCELLED", st[0][3])
+
+ // 4. One more batch so BE actually sees the CANCELLED status
+ // and (with the fix) purges its cache entry.
+ for (int i = 0; i < 20; i++) {
+ sql """INSERT INTO t_exp VALUES (${100 + i}, 'y')"""
+ }
+ sleep(5000)
+
+ // 5. Baseline for the skipped-rowset counter. After BE has
+ // cleaned its cache, subsequent warm_up_rowset calls return
+ // early (empty replicas -> "skipping rowset") and bump this
+ // counter on every commit. If the typo is unfixed the counter
+ // stays flat because BE keeps calling FE.
+ def skippedBaseline = getSkippedRowsetSum(clusterDst)
+ logger.info("skipped_rowset baseline=${skippedBaseline}")
+
+ // 6. Wait past history_cloud_warm_up_job_keep_max_second plus one
+ // JobDaemon cycle (~20s, CYCLE_COUNT_TO_CHECK_EXPIRE=20 ticks
+ // at 1s each) so FE removes the cancelled job from its map.
+ logger.info("waiting for FE to expire+remove cancelled job")
+ def removed = false
+ for (int i = 0; i < 60; i++) {
+ def rows = sql """SHOW WARM UP JOB WHERE ID = ${jobId}"""
+ if (rows.isEmpty()) {
+ removed = true
+ logger.info("job ${jobId} removed from FE after ${i}s")
+ break
+ }
+ sleep(1000)
+ }
+ assertTrue(removed, "FE should have removed expired warm-up job")
+
+ // 7. After FE removal, run more loads. Buggy code path:
+ // BE still has job_id in _tablet_replica_cache -> RPC to FE ->
+ // FE's LOG.info("...", job.getJobId(), ...) NPE's on
+ // `job == null` -> BE falls into the 2s thrift exception
+ // sleep *inside* CloudWarmUpManager::_mtx, serialising every
+ // commit_rowset and blowing up heavy_work_pool.
+ //
+ // Fixed code path: BE cache already cleaned in step 4, so
+ // warm_up_rowset takes the empty-replicas fast path, loads
+ // stay fast, and FE never gets called with the dead job_id.
+ def t0 = System.currentTimeMillis()
+ for (int i = 0; i < 20; i++) {
+ sql """INSERT INTO t_exp VALUES (${200 + i}, 'z')"""
+ }
+ def elapsedMs = System.currentTimeMillis() - t0
+ logger.info("20 INSERTs after FE removal took ${elapsedMs}ms")
+
+ // 20 INSERTs, each commit would cost ~2s of serialised sleep in
+ // the buggy case (>= 40s). Threshold of 30s is a generous
+ // upper bound that the fixed path comfortably meets but the
+ // buggy path cannot.
+ assertTrue(elapsedMs < 30_000,
+ "post-removal inserts should not be blocked by NPE sleeps, " +
+ "took ${elapsedMs}ms")
+
+ // 8. On the fixed path every commit short-circuits through
+ // g_file_cache_event_driven_warm_up_skipped_rowset_num.
+ // We expect it to grow; on the buggy path it would be flat
+ // since BE never stopped pursuing FE replicas.
+ def skippedAfter = getSkippedRowsetSum(clusterDst)
+ logger.info("skipped_rowset after=${skippedAfter}")
+ assertTrue(skippedAfter > skippedBaseline,
+ "BE should skip warm_up_rowset for tablets after
cancel+expire, " +
+ "baseline=${skippedBaseline} after=${skippedAfter}")
+
+ sql """DROP TABLE IF EXISTS t_exp"""
+ }
+}
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]