This is an automated email from the ASF dual-hosted git repository.
morrysnow pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/doris.git
The following commit(s) were added to refs/heads/master by this push:
new e2a678cb74a [Enhancement](recyclebin) Optimize lock granularity in
CatalogRecycleBin (#61366)
e2a678cb74a is described below
commit e2a678cb74a5ef6278a7c968901c2de9348e0626
Author: morrySnow <[email protected]>
AuthorDate: Tue Mar 17 19:53:54 2026 +0800
[Enhancement](recyclebin) Optimize lock granularity in CatalogRecycleBin
(#61366)
## Problem
All methods in `CatalogRecycleBin.java` use `synchronized` (single
monitor lock), creating extremely coarse lock granularity. When
`erasePartition()` runs slowly with many partitions, other
`synchronized` methods block waiting for the lock. Callers like
`recyclePartition()` hold TABLE WRITE LOCK while waiting, causing
cascading blocking that can bring down the entire Doris metadata
service.
## Solution
Two complementary optimizations:
### 1. Replace `synchronized` with `ReentrantReadWriteLock`
- **Lock-free** (8 methods): Simple ConcurrentHashMap lookups
(`isRecyclePartition`, `getRecycleTimeById`, etc.)
- **Read lock** (4 methods): Read-only iterations
(`allTabletsInRecycledStatus`, `getInfo`, `write`, etc.)
- **Write lock** (11 methods): Map mutations
(`recycleDatabase/Table/Partition`, `recover*`, `clearAll`)
### 2. Microbatch Erase Pattern (Critical)
Refactored all 12 erase methods to process items **one at a time** with
lock release between items:
- **Inside write lock (per item)**: cleanup RPCs + map removal + edit
log write
- **Release lock between items**: other operations can proceed
This reduces lock hold time from **O(N × T)** (all items) to **O(T)**
(one item) per acquisition.
## Data Structure Changes
Changed 4 internal maps from `HashMap` to `ConcurrentHashMap` to enable
lock-free reads.
## Bug Fixes (found during self-review)
1. **NPE in `getIdListToEraseByRecycleTime`**: Used `getOrDefault` to
handle stale IDs that may be concurrently removed between snapshot and
processing
2. **DdlException in cascade erase**: Added try-catch in
`eraseDatabaseInstantly`/`eraseTableInstantly` for partitions/tables
concurrently erased by daemon
## Testing
- All 24 existing unit tests pass
- Added 3 new concurrency tests:
- `testConcurrentReadsDoNotBlock` — 10 concurrent reader threads
- `testConcurrentRecycleAndRead` — writer + 5 readers simultaneously
- `testMicrobatchEraseReleasesLockBetweenItems` — verifies
recyclePartition succeeds during erase
---------
Co-authored-by: Copilot <[email protected]>
---
.../apache/doris/catalog/CatalogRecycleBin.java | 1700 +++++++++++---------
.../doris/catalog/CatalogRecycleBinTest.java | 185 +++
2 files changed, 1151 insertions(+), 734 deletions(-)
diff --git
a/fe/fe-core/src/main/java/org/apache/doris/catalog/CatalogRecycleBin.java
b/fe/fe-core/src/main/java/org/apache/doris/catalog/CatalogRecycleBin.java
index ee5adb809cc..e5f10c96d93 100644
--- a/fe/fe-core/src/main/java/org/apache/doris/catalog/CatalogRecycleBin.java
+++ b/fe/fe-core/src/main/java/org/apache/doris/catalog/CatalogRecycleBin.java
@@ -36,7 +36,6 @@ import org.apache.doris.thrift.TStorageMedium;
import com.google.common.base.Preconditions;
import com.google.common.base.Strings;
import com.google.common.collect.Lists;
-import com.google.common.collect.Maps;
import com.google.common.collect.Range;
import com.google.common.collect.Sets;
import com.google.gson.annotations.SerializedName;
@@ -47,6 +46,7 @@ import org.apache.logging.log4j.Logger;
import java.io.DataInput;
import java.io.DataOutput;
import java.io.IOException;
+import java.util.ArrayList;
import java.util.Collection;
import java.util.HashMap;
import java.util.Iterator;
@@ -54,6 +54,7 @@ import java.util.List;
import java.util.Map;
import java.util.Set;
import java.util.concurrent.ConcurrentHashMap;
+import java.util.concurrent.locks.ReentrantReadWriteLock;
import java.util.stream.Collectors;
import java.util.stream.Stream;
@@ -64,10 +65,28 @@ public class CatalogRecycleBin extends MasterDaemon
implements Writable {
// to avoid erase log ahead of drop log
private static final long minEraseLatency = 10 * 60 * 1000; // 10 min
- private Map<Long, RecycleDatabaseInfo> idToDatabase;
- private Map<Long, RecycleTableInfo> idToTable;
- private Map<Long, RecyclePartitionInfo> idToPartition;
- private Map<Long, Long> idToRecycleTime;
+ private final ReentrantReadWriteLock lock = new ReentrantReadWriteLock();
+
+ private void readLock() {
+ lock.readLock().lock();
+ }
+
+ private void readUnlock() {
+ lock.readLock().unlock();
+ }
+
+ private void writeLock() {
+ lock.writeLock().lock();
+ }
+
+ private void writeUnlock() {
+ lock.writeLock().unlock();
+ }
+
+ private ConcurrentHashMap<Long, RecycleDatabaseInfo> idToDatabase;
+ private ConcurrentHashMap<Long, RecycleTableInfo> idToTable;
+ private ConcurrentHashMap<Long, RecyclePartitionInfo> idToPartition;
+ private ConcurrentHashMap<Long, Long> idToRecycleTime;
// Caches below to avoid calculate meta with same name every demon run
cycle.
// When the meta is updated, these caches should be updated too. No need to
@@ -85,42 +104,48 @@ public class CatalogRecycleBin extends MasterDaemon
implements Writable {
public CatalogRecycleBin() {
super("recycle bin", FeConstants.runningUnitTest ? 10 * 1000L :
DEFAULT_INTERVAL_SECONDS * 1000L);
- idToDatabase = Maps.newHashMap();
- idToTable = Maps.newHashMap();
- idToPartition = Maps.newHashMap();
- idToRecycleTime = Maps.newHashMap();
+ idToDatabase = new ConcurrentHashMap<>();
+ idToTable = new ConcurrentHashMap<>();
+ idToPartition = new ConcurrentHashMap<>();
+ idToRecycleTime = new ConcurrentHashMap<>();
}
- public synchronized boolean allTabletsInRecycledStatus(List<Long>
backendTabletIds) {
- Set<Long> recycledTabletSet = Sets.newHashSet();
-
- Iterator<Map.Entry<Long, RecyclePartitionInfo>> iterator =
idToPartition.entrySet().iterator();
- while (iterator.hasNext()) {
- Map.Entry<Long, RecyclePartitionInfo> entry = iterator.next();
- RecyclePartitionInfo partitionInfo = entry.getValue();
- Partition partition = partitionInfo.getPartition();
- addRecycledTabletsForPartition(recycledTabletSet, partition);
- }
+ public boolean allTabletsInRecycledStatus(List<Long> backendTabletIds) {
+ readLock();
+ try {
+ Set<Long> recycledTabletSet = Sets.newHashSet();
- Iterator<Map.Entry<Long, RecycleTableInfo>> tableIter =
idToTable.entrySet().iterator();
- while (tableIter.hasNext()) {
- Map.Entry<Long, RecycleTableInfo> entry = tableIter.next();
- RecycleTableInfo tableInfo = entry.getValue();
- Table table = tableInfo.getTable();
- addRecycledTabletsForTable(recycledTabletSet, table);
- }
+ Iterator<Map.Entry<Long, RecyclePartitionInfo>> iterator =
idToPartition.entrySet().iterator();
+ while (iterator.hasNext()) {
+ Map.Entry<Long, RecyclePartitionInfo> entry = iterator.next();
+ RecyclePartitionInfo partitionInfo = entry.getValue();
+ Partition partition = partitionInfo.getPartition();
+ addRecycledTabletsForPartition(recycledTabletSet, partition);
+ }
- Iterator<Map.Entry<Long, RecycleDatabaseInfo>> dbIterator =
idToDatabase.entrySet().iterator();
- while (dbIterator.hasNext()) {
- Map.Entry<Long, RecycleDatabaseInfo> entry = dbIterator.next();
- RecycleDatabaseInfo dbInfo = entry.getValue();
- Database db = dbInfo.getDb();
- for (Table table : db.getTables()) {
+ Iterator<Map.Entry<Long, RecycleTableInfo>> tableIter =
idToTable.entrySet().iterator();
+ while (tableIter.hasNext()) {
+ Map.Entry<Long, RecycleTableInfo> entry = tableIter.next();
+ RecycleTableInfo tableInfo = entry.getValue();
+ Table table = tableInfo.getTable();
addRecycledTabletsForTable(recycledTabletSet, table);
}
- }
- return recycledTabletSet.size() >= backendTabletIds.size() &&
recycledTabletSet.containsAll(backendTabletIds);
+ Iterator<Map.Entry<Long, RecycleDatabaseInfo>> dbIterator =
idToDatabase.entrySet().iterator();
+ while (dbIterator.hasNext()) {
+ Map.Entry<Long, RecycleDatabaseInfo> entry = dbIterator.next();
+ RecycleDatabaseInfo dbInfo = entry.getValue();
+ Database db = dbInfo.getDb();
+ for (Table table : db.getTables()) {
+ addRecycledTabletsForTable(recycledTabletSet, table);
+ }
+ }
+
+ return recycledTabletSet.size() >= backendTabletIds.size()
+ && recycledTabletSet.containsAll(backendTabletIds);
+ } finally {
+ readUnlock();
+ }
}
private void addRecycledTabletsForTable(Set<Long> recycledTabletSet, Table
table) {
@@ -141,130 +166,158 @@ public class CatalogRecycleBin extends MasterDaemon
implements Writable {
}
}
- public synchronized boolean recycleDatabase(Database db, Set<String>
tableNames, Set<Long> tableIds,
+ public boolean recycleDatabase(Database db, Set<String> tableNames,
Set<Long> tableIds,
boolean isReplay, boolean
isForceDrop, long replayRecycleTime) {
- long recycleTime = 0;
- if (idToDatabase.containsKey(db.getId())) {
- LOG.error("db[{}] already in recycle bin.", db.getId());
- return false;
- }
-
- // db should be empty. all tables are recycled before
- if (!db.getTableIds().isEmpty()) {
- throw new IllegalStateException("Database " + db.getFullName() + "
is not empty. Contains tables: "
- +
db.getTableIds().stream().collect(Collectors.toSet()));
- }
-
- // recycle db
- RecycleDatabaseInfo databaseInfo = new RecycleDatabaseInfo(db,
tableNames, tableIds);
- idToDatabase.put(db.getId(), databaseInfo);
- if (isForceDrop) {
- // The 'force drop' database should be recycle immediately.
- recycleTime = 0;
- } else if (!isReplay || replayRecycleTime == 0) {
- recycleTime = System.currentTimeMillis();
- } else {
- recycleTime = replayRecycleTime;
- }
- idToRecycleTime.put(db.getId(), recycleTime);
- dbNameToIds.computeIfAbsent(db.getFullName(), k ->
ConcurrentHashMap.newKeySet()).add(db.getId());
- LOG.info("recycle db[{}-{}], is force drop: {}", db.getId(),
db.getFullName(), isForceDrop);
- return true;
+ writeLock();
+ try {
+ long recycleTime = 0;
+ if (idToDatabase.containsKey(db.getId())) {
+ LOG.error("db[{}] already in recycle bin.", db.getId());
+ return false;
+ }
+
+ // db should be empty. all tables are recycled before
+ if (!db.getTableIds().isEmpty()) {
+ throw new IllegalStateException("Database " + db.getFullName()
+ " is not empty. Contains tables: "
+ +
db.getTableIds().stream().collect(Collectors.toSet()));
+ }
+
+ // recycle db
+ RecycleDatabaseInfo databaseInfo = new RecycleDatabaseInfo(db,
tableNames, tableIds);
+ idToDatabase.put(db.getId(), databaseInfo);
+ if (isForceDrop) {
+ // The 'force drop' database should be recycle immediately.
+ recycleTime = 0;
+ } else if (!isReplay || replayRecycleTime == 0) {
+ recycleTime = System.currentTimeMillis();
+ } else {
+ recycleTime = replayRecycleTime;
+ }
+ idToRecycleTime.put(db.getId(), recycleTime);
+ dbNameToIds.computeIfAbsent(db.getFullName(), k ->
ConcurrentHashMap.newKeySet()).add(db.getId());
+ LOG.info("recycle db[{}-{}], is force drop: {}", db.getId(),
db.getFullName(), isForceDrop);
+ return true;
+ } finally {
+ writeUnlock();
+ }
}
- public synchronized boolean recycleTable(long dbId, Table table, boolean
isReplay,
+ public boolean recycleTable(long dbId, Table table, boolean isReplay,
boolean isForceDrop, long
replayRecycleTime) {
- long recycleTime = 0;
- if (idToTable.containsKey(table.getId())) {
- LOG.error("table[{}] already in recycle bin.", table.getId());
- return false;
- }
-
- // recycle table
- RecycleTableInfo tableInfo = new RecycleTableInfo(dbId, table);
- if (isForceDrop) {
- // The 'force drop' table should be recycle immediately.
- recycleTime = 0;
- } else if (!isReplay || replayRecycleTime == 0) {
- recycleTime = System.currentTimeMillis();
- } else {
- recycleTime = replayRecycleTime;
- }
- idToRecycleTime.put(table.getId(), recycleTime);
- idToTable.put(table.getId(), tableInfo);
- dbIdTableNameToIds.computeIfAbsent(Pair.of(dbId, table.getName()),
- k -> ConcurrentHashMap.newKeySet()).add(table.getId());
- LOG.info("recycle table[{}-{}], is force drop: {}", table.getId(),
table.getName(), isForceDrop);
- return true;
+ writeLock();
+ try {
+ long recycleTime = 0;
+ if (idToTable.containsKey(table.getId())) {
+ LOG.error("table[{}] already in recycle bin.", table.getId());
+ return false;
+ }
+
+ // recycle table
+ RecycleTableInfo tableInfo = new RecycleTableInfo(dbId, table);
+ if (isForceDrop) {
+ // The 'force drop' table should be recycle immediately.
+ recycleTime = 0;
+ } else if (!isReplay || replayRecycleTime == 0) {
+ recycleTime = System.currentTimeMillis();
+ } else {
+ recycleTime = replayRecycleTime;
+ }
+ idToRecycleTime.put(table.getId(), recycleTime);
+ idToTable.put(table.getId(), tableInfo);
+ dbIdTableNameToIds.computeIfAbsent(Pair.of(dbId, table.getName()),
+ k -> ConcurrentHashMap.newKeySet()).add(table.getId());
+ LOG.info("recycle table[{}-{}], is force drop: {}", table.getId(),
table.getName(), isForceDrop);
+ return true;
+ } finally {
+ writeUnlock();
+ }
}
- public synchronized boolean recyclePartition(long dbId, long tableId,
String tableName, Partition partition,
+ public boolean recyclePartition(long dbId, long tableId, String tableName,
Partition partition,
Range<PartitionKey> range,
PartitionItem listPartitionItem,
DataProperty dataProperty,
ReplicaAllocation replicaAlloc,
boolean isInMemory, boolean
isMutable) {
- if (idToPartition.containsKey(partition.getId())) {
- LOG.error("partition[{}] already in recycle bin.",
partition.getId());
- return false;
- }
-
- // recycle partition
- RecyclePartitionInfo partitionInfo = new RecyclePartitionInfo(dbId,
tableId, partition,
- range, listPartitionItem, dataProperty, replicaAlloc,
isInMemory, isMutable);
- idToRecycleTime.put(partition.getId(), System.currentTimeMillis());
- idToPartition.put(partition.getId(), partitionInfo);
- dbTblIdPartitionNameToIds.computeIfAbsent(Pair.of(dbId, tableId), k ->
new ConcurrentHashMap<>())
- .computeIfAbsent(partition.getName(), k ->
ConcurrentHashMap.newKeySet()).add(partition.getId());
- LOG.info("recycle partition[{}-{}] of table [{}-{}]",
partition.getId(), partition.getName(),
- tableId, tableName);
- return true;
+ writeLock();
+ try {
+ if (idToPartition.containsKey(partition.getId())) {
+ LOG.error("partition[{}] already in recycle bin.",
partition.getId());
+ return false;
+ }
+
+ // recycle partition
+ RecyclePartitionInfo partitionInfo = new
RecyclePartitionInfo(dbId, tableId, partition,
+ range, listPartitionItem, dataProperty, replicaAlloc,
isInMemory, isMutable);
+ idToRecycleTime.put(partition.getId(), System.currentTimeMillis());
+ idToPartition.put(partition.getId(), partitionInfo);
+ dbTblIdPartitionNameToIds.computeIfAbsent(Pair.of(dbId, tableId),
k -> new ConcurrentHashMap<>())
+ .computeIfAbsent(partition.getName(), k ->
ConcurrentHashMap.newKeySet()).add(partition.getId());
+ LOG.info("recycle partition[{}-{}] of table [{}-{}]",
partition.getId(), partition.getName(),
+ tableId, tableName);
+ return true;
+ } finally {
+ writeUnlock();
+ }
}
- public synchronized Long getRecycleTimeById(long id) {
+ public Long getRecycleTimeById(long id) {
return idToRecycleTime.get(id);
}
- public synchronized void setRecycleTimeByIdForReplay(long id, Long
recycleTime) {
+ public void setRecycleTimeByIdForReplay(long id, Long recycleTime) {
idToRecycleTime.put(id, recycleTime);
}
- public synchronized boolean isRecycleDatabase(long dbId) {
+ public boolean isRecycleDatabase(long dbId) {
return idToDatabase.containsKey(dbId);
}
- public synchronized boolean isRecycleTable(long dbId, long tableId) {
+ public boolean isRecycleTable(long dbId, long tableId) {
return isRecycleDatabase(dbId) || idToTable.containsKey(tableId);
}
- public synchronized boolean isRecyclePartition(long dbId, long tableId,
long partitionId) {
+ public boolean isRecyclePartition(long dbId, long tableId, long
partitionId) {
return isRecycleTable(dbId, tableId) ||
idToPartition.containsKey(partitionId);
}
- public synchronized void getRecycleIds(Set<Long> dbIds, Set<Long>
tableIds, Set<Long> partitionIds) {
+ public void getRecycleIds(Set<Long> dbIds, Set<Long> tableIds, Set<Long>
partitionIds) {
dbIds.addAll(idToDatabase.keySet());
tableIds.addAll(idToTable.keySet());
partitionIds.addAll(idToPartition.keySet());
}
- private synchronized boolean isExpire(long id, long currentTimeMs) {
+ private boolean isExpire(long id, long currentTimeMs) {
long latency = currentTimeMs - idToRecycleTime.get(id);
return (Config.catalog_trash_ignore_min_erase_latency || latency >
minEraseLatency)
&& latency > Config.catalog_trash_expire_second * 1000L;
}
- private synchronized void eraseDatabase(long currentTimeMs, int keepNum) {
+ private void eraseDatabase(long currentTimeMs, int keepNum) {
int eraseNum = 0;
StopWatch watch = StopWatch.createStarted();
try {
- // 1. erase expired database
- Iterator<Map.Entry<Long, RecycleDatabaseInfo>> dbIter =
idToDatabase.entrySet().iterator();
- while (dbIter.hasNext()) {
- Map.Entry<Long, RecycleDatabaseInfo> entry = dbIter.next();
- RecycleDatabaseInfo dbInfo = entry.getValue();
- Database db = dbInfo.getDb();
- if (isExpire(db.getId(), currentTimeMs)) {
- // erase db
- dbIter.remove();
- idToRecycleTime.remove(entry.getKey());
+ // 1. collect expired database IDs under read lock
+ List<Long> expiredIds = new ArrayList<>();
+ readLock();
+ try {
+ for (Map.Entry<Long, RecycleDatabaseInfo> entry :
idToDatabase.entrySet()) {
+ if (isExpire(entry.getKey(), currentTimeMs)) {
+ expiredIds.add(entry.getKey());
+ }
+ }
+ } finally {
+ readUnlock();
+ }
+
+ // 2. erase each expired database one at a time
+ for (Long dbId : expiredIds) {
+ writeLock();
+ try {
+ RecycleDatabaseInfo dbInfo = idToDatabase.remove(dbId);
+ if (dbInfo == null) {
+ continue;
+ }
+ Database db = dbInfo.getDb();
+ idToRecycleTime.remove(dbId);
dbNameToIds.computeIfPresent(db.getFullName(), (k, v) -> {
v.remove(db.getId());
@@ -274,13 +327,23 @@ public class CatalogRecycleBin extends MasterDaemon
implements Writable {
Env.getCurrentEnv().eraseDatabase(db.getId(), true);
LOG.info("erase db[{}]", db.getId());
eraseNum++;
+ } finally {
+ writeUnlock();
}
}
- // 2. erase exceed number
+
+ // 3. erase exceed number
if (keepNum < 0) {
return;
}
- for (Map.Entry<String, Set<Long>> entry : dbNameToIds.entrySet()) {
+ List<Map.Entry<String, Set<Long>>> groups;
+ readLock();
+ try {
+ groups = new ArrayList<>(dbNameToIds.entrySet());
+ } finally {
+ readUnlock();
+ }
+ for (Map.Entry<String, Set<Long>> entry : groups) {
String dbName = entry.getKey();
eraseDatabaseWithSameName(dbName, currentTimeMs, keepNum,
Lists.newArrayList(entry.getValue()));
}
@@ -290,29 +353,40 @@ public class CatalogRecycleBin extends MasterDaemon
implements Writable {
}
}
- private synchronized void eraseDatabaseWithSameName(String dbName, long
currentTimeMs,
+ private void eraseDatabaseWithSameName(String dbName, long currentTimeMs,
int
maxSameNameTrashNum, List<Long> sameNameDbIdList) {
- List<Long> dbIdToErase =
getIdListToEraseByRecycleTime(sameNameDbIdList, maxSameNameTrashNum);
+ List<Long> dbIdToErase;
+ readLock();
+ try {
+ dbIdToErase = getIdListToEraseByRecycleTime(sameNameDbIdList,
maxSameNameTrashNum);
+ } finally {
+ readUnlock();
+ }
for (Long dbId : dbIdToErase) {
- RecycleDatabaseInfo dbInfo = idToDatabase.get(dbId);
- if (!isExpireMinLatency(dbId, currentTimeMs)) {
- continue;
- }
- eraseAllTables(dbInfo);
- idToDatabase.remove(dbId);
- idToRecycleTime.remove(dbId);
+ writeLock();
+ try {
+ RecycleDatabaseInfo dbInfo = idToDatabase.get(dbId);
+ if (dbInfo == null || !isExpireMinLatency(dbId,
currentTimeMs)) {
+ continue;
+ }
+ eraseAllTables(dbInfo);
+ idToDatabase.remove(dbId);
+ idToRecycleTime.remove(dbId);
- dbNameToIds.computeIfPresent(dbName, (k, v) -> {
- v.remove(dbId);
- return v.isEmpty() ? null : v;
- });
+ dbNameToIds.computeIfPresent(dbName, (k, v) -> {
+ v.remove(dbId);
+ return v.isEmpty() ? null : v;
+ });
- Env.getCurrentEnv().eraseDatabase(dbId, true);
- LOG.info("erase database[{}] name: {}", dbId, dbName);
+ Env.getCurrentEnv().eraseDatabase(dbId, true);
+ LOG.info("erase database[{}] name: {}", dbId, dbName);
+ } finally {
+ writeUnlock();
+ }
}
}
- private synchronized boolean isExpireMinLatency(long id, long
currentTimeMs) {
+ private boolean isExpireMinLatency(long id, long currentTimeMs) {
return (currentTimeMs - idToRecycleTime.get(id)) > minEraseLatency ||
FeConstants.runningUnitTest;
}
@@ -348,40 +422,56 @@ public class CatalogRecycleBin extends MasterDaemon
implements Writable {
}
}
- public synchronized void replayEraseDatabase(long dbId) {
- RecycleDatabaseInfo dbInfo = idToDatabase.remove(dbId);
- idToRecycleTime.remove(dbId);
+ public void replayEraseDatabase(long dbId) {
+ writeLock();
+ try {
+ RecycleDatabaseInfo dbInfo = idToDatabase.remove(dbId);
+ idToRecycleTime.remove(dbId);
- if (dbInfo != null) {
- dbNameToIds.computeIfPresent(dbInfo.getDb().getFullName(), (k, v)
-> {
- v.remove(dbId);
- return v.isEmpty() ? null : v;
- });
- }
+ if (dbInfo != null) {
+ dbNameToIds.computeIfPresent(dbInfo.getDb().getFullName(), (k,
v) -> {
+ v.remove(dbId);
+ return v.isEmpty() ? null : v;
+ });
+ }
- Env.getCurrentEnv().eraseDatabase(dbId, false);
- LOG.info("replay erase db[{}]", dbId);
+ Env.getCurrentEnv().eraseDatabase(dbId, false);
+ LOG.info("replay erase db[{}]", dbId);
+ } finally {
+ writeUnlock();
+ }
}
- private synchronized void eraseTable(long currentTimeMs, int keepNum) {
+ private void eraseTable(long currentTimeMs, int keepNum) {
int eraseNum = 0;
StopWatch watch = StopWatch.createStarted();
try {
- // 1. erase expired tables
- Iterator<Map.Entry<Long, RecycleTableInfo>> tableIter =
idToTable.entrySet().iterator();
- while (tableIter.hasNext()) {
- Map.Entry<Long, RecycleTableInfo> entry = tableIter.next();
- RecycleTableInfo tableInfo = entry.getValue();
- Table table = tableInfo.getTable();
- long tableId = table.getId();
+ // 1. collect expired table IDs under read lock
+ List<Long> expiredIds = new ArrayList<>();
+ readLock();
+ try {
+ for (Map.Entry<Long, RecycleTableInfo> entry :
idToTable.entrySet()) {
+ if (isExpire(entry.getKey(), currentTimeMs)) {
+ expiredIds.add(entry.getKey());
+ }
+ }
+ } finally {
+ readUnlock();
+ }
- if (isExpire(tableId, currentTimeMs)) {
+ // 2. erase each expired table one at a time
+ for (Long tableId : expiredIds) {
+ writeLock();
+ try {
+ RecycleTableInfo tableInfo = idToTable.remove(tableId);
+ if (tableInfo == null) {
+ continue;
+ }
+ Table table = tableInfo.getTable();
if (table.isManagedTable()) {
Env.getCurrentEnv().onEraseOlapTable(tableInfo.dbId,
(OlapTable) table, false);
}
- // erase table
- tableIter.remove();
idToRecycleTime.remove(tableId);
dbIdTableNameToIds.computeIfPresent(Pair.of(tableInfo.getDbId(),
table.getName()),
@@ -390,18 +480,26 @@ public class CatalogRecycleBin extends MasterDaemon
implements Writable {
return v.isEmpty() ? null : v;
});
- // log
Env.getCurrentEnv().getEditLog().logEraseTable(tableId);
LOG.info("erase table[{}]", tableId);
eraseNum++;
+ } finally {
+ writeUnlock();
}
- } // end for tables
+ }
- // 2. erase exceed num
+ // 3. erase exceed num
if (keepNum < 0) {
return;
}
- for (Map.Entry<Pair<Long, String>, Set<Long>> entry :
dbIdTableNameToIds.entrySet()) {
+ List<Map.Entry<Pair<Long, String>, Set<Long>>> groups;
+ readLock();
+ try {
+ groups = new ArrayList<>(dbIdTableNameToIds.entrySet());
+ } finally {
+ readUnlock();
+ }
+ for (Map.Entry<Pair<Long, String>, Set<Long>> entry : groups) {
eraseTableWithSameName(entry.getKey().first,
entry.getKey().second, currentTimeMs, keepNum,
Lists.newArrayList(entry.getValue()));
}
@@ -411,71 +509,98 @@ public class CatalogRecycleBin extends MasterDaemon
implements Writable {
}
}
- private synchronized void eraseTableWithSameName(long dbId, String
tableName, long currentTimeMs,
+ private void eraseTableWithSameName(long dbId, String tableName, long
currentTimeMs,
int maxSameNameTrashNum, List<Long> sameNameTableIdList) {
- List<Long> tableIdToErase =
getIdListToEraseByRecycleTime(sameNameTableIdList, maxSameNameTrashNum);
+ List<Long> tableIdToErase;
+ readLock();
+ try {
+ tableIdToErase =
getIdListToEraseByRecycleTime(sameNameTableIdList, maxSameNameTrashNum);
+ } finally {
+ readUnlock();
+ }
for (Long tableId : tableIdToErase) {
- RecycleTableInfo tableInfo = idToTable.get(tableId);
- if (!isExpireMinLatency(tableId, currentTimeMs)) {
- continue;
- }
- Table table = tableInfo.getTable();
- if (table.isManagedTable()) {
- Env.getCurrentEnv().onEraseOlapTable(dbId, (OlapTable) table,
false);
- }
+ writeLock();
+ try {
+ RecycleTableInfo tableInfo = idToTable.get(tableId);
+ if (tableInfo == null || !isExpireMinLatency(tableId,
currentTimeMs)) {
+ continue;
+ }
+ Table table = tableInfo.getTable();
+ if (table.isManagedTable()) {
+ Env.getCurrentEnv().onEraseOlapTable(dbId, (OlapTable)
table, false);
+ }
- idToTable.remove(tableId);
- idToRecycleTime.remove(tableId);
+ idToTable.remove(tableId);
+ idToRecycleTime.remove(tableId);
- dbIdTableNameToIds.computeIfPresent(Pair.of(dbId, tableName), (k,
v) -> {
- v.remove(tableId);
- return v.isEmpty() ? null : v;
- });
+ dbIdTableNameToIds.computeIfPresent(Pair.of(dbId, tableName),
(k, v) -> {
+ v.remove(tableId);
+ return v.isEmpty() ? null : v;
+ });
- Env.getCurrentEnv().getEditLog().logEraseTable(tableId);
- LOG.info("erase table[{}] name: {} from db[{}]", tableId,
tableName, dbId);
+ Env.getCurrentEnv().getEditLog().logEraseTable(tableId);
+ LOG.info("erase table[{}] name: {} from db[{}]", tableId,
tableName, dbId);
+ } finally {
+ writeUnlock();
+ }
}
}
- public synchronized void replayEraseTable(long tableId) {
- LOG.info("before replay erase table[{}]", tableId);
- RecycleTableInfo tableInfo = idToTable.remove(tableId);
- idToRecycleTime.remove(tableId);
- if (tableInfo == null) {
- // FIXME(walter): Sometimes `eraseTable` in 'DROP DB ... FORCE'
may be executed earlier than
- // finish drop db, especially in the case of drop db with many
tables.
- return;
- }
+ public void replayEraseTable(long tableId) {
+ writeLock();
+ try {
+ LOG.info("before replay erase table[{}]", tableId);
+ RecycleTableInfo tableInfo = idToTable.remove(tableId);
+ idToRecycleTime.remove(tableId);
+ if (tableInfo == null) {
+ // FIXME(walter): Sometimes `eraseTable` in 'DROP DB ...
FORCE' may be executed earlier than
+ // finish drop db, especially in the case of drop db with many
tables.
+ return;
+ }
- dbIdTableNameToIds.computeIfPresent(Pair.of(tableInfo.getDbId(),
tableInfo.getTable().getName()),
- (k, v) -> {
- v.remove(tableId);
- return v.isEmpty() ? null : v;
- });
+ dbIdTableNameToIds.computeIfPresent(Pair.of(tableInfo.getDbId(),
tableInfo.getTable().getName()),
+ (k, v) -> {
+ v.remove(tableId);
+ return v.isEmpty() ? null : v;
+ });
- Table table = tableInfo.getTable();
- if (table.isManagedTable()) {
- Env.getCurrentEnv().onEraseOlapTable(tableInfo.dbId, (OlapTable)
table, true);
+ Table table = tableInfo.getTable();
+ if (table.isManagedTable()) {
+ Env.getCurrentEnv().onEraseOlapTable(tableInfo.dbId,
(OlapTable) table, true);
+ }
+ LOG.info("replay erase table[{}]", tableId);
+ } finally {
+ writeUnlock();
}
- LOG.info("replay erase table[{}]", tableId);
}
- private synchronized void erasePartition(long currentTimeMs, int keepNum) {
+ private void erasePartition(long currentTimeMs, int keepNum) {
int eraseNum = 0;
StopWatch watch = StopWatch.createStarted();
try {
- // 1. erase expired partitions
- Iterator<Map.Entry<Long, RecyclePartitionInfo>> iterator =
idToPartition.entrySet().iterator();
- while (iterator.hasNext()) {
- Map.Entry<Long, RecyclePartitionInfo> entry = iterator.next();
- RecyclePartitionInfo partitionInfo = entry.getValue();
- Partition partition = partitionInfo.getPartition();
+ // 1. collect expired partition IDs under read lock
+ List<Long> expiredIds = new ArrayList<>();
+ readLock();
+ try {
+ for (Map.Entry<Long, RecyclePartitionInfo> entry :
idToPartition.entrySet()) {
+ if (isExpire(entry.getKey(), currentTimeMs)) {
+ expiredIds.add(entry.getKey());
+ }
+ }
+ } finally {
+ readUnlock();
+ }
- long partitionId = entry.getKey();
- if (isExpire(partitionId, currentTimeMs)) {
+ // 2. erase each expired partition one at a time (microbatch)
+ for (Long partitionId : expiredIds) {
+ writeLock();
+ try {
+ RecyclePartitionInfo partitionInfo =
idToPartition.remove(partitionId);
+ if (partitionInfo == null) {
+ continue;
+ }
+ Partition partition = partitionInfo.getPartition();
Env.getCurrentEnv().onErasePartition(partition);
- // erase partition
- iterator.remove();
idToRecycleTime.remove(partitionId);
dbTblIdPartitionNameToIds.computeIfPresent(
@@ -486,18 +611,28 @@ public class CatalogRecycleBin extends MasterDaemon
implements Writable {
});
return partitionMap.isEmpty() ? null :
partitionMap;
});
- // log
+
Env.getCurrentEnv().getEditLog().logErasePartition(partitionId);
LOG.info("erase partition[{}]. reason: expired",
partitionId);
eraseNum++;
+ } finally {
+ writeUnlock();
}
- } // end for partitions
+ }
- // 2. erase exceed number
+ // 3. erase exceed number
if (keepNum < 0) {
return;
}
- for (Map.Entry<Pair<Long, Long>, Map<String, Set<Long>>> entry :
dbTblIdPartitionNameToIds.entrySet()) {
+ // Collect same-name groups under read lock
+ List<Map.Entry<Pair<Long, Long>, Map<String, Set<Long>>>> groups;
+ readLock();
+ try {
+ groups = new ArrayList<>(dbTblIdPartitionNameToIds.entrySet());
+ } finally {
+ readUnlock();
+ }
+ for (Map.Entry<Pair<Long, Long>, Map<String, Set<Long>>> entry :
groups) {
long dbId = entry.getKey().first;
long tableId = entry.getKey().second;
for (Map.Entry<String, Set<Long>> partitionEntry :
entry.getValue().entrySet()) {
@@ -511,66 +646,84 @@ public class CatalogRecycleBin extends MasterDaemon
implements Writable {
}
}
- private synchronized void erasePartitionWithSameName(long dbId, long
tableId, String partitionName,
+ private void erasePartitionWithSameName(long dbId, long tableId, String
partitionName,
long currentTimeMs, int maxSameNameTrashNum, List<Long>
sameNamePartitionIdList) {
- List<Long> partitionIdToErase =
getIdListToEraseByRecycleTime(sameNamePartitionIdList,
- maxSameNameTrashNum);
+ List<Long> partitionIdToErase;
+ readLock();
+ try {
+ partitionIdToErase =
getIdListToEraseByRecycleTime(sameNamePartitionIdList, maxSameNameTrashNum);
+ } finally {
+ readUnlock();
+ }
for (Long partitionId : partitionIdToErase) {
- RecyclePartitionInfo partitionInfo =
idToPartition.get(partitionId);
- if (!isExpireMinLatency(partitionId, currentTimeMs)) {
- continue;
- }
- Partition partition = partitionInfo.getPartition();
+ writeLock();
+ try {
+ RecyclePartitionInfo partitionInfo =
idToPartition.get(partitionId);
+ if (partitionInfo == null || !isExpireMinLatency(partitionId,
currentTimeMs)) {
+ continue;
+ }
+ Partition partition = partitionInfo.getPartition();
- Env.getCurrentEnv().onErasePartition(partition);
- idToPartition.remove(partitionId);
- idToRecycleTime.remove(partitionId);
+ Env.getCurrentEnv().onErasePartition(partition);
+ idToPartition.remove(partitionId);
+ idToRecycleTime.remove(partitionId);
- dbTblIdPartitionNameToIds.computeIfPresent(Pair.of(dbId, tableId),
(pair, partitionMap) -> {
- partitionMap.computeIfPresent(partitionName, (name, idSet) -> {
- idSet.remove(partitionId);
- return idSet.isEmpty() ? null : idSet;
+ dbTblIdPartitionNameToIds.computeIfPresent(Pair.of(dbId,
tableId), (pair, partitionMap) -> {
+ partitionMap.computeIfPresent(partitionName, (name, idSet)
-> {
+ idSet.remove(partitionId);
+ return idSet.isEmpty() ? null : idSet;
+ });
+ return partitionMap.isEmpty() ? null : partitionMap;
});
- return partitionMap.isEmpty() ? null : partitionMap;
- });
- Env.getCurrentEnv().getEditLog().logErasePartition(partitionId);
- LOG.info("erase partition[{}] name: {} from table[{}] from
db[{}]", partitionId,
- partitionName, tableId, dbId);
+
Env.getCurrentEnv().getEditLog().logErasePartition(partitionId);
+ LOG.info("erase partition[{}] name: {} from table[{}] from
db[{}]", partitionId,
+ partitionName, tableId, dbId);
+ } finally {
+ writeUnlock();
+ }
}
}
- public synchronized void replayErasePartition(long partitionId) {
- RecyclePartitionInfo partitionInfo = idToPartition.remove(partitionId);
- idToRecycleTime.remove(partitionId);
+ public void replayErasePartition(long partitionId) {
+ writeLock();
+ try {
+ RecyclePartitionInfo partitionInfo =
idToPartition.remove(partitionId);
+ idToRecycleTime.remove(partitionId);
- if (partitionInfo == null) {
- LOG.warn("replayErasePartition: partitionInfo is null for
partitionId[{}]", partitionId);
- return;
- }
+ if (partitionInfo == null) {
+ LOG.warn("replayErasePartition: partitionInfo is null for
partitionId[{}]", partitionId);
+ return;
+ }
- dbTblIdPartitionNameToIds.computeIfPresent(
- Pair.of(partitionInfo.getDbId(), partitionInfo.getTableId()),
(pair, partitionMap) -> {
-
partitionMap.computeIfPresent(partitionInfo.getPartition().getName(), (name,
idSet) -> {
- idSet.remove(partitionId);
- return idSet.isEmpty() ? null : idSet;
+ dbTblIdPartitionNameToIds.computeIfPresent(
+ Pair.of(partitionInfo.getDbId(),
partitionInfo.getTableId()), (pair, partitionMap) -> {
+
partitionMap.computeIfPresent(partitionInfo.getPartition().getName(), (name,
idSet) -> {
+ idSet.remove(partitionId);
+ return idSet.isEmpty() ? null : idSet;
+ });
+ return partitionMap.isEmpty() ? null : partitionMap;
});
- return partitionMap.isEmpty() ? null : partitionMap;
- });
- Partition partition = partitionInfo.getPartition();
- Env.getCurrentEnv().onErasePartition(partition);
+ Partition partition = partitionInfo.getPartition();
+ Env.getCurrentEnv().onErasePartition(partition);
- LOG.info("replay erase partition[{}]", partitionId);
+ LOG.info("replay erase partition[{}]", partitionId);
+ } finally {
+ writeUnlock();
+ }
}
- private synchronized List<Long> getIdListToEraseByRecycleTime(List<Long>
ids, int maxTrashNum) {
+ private List<Long> getIdListToEraseByRecycleTime(List<Long> ids, int
maxTrashNum) {
List<Long> idToErase = Lists.newArrayList();
if (ids.size() <= maxTrashNum) {
return idToErase;
}
- // order by recycle time desc
- ids.sort((x, y) -> Long.compare(idToRecycleTime.get(y),
idToRecycleTime.get(x)));
+ // order by recycle time desc; use getOrDefault to handle stale IDs
+ // that may have been removed between snapshot and read lock
acquisition
+ ids.sort((x, y) -> Long.compare(
+ idToRecycleTime.getOrDefault(y, 0L),
+ idToRecycleTime.getOrDefault(x, 0L)));
for (int i = maxTrashNum; i < ids.size(); i++) {
idToErase.add(ids.get(i));
@@ -578,66 +731,76 @@ public class CatalogRecycleBin extends MasterDaemon
implements Writable {
return idToErase;
}
- public synchronized Database recoverDatabase(String dbName, long dbId)
throws DdlException {
- RecycleDatabaseInfo dbInfo = null;
- // The recycle time of the force dropped tables and databases will be
set to zero, use 1 here to
- // skip these databases and tables.
- long recycleTime = 1;
- Iterator<Map.Entry<Long, RecycleDatabaseInfo>> iterator =
idToDatabase.entrySet().iterator();
- while (iterator.hasNext()) {
- Map.Entry<Long, RecycleDatabaseInfo> entry = iterator.next();
- if (dbName.equals(entry.getValue().getDb().getFullName())) {
- if (dbId == -1) {
- if (recycleTime <= idToRecycleTime.get(entry.getKey())) {
- recycleTime = idToRecycleTime.get(entry.getKey());
+ public Database recoverDatabase(String dbName, long dbId) throws
DdlException {
+ writeLock();
+ try {
+ RecycleDatabaseInfo dbInfo = null;
+ // The recycle time of the force dropped tables and databases will
be set to zero, use 1 here to
+ // skip these databases and tables.
+ long recycleTime = 1;
+ Iterator<Map.Entry<Long, RecycleDatabaseInfo>> iterator =
idToDatabase.entrySet().iterator();
+ while (iterator.hasNext()) {
+ Map.Entry<Long, RecycleDatabaseInfo> entry = iterator.next();
+ if (dbName.equals(entry.getValue().getDb().getFullName())) {
+ if (dbId == -1) {
+ if (recycleTime <=
idToRecycleTime.get(entry.getKey())) {
+ recycleTime = idToRecycleTime.get(entry.getKey());
+ dbInfo = entry.getValue();
+ }
+ } else if (entry.getKey() == dbId) {
dbInfo = entry.getValue();
+ break;
}
- } else if (entry.getKey() == dbId) {
- dbInfo = entry.getValue();
- break;
}
}
- }
- if (dbInfo == null) {
- throw new DdlException("Unknown database '" + dbName + "' or
database id '" + dbId + "'");
- }
+ if (dbInfo == null) {
+ throw new DdlException("Unknown database '" + dbName + "' or
database id '" + dbId + "'");
+ }
- // 1. recover all tables in this db
- recoverAllTables(dbInfo);
+ // 1. recover all tables in this db
+ recoverAllTables(dbInfo);
- Database db = dbInfo.getDb();
- // 2. remove db from idToDatabase and idToRecycleTime
- idToDatabase.remove(db.getId());
- idToRecycleTime.remove(db.getId());
+ Database db = dbInfo.getDb();
+ // 2. remove db from idToDatabase and idToRecycleTime
+ idToDatabase.remove(db.getId());
+ idToRecycleTime.remove(db.getId());
- dbNameToIds.computeIfPresent(dbInfo.getDb().getFullName(), (k, v) -> {
- v.remove(dbId);
- return v.isEmpty() ? null : v;
- });
+ dbNameToIds.computeIfPresent(dbInfo.getDb().getFullName(), (k, v)
-> {
+ v.remove(dbId);
+ return v.isEmpty() ? null : v;
+ });
- return db;
+ return db;
+ } finally {
+ writeUnlock();
+ }
}
- public synchronized Database replayRecoverDatabase(long dbId) {
- RecycleDatabaseInfo dbInfo = idToDatabase.get(dbId);
-
+ public Database replayRecoverDatabase(long dbId) {
+ writeLock();
try {
- recoverAllTables(dbInfo);
- } catch (DdlException e) {
- // should not happened
- LOG.error("failed replay recover database: {}", dbId, e);
- }
+ RecycleDatabaseInfo dbInfo = idToDatabase.get(dbId);
- idToDatabase.remove(dbId);
- idToRecycleTime.remove(dbId);
+ try {
+ recoverAllTables(dbInfo);
+ } catch (DdlException e) {
+ // should not happened
+ LOG.error("failed replay recover database: {}", dbId, e);
+ }
- dbNameToIds.computeIfPresent(dbInfo.getDb().getFullName(), (k, v) -> {
- v.remove(dbId);
- return v.isEmpty() ? null : v;
- });
+ idToDatabase.remove(dbId);
+ idToRecycleTime.remove(dbId);
- return dbInfo.getDb();
+ dbNameToIds.computeIfPresent(dbInfo.getDb().getFullName(), (k, v)
-> {
+ v.remove(dbId);
+ return v.isEmpty() ? null : v;
+ });
+
+ return dbInfo.getDb();
+ } finally {
+ writeUnlock();
+ }
}
private void recoverAllTables(RecycleDatabaseInfo dbInfo) throws
DdlException {
@@ -676,71 +839,81 @@ public class CatalogRecycleBin extends MasterDaemon
implements Writable {
}
}
- public synchronized boolean recoverTable(Database db, String tableName,
long tableId,
+ public boolean recoverTable(Database db, String tableName, long tableId,
String newTableName) throws
DdlException {
- // make sure to get db lock
- Table table = null;
- // The recycle time of the force dropped tables and databases will be
set to zero, use 1 here to
- // skip these databases and tables.
- long recycleTime = 1;
- long dbId = db.getId();
- Iterator<Map.Entry<Long, RecycleTableInfo>> iterator =
idToTable.entrySet().iterator();
- while (iterator.hasNext()) {
- Map.Entry<Long, RecycleTableInfo> entry = iterator.next();
- RecycleTableInfo tableInfo = entry.getValue();
- if (tableInfo.getDbId() != dbId) {
- continue;
- }
+ writeLock();
+ try {
+ // make sure to get db lock
+ Table table = null;
+ // The recycle time of the force dropped tables and databases will
be set to zero, use 1 here to
+ // skip these databases and tables.
+ long recycleTime = 1;
+ long dbId = db.getId();
+ Iterator<Map.Entry<Long, RecycleTableInfo>> iterator =
idToTable.entrySet().iterator();
+ while (iterator.hasNext()) {
+ Map.Entry<Long, RecycleTableInfo> entry = iterator.next();
+ RecycleTableInfo tableInfo = entry.getValue();
+ if (tableInfo.getDbId() != dbId) {
+ continue;
+ }
- if (!tableInfo.getTable().getName().equals(tableName)) {
- continue;
- }
+ if (!tableInfo.getTable().getName().equals(tableName)) {
+ continue;
+ }
- if (tableId == -1) {
- if (recycleTime <= idToRecycleTime.get(entry.getKey())) {
- recycleTime = idToRecycleTime.get(entry.getKey());
+ if (tableId == -1) {
+ if (recycleTime <= idToRecycleTime.get(entry.getKey())) {
+ recycleTime = idToRecycleTime.get(entry.getKey());
+ table = tableInfo.getTable();
+ }
+ } else if (entry.getKey() == tableId) {
table = tableInfo.getTable();
+ break;
}
- } else if (entry.getKey() == tableId) {
- table = tableInfo.getTable();
- break;
}
- }
- if (table == null) {
- throw new DdlException("Unknown table '" + tableName + "' or table
id '" + tableId + "' in "
- + db.getFullName());
- }
+ if (table == null) {
+ throw new DdlException("Unknown table '" + tableName + "' or
table id '" + tableId + "' in "
+ + db.getFullName());
+ }
- if (table.getType() == TableType.MATERIALIZED_VIEW) {
- throw new DdlException("Can not recover materialized view '" +
tableName + "' or table id '"
- + tableId + "' in " + db.getFullName());
- }
+ if (table.getType() == TableType.MATERIALIZED_VIEW) {
+ throw new DdlException("Can not recover materialized view '" +
tableName + "' or table id '"
+ + tableId + "' in " + db.getFullName());
+ }
- innerRecoverTable(db, table, tableName, newTableName, null, false);
- LOG.info("recover db[{}] with table[{}]: {}", dbId, table.getId(),
table.getName());
- return true;
+ innerRecoverTable(db, table, tableName, newTableName, null, false);
+ LOG.info("recover db[{}] with table[{}]: {}", dbId, table.getId(),
table.getName());
+ return true;
+ } finally {
+ writeUnlock();
+ }
}
- public synchronized void replayRecoverTable(Database db, long tableId,
String newTableName) throws DdlException {
- // make sure to get db write lock
- Iterator<Map.Entry<Long, RecycleTableInfo>> iterator =
idToTable.entrySet().iterator();
- while (iterator.hasNext()) {
- Map.Entry<Long, RecycleTableInfo> entry = iterator.next();
- RecycleTableInfo tableInfo = entry.getValue();
- if (tableInfo.getTable().getId() != tableId) {
- continue;
- }
- Preconditions.checkState(tableInfo.getDbId() == db.getId());
- Table table = tableInfo.getTable();
- String tableName = table.getName();
- if (innerRecoverTable(db, table, tableName, newTableName,
iterator, true)) {
- break;
+ public void replayRecoverTable(Database db, long tableId, String
newTableName) throws DdlException {
+ writeLock();
+ try {
+ // make sure to get db write lock
+ Iterator<Map.Entry<Long, RecycleTableInfo>> iterator =
idToTable.entrySet().iterator();
+ while (iterator.hasNext()) {
+ Map.Entry<Long, RecycleTableInfo> entry = iterator.next();
+ RecycleTableInfo tableInfo = entry.getValue();
+ if (tableInfo.getTable().getId() != tableId) {
+ continue;
+ }
+ Preconditions.checkState(tableInfo.getDbId() == db.getId());
+ Table table = tableInfo.getTable();
+ String tableName = table.getName();
+ if (innerRecoverTable(db, table, tableName, newTableName,
iterator, true)) {
+ break;
+ }
}
+ } finally {
+ writeUnlock();
}
}
- private synchronized boolean innerRecoverTable(Database db, Table table,
String tableName, String newTableName,
+ private boolean innerRecoverTable(Database db, Table table, String
tableName, String newTableName,
Iterator<Map.Entry<Long,
RecycleTableInfo>> iterator,
boolean isReplay) throws
DdlException {
table.writeLock();
@@ -795,217 +968,245 @@ public class CatalogRecycleBin extends MasterDaemon
implements Writable {
return true;
}
- public synchronized void recoverPartition(long dbId, OlapTable table,
String partitionName,
+ public void recoverPartition(long dbId, OlapTable table, String
partitionName,
long partitionIdToRecover, String newPartitionName) throws
DdlException {
- if (table.getType() == TableType.MATERIALIZED_VIEW) {
- throw new DdlException("Can not recover partition in materialized
view: " + table.getName());
- }
+ writeLock();
+ try {
+ if (table.getType() == TableType.MATERIALIZED_VIEW) {
+ throw new DdlException("Can not recover partition in
materialized view: " + table.getName());
+ }
- long recycleTime = -1;
- // make sure to get db write lock
- RecyclePartitionInfo recoverPartitionInfo = null;
+ long recycleTime = -1;
+ // make sure to get db write lock
+ RecyclePartitionInfo recoverPartitionInfo = null;
- Iterator<Map.Entry<Long, RecyclePartitionInfo>> iterator =
idToPartition.entrySet().iterator();
- while (iterator.hasNext()) {
- Map.Entry<Long, RecyclePartitionInfo> entry = iterator.next();
- RecyclePartitionInfo partitionInfo = entry.getValue();
+ Iterator<Map.Entry<Long, RecyclePartitionInfo>> iterator =
idToPartition.entrySet().iterator();
+ while (iterator.hasNext()) {
+ Map.Entry<Long, RecyclePartitionInfo> entry = iterator.next();
+ RecyclePartitionInfo partitionInfo = entry.getValue();
- if (partitionInfo.getTableId() != table.getId()) {
- continue;
- }
+ if (partitionInfo.getTableId() != table.getId()) {
+ continue;
+ }
- if
(!partitionInfo.getPartition().getName().equalsIgnoreCase(partitionName)) {
- continue;
- }
+ if
(!partitionInfo.getPartition().getName().equalsIgnoreCase(partitionName)) {
+ continue;
+ }
- if (partitionIdToRecover == -1) {
- if (recycleTime <= idToRecycleTime.get(entry.getKey())) {
- recycleTime = idToRecycleTime.get(entry.getKey());
+ if (partitionIdToRecover == -1) {
+ if (recycleTime <= idToRecycleTime.get(entry.getKey())) {
+ recycleTime = idToRecycleTime.get(entry.getKey());
+ recoverPartitionInfo = partitionInfo;
+ }
+ } else if (entry.getKey() == partitionIdToRecover) {
recoverPartitionInfo = partitionInfo;
+ break;
}
- } else if (entry.getKey() == partitionIdToRecover) {
- recoverPartitionInfo = partitionInfo;
- break;
}
- }
-
- if (recoverPartitionInfo == null) {
- throw new DdlException("No partition named '" + partitionName + "'
or partition id '" + partitionIdToRecover
- + "' in table " + table.getName());
- }
-
- PartitionInfo partitionInfo = table.getPartitionInfo();
- PartitionItem recoverItem = null;
- if (partitionInfo.getType() == PartitionType.RANGE) {
- recoverItem = new
RangePartitionItem(recoverPartitionInfo.getRange());
- } else if (partitionInfo.getType() == PartitionType.LIST) {
- recoverItem = recoverPartitionInfo.getListPartitionItem();
- }
- // check if partition item is invalid
- if (partitionInfo.getAnyIntersectItem(recoverItem, false) != null) {
- throw new DdlException("Can not recover partition[" +
partitionName + "]. Partition item conflict.");
- }
-
- // check if schema change
- Partition recoverPartition = recoverPartitionInfo.getPartition();
- Set<Long> tableIndex = table.getIndexIdToMeta().keySet();
- Set<Long> partitionIndex =
recoverPartition.getMaterializedIndices(IndexExtState.ALL).stream()
- .map(i -> i.getId()).collect(Collectors.toSet());
- if (!tableIndex.equals(partitionIndex)) {
- throw new DdlException("table's index not equal with partition's
index. table's index=" + tableIndex
- + ", partition's index=" + partitionIndex);
- }
- // check if partition name exists
-
Preconditions.checkState(recoverPartition.getName().equalsIgnoreCase(partitionName));
- if (!Strings.isNullOrEmpty(newPartitionName)) {
- if (table.checkPartitionNameExist(newPartitionName)) {
- throw new DdlException("Partition name[" + newPartitionName +
"] is already used");
+ if (recoverPartitionInfo == null) {
+ throw new DdlException("No partition named '" + partitionName
+ + "' or partition id '" + partitionIdToRecover
+ + "' in table " + table.getName());
}
- recoverPartition.setName(newPartitionName);
- }
-
- // recover partition
- table.addPartition(recoverPartition);
-
- // recover partition info
- long partitionId = recoverPartition.getId();
- partitionInfo.setItem(partitionId, false, recoverItem);
- partitionInfo.setDataProperty(partitionId,
recoverPartitionInfo.getDataProperty());
- partitionInfo.setReplicaAllocation(partitionId,
recoverPartitionInfo.getReplicaAlloc());
- partitionInfo.setIsInMemory(partitionId,
recoverPartitionInfo.isInMemory());
- partitionInfo.setIsMutable(partitionId,
recoverPartitionInfo.isMutable());
-
- // remove from recycle bin
- idToPartition.remove(partitionId);
- idToRecycleTime.remove(partitionId);
- if (!Env.getCurrentEnv().invalidCacheForCloud()) {
- long version = table.getNextVersion();
- table.updateVisibleVersionAndTime(version,
System.currentTimeMillis());
- }
-
- dbTblIdPartitionNameToIds.computeIfPresent(
- Pair.of(recoverPartitionInfo.getDbId(),
recoverPartitionInfo.getTableId()), (pair, partitionMap) -> {
- partitionMap.computeIfPresent(partitionName, (name, idSet)
-> {
- idSet.remove(recoverPartition.getId());
- return idSet.isEmpty() ? null : idSet;
- });
- return partitionMap.isEmpty() ? null : partitionMap;
- });
-
- // log
- RecoverInfo recoverInfo = new RecoverInfo(dbId, table.getId(),
partitionId, "",
- table.getName(), "",
partitionName, newPartitionName);
- Env.getCurrentEnv().getEditLog().logRecoverPartition(recoverInfo);
- LOG.info("recover partition[{}]", partitionId);
- }
+ PartitionInfo partitionInfo = table.getPartitionInfo();
+ PartitionItem recoverItem = null;
+ if (partitionInfo.getType() == PartitionType.RANGE) {
+ recoverItem = new
RangePartitionItem(recoverPartitionInfo.getRange());
+ } else if (partitionInfo.getType() == PartitionType.LIST) {
+ recoverItem = recoverPartitionInfo.getListPartitionItem();
+ }
+ // check if partition item is invalid
+ if (partitionInfo.getAnyIntersectItem(recoverItem, false) != null)
{
+ throw new DdlException("Can not recover partition[" +
partitionName + "]. Partition item conflict.");
+ }
- // The caller should keep table write lock
- public synchronized void replayRecoverPartition(OlapTable table, long
partitionId,
- String newPartitionName)
throws DdlException {
- Iterator<Map.Entry<Long, RecyclePartitionInfo>> iterator =
idToPartition.entrySet().iterator();
- Env currentEnv = Env.getCurrentEnv();
- while (iterator.hasNext()) {
- Map.Entry<Long, RecyclePartitionInfo> entry = iterator.next();
- RecyclePartitionInfo recyclePartitionInfo = entry.getValue();
- if (recyclePartitionInfo.getPartition().getId() != partitionId) {
- continue;
+ // check if schema change
+ Partition recoverPartition = recoverPartitionInfo.getPartition();
+ Set<Long> tableIndex = table.getIndexIdToMeta().keySet();
+ Set<Long> partitionIndex =
recoverPartition.getMaterializedIndices(IndexExtState.ALL).stream()
+ .map(i -> i.getId()).collect(Collectors.toSet());
+ if (!tableIndex.equals(partitionIndex)) {
+ throw new DdlException("table's index not equal with
partition's index. table's index=" + tableIndex
+ + ", partition's index=" + partitionIndex);
}
- Preconditions.checkState(recyclePartitionInfo.getTableId() ==
table.getId());
+ // check if partition name exists
+
Preconditions.checkState(recoverPartition.getName().equalsIgnoreCase(partitionName));
if (!Strings.isNullOrEmpty(newPartitionName)) {
if (table.checkPartitionNameExist(newPartitionName)) {
throw new DdlException("Partition name[" +
newPartitionName + "] is already used");
}
+ recoverPartition.setName(newPartitionName);
}
- table.addPartition(recyclePartitionInfo.getPartition());
- if (!Strings.isNullOrEmpty(newPartitionName)) {
-
table.renamePartition(recyclePartitionInfo.getPartition().getName(),
newPartitionName);
- }
- PartitionInfo partitionInfo = table.getPartitionInfo();
- PartitionItem recoverItem = null;
- if (partitionInfo.getType() == PartitionType.RANGE) {
- recoverItem = new
RangePartitionItem(recyclePartitionInfo.getRange());
- } else if (partitionInfo.getType() == PartitionType.LIST) {
- recoverItem = recyclePartitionInfo.getListPartitionItem();
- }
+
+ // recover partition
+ table.addPartition(recoverPartition);
+
+ // recover partition info
+ long partitionId = recoverPartition.getId();
partitionInfo.setItem(partitionId, false, recoverItem);
- partitionInfo.setDataProperty(partitionId,
recyclePartitionInfo.getDataProperty());
- partitionInfo.setReplicaAllocation(partitionId,
recyclePartitionInfo.getReplicaAlloc());
- partitionInfo.setIsInMemory(partitionId,
recyclePartitionInfo.isInMemory());
- partitionInfo.setIsMutable(partitionId,
recyclePartitionInfo.isMutable());
+ partitionInfo.setDataProperty(partitionId,
recoverPartitionInfo.getDataProperty());
+ partitionInfo.setReplicaAllocation(partitionId,
recoverPartitionInfo.getReplicaAlloc());
+ partitionInfo.setIsInMemory(partitionId,
recoverPartitionInfo.isInMemory());
+ partitionInfo.setIsMutable(partitionId,
recoverPartitionInfo.isMutable());
- iterator.remove();
+ // remove from recycle bin
+ idToPartition.remove(partitionId);
idToRecycleTime.remove(partitionId);
- if (!currentEnv.invalidCacheForCloud()) {
+ if (!Env.getCurrentEnv().invalidCacheForCloud()) {
long version = table.getNextVersion();
table.updateVisibleVersionAndTime(version,
System.currentTimeMillis());
}
dbTblIdPartitionNameToIds.computeIfPresent(
- Pair.of(recyclePartitionInfo.getDbId(), table.getId()),
(pair, partitionMap) -> {
-
partitionMap.computeIfPresent(recyclePartitionInfo.getPartition().getName(),
(name, idSet) -> {
- idSet.remove(partitionId);
+ Pair.of(recoverPartitionInfo.getDbId(),
+ recoverPartitionInfo.getTableId()), (pair,
partitionMap) -> {
+ partitionMap.computeIfPresent(partitionName, (name,
idSet) -> {
+ idSet.remove(recoverPartition.getId());
return idSet.isEmpty() ? null : idSet;
});
return partitionMap.isEmpty() ? null : partitionMap;
});
- LOG.info("replay recover partition[{}]", partitionId);
- break;
+ // log
+ RecoverInfo recoverInfo = new RecoverInfo(dbId, table.getId(),
partitionId, "",
+ table.getName(), "",
partitionName, newPartitionName);
+ Env.getCurrentEnv().getEditLog().logRecoverPartition(recoverInfo);
+ LOG.info("recover partition[{}]", partitionId);
+ } finally {
+ writeUnlock();
}
}
- // erase database in catalog recycle bin instantly
- public synchronized void eraseDatabaseInstantly(long dbId) throws
DdlException {
- // 1. find dbInfo and erase db
- RecycleDatabaseInfo dbInfo = idToDatabase.get(dbId);
- if (dbInfo != null) {
- // erase db
- Env.getCurrentEnv().eraseDatabase(dbId, true);
-
- // erase db from idToDatabase and idToRecycleTime
- idToDatabase.remove(dbId);
- idToRecycleTime.remove(dbId);
+ // The caller should keep table write lock
+ public void replayRecoverPartition(OlapTable table, long partitionId,
+ String newPartitionName)
throws DdlException {
+ writeLock();
+ try {
+ Iterator<Map.Entry<Long, RecyclePartitionInfo>> iterator =
idToPartition.entrySet().iterator();
+ Env currentEnv = Env.getCurrentEnv();
+ while (iterator.hasNext()) {
+ Map.Entry<Long, RecyclePartitionInfo> entry = iterator.next();
+ RecyclePartitionInfo recyclePartitionInfo = entry.getValue();
+ if (recyclePartitionInfo.getPartition().getId() !=
partitionId) {
+ continue;
+ }
- dbNameToIds.computeIfPresent(dbInfo.getDb().getFullName(), (k, v)
-> {
- v.remove(dbId);
- return v.isEmpty() ? null : v;
- });
+ Preconditions.checkState(recyclePartitionInfo.getTableId() ==
table.getId());
+ if (!Strings.isNullOrEmpty(newPartitionName)) {
+ if (table.checkPartitionNameExist(newPartitionName)) {
+ throw new DdlException("Partition name[" +
newPartitionName + "] is already used");
+ }
+ }
+ table.addPartition(recyclePartitionInfo.getPartition());
+ if (!Strings.isNullOrEmpty(newPartitionName)) {
+
table.renamePartition(recyclePartitionInfo.getPartition().getName(),
newPartitionName);
+ }
+ PartitionInfo partitionInfo = table.getPartitionInfo();
+ PartitionItem recoverItem = null;
+ if (partitionInfo.getType() == PartitionType.RANGE) {
+ recoverItem = new
RangePartitionItem(recyclePartitionInfo.getRange());
+ } else if (partitionInfo.getType() == PartitionType.LIST) {
+ recoverItem = recyclePartitionInfo.getListPartitionItem();
+ }
+ partitionInfo.setItem(partitionId, false, recoverItem);
+ partitionInfo.setDataProperty(partitionId,
recyclePartitionInfo.getDataProperty());
+ partitionInfo.setReplicaAllocation(partitionId,
recyclePartitionInfo.getReplicaAlloc());
+ partitionInfo.setIsInMemory(partitionId,
recyclePartitionInfo.isInMemory());
+ partitionInfo.setIsMutable(partitionId,
recyclePartitionInfo.isMutable());
+
+ iterator.remove();
+ idToRecycleTime.remove(partitionId);
+
+ if (!currentEnv.invalidCacheForCloud()) {
+ long version = table.getNextVersion();
+ table.updateVisibleVersionAndTime(version,
System.currentTimeMillis());
+ }
+
+ dbTblIdPartitionNameToIds.computeIfPresent(
+ Pair.of(recyclePartitionInfo.getDbId(),
table.getId()), (pair, partitionMap) -> {
+ partitionMap.computeIfPresent(
+
recyclePartitionInfo.getPartition().getName(),
+ (name, idSet) -> {
+ idSet.remove(partitionId);
+ return idSet.isEmpty() ? null : idSet;
+ });
+ return partitionMap.isEmpty() ? null :
partitionMap;
+ });
- // log for erase db
- String dbName = dbInfo.getDb().getName();
- LOG.info("erase db[{}]: {}", dbId, dbName);
+ LOG.info("replay recover partition[{}]", partitionId);
+ break;
+ }
+ } finally {
+ writeUnlock();
}
+ }
- // 2. remove all tables with the same dbId
- List<Long> tableIdToErase = Lists.newArrayList();
- Iterator<Map.Entry<Long, RecycleTableInfo>> tableIterator =
idToTable.entrySet().iterator();
- while (tableIterator.hasNext()) {
- Map.Entry<Long, RecycleTableInfo> entry = tableIterator.next();
- RecycleTableInfo tableInfo = entry.getValue();
- if (tableInfo.getDbId() == dbId) {
- tableIdToErase.add(entry.getKey());
+ // erase database in catalog recycle bin instantly
+ public void eraseDatabaseInstantly(long dbId) throws DdlException {
+ // 1. erase db
+ RecycleDatabaseInfo dbInfo;
+ writeLock();
+ try {
+ dbInfo = idToDatabase.get(dbId);
+ if (dbInfo != null) {
+ Env.getCurrentEnv().eraseDatabase(dbId, true);
+ idToDatabase.remove(dbId);
+ idToRecycleTime.remove(dbId);
+
+ dbNameToIds.computeIfPresent(dbInfo.getDb().getFullName(), (k,
v) -> {
+ v.remove(dbId);
+ return v.isEmpty() ? null : v;
+ });
+
+ String dbName = dbInfo.getDb().getName();
+ LOG.info("erase db[{}]: {}", dbId, dbName);
+ }
+ } finally {
+ writeUnlock();
+ }
+
+ // 2. collect tables with same dbId
+ List<Long> tableIdToErase = new ArrayList<>();
+ readLock();
+ try {
+ for (Map.Entry<Long, RecycleTableInfo> entry :
idToTable.entrySet()) {
+ if (entry.getValue().getDbId() == dbId) {
+ tableIdToErase.add(entry.getKey());
+ }
}
+ } finally {
+ readUnlock();
}
for (Long tableId : tableIdToErase) {
- eraseTableInstantly(tableId);
+ try {
+ eraseTableInstantly(tableId);
+ } catch (DdlException e) {
+ LOG.info("table[{}] already erased by concurrent operation,
skip", tableId);
+ }
}
- // 3. remove all partitions with the same dbId
- List<Long> partitionIdToErase = Lists.newArrayList();
- Iterator<Map.Entry<Long, RecyclePartitionInfo>> partitionIterator =
idToPartition.entrySet().iterator();
- while (partitionIterator.hasNext()) {
- Map.Entry<Long, RecyclePartitionInfo> entry =
partitionIterator.next();
- RecyclePartitionInfo partitionInfo = entry.getValue();
- if (partitionInfo.getDbId() == dbId) {
- partitionIdToErase.add(entry.getKey());
+ // 3. collect partitions with same dbId
+ List<Long> partitionIdToErase = new ArrayList<>();
+ readLock();
+ try {
+ for (Map.Entry<Long, RecyclePartitionInfo> entry :
idToPartition.entrySet()) {
+ if (entry.getValue().getDbId() == dbId) {
+ partitionIdToErase.add(entry.getKey());
+ }
}
+ } finally {
+ readUnlock();
}
for (Long partitionId : partitionIdToErase) {
- erasePartitionInstantly(partitionId);
+ try {
+ erasePartitionInstantly(partitionId);
+ } catch (DdlException e) {
+ LOG.info("partition[{}] already erased by concurrent
operation, skip", partitionId);
+ }
}
// 4. determine if nothing is deleted
@@ -1015,44 +1216,53 @@ public class CatalogRecycleBin extends MasterDaemon
implements Writable {
}
// erase table in catalog recycle bin instantly
- public synchronized void eraseTableInstantly(long tableId) throws
DdlException {
- // 1. find tableInfo and erase table
- RecycleTableInfo tableInfo = idToTable.get(tableId);
- if (tableInfo != null) {
- // erase table
- long dbId = tableInfo.getDbId();
- Table table = tableInfo.getTable();
- if (table.getType() == TableType.OLAP || table.getType() ==
TableType.MATERIALIZED_VIEW) {
- Env.getCurrentEnv().onEraseOlapTable(dbId, (OlapTable) table,
false);
- }
+ public void eraseTableInstantly(long tableId) throws DdlException {
+ // 1. erase table
+ RecycleTableInfo tableInfo;
+ writeLock();
+ try {
+ tableInfo = idToTable.get(tableId);
+ if (tableInfo != null) {
+ long dbId = tableInfo.getDbId();
+ Table table = tableInfo.getTable();
+ if (table.getType() == TableType.OLAP || table.getType() ==
TableType.MATERIALIZED_VIEW) {
+ Env.getCurrentEnv().onEraseOlapTable(dbId, (OlapTable)
table, false);
+ }
- // erase table from idToTable and idToRecycleTime
- idToTable.remove(tableId);
- idToRecycleTime.remove(tableId);
+ idToTable.remove(tableId);
+ idToRecycleTime.remove(tableId);
- dbIdTableNameToIds.computeIfPresent(Pair.of(dbId,
table.getName()), (k, v) -> {
- v.remove(tableId);
- return v.isEmpty() ? null : v;
- });
+ dbIdTableNameToIds.computeIfPresent(Pair.of(dbId,
table.getName()), (k, v) -> {
+ v.remove(tableId);
+ return v.isEmpty() ? null : v;
+ });
- // log for erase table
- String tableName = table.getName();
- Env.getCurrentEnv().getEditLog().logEraseTable(tableId);
- LOG.info("erase db[{}]'s table[{}]: {}", dbId, tableId, tableName);
+ String tableName = table.getName();
+ Env.getCurrentEnv().getEditLog().logEraseTable(tableId);
+ LOG.info("erase db[{}]'s table[{}]: {}", dbId, tableId,
tableName);
+ }
+ } finally {
+ writeUnlock();
}
- // 2. erase all partitions with the same tableId
- List<Long> partitionIdToErase = Lists.newArrayList();
- Iterator<Map.Entry<Long, RecyclePartitionInfo>> partitionIterator =
idToPartition.entrySet().iterator();
- while (partitionIterator.hasNext()) {
- Map.Entry<Long, RecyclePartitionInfo> entry =
partitionIterator.next();
- RecyclePartitionInfo partitionInfo = entry.getValue();
- if (partitionInfo.getTableId() == tableId) {
- partitionIdToErase.add(entry.getKey());
+ // 2. collect partitions with same tableId
+ List<Long> partitionIdToErase = new ArrayList<>();
+ readLock();
+ try {
+ for (Map.Entry<Long, RecyclePartitionInfo> entry :
idToPartition.entrySet()) {
+ if (entry.getValue().getTableId() == tableId) {
+ partitionIdToErase.add(entry.getKey());
+ }
}
+ } finally {
+ readUnlock();
}
for (Long partitionId : partitionIdToErase) {
- erasePartitionInstantly(partitionId);
+ try {
+ erasePartitionInstantly(partitionId);
+ } catch (DdlException e) {
+ LOG.info("partition[{}] already erased by concurrent
operation, skip", partitionId);
+ }
}
// 3. determine if nothing is deleted
@@ -1062,35 +1272,36 @@ public class CatalogRecycleBin extends MasterDaemon
implements Writable {
}
// erase partition in catalog recycle bin instantly
- public synchronized void erasePartitionInstantly(long partitionId) throws
DdlException {
- // 1. find partitionInfo to erase
- RecyclePartitionInfo partitionInfo = idToPartition.get(partitionId);
- if (partitionInfo == null) {
- throw new DdlException("No partition id '" + partitionId + "'");
- }
+ public void erasePartitionInstantly(long partitionId) throws DdlException {
+ writeLock();
+ try {
+ RecyclePartitionInfo partitionInfo =
idToPartition.get(partitionId);
+ if (partitionInfo == null) {
+ throw new DdlException("No partition id '" + partitionId +
"'");
+ }
- // 2. erase partition
- Partition partition = partitionInfo.getPartition();
- Env.getCurrentEnv().onErasePartition(partition);
+ Partition partition = partitionInfo.getPartition();
+ Env.getCurrentEnv().onErasePartition(partition);
- // 3. erase partition in idToPartition and idToRecycleTime
- idToPartition.remove(partitionId);
- idToRecycleTime.remove(partitionId);
+ idToPartition.remove(partitionId);
+ idToRecycleTime.remove(partitionId);
- dbTblIdPartitionNameToIds.computeIfPresent(
- Pair.of(partitionInfo.getDbId(), partitionInfo.getTableId()),
(pair, partitionMap) -> {
- partitionMap.computeIfPresent(partition.getName(), (name,
idSet) -> {
- idSet.remove(partitionId);
- return idSet.isEmpty() ? null : idSet;
+ dbTblIdPartitionNameToIds.computeIfPresent(
+ Pair.of(partitionInfo.getDbId(),
partitionInfo.getTableId()), (pair, partitionMap) -> {
+ partitionMap.computeIfPresent(partition.getName(),
(name, idSet) -> {
+ idSet.remove(partitionId);
+ return idSet.isEmpty() ? null : idSet;
+ });
+ return partitionMap.isEmpty() ? null : partitionMap;
});
- return partitionMap.isEmpty() ? null : partitionMap;
- });
- // 4. log for erase partition
- long tableId = partitionInfo.getTableId();
- String partitionName = partition.getName();
- Env.getCurrentEnv().getEditLog().logErasePartition(partitionId);
- LOG.info("erase table[{}]'s partition[{}]: {}", tableId, partitionId,
partitionName);
+ long tableId = partitionInfo.getTableId();
+ String partitionName = partition.getName();
+ Env.getCurrentEnv().getEditLog().logErasePartition(partitionId);
+ LOG.info("erase table[{}]'s partition[{}]: {}", tableId,
partitionId, partitionName);
+ } finally {
+ writeUnlock();
+ }
}
// no need to use synchronized.
@@ -1191,187 +1402,203 @@ public class CatalogRecycleBin extends MasterDaemon
implements Writable {
eraseDatabase(currentTimeMs, keepNum);
}
- public synchronized List<List<String>> getInfo() {
- Map<Long, Pair<Long, Long>> dbToDataSize = new HashMap<>();
- List<List<String>> tableInfos = Lists.newArrayList();
- for (Map.Entry<Long, RecycleTableInfo> entry : idToTable.entrySet()) {
- List<String> info = Lists.newArrayList();
- info.add("Table");
- RecycleTableInfo tableInfo = entry.getValue();
- Table table = tableInfo.getTable();
- info.add(table.getName());
- info.add(String.valueOf(tableInfo.getDbId()));
- info.add(String.valueOf(entry.getKey()));
- info.add("");
- //info.add(String.valueOf(idToRecycleTime.get(entry.getKey())));
-
info.add(TimeUtils.longToTimeString(idToRecycleTime.get(entry.getKey())));
- // data size
- long dataSize = table.getDataSize(false);
- info.add(DebugUtil.printByteWithUnit(dataSize));
- // remote data size
- long remoteDataSize = table instanceof OlapTable ? ((OlapTable)
table).getRemoteDataSize() : 0L;
- info.add(DebugUtil.printByteWithUnit(remoteDataSize));
- // calculate database data size
- dbToDataSize.compute(tableInfo.getDbId(), (k, v) -> {
- if (v == null) {
- return Pair.of(dataSize, remoteDataSize);
- } else {
- v.first += dataSize;
- v.second += remoteDataSize;
- return v;
- }
- });
+ public List<List<String>> getInfo() {
+ readLock();
+ try {
+ Map<Long, Pair<Long, Long>> dbToDataSize = new HashMap<>();
+ List<List<String>> tableInfos = Lists.newArrayList();
+ for (Map.Entry<Long, RecycleTableInfo> entry :
idToTable.entrySet()) {
+ List<String> info = Lists.newArrayList();
+ info.add("Table");
+ RecycleTableInfo tableInfo = entry.getValue();
+ Table table = tableInfo.getTable();
+ info.add(table.getName());
+ info.add(String.valueOf(tableInfo.getDbId()));
+ info.add(String.valueOf(entry.getKey()));
+ info.add("");
+
//info.add(String.valueOf(idToRecycleTime.get(entry.getKey())));
+
info.add(TimeUtils.longToTimeString(idToRecycleTime.get(entry.getKey())));
+ // data size
+ long dataSize = table.getDataSize(false);
+ info.add(DebugUtil.printByteWithUnit(dataSize));
+ // remote data size
+ long remoteDataSize = table instanceof OlapTable ?
((OlapTable) table).getRemoteDataSize() : 0L;
+ info.add(DebugUtil.printByteWithUnit(remoteDataSize));
+ // calculate database data size
+ dbToDataSize.compute(tableInfo.getDbId(), (k, v) -> {
+ if (v == null) {
+ return Pair.of(dataSize, remoteDataSize);
+ } else {
+ v.first += dataSize;
+ v.second += remoteDataSize;
+ return v;
+ }
+ });
- tableInfos.add(info);
- }
- // sort by Name, DropTime
- tableInfos.sort((x, y) -> {
- int nameRet = x.get(1).compareTo(y.get(1));
- if (nameRet == 0) {
- return x.get(5).compareTo(y.get(5));
- } else {
- return nameRet;
+ tableInfos.add(info);
}
- });
-
- List<List<String>> partitionInfos = Lists.newArrayList();
- for (Map.Entry<Long, RecyclePartitionInfo> entry :
idToPartition.entrySet()) {
- List<String> info = Lists.newArrayList();
- info.add("Partition");
- RecyclePartitionInfo partitionInfo = entry.getValue();
- Partition partition = partitionInfo.getPartition();
- info.add(partition.getName());
- info.add(String.valueOf(partitionInfo.getDbId()));
- info.add(String.valueOf(partitionInfo.getTableId()));
- info.add(String.valueOf(entry.getKey()));
- //info.add(String.valueOf(idToRecycleTime.get(entry.getKey())));
-
info.add(TimeUtils.longToTimeString(idToRecycleTime.get(entry.getKey())));
- // data size
- long dataSize = partition.getDataSize(false);
- info.add(DebugUtil.printByteWithUnit(dataSize));
- // remote data size
- long remoteDataSize = partition.getRemoteDataSize();
- info.add(DebugUtil.printByteWithUnit(remoteDataSize));
- // calculate database data size
- dbToDataSize.compute(partitionInfo.getDbId(), (k, v) -> {
- if (v == null) {
- return Pair.of(dataSize, remoteDataSize);
+ // sort by Name, DropTime
+ tableInfos.sort((x, y) -> {
+ int nameRet = x.get(1).compareTo(y.get(1));
+ if (nameRet == 0) {
+ return x.get(5).compareTo(y.get(5));
} else {
- v.first += dataSize;
- v.second += remoteDataSize;
- return v;
+ return nameRet;
}
});
- partitionInfos.add(info);
- }
- // sort by Name, DropTime
- partitionInfos.sort((x, y) -> {
- int nameRet = x.get(1).compareTo(y.get(1));
- if (nameRet == 0) {
- return x.get(5).compareTo(y.get(5));
- } else {
- return nameRet;
- }
- });
+ List<List<String>> partitionInfos = Lists.newArrayList();
+ for (Map.Entry<Long, RecyclePartitionInfo> entry :
idToPartition.entrySet()) {
+ List<String> info = Lists.newArrayList();
+ info.add("Partition");
+ RecyclePartitionInfo partitionInfo = entry.getValue();
+ Partition partition = partitionInfo.getPartition();
+ info.add(partition.getName());
+ info.add(String.valueOf(partitionInfo.getDbId()));
+ info.add(String.valueOf(partitionInfo.getTableId()));
+ info.add(String.valueOf(entry.getKey()));
+
//info.add(String.valueOf(idToRecycleTime.get(entry.getKey())));
+
info.add(TimeUtils.longToTimeString(idToRecycleTime.get(entry.getKey())));
+ // data size
+ long dataSize = partition.getDataSize(false);
+ info.add(DebugUtil.printByteWithUnit(dataSize));
+ // remote data size
+ long remoteDataSize = partition.getRemoteDataSize();
+ info.add(DebugUtil.printByteWithUnit(remoteDataSize));
+ // calculate database data size
+ dbToDataSize.compute(partitionInfo.getDbId(), (k, v) -> {
+ if (v == null) {
+ return Pair.of(dataSize, remoteDataSize);
+ } else {
+ v.first += dataSize;
+ v.second += remoteDataSize;
+ return v;
+ }
+ });
- List<List<String>> dbInfos = Lists.newArrayList();
- for (Map.Entry<Long, RecycleDatabaseInfo> entry :
idToDatabase.entrySet()) {
- List<String> info = Lists.newArrayList();
- info.add("Database");
- RecycleDatabaseInfo dbInfo = entry.getValue();
- Database db = dbInfo.getDb();
- info.add(db.getFullName());
- info.add(String.valueOf(entry.getKey()));
- info.add("");
- info.add("");
- //info.add(String.valueOf(idToRecycleTime.get(entry.getKey())));
-
info.add(TimeUtils.longToTimeString(idToRecycleTime.get(entry.getKey())));
- // data size
- Pair<Long, Long> dataSizePair =
dbToDataSize.getOrDefault(entry.getKey(), Pair.of(0L, 0L));
- info.add(DebugUtil.printByteWithUnit(dataSizePair.first));
- // remote data size
- info.add(DebugUtil.printByteWithUnit(dataSizePair.second));
-
- dbInfos.add(info);
- }
- // sort by Name, DropTime
- dbInfos.sort((x, y) -> {
- int nameRet = x.get(1).compareTo(y.get(1));
- if (nameRet == 0) {
- return x.get(5).compareTo(y.get(5));
- } else {
- return nameRet;
+ partitionInfos.add(info);
}
- });
-
- return Stream.of(dbInfos, tableInfos,
partitionInfos).flatMap(Collection::stream).collect(Collectors.toList());
- }
+ // sort by Name, DropTime
+ partitionInfos.sort((x, y) -> {
+ int nameRet = x.get(1).compareTo(y.get(1));
+ if (nameRet == 0) {
+ return x.get(5).compareTo(y.get(5));
+ } else {
+ return nameRet;
+ }
+ });
- public synchronized Map<Long, Pair<Long, Long>> getDbToRecycleSize() {
- Map<Long, Pair<Long, Long>> dbToRecycleSize = new HashMap<>();
- for (Map.Entry<Long, RecycleTableInfo> entry : idToTable.entrySet()) {
- RecycleTableInfo tableInfo = entry.getValue();
- Table table = tableInfo.getTable();
- if (!(table instanceof OlapTable)) {
- continue;
+ List<List<String>> dbInfos = Lists.newArrayList();
+ for (Map.Entry<Long, RecycleDatabaseInfo> entry :
idToDatabase.entrySet()) {
+ List<String> info = Lists.newArrayList();
+ info.add("Database");
+ RecycleDatabaseInfo dbInfo = entry.getValue();
+ Database db = dbInfo.getDb();
+ info.add(db.getFullName());
+ info.add(String.valueOf(entry.getKey()));
+ info.add("");
+ info.add("");
+
//info.add(String.valueOf(idToRecycleTime.get(entry.getKey())));
+
info.add(TimeUtils.longToTimeString(idToRecycleTime.get(entry.getKey())));
+ // data size
+ Pair<Long, Long> dataSizePair =
dbToDataSize.getOrDefault(entry.getKey(), Pair.of(0L, 0L));
+ info.add(DebugUtil.printByteWithUnit(dataSizePair.first));
+ // remote data size
+ info.add(DebugUtil.printByteWithUnit(dataSizePair.second));
+
+ dbInfos.add(info);
}
- long dataSize = table.getDataSize(false);
- long remoteDataSize = ((OlapTable) table).getRemoteDataSize();
- dbToRecycleSize.compute(tableInfo.getDbId(), (k, v) -> {
- if (v == null) {
- return Pair.of(dataSize, remoteDataSize);
+ // sort by Name, DropTime
+ dbInfos.sort((x, y) -> {
+ int nameRet = x.get(1).compareTo(y.get(1));
+ if (nameRet == 0) {
+ return x.get(5).compareTo(y.get(5));
} else {
- v.first += dataSize;
- v.second += remoteDataSize;
- return v;
+ return nameRet;
}
});
+
+ return Stream.of(dbInfos, tableInfos, partitionInfos)
+ .flatMap(Collection::stream).collect(Collectors.toList());
+ } finally {
+ readUnlock();
}
+ }
- for (Map.Entry<Long, RecyclePartitionInfo> entry :
idToPartition.entrySet()) {
- RecyclePartitionInfo partitionInfo = entry.getValue();
- Partition partition = partitionInfo.getPartition();
- long dataSize = partition.getDataSize(false);
- long remoteDataSize = partition.getRemoteDataSize();
- dbToRecycleSize.compute(partitionInfo.getDbId(), (k, v) -> {
- if (v == null) {
- return Pair.of(dataSize, remoteDataSize);
- } else {
- v.first += dataSize;
- v.second += remoteDataSize;
- return v;
+ public Map<Long, Pair<Long, Long>> getDbToRecycleSize() {
+ readLock();
+ try {
+ Map<Long, Pair<Long, Long>> dbToRecycleSize = new HashMap<>();
+ for (Map.Entry<Long, RecycleTableInfo> entry :
idToTable.entrySet()) {
+ RecycleTableInfo tableInfo = entry.getValue();
+ Table table = tableInfo.getTable();
+ if (!(table instanceof OlapTable)) {
+ continue;
}
- });
+ long dataSize = table.getDataSize(false);
+ long remoteDataSize = ((OlapTable) table).getRemoteDataSize();
+ dbToRecycleSize.compute(tableInfo.getDbId(), (k, v) -> {
+ if (v == null) {
+ return Pair.of(dataSize, remoteDataSize);
+ } else {
+ v.first += dataSize;
+ v.second += remoteDataSize;
+ return v;
+ }
+ });
+ }
+
+ for (Map.Entry<Long, RecyclePartitionInfo> entry :
idToPartition.entrySet()) {
+ RecyclePartitionInfo partitionInfo = entry.getValue();
+ Partition partition = partitionInfo.getPartition();
+ long dataSize = partition.getDataSize(false);
+ long remoteDataSize = partition.getRemoteDataSize();
+ dbToRecycleSize.compute(partitionInfo.getDbId(), (k, v) -> {
+ if (v == null) {
+ return Pair.of(dataSize, remoteDataSize);
+ } else {
+ v.first += dataSize;
+ v.second += remoteDataSize;
+ return v;
+ }
+ });
+ }
+ return dbToRecycleSize;
+ } finally {
+ readUnlock();
}
- return dbToRecycleSize;
}
- // Need to add "synchronized", because when calling /dump api to dump
image,
+ // Need to add read lock, because when calling /dump api to dump image,
// this class is not protected by any lock, will throw
ConcurrentModificationException.
@Override
- public synchronized void write(DataOutput out) throws IOException {
- out.writeInt(idToDatabase.size());
- for (Map.Entry<Long, RecycleDatabaseInfo> entry :
idToDatabase.entrySet()) {
- out.writeLong(entry.getKey());
- entry.getValue().write(out);
- }
- out.writeInt(idToTable.size());
- for (Map.Entry<Long, RecycleTableInfo> entry : idToTable.entrySet()) {
- out.writeLong(entry.getKey());
- entry.getValue().write(out);
- }
- out.writeInt(idToPartition.size());
- for (Map.Entry<Long, RecyclePartitionInfo> entry :
idToPartition.entrySet()) {
- out.writeLong(entry.getKey());
- entry.getValue().write(out);
- }
- out.writeInt(idToRecycleTime.size());
- for (Map.Entry<Long, Long> entry : idToRecycleTime.entrySet()) {
- out.writeLong(entry.getKey());
- out.writeLong(entry.getValue());
- }
- Text.writeString(out, GsonUtils.GSON.toJson(this));
+ public void write(DataOutput out) throws IOException {
+ readLock();
+ try {
+ out.writeInt(idToDatabase.size());
+ for (Map.Entry<Long, RecycleDatabaseInfo> entry :
idToDatabase.entrySet()) {
+ out.writeLong(entry.getKey());
+ entry.getValue().write(out);
+ }
+ out.writeInt(idToTable.size());
+ for (Map.Entry<Long, RecycleTableInfo> entry :
idToTable.entrySet()) {
+ out.writeLong(entry.getKey());
+ entry.getValue().write(out);
+ }
+ out.writeInt(idToPartition.size());
+ for (Map.Entry<Long, RecyclePartitionInfo> entry :
idToPartition.entrySet()) {
+ out.writeLong(entry.getKey());
+ entry.getValue().write(out);
+ }
+ out.writeInt(idToRecycleTime.size());
+ for (Map.Entry<Long, Long> entry : idToRecycleTime.entrySet()) {
+ out.writeLong(entry.getKey());
+ out.writeLong(entry.getValue());
+ }
+ Text.writeString(out, GsonUtils.GSON.toJson(this));
+ } finally {
+ readUnlock();
+ }
}
public void readFieldsWithGson(DataInput in) throws IOException {
@@ -1608,14 +1835,19 @@ public class CatalogRecycleBin extends MasterDaemon
implements Writable {
}
// only for unit test
- public synchronized void clearAll() {
- idToDatabase.clear();
- idToTable.clear();
- idToPartition.clear();
- idToRecycleTime.clear();
- dbNameToIds.clear();
- dbIdTableNameToIds.clear();
- dbTblIdPartitionNameToIds.clear();
- LOG.info("Cleared all objects in recycle bin");
+ public void clearAll() {
+ writeLock();
+ try {
+ idToDatabase.clear();
+ idToTable.clear();
+ idToPartition.clear();
+ idToRecycleTime.clear();
+ dbNameToIds.clear();
+ dbIdTableNameToIds.clear();
+ dbTblIdPartitionNameToIds.clear();
+ LOG.info("Cleared all objects in recycle bin");
+ } finally {
+ writeUnlock();
+ }
}
}
diff --git
a/fe/fe-core/src/test/java/org/apache/doris/catalog/CatalogRecycleBinTest.java
b/fe/fe-core/src/test/java/org/apache/doris/catalog/CatalogRecycleBinTest.java
index 20f5c01bea2..3b642b5809f 100644
---
a/fe/fe-core/src/test/java/org/apache/doris/catalog/CatalogRecycleBinTest.java
+++
b/fe/fe-core/src/test/java/org/apache/doris/catalog/CatalogRecycleBinTest.java
@@ -35,11 +35,19 @@ import org.junit.BeforeClass;
import org.junit.Test;
import java.io.File;
+import java.util.ArrayList;
import java.util.List;
import java.util.Map;
import java.util.Optional;
import java.util.Set;
import java.util.UUID;
+import java.util.concurrent.CountDownLatch;
+import java.util.concurrent.CyclicBarrier;
+import java.util.concurrent.ExecutorService;
+import java.util.concurrent.Executors;
+import java.util.concurrent.Future;
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.atomic.AtomicBoolean;
import java.util.stream.Collectors;
public class CatalogRecycleBinTest {
@@ -835,5 +843,182 @@ public class CatalogRecycleBinTest {
db.unregisterTable(CatalogTestUtil.testEsTableId1);
recycleBin.recycleTable(CatalogTestUtil.testDbId1, esTable, false,
false, 0);
}
+
+ @Test
+ public void testConcurrentReadsDoNotBlock() throws Exception {
+ CatalogRecycleBin recycleBin = Env.getCurrentRecycleBin();
+
+ // Recycle several partitions
+ for (int i = 1; i <= 50; i++) {
+ MaterializedIndex index = new MaterializedIndex(2000 + i,
IndexState.NORMAL);
+ RandomDistributionInfo dist = new RandomDistributionInfo(1);
+ Partition partition = new Partition(3000 + i, "part_" + i, index,
dist);
+ recycleBin.recyclePartition(
+ CatalogTestUtil.testDbId1, CatalogTestUtil.testTableId1,
+ CatalogTestUtil.testTable1, partition, null, null,
+ new DataProperty(TStorageMedium.HDD), new
ReplicaAllocation((short) 3),
+ false, false);
+ }
+
+ // Multiple reader threads should run concurrently without blocking
each other
+ int numReaders = 10;
+ CyclicBarrier barrier = new CyclicBarrier(numReaders);
+ ExecutorService executor = Executors.newFixedThreadPool(numReaders);
+ List<Future<Boolean>> futures = new ArrayList<>();
+
+ for (int i = 0; i < numReaders; i++) {
+ futures.add(executor.submit(() -> {
+ barrier.await(5, TimeUnit.SECONDS);
+ // Perform various read operations concurrently
+ for (int j = 1; j <= 50; j++) {
+ recycleBin.isRecyclePartition(CatalogTestUtil.testDbId1,
+ CatalogTestUtil.testTableId1, 3000 + j);
+ recycleBin.getRecycleTimeById(3000 + j);
+ }
+ Set<Long> dbIds = Sets.newHashSet();
+ Set<Long> tableIds = Sets.newHashSet();
+ Set<Long> partIds = Sets.newHashSet();
+ recycleBin.getRecycleIds(dbIds, tableIds, partIds);
+ return true;
+ }));
+ }
+
+ executor.shutdown();
+ Assert.assertTrue(executor.awaitTermination(30, TimeUnit.SECONDS));
+ for (Future<Boolean> f : futures) {
+ Assert.assertTrue(f.get());
+ }
+ }
+
+ @Test
+ public void testConcurrentRecycleAndRead() throws Exception {
+ CatalogRecycleBin recycleBin = Env.getCurrentRecycleBin();
+
+ AtomicBoolean readerError = new AtomicBoolean(false);
+ AtomicBoolean writerDone = new AtomicBoolean(false);
+ CountDownLatch startLatch = new CountDownLatch(1);
+
+ // Writer thread: continuously recycles partitions
+ Thread writer = new Thread(() -> {
+ try {
+ startLatch.await();
+ for (int i = 1; i <= 100; i++) {
+ MaterializedIndex index = new MaterializedIndex(4000 + i,
IndexState.NORMAL);
+ RandomDistributionInfo dist = new
RandomDistributionInfo(1);
+ Partition partition = new Partition(5000 + i, "cpart_" +
i, index, dist);
+ recycleBin.recyclePartition(
+ CatalogTestUtil.testDbId1,
CatalogTestUtil.testTableId1,
+ CatalogTestUtil.testTable1, partition, null, null,
+ new DataProperty(TStorageMedium.HDD), new
ReplicaAllocation((short) 3),
+ false, false);
+ }
+ } catch (Exception e) {
+ readerError.set(true);
+ } finally {
+ writerDone.set(true);
+ }
+ });
+
+ // Reader threads: continuously read while writer is active
+ List<Thread> readers = new ArrayList<>();
+ for (int r = 0; r < 5; r++) {
+ Thread reader = new Thread(() -> {
+ try {
+ startLatch.await();
+ while (!writerDone.get()) {
+ // These should never throw
ConcurrentModificationException
+ Set<Long> dbIds = Sets.newHashSet();
+ Set<Long> tableIds = Sets.newHashSet();
+ Set<Long> partIds = Sets.newHashSet();
+ recycleBin.getRecycleIds(dbIds, tableIds, partIds);
+
recycleBin.isRecyclePartition(CatalogTestUtil.testDbId1,
+ CatalogTestUtil.testTableId1, 5001);
+ }
+ } catch (Exception e) {
+ readerError.set(true);
+ }
+ });
+ readers.add(reader);
+ }
+
+ writer.start();
+ readers.forEach(Thread::start);
+ startLatch.countDown();
+
+ writer.join(30_000);
+ for (Thread reader : readers) {
+ reader.join(30_000);
+ }
+
+ Assert.assertFalse("Reader or writer thread encountered an error",
readerError.get());
+ // Verify all 100 partitions were recycled
+ for (int i = 1; i <= 100; i++) {
+
Assert.assertTrue(recycleBin.isRecyclePartition(CatalogTestUtil.testDbId1,
+ CatalogTestUtil.testTableId1, 5000 + i));
+ }
+ }
+
+ @Test
+ public void testMicrobatchEraseReleasesLockBetweenItems() throws Exception
{
+ CatalogRecycleBin recycleBin = Env.getCurrentRecycleBin();
+
+ // Recycle many partitions
+ int numPartitions = 50;
+ for (int i = 1; i <= numPartitions; i++) {
+ MaterializedIndex index = new MaterializedIndex(6000 + i,
IndexState.NORMAL);
+ RandomDistributionInfo dist = new RandomDistributionInfo(1);
+ Partition partition = new Partition(7000 + i, "epart_" + i, index,
dist);
+ recycleBin.recyclePartition(
+ CatalogTestUtil.testDbId1, CatalogTestUtil.testTableId1,
+ CatalogTestUtil.testTable1, partition, null, null,
+ new DataProperty(TStorageMedium.HDD), new
ReplicaAllocation((short) 3),
+ false, false);
+ }
+
+ // Verify all were recycled
+ Set<Long> dbIds = Sets.newHashSet();
+ Set<Long> tableIds = Sets.newHashSet();
+ Set<Long> partitionIds = Sets.newHashSet();
+ recycleBin.getRecycleIds(dbIds, tableIds, partitionIds);
+ Assert.assertEquals(numPartitions, partitionIds.size());
+
+ // Now run erase daemon which should process items one at a time
+ // While erase is running, a concurrent recyclePartition should be
able to
+ // proceed between items (not blocked for the entire erase duration)
+ AtomicBoolean recycleCompleted = new AtomicBoolean(false);
+ AtomicBoolean eraseStarted = new AtomicBoolean(false);
+
+ Thread eraseThread = new Thread(() -> {
+ eraseStarted.set(true);
+ recycleBin.runAfterCatalogReady();
+ });
+
+ eraseThread.start();
+
+ // Wait briefly for erase to start, then try to recycle a new partition
+ Thread.sleep(50);
+ if (eraseStarted.get()) {
+ MaterializedIndex newIndex = new MaterializedIndex(8000,
IndexState.NORMAL);
+ RandomDistributionInfo newDist = new RandomDistributionInfo(1);
+ Partition newPartition = new Partition(9000, "new_part", newIndex,
newDist);
+ recycleBin.recyclePartition(
+ CatalogTestUtil.testDbId1, CatalogTestUtil.testTableId1,
+ CatalogTestUtil.testTable1, newPartition, null, null,
+ new DataProperty(TStorageMedium.HDD), new
ReplicaAllocation((short) 3),
+ false, false);
+ recycleCompleted.set(true);
+ }
+
+ eraseThread.join(60_000);
+ Assert.assertFalse("Erase thread should have finished",
eraseThread.isAlive());
+
+ // The new partition should have been recycled successfully
+ if (eraseStarted.get()) {
+ Assert.assertTrue("recyclePartition should succeed during erase",
+ recycleCompleted.get());
+
Assert.assertTrue(recycleBin.isRecyclePartition(CatalogTestUtil.testDbId1,
+ CatalogTestUtil.testTableId1, 9000));
+ }
+ }
}
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]