This is an automated email from the ASF dual-hosted git repository.
yiguolei pushed a commit to branch branch-4.1
in repository https://gitbox.apache.org/repos/asf/doris.git
The following commit(s) were added to refs/heads/branch-4.1 by this push:
new 6fa11115049 branch-4.1: [fix](cloud) Deduplicate pending one-shot warm
up jobs(pick#62384) (#64236)
6fa11115049 is described below
commit 6fa11115049dc9054096246f0c6daaa1d21cb452
Author: zhengyu <[email protected]>
AuthorDate: Tue Jun 9 10:22:26 2026 +0800
branch-4.1: [fix](cloud) Deduplicate pending one-shot warm up
jobs(pick#62384) (#64236)
Original PR: https://github.com/apache/doris/pull/62384
Picked to: branch-4.1
Pick branch: freemandealer:pick-branch-4.1-pr-62384
Validation:
- git diff --check
- ./build.sh --fe
Notes:
Resolved CacheHotspotManagerTest.java conflict by using the PR
Mockito-based test structure.
---
.../apache/doris/cloud/CacheHotspotManager.java | 210 +++++++++-
.../doris/cloud/cache/CacheHotspotManagerTest.java | 435 ++++++++++++++++++---
2 files changed, 580 insertions(+), 65 deletions(-)
diff --git
a/fe/fe-core/src/main/java/org/apache/doris/cloud/CacheHotspotManager.java
b/fe/fe-core/src/main/java/org/apache/doris/cloud/CacheHotspotManager.java
index cac253243ff..42826af77ab 100644
--- a/fe/fe-core/src/main/java/org/apache/doris/cloud/CacheHotspotManager.java
+++ b/fe/fe-core/src/main/java/org/apache/doris/cloud/CacheHotspotManager.java
@@ -52,6 +52,7 @@ import org.apache.doris.thrift.THotTableMessage;
import org.apache.doris.thrift.TNetworkAddress;
import org.apache.doris.thrift.TStatusCode;
+import com.google.common.base.Preconditions;
import com.google.common.collect.Lists;
import com.google.common.collect.Maps;
import org.apache.logging.log4j.LogManager;
@@ -67,6 +68,7 @@ import java.util.Collection;
import java.util.Collections;
import java.util.Comparator;
import java.util.HashMap;
+import java.util.HashSet;
import java.util.Iterator;
import java.util.List;
import java.util.Map;
@@ -82,6 +84,7 @@ import java.util.concurrent.Future;
import java.util.concurrent.ThreadPoolExecutor;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.TimeoutException;
+import java.util.concurrent.locks.ReentrantLock;
public class CacheHotspotManager extends MasterDaemon {
public static final int MAX_SHOW_ENTRIES = 2000;
@@ -109,6 +112,9 @@ public class CacheHotspotManager extends MasterDaemon {
private ConcurrentMap<Long, CloudWarmUpJob> runnableCloudWarmUpJobs =
Maps.newConcurrentMap();
+ private final ConcurrentMap<OncePendingJobKey,
RefCountedPendingCreateLock> oncePendingCreateLocks
+ = Maps.newConcurrentMap();
+
private final ThreadPoolExecutor cloudWarmUpThreadPool =
ThreadPoolManager.newDaemonCacheThreadPool(
Config.max_active_cloud_warm_up_job, "cloud-warm-up-pool", true);
@@ -148,10 +154,185 @@ public class CacheHotspotManager extends MasterDaemon {
}
}
+ private static class OncePendingJobKey {
+ private final JobType jobType;
+ private final String srcName;
+ private final String dstName;
+ private final List<String> normalizedTables;
+ private final boolean force;
+
+ OncePendingJobKey(JobType jobType, String srcName, String dstName,
+ List<String> normalizedTables, boolean force) {
+ this.jobType = jobType;
+ this.srcName = normalizeNullableName(srcName);
+ this.dstName = normalizeNullableName(dstName);
+ this.normalizedTables = normalizedTables.isEmpty()
+ ? Collections.emptyList()
+ : Collections.unmodifiableList(new
ArrayList<>(normalizedTables));
+ this.force = force;
+ }
+
+ @Override
+ public boolean equals(Object o) {
+ if (this == o) {
+ return true;
+ }
+ if (!(o instanceof OncePendingJobKey)) {
+ return false;
+ }
+ OncePendingJobKey jobKey = (OncePendingJobKey) o;
+ return force == jobKey.force
+ && jobType == jobKey.jobType
+ && Objects.equals(srcName, jobKey.srcName)
+ && Objects.equals(dstName, jobKey.dstName)
+ && Objects.equals(normalizedTables,
jobKey.normalizedTables);
+ }
+
+ @Override
+ public int hashCode() {
+ return Objects.hash(jobType, srcName, dstName, normalizedTables,
force);
+ }
+
+ @Override
+ public String toString() {
+ return "OncePendingWarmUpJob{"
+ + "jobType=" + jobType
+ + ", src='" + srcName + '\''
+ + ", dst='" + dstName + '\''
+ + ", tables=" + normalizedTables
+ + ", force=" + force
+ + '}';
+ }
+ }
+
+ private static class RefCountedPendingCreateLock {
+ private final ReentrantLock lock = new ReentrantLock();
+
+ // Tracks holders and waiters that retained the entry before locking.
+ private volatile int refCount = 1;
+
+ void retain() {
+ ++refCount;
+ }
+
+ int release() {
+ Preconditions.checkState(refCount > 0, "once pending create lock
ref count underflow");
+ return --refCount;
+ }
+
+ void lock() {
+ lock.lock();
+ }
+
+ void unlock() {
+ lock.unlock();
+ }
+ }
+
// Tracks long-running jobs (event-driven and periodic).
// Ensures only one active job exists per <source, destination, sync_mode>
tuple.
private Set<JobKey> repeatJobDetectionSet = ConcurrentHashMap.newKeySet();
+ private static String normalizeNullableName(String value) {
+ return value == null ? "" : value;
+ }
+
+ private static String normalizeTableKey(Triple<String, String, String>
tableTriple) {
+ String dbName = normalizeNullableName(tableTriple.getLeft());
+ String tableName = normalizeNullableName(tableTriple.getMiddle());
+ String partitionName = normalizeNullableName(tableTriple.getRight());
+ if (partitionName.isEmpty()) {
+ return dbName + "." + tableName;
+ }
+ return dbName + "." + tableName + "." + partitionName;
+ }
+
+ private static List<String> normalizeTables(List<Triple<String, String,
String>> tables) {
+ if (tables == null || tables.isEmpty()) {
+ return Collections.emptyList();
+ }
+ HashSet<String> normalizedTables = new HashSet<>();
+ for (Triple<String, String, String> table : tables) {
+ normalizedTables.add(normalizeTableKey(table));
+ }
+ List<String> sortedTables = new ArrayList<>(normalizedTables);
+ Collections.sort(sortedTables);
+ return sortedTables;
+ }
+
+ private boolean isClusterOnceCommand(WarmUpClusterCommand command) {
+ Map<String, String> properties = command.getProperties();
+ if (properties == null) {
+ return true;
+ }
+ String syncMode = properties.get("sync_mode");
+ return !"periodic".equals(syncMode) &&
!"event_driven".equals(syncMode);
+ }
+
+ private OncePendingJobKey buildOncePendingJobKey(WarmUpClusterCommand
command) {
+ if (command.isWarmUpWithTable()) {
+ return new OncePendingJobKey(JobType.TABLE, "",
command.getDstCluster(),
+ normalizeTables(command.getTables()), command.isForce());
+ }
+ if (!isClusterOnceCommand(command)) {
+ return null;
+ }
+ return new OncePendingJobKey(JobType.CLUSTER, command.getSrcCluster(),
+ command.getDstCluster(), Collections.emptyList(), false);
+ }
+
+ private OncePendingJobKey buildOncePendingJobKey(CloudWarmUpJob job) {
+ if (!job.isOnce()) {
+ return null;
+ }
+ if (job.getJobType() == JobType.TABLE) {
+ return new OncePendingJobKey(JobType.TABLE, "",
job.getDstClusterName(),
+ normalizeTables(job.tables), job.force);
+ }
+ if (job.getJobType() == JobType.CLUSTER) {
+ return new OncePendingJobKey(JobType.CLUSTER,
job.getSrcClusterName(),
+ job.getDstClusterName(), Collections.emptyList(), false);
+ }
+ return null;
+ }
+
+ private CloudWarmUpJob findExistingPendingOnceJob(OncePendingJobKey key) {
+ CloudWarmUpJob selectedJob = null;
+ for (CloudWarmUpJob job : cloudWarmUpJobs.values()) {
+ if (job.getJobState() != JobState.PENDING || !job.isOnce()) {
+ continue;
+ }
+ OncePendingJobKey existingKey = buildOncePendingJobKey(job);
+ if (!key.equals(existingKey)) {
+ continue;
+ }
+ if (selectedJob == null
+ || job.getCreateTimeMs() < selectedJob.getCreateTimeMs()
+ || (job.getCreateTimeMs() == selectedJob.getCreateTimeMs()
+ && job.getJobId() < selectedJob.getJobId())) {
+ selectedJob = job;
+ }
+ }
+ return selectedJob;
+ }
+
+ private RefCountedPendingCreateLock
retainOncePendingCreateLock(OncePendingJobKey key) {
+ return oncePendingCreateLocks.compute(key, (ignored, existingLock) -> {
+ if (existingLock == null) {
+ return new RefCountedPendingCreateLock();
+ }
+ existingLock.retain();
+ return existingLock;
+ });
+ }
+
+ private void releaseOncePendingCreateLock(OncePendingJobKey key,
RefCountedPendingCreateLock lock) {
+ oncePendingCreateLocks.compute(key, (ignored, existingLock) -> {
+ Preconditions.checkState(existingLock == lock, "unexpected once
pending create lock entry");
+ return existingLock.release() == 0 ? null : existingLock;
+ });
+ }
+
private void registerJobForRepeatDetection(CloudWarmUpJob job, boolean
replay) throws AnalysisException {
if (job.isDone()) {
return;
@@ -781,6 +962,31 @@ public class CacheHotspotManager extends MasterDaemon {
}
public long createJob(WarmUpClusterCommand stmt) throws AnalysisException {
+ OncePendingJobKey oncePendingJobKey = buildOncePendingJobKey(stmt);
+ if (oncePendingJobKey != null) {
+ RefCountedPendingCreateLock createLock =
retainOncePendingCreateLock(oncePendingJobKey);
+ createLock.lock();
+ try {
+ CloudWarmUpJob existingPendingJob =
findExistingPendingOnceJob(oncePendingJobKey);
+ if (existingPendingJob != null) {
+ long existingJobId = existingPendingJob.getJobId();
+ if (stmt.isWarmUpWithTable()) {
+ throw new AnalysisException("Table warm up job already
has a pending job, job id: "
+ + existingJobId + ". Please retry later.");
+ }
+ LOG.info("reuse existing pending warm up job {} for key
{}", existingJobId, oncePendingJobKey);
+ return existingJobId;
+ }
+ return createJobInternal(stmt);
+ } finally {
+ createLock.unlock();
+ releaseOncePendingCreateLock(oncePendingJobKey, createLock);
+ }
+ }
+ return createJobInternal(stmt);
+ }
+
+ private long createJobInternal(WarmUpClusterCommand stmt) throws
AnalysisException {
long jobId = Env.getCurrentEnv().getNextId();
CloudWarmUpJob warmUpJob;
if (stmt.isWarmUpWithTable()) {
@@ -800,6 +1006,9 @@ public class CacheHotspotManager extends MasterDaemon {
.setJobType(JobType.CLUSTER);
Map<String, String> properties = stmt.getProperties();
+ if (properties == null) {
+ properties = Collections.emptyMap();
+ }
if ("periodic".equals(properties.get("sync_mode"))) {
String syncIntervalSecStr =
properties.get("sync_interval_sec");
if (syncIntervalSecStr == null) {
@@ -831,7 +1040,6 @@ public class CacheHotspotManager extends MasterDaemon {
}
warmUpJob = builder.build();
}
-
addCloudWarmUpJob(warmUpJob);
Env.getCurrentEnv().getEditLog().logModifyCloudWarmUpJob(warmUpJob);
diff --git
a/fe/fe-core/src/test/java/org/apache/doris/cloud/cache/CacheHotspotManagerTest.java
b/fe/fe-core/src/test/java/org/apache/doris/cloud/cache/CacheHotspotManagerTest.java
index 3bb5a93c864..e25eb98d140 100644
---
a/fe/fe-core/src/test/java/org/apache/doris/cloud/cache/CacheHotspotManagerTest.java
+++
b/fe/fe-core/src/test/java/org/apache/doris/cloud/cache/CacheHotspotManagerTest.java
@@ -17,109 +17,416 @@
package org.apache.doris.cloud.cache;
+import org.apache.doris.catalog.Env;
import org.apache.doris.catalog.MaterializedIndex;
import org.apache.doris.catalog.MaterializedIndex.IndexExtState;
import org.apache.doris.catalog.Partition;
import org.apache.doris.catalog.Tablet;
import org.apache.doris.cloud.CacheHotspotManager;
+import org.apache.doris.cloud.CloudWarmUpJob;
+import org.apache.doris.cloud.CloudWarmUpJob.JobState;
+import org.apache.doris.cloud.CloudWarmUpJob.JobType;
+import org.apache.doris.cloud.CloudWarmUpJob.SyncEvent;
+import org.apache.doris.cloud.CloudWarmUpJob.SyncMode;
import org.apache.doris.cloud.catalog.CloudTablet;
import org.apache.doris.cloud.system.CloudSystemInfoService;
+import org.apache.doris.common.AnalysisException;
+import org.apache.doris.common.FeConstants;
import org.apache.doris.common.Triple;
+import org.apache.doris.nereids.trees.plans.commands.WarmUpClusterCommand;
+import org.apache.doris.persist.EditLog;
import org.apache.doris.system.Backend;
-import mockit.Mock;
-import mockit.MockUp;
+import org.junit.After;
import org.junit.Assert;
+import org.junit.Before;
import org.junit.Test;
+import org.mockito.MockedStatic;
+import org.mockito.Mockito;
+import java.lang.reflect.Field;
import java.util.ArrayList;
+import java.util.HashMap;
import java.util.HashSet;
import java.util.List;
import java.util.Map;
import java.util.Set;
+import java.util.concurrent.CountDownLatch;
+import java.util.concurrent.ExecutorService;
+import java.util.concurrent.Executors;
+import java.util.concurrent.Future;
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.atomic.AtomicInteger;
+import java.util.concurrent.atomic.AtomicLong;
public class CacheHotspotManagerTest {
private CacheHotspotManager cacheHotspotManager;
private CloudSystemInfoService cloudSystemInfoService;
- private Partition partition;
+ private boolean originalRunningUnitTest;
+ private AtomicLong nextJobId;
+ private Env env;
+ private EditLog editLog;
+ private MockedStatic<Env> envMockedStatic;
+
+ @Before
+ public void setUp() {
+ originalRunningUnitTest = FeConstants.runningUnitTest;
+ FeConstants.runningUnitTest = true;
+ nextJobId = new AtomicLong(1000L);
+ env = Mockito.mock(Env.class);
+ editLog = Mockito.mock(EditLog.class);
+ Mockito.when(env.getNextId()).thenAnswer(invocation ->
nextJobId.getAndIncrement());
+ Mockito.when(env.getEditLog()).thenReturn(editLog);
+ envMockedStatic = Mockito.mockStatic(Env.class);
+ envMockedStatic.when(Env::getCurrentEnv).thenReturn(env);
+ cloudSystemInfoService = new CloudSystemInfoService();
+ cacheHotspotManager = new CacheHotspotManager(cloudSystemInfoService);
+ }
+
+ @After
+ public void tearDown() {
+ envMockedStatic.close();
+ FeConstants.runningUnitTest = originalRunningUnitTest;
+ }
@Test
public void testWarmUpNewClusterByTable() {
- partition = new Partition(0, null, null, null);
- new MockUp<Partition>() {
-
- @Mock
- public long getDataSize(boolean singleReplica) {
- return 10000000L;
+ cloudSystemInfoService = new CloudSystemInfoService();
+ // Use mock with CALLS_REAL_METHODS to avoid package-private access
issues
+ // (CacheHotspotManager's helper methods are package-private)
+ cacheHotspotManager = Mockito.mock(CacheHotspotManager.class,
invocation -> {
+ String methodName = invocation.getMethod().getName();
+ switch (methodName) {
+ case "getFileCacheCapacity":
+ return 100L;
+ case "getPartitionsFromTriple": {
+ List<Partition> partitions = new ArrayList<>();
+ Partition spyPartition = Mockito.spy(new Partition(1,
"p1", null, null));
+
Mockito.doReturn(10000000L).when(spyPartition).getDataSize(Mockito.anyBoolean());
+ List<MaterializedIndex> list = new ArrayList<>();
+ list.add(new MaterializedIndex());
+ Mockito.doReturn(list).when(spyPartition)
+
.getMaterializedIndices(Mockito.any(IndexExtState.class));
+ partitions.add(spyPartition);
+ return partitions;
+ }
+ case "getBackendsFromCluster": {
+ String dstClusterName = (String) invocation.getArgument(0);
+ List<Backend> backends = new ArrayList<>();
+ backends.add(new Backend(11, dstClusterName, 0));
+ return backends;
+ }
+ case "getTabletsFromIndexs": {
+ List<Tablet> list = new ArrayList<>();
+ list.add(new CloudTablet(1001L));
+ return list;
+ }
+ case "getTabletIdsFromBe": {
+ Set<Long> tabletIds = new HashSet<>();
+ tabletIds.add(1001L);
+ return tabletIds;
+ }
+ default:
+ return invocation.callRealMethod();
}
+ });
+
+ long jobId = 1L;
+ String dstClusterName = "test_cluster";
+ List<Triple<String, String, String>> tables = new ArrayList<>();
+ tables.add(Triple.of("test_db", "test_table", ""));
+
+ Map<Long, List<Tablet>> result =
cacheHotspotManager.warmUpNewClusterByTable(
+ jobId, dstClusterName, tables, true);
+ Assert.assertEquals(1, result.size());
+ Assert.assertEquals(1001L, result.get(11L).get(0).getId());
+
+ RuntimeException exception =
Assert.assertThrows(RuntimeException.class, () ->
+ cacheHotspotManager.warmUpNewClusterByTable(jobId,
dstClusterName, tables, false));
+ Assert.assertEquals("The cluster " + dstClusterName + " cache size is
not enough", exception.getMessage());
+ }
+
+ @Test
+ public void testCreateTableOnceJobRejectsPendingDuplicateOrderDifference()
throws AnalysisException {
+ long firstJobId = cacheHotspotManager.createJob(newTableStmt("dst",
false,
+ Triple.of("db1", "tbl1", ""),
+ Triple.of("db2", "tbl2", "p1")));
+ AnalysisException exception =
Assert.assertThrows(AnalysisException.class, () ->
+ cacheHotspotManager.createJob(newTableStmt("dst", false,
+ Triple.of("db2", "tbl2", "p1"),
+ Triple.of("db1", "tbl1", ""))));
+
+ Assert.assertTrue(exception.getMessage().contains("already has a
pending job"));
+ Assert.assertTrue(exception.getMessage().contains("job id: " +
firstJobId));
+ Assert.assertEquals(1,
cacheHotspotManager.getCloudWarmUpJobs().size());
+ Mockito.verify(env, Mockito.times(1)).getNextId();
+ Mockito.verify(editLog,
Mockito.times(1)).logModifyCloudWarmUpJob(Mockito.any(CloudWarmUpJob.class));
+ }
+
+ @Test
+ public void testCreateTableOnceJobRejectsPendingDuplicateTableEntries()
throws AnalysisException {
+ long firstJobId = cacheHotspotManager.createJob(newTableStmt("dst",
false,
+ Triple.of("db1", "tbl1", ""),
+ Triple.of("db1", "tbl1", "")));
+ AnalysisException exception =
Assert.assertThrows(AnalysisException.class, () ->
+ cacheHotspotManager.createJob(newTableStmt("dst", false,
Triple.of("db1", "tbl1", ""))));
+
+ Assert.assertTrue(exception.getMessage().contains("already has a
pending job"));
+ Assert.assertTrue(exception.getMessage().contains("job id: " +
firstJobId));
+ Assert.assertEquals(1,
cacheHotspotManager.getCloudWarmUpJobs().size());
+ }
+
+ @Test
+ public void testCreateTableOnceJobDoesNotDedupDifferentForce() throws
AnalysisException {
+ long forceFalseJobId =
cacheHotspotManager.createJob(newTableStmt("dst", false,
+ Triple.of("db1", "tbl1", "")));
+ long forceTrueJobId =
cacheHotspotManager.createJob(newTableStmt("dst", true,
+ Triple.of("db1", "tbl1", "")));
+
+ Assert.assertNotEquals(forceFalseJobId, forceTrueJobId);
+ Assert.assertEquals(2,
cacheHotspotManager.getCloudWarmUpJobs().size());
+ }
+
+ @Test
+ public void testCreateClusterOnceJobDedupesPendingJob() throws
AnalysisException {
+ long firstJobId = cacheHotspotManager.createJob(newClusterStmt("dst",
"src", false));
+ long reusedJobId = cacheHotspotManager.createJob(newClusterStmt("dst",
"src", false));
- @Mock
- public List<MaterializedIndex>
getMaterializedIndices(IndexExtState extState) {
- List<MaterializedIndex> list = new ArrayList<>();
- MaterializedIndex ind = new MaterializedIndex();
- list.add(ind);
- return list;
+ Assert.assertEquals(firstJobId, reusedJobId);
+ Assert.assertEquals(1,
cacheHotspotManager.getCloudWarmUpJobs().size());
+ }
+
+ @Test
+ public void testCreateClusterOnceJobDedupesRegardlessOfForceFlag() throws
AnalysisException {
+ long firstJobId = cacheHotspotManager.createJob(newClusterStmt("dst",
"src", false));
+ long reusedJobId = cacheHotspotManager.createJob(newClusterStmt("dst",
"src", true));
+
+ Assert.assertEquals(firstJobId, reusedJobId);
+ Assert.assertEquals(1,
cacheHotspotManager.getCloudWarmUpJobs().size());
+ }
+
+ @Test
+ public void
testCreateClusterOnceJobAllowsNewPendingWhenOnlyRunningExists() throws
AnalysisException {
+ CloudWarmUpJob runningJob = newClusterJob(10L, "src", "dst",
SyncMode.ONCE, JobState.RUNNING, 100L);
+ cacheHotspotManager.addCloudWarmUpJob(runningJob);
+
+ long newJobId = cacheHotspotManager.createJob(newClusterStmt("dst",
"src", false));
+
+ Assert.assertNotEquals(runningJob.getJobId(), newJobId);
+ Assert.assertEquals(2,
cacheHotspotManager.getCloudWarmUpJobs().size());
+ Assert.assertEquals(JobState.PENDING,
cacheHotspotManager.getCloudWarmUpJob(newJobId).getJobState());
+ }
+
+ @Test
+ public void
testCreateClusterOnceJobReusesPendingWhenRunningAndPendingExist() throws
AnalysisException {
+ CloudWarmUpJob runningJob = newClusterJob(10L, "src", "dst",
SyncMode.ONCE, JobState.RUNNING, 100L);
+ CloudWarmUpJob pendingJob = newClusterJob(11L, "src", "dst",
SyncMode.ONCE, JobState.PENDING, 200L);
+ cacheHotspotManager.addCloudWarmUpJob(runningJob);
+ cacheHotspotManager.addCloudWarmUpJob(pendingJob);
+
+ long reusedJobId = cacheHotspotManager.createJob(newClusterStmt("dst",
"src", false));
+
+ Assert.assertEquals(pendingJob.getJobId(), reusedJobId);
+ Assert.assertEquals(2,
cacheHotspotManager.getCloudWarmUpJobs().size());
+ }
+
+ @Test
+ public void testCreateOnceJobIgnoresFinishedHistory() throws
AnalysisException {
+ CloudWarmUpJob finishedJob = newClusterJob(10L, "src", "dst",
SyncMode.ONCE, JobState.FINISHED, 100L);
+ cacheHotspotManager.addCloudWarmUpJob(finishedJob);
+
+ long newJobId = cacheHotspotManager.createJob(newClusterStmt("dst",
"src", false));
+
+ Assert.assertNotEquals(finishedJob.getJobId(), newJobId);
+ Assert.assertEquals(2,
cacheHotspotManager.getCloudWarmUpJobs().size());
+ Assert.assertEquals(JobState.PENDING,
cacheHotspotManager.getCloudWarmUpJob(newJobId).getJobState());
+ }
+
+ @Test
+ public void
testCreateClusterOnceJobReusesOldestHistoricalPendingDuplicateAfterReplay()
throws Exception {
+ CloudWarmUpJob newerPendingJob = newClusterJob(20L, "src", "dst",
SyncMode.ONCE, JobState.PENDING, 200L);
+ CloudWarmUpJob olderPendingJob = newClusterJob(30L, "src", "dst",
SyncMode.ONCE, JobState.PENDING, 100L);
+ cacheHotspotManager.replayCloudWarmUpJob(newerPendingJob);
+ cacheHotspotManager.replayCloudWarmUpJob(olderPendingJob);
+
+ long reusedJobId = cacheHotspotManager.createJob(newClusterStmt("dst",
"src", false));
+
+ Assert.assertEquals(olderPendingJob.getJobId(), reusedJobId);
+ Assert.assertEquals(2,
cacheHotspotManager.getCloudWarmUpJobs().size());
+ }
+
+ @Test
+ public void testCreateTableOnceJobRemovesLockEntryWhenCreateFails() throws
Exception {
+ boolean previousRunningUnitTest = FeConstants.runningUnitTest;
+ FeConstants.runningUnitTest = false;
+ cacheHotspotManager = new CacheHotspotManager(cloudSystemInfoService) {
+ @Override
+ public Map<Long, List<Tablet>> warmUpNewClusterByTable(long jobId,
String dstClusterName,
+ List<Triple<String, String, String>> tables, boolean
isForce) {
+ throw new RuntimeException("mock create failure");
}
};
- cloudSystemInfoService = new CloudSystemInfoService();
- cacheHotspotManager = new CacheHotspotManager(cloudSystemInfoService);
- new MockUp<CacheHotspotManager>() {
+ try {
+ RuntimeException exception =
Assert.assertThrows(RuntimeException.class, () ->
+ cacheHotspotManager.createJob(newTableStmt("dst", false,
Triple.of("db1", "tbl1", ""))));
- @Mock
- Long getFileCacheCapacity(String clusterName) throws
RuntimeException {
- return 100L;
- }
+ Assert.assertEquals("mock create failure", exception.getMessage());
+ Assert.assertEquals(0, getOncePendingCreateLockCount());
+ Assert.assertEquals(0,
cacheHotspotManager.getCloudWarmUpJobs().size());
+ } finally {
+ FeConstants.runningUnitTest = previousRunningUnitTest;
+ }
+ }
- @Mock
- List<Partition> getPartitionsFromTriple(Triple<String, String,
String> tableTriple) {
- List<Partition> partitions = new ArrayList<>();
- partition = new Partition(1, "p1", null, null);
- partitions.add(partition);
- return partitions;
+ @Test
+ public void
testConcurrentCreateClusterOnceJobReleasesRefCountedLockAfterWaiterCompletes()
throws Exception {
+ CountDownLatch firstCreateEntered = new CountDownLatch(1);
+ CountDownLatch allowFirstCreateToContinue = new CountDownLatch(1);
+ AtomicInteger getNextIdCalls = new AtomicInteger();
+ Mockito.when(env.getNextId()).thenAnswer(invocation -> {
+ if (getNextIdCalls.incrementAndGet() == 1) {
+ firstCreateEntered.countDown();
+ Assert.assertTrue(allowFirstCreateToContinue.await(5,
TimeUnit.SECONDS));
}
+ return nextJobId.getAndIncrement();
+ });
- @Mock
- List<Backend> getBackendsFromCluster(String dstClusterName) {
- List<Backend> backends = new ArrayList<>();
- Backend backend = new Backend(11, dstClusterName, 0);
- backends.add(backend);
- return backends;
- }
+ ExecutorService executor = Executors.newFixedThreadPool(2);
+ try {
+ Future<Long> firstCreate = executor.submit(() ->
createJobWithThreadLocalEnv(
+ newClusterStmt("dst", "src", false)));
+ Assert.assertTrue(firstCreateEntered.await(5, TimeUnit.SECONDS));
- @Mock
- public List<Tablet> getTabletsFromIndexs(List<MaterializedIndex>
indexes) {
- List<Tablet> list = new ArrayList<>();
- Tablet tablet = new CloudTablet(1001L);
- list.add(tablet);
- return list;
- }
+ Future<Long> secondCreate = executor.submit(() ->
createJobWithThreadLocalEnv(
+ newClusterStmt("dst", "src", false)));
+ waitForOncePendingCreateLockRefCount(2, 5000L);
- @Mock
- Set<Long> getTabletIdsFromBe(long beId) {
- Set<Long> tabletIds = new HashSet<Long>();
- tabletIds.add(1001L);
- return tabletIds;
- }
- };
+ allowFirstCreateToContinue.countDown();
- // Setup mock data
- long jobId = 1L;
- String dstClusterName = "test_cluster";
- List<Triple<String, String, String>> tables = new ArrayList<>();
- tables.add(Triple.of("test_db", "test_table", ""));
+ long firstJobId = firstCreate.get(5, TimeUnit.SECONDS);
+ long secondJobId = secondCreate.get(5, TimeUnit.SECONDS);
+ Assert.assertEquals(firstJobId, secondJobId);
+ Assert.assertEquals(1, getNextIdCalls.get());
+ Assert.assertEquals(1,
cacheHotspotManager.getCloudWarmUpJobs().size());
+ Assert.assertEquals(0, getOncePendingCreateLockCount());
+ } finally {
+ allowFirstCreateToContinue.countDown();
+ executor.shutdownNow();
+ }
+ }
+ @Test
+ public void testCreatePeriodicJobUnaffected() throws AnalysisException {
+ WarmUpClusterCommand periodicStmt = newClusterStmt("dst", "src",
false, periodicProperties(60));
+ long firstJobId = cacheHotspotManager.createJob(periodicStmt);
+ AnalysisException exception =
Assert.assertThrows(AnalysisException.class, () ->
+ cacheHotspotManager.createJob(newClusterStmt("dst", "src",
false, periodicProperties(60))));
- // force = true
- Map<Long, List<Tablet>> result =
cacheHotspotManager.warmUpNewClusterByTable(
- jobId, dstClusterName, tables, true);
- Assert.assertEquals(result.size(), 1);
- Assert.assertEquals(result.get(11L).get(0).getId(), 1001L);
+ Assert.assertEquals(1000L, firstJobId);
+ Assert.assertTrue(exception.getMessage().contains("already has a
runnable job"));
+ Assert.assertEquals(1,
cacheHotspotManager.getCloudWarmUpJobs().size());
+ }
- // force = false
- RuntimeException exception =
Assert.assertThrows(RuntimeException.class, () -> {
- cacheHotspotManager.warmUpNewClusterByTable(jobId, dstClusterName,
tables, false);
- });
- Assert.assertEquals("The cluster " + dstClusterName + " cache size is
not enough", exception.getMessage());
+ @Test
+ public void testCreateEventDrivenJobUnaffected() throws AnalysisException {
+ WarmUpClusterCommand eventDrivenStmt = newClusterStmt("dst", "src",
false, eventDrivenProperties("load"));
+ long firstJobId = cacheHotspotManager.createJob(eventDrivenStmt);
+ AnalysisException exception =
Assert.assertThrows(AnalysisException.class, () ->
+ cacheHotspotManager.createJob(newClusterStmt("dst", "src",
false, eventDrivenProperties("load"))));
+
+ Assert.assertEquals(1000L, firstJobId);
+ Assert.assertTrue(exception.getMessage().contains("already has a
runnable job"));
+ Assert.assertEquals(1,
cacheHotspotManager.getCloudWarmUpJobs().size());
+ }
+
+ private WarmUpClusterCommand newTableStmt(String dstClusterName, boolean
force,
+ Triple<String, String, String>... tables) {
+ WarmUpClusterCommand stmt = new WarmUpClusterCommand(new ArrayList<>(),
+ null, dstClusterName, force, true);
+ for (Triple<String, String, String> table : tables) {
+ stmt.getTables().add(table);
+ }
+ return stmt;
+ }
+
+ private WarmUpClusterCommand newClusterStmt(String dstClusterName, String
srcClusterName, boolean force) {
+ return newClusterStmt(dstClusterName, srcClusterName, force, new
HashMap<>());
+ }
+
+ private WarmUpClusterCommand newClusterStmt(String dstClusterName, String
srcClusterName,
+ boolean force, Map<String, String> properties) {
+ return new WarmUpClusterCommand(null, srcClusterName, dstClusterName,
force, false, properties);
+ }
+
+ private Map<String, String> periodicProperties(long syncIntervalSec) {
+ Map<String, String> properties = new HashMap<>();
+ properties.put("sync_mode", "periodic");
+ properties.put("sync_interval_sec", String.valueOf(syncIntervalSec));
+ return properties;
+ }
+
+ private Map<String, String> eventDrivenProperties(String syncEvent) {
+ Map<String, String> properties = new HashMap<>();
+ properties.put("sync_mode", "event_driven");
+ properties.put("sync_event", syncEvent);
+ return properties;
+ }
+
+ private CloudWarmUpJob newClusterJob(long jobId, String srcClusterName,
String dstClusterName,
+ SyncMode syncMode, JobState jobState, long createTimeMs) {
+ CloudWarmUpJob.Builder builder = new CloudWarmUpJob.Builder()
+ .setJobId(jobId)
+ .setSrcClusterName(srcClusterName)
+ .setDstClusterName(dstClusterName)
+ .setJobType(JobType.CLUSTER)
+ .setSyncMode(syncMode);
+ if (syncMode == SyncMode.PERIODIC) {
+ builder.setSyncInterval(60L);
+ } else if (syncMode == SyncMode.EVENT_DRIVEN) {
+ builder.setSyncEvent(SyncEvent.LOAD);
+ }
+ CloudWarmUpJob job = builder.build();
+ job.setJobState(jobState);
+ job.setCreateTimeMs(createTimeMs);
+ return job;
+ }
+
+ private int getOncePendingCreateLockCount() throws Exception {
+ return getOncePendingCreateLocks().size();
+ }
+
+ private long createJobWithThreadLocalEnv(WarmUpClusterCommand command)
throws AnalysisException {
+ try (MockedStatic<Env> threadLocalEnvMock =
Mockito.mockStatic(Env.class)) {
+ threadLocalEnvMock.when(Env::getCurrentEnv).thenReturn(env);
+ return cacheHotspotManager.createJob(command);
+ }
+ }
+
+ private int getOnlyOncePendingCreateLockRefCount() throws Exception {
+ Map<?, ?> locks = getOncePendingCreateLocks();
+ Assert.assertEquals(1, locks.size());
+ Object lockEntry = locks.values().iterator().next();
+ Field refCountField =
lockEntry.getClass().getDeclaredField("refCount");
+ refCountField.setAccessible(true);
+ return refCountField.getInt(lockEntry);
+ }
+
+ private Map<?, ?> getOncePendingCreateLocks() throws Exception {
+ Field locksField =
CacheHotspotManager.class.getDeclaredField("oncePendingCreateLocks");
+ locksField.setAccessible(true);
+ return (Map<?, ?>) locksField.get(cacheHotspotManager);
+ }
+
+ private void waitForOncePendingCreateLockRefCount(int expectedRefCount,
long timeoutMs) throws Exception {
+ long deadlineMs = System.currentTimeMillis() + timeoutMs;
+ while (System.currentTimeMillis() < deadlineMs) {
+ if (getOncePendingCreateLockCount() == 1
+ && getOnlyOncePendingCreateLockRefCount() ==
expectedRefCount) {
+ return;
+ }
+ Thread.sleep(10L);
+ }
+ Assert.fail("Timed out waiting for once pending create lock ref count
" + expectedRefCount);
}
}
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]