This is an automated email from the ASF dual-hosted git repository.

morrysnow pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/doris.git


The following commit(s) were added to refs/heads/master by this push:
     new e2a678cb74a [Enhancement](recyclebin) Optimize lock granularity in 
CatalogRecycleBin (#61366)
e2a678cb74a is described below

commit e2a678cb74a5ef6278a7c968901c2de9348e0626
Author: morrySnow <[email protected]>
AuthorDate: Tue Mar 17 19:53:54 2026 +0800

    [Enhancement](recyclebin) Optimize lock granularity in CatalogRecycleBin 
(#61366)
    
    ## Problem
    
    All methods in `CatalogRecycleBin.java` use `synchronized` (single
    monitor lock), creating extremely coarse lock granularity. When
    `erasePartition()` runs slowly with many partitions, other
    `synchronized` methods block waiting for the lock. Callers like
    `recyclePartition()` hold TABLE WRITE LOCK while waiting, causing
    cascading blocking that can bring down the entire Doris metadata
    service.
    
    ## Solution
    
    Two complementary optimizations:
    
    ### 1. Replace `synchronized` with `ReentrantReadWriteLock`
    - **Lock-free** (8 methods): Simple ConcurrentHashMap lookups
    (`isRecyclePartition`, `getRecycleTimeById`, etc.)
    - **Read lock** (4 methods): Read-only iterations
    (`allTabletsInRecycledStatus`, `getInfo`, `write`, etc.)
    - **Write lock** (11 methods): Map mutations
    (`recycleDatabase/Table/Partition`, `recover*`, `clearAll`)
    
    ### 2. Microbatch Erase Pattern (Critical)
    Refactored all 12 erase methods to process items **one at a time** with
    lock release between items:
    - **Inside write lock (per item)**: cleanup RPCs + map removal + edit
    log write
    - **Release lock between items**: other operations can proceed
    
    This reduces lock hold time from **O(N × T)** (all items) to **O(T)**
    (one item) per acquisition.
    
    ## Data Structure Changes
    
    Changed 4 internal maps from `HashMap` to `ConcurrentHashMap` to enable
    lock-free reads.
    
    ## Bug Fixes (found during self-review)
    
    1. **NPE in `getIdListToEraseByRecycleTime`**: Used `getOrDefault` to
    handle stale IDs that may be concurrently removed between snapshot and
    processing
    2. **DdlException in cascade erase**: Added try-catch in
    `eraseDatabaseInstantly`/`eraseTableInstantly` for partitions/tables
    concurrently erased by daemon
    
    ## Testing
    
    - All 24 existing unit tests pass
    - Added 3 new concurrency tests:
      - `testConcurrentReadsDoNotBlock` — 10 concurrent reader threads
      - `testConcurrentRecycleAndRead` — writer + 5 readers simultaneously
    - `testMicrobatchEraseReleasesLockBetweenItems` — verifies
    recyclePartition succeeds during erase
    
    ---------
    
    Co-authored-by: Copilot <[email protected]>
---
 .../apache/doris/catalog/CatalogRecycleBin.java    | 1700 +++++++++++---------
 .../doris/catalog/CatalogRecycleBinTest.java       |  185 +++
 2 files changed, 1151 insertions(+), 734 deletions(-)

diff --git 
a/fe/fe-core/src/main/java/org/apache/doris/catalog/CatalogRecycleBin.java 
b/fe/fe-core/src/main/java/org/apache/doris/catalog/CatalogRecycleBin.java
index ee5adb809cc..e5f10c96d93 100644
--- a/fe/fe-core/src/main/java/org/apache/doris/catalog/CatalogRecycleBin.java
+++ b/fe/fe-core/src/main/java/org/apache/doris/catalog/CatalogRecycleBin.java
@@ -36,7 +36,6 @@ import org.apache.doris.thrift.TStorageMedium;
 import com.google.common.base.Preconditions;
 import com.google.common.base.Strings;
 import com.google.common.collect.Lists;
-import com.google.common.collect.Maps;
 import com.google.common.collect.Range;
 import com.google.common.collect.Sets;
 import com.google.gson.annotations.SerializedName;
@@ -47,6 +46,7 @@ import org.apache.logging.log4j.Logger;
 import java.io.DataInput;
 import java.io.DataOutput;
 import java.io.IOException;
+import java.util.ArrayList;
 import java.util.Collection;
 import java.util.HashMap;
 import java.util.Iterator;
@@ -54,6 +54,7 @@ import java.util.List;
 import java.util.Map;
 import java.util.Set;
 import java.util.concurrent.ConcurrentHashMap;
+import java.util.concurrent.locks.ReentrantReadWriteLock;
 import java.util.stream.Collectors;
 import java.util.stream.Stream;
 
@@ -64,10 +65,28 @@ public class CatalogRecycleBin extends MasterDaemon 
implements Writable {
     // to avoid erase log ahead of drop log
     private static final long minEraseLatency = 10 * 60 * 1000;  // 10 min
 
-    private Map<Long, RecycleDatabaseInfo> idToDatabase;
-    private Map<Long, RecycleTableInfo> idToTable;
-    private Map<Long, RecyclePartitionInfo> idToPartition;
-    private Map<Long, Long> idToRecycleTime;
+    private final ReentrantReadWriteLock lock = new ReentrantReadWriteLock();
+
+    private void readLock() {
+        lock.readLock().lock();
+    }
+
+    private void readUnlock() {
+        lock.readLock().unlock();
+    }
+
+    private void writeLock() {
+        lock.writeLock().lock();
+    }
+
+    private void writeUnlock() {
+        lock.writeLock().unlock();
+    }
+
+    private ConcurrentHashMap<Long, RecycleDatabaseInfo> idToDatabase;
+    private ConcurrentHashMap<Long, RecycleTableInfo> idToTable;
+    private ConcurrentHashMap<Long, RecyclePartitionInfo> idToPartition;
+    private ConcurrentHashMap<Long, Long> idToRecycleTime;
 
     // Caches below to avoid calculate meta with same name every demon run 
cycle.
     // When the meta is updated, these caches should be updated too. No need to
@@ -85,42 +104,48 @@ public class CatalogRecycleBin extends MasterDaemon 
implements Writable {
 
     public CatalogRecycleBin() {
         super("recycle bin", FeConstants.runningUnitTest ? 10 * 1000L : 
DEFAULT_INTERVAL_SECONDS * 1000L);
-        idToDatabase = Maps.newHashMap();
-        idToTable = Maps.newHashMap();
-        idToPartition = Maps.newHashMap();
-        idToRecycleTime = Maps.newHashMap();
+        idToDatabase = new ConcurrentHashMap<>();
+        idToTable = new ConcurrentHashMap<>();
+        idToPartition = new ConcurrentHashMap<>();
+        idToRecycleTime = new ConcurrentHashMap<>();
     }
 
-    public synchronized boolean allTabletsInRecycledStatus(List<Long> 
backendTabletIds) {
-        Set<Long> recycledTabletSet = Sets.newHashSet();
-
-        Iterator<Map.Entry<Long, RecyclePartitionInfo>> iterator = 
idToPartition.entrySet().iterator();
-        while (iterator.hasNext()) {
-            Map.Entry<Long, RecyclePartitionInfo> entry = iterator.next();
-            RecyclePartitionInfo partitionInfo = entry.getValue();
-            Partition partition = partitionInfo.getPartition();
-            addRecycledTabletsForPartition(recycledTabletSet, partition);
-        }
+    public boolean allTabletsInRecycledStatus(List<Long> backendTabletIds) {
+        readLock();
+        try {
+            Set<Long> recycledTabletSet = Sets.newHashSet();
 
-        Iterator<Map.Entry<Long, RecycleTableInfo>> tableIter = 
idToTable.entrySet().iterator();
-        while (tableIter.hasNext()) {
-            Map.Entry<Long, RecycleTableInfo> entry = tableIter.next();
-            RecycleTableInfo tableInfo = entry.getValue();
-            Table table = tableInfo.getTable();
-            addRecycledTabletsForTable(recycledTabletSet, table);
-        }
+            Iterator<Map.Entry<Long, RecyclePartitionInfo>> iterator = 
idToPartition.entrySet().iterator();
+            while (iterator.hasNext()) {
+                Map.Entry<Long, RecyclePartitionInfo> entry = iterator.next();
+                RecyclePartitionInfo partitionInfo = entry.getValue();
+                Partition partition = partitionInfo.getPartition();
+                addRecycledTabletsForPartition(recycledTabletSet, partition);
+            }
 
-        Iterator<Map.Entry<Long, RecycleDatabaseInfo>> dbIterator = 
idToDatabase.entrySet().iterator();
-        while (dbIterator.hasNext()) {
-            Map.Entry<Long, RecycleDatabaseInfo> entry = dbIterator.next();
-            RecycleDatabaseInfo dbInfo = entry.getValue();
-            Database db = dbInfo.getDb();
-            for (Table table : db.getTables()) {
+            Iterator<Map.Entry<Long, RecycleTableInfo>> tableIter = 
idToTable.entrySet().iterator();
+            while (tableIter.hasNext()) {
+                Map.Entry<Long, RecycleTableInfo> entry = tableIter.next();
+                RecycleTableInfo tableInfo = entry.getValue();
+                Table table = tableInfo.getTable();
                 addRecycledTabletsForTable(recycledTabletSet, table);
             }
-        }
 
-        return recycledTabletSet.size() >= backendTabletIds.size() && 
recycledTabletSet.containsAll(backendTabletIds);
+            Iterator<Map.Entry<Long, RecycleDatabaseInfo>> dbIterator = 
idToDatabase.entrySet().iterator();
+            while (dbIterator.hasNext()) {
+                Map.Entry<Long, RecycleDatabaseInfo> entry = dbIterator.next();
+                RecycleDatabaseInfo dbInfo = entry.getValue();
+                Database db = dbInfo.getDb();
+                for (Table table : db.getTables()) {
+                    addRecycledTabletsForTable(recycledTabletSet, table);
+                }
+            }
+
+            return recycledTabletSet.size() >= backendTabletIds.size()
+                    && recycledTabletSet.containsAll(backendTabletIds);
+        } finally {
+            readUnlock();
+        }
     }
 
     private void addRecycledTabletsForTable(Set<Long> recycledTabletSet, Table 
table) {
@@ -141,130 +166,158 @@ public class CatalogRecycleBin extends MasterDaemon 
implements Writable {
         }
     }
 
-    public synchronized boolean recycleDatabase(Database db, Set<String> 
tableNames, Set<Long> tableIds,
+    public boolean recycleDatabase(Database db, Set<String> tableNames, 
Set<Long> tableIds,
                                                 boolean isReplay, boolean 
isForceDrop, long replayRecycleTime) {
-        long recycleTime = 0;
-        if (idToDatabase.containsKey(db.getId())) {
-            LOG.error("db[{}] already in recycle bin.", db.getId());
-            return false;
-        }
-
-        // db should be empty. all tables are recycled before
-        if (!db.getTableIds().isEmpty()) {
-            throw new IllegalStateException("Database " + db.getFullName() + " 
is not empty. Contains tables: "
-                                            + 
db.getTableIds().stream().collect(Collectors.toSet()));
-        }
-
-        // recycle db
-        RecycleDatabaseInfo databaseInfo = new RecycleDatabaseInfo(db, 
tableNames, tableIds);
-        idToDatabase.put(db.getId(), databaseInfo);
-        if (isForceDrop) {
-            // The 'force drop' database should be recycle immediately.
-            recycleTime = 0;
-        } else if (!isReplay || replayRecycleTime == 0) {
-            recycleTime = System.currentTimeMillis();
-        } else {
-            recycleTime = replayRecycleTime;
-        }
-        idToRecycleTime.put(db.getId(), recycleTime);
-        dbNameToIds.computeIfAbsent(db.getFullName(), k -> 
ConcurrentHashMap.newKeySet()).add(db.getId());
-        LOG.info("recycle db[{}-{}], is force drop: {}", db.getId(), 
db.getFullName(), isForceDrop);
-        return true;
+        writeLock();
+        try {
+            long recycleTime = 0;
+            if (idToDatabase.containsKey(db.getId())) {
+                LOG.error("db[{}] already in recycle bin.", db.getId());
+                return false;
+            }
+
+            // db should be empty. all tables are recycled before
+            if (!db.getTableIds().isEmpty()) {
+                throw new IllegalStateException("Database " + db.getFullName() 
+ " is not empty. Contains tables: "
+                                                + 
db.getTableIds().stream().collect(Collectors.toSet()));
+            }
+
+            // recycle db
+            RecycleDatabaseInfo databaseInfo = new RecycleDatabaseInfo(db, 
tableNames, tableIds);
+            idToDatabase.put(db.getId(), databaseInfo);
+            if (isForceDrop) {
+                // The 'force drop' database should be recycle immediately.
+                recycleTime = 0;
+            } else if (!isReplay || replayRecycleTime == 0) {
+                recycleTime = System.currentTimeMillis();
+            } else {
+                recycleTime = replayRecycleTime;
+            }
+            idToRecycleTime.put(db.getId(), recycleTime);
+            dbNameToIds.computeIfAbsent(db.getFullName(), k -> 
ConcurrentHashMap.newKeySet()).add(db.getId());
+            LOG.info("recycle db[{}-{}], is force drop: {}", db.getId(), 
db.getFullName(), isForceDrop);
+            return true;
+        } finally {
+            writeUnlock();
+        }
     }
 
-    public synchronized boolean recycleTable(long dbId, Table table, boolean 
isReplay,
+    public boolean recycleTable(long dbId, Table table, boolean isReplay,
                                              boolean isForceDrop, long 
replayRecycleTime) {
-        long recycleTime = 0;
-        if (idToTable.containsKey(table.getId())) {
-            LOG.error("table[{}] already in recycle bin.", table.getId());
-            return false;
-        }
-
-        // recycle table
-        RecycleTableInfo tableInfo = new RecycleTableInfo(dbId, table);
-        if (isForceDrop) {
-            // The 'force drop' table should be recycle immediately.
-            recycleTime = 0;
-        } else if (!isReplay || replayRecycleTime == 0) {
-            recycleTime = System.currentTimeMillis();
-        } else {
-            recycleTime = replayRecycleTime;
-        }
-        idToRecycleTime.put(table.getId(), recycleTime);
-        idToTable.put(table.getId(), tableInfo);
-        dbIdTableNameToIds.computeIfAbsent(Pair.of(dbId, table.getName()),
-                k -> ConcurrentHashMap.newKeySet()).add(table.getId());
-        LOG.info("recycle table[{}-{}], is force drop: {}", table.getId(), 
table.getName(), isForceDrop);
-        return true;
+        writeLock();
+        try {
+            long recycleTime = 0;
+            if (idToTable.containsKey(table.getId())) {
+                LOG.error("table[{}] already in recycle bin.", table.getId());
+                return false;
+            }
+
+            // recycle table
+            RecycleTableInfo tableInfo = new RecycleTableInfo(dbId, table);
+            if (isForceDrop) {
+                // The 'force drop' table should be recycle immediately.
+                recycleTime = 0;
+            } else if (!isReplay || replayRecycleTime == 0) {
+                recycleTime = System.currentTimeMillis();
+            } else {
+                recycleTime = replayRecycleTime;
+            }
+            idToRecycleTime.put(table.getId(), recycleTime);
+            idToTable.put(table.getId(), tableInfo);
+            dbIdTableNameToIds.computeIfAbsent(Pair.of(dbId, table.getName()),
+                    k -> ConcurrentHashMap.newKeySet()).add(table.getId());
+            LOG.info("recycle table[{}-{}], is force drop: {}", table.getId(), 
table.getName(), isForceDrop);
+            return true;
+        } finally {
+            writeUnlock();
+        }
     }
 
-    public synchronized boolean recyclePartition(long dbId, long tableId, 
String tableName, Partition partition,
+    public boolean recyclePartition(long dbId, long tableId, String tableName, 
Partition partition,
                                                  Range<PartitionKey> range, 
PartitionItem listPartitionItem,
                                                  DataProperty dataProperty, 
ReplicaAllocation replicaAlloc,
                                                  boolean isInMemory, boolean 
isMutable) {
-        if (idToPartition.containsKey(partition.getId())) {
-            LOG.error("partition[{}] already in recycle bin.", 
partition.getId());
-            return false;
-        }
-
-        // recycle partition
-        RecyclePartitionInfo partitionInfo = new RecyclePartitionInfo(dbId, 
tableId, partition,
-                range, listPartitionItem, dataProperty, replicaAlloc, 
isInMemory, isMutable);
-        idToRecycleTime.put(partition.getId(), System.currentTimeMillis());
-        idToPartition.put(partition.getId(), partitionInfo);
-        dbTblIdPartitionNameToIds.computeIfAbsent(Pair.of(dbId, tableId), k -> 
new ConcurrentHashMap<>())
-                .computeIfAbsent(partition.getName(), k -> 
ConcurrentHashMap.newKeySet()).add(partition.getId());
-        LOG.info("recycle partition[{}-{}] of table [{}-{}]", 
partition.getId(), partition.getName(),
-                tableId, tableName);
-        return true;
+        writeLock();
+        try {
+            if (idToPartition.containsKey(partition.getId())) {
+                LOG.error("partition[{}] already in recycle bin.", 
partition.getId());
+                return false;
+            }
+
+            // recycle partition
+            RecyclePartitionInfo partitionInfo = new 
RecyclePartitionInfo(dbId, tableId, partition,
+                    range, listPartitionItem, dataProperty, replicaAlloc, 
isInMemory, isMutable);
+            idToRecycleTime.put(partition.getId(), System.currentTimeMillis());
+            idToPartition.put(partition.getId(), partitionInfo);
+            dbTblIdPartitionNameToIds.computeIfAbsent(Pair.of(dbId, tableId), 
k -> new ConcurrentHashMap<>())
+                    .computeIfAbsent(partition.getName(), k -> 
ConcurrentHashMap.newKeySet()).add(partition.getId());
+            LOG.info("recycle partition[{}-{}] of table [{}-{}]", 
partition.getId(), partition.getName(),
+                    tableId, tableName);
+            return true;
+        } finally {
+            writeUnlock();
+        }
     }
 
-    public synchronized Long getRecycleTimeById(long id) {
+    public Long getRecycleTimeById(long id) {
         return idToRecycleTime.get(id);
     }
 
-    public synchronized void setRecycleTimeByIdForReplay(long id, Long 
recycleTime) {
+    public void setRecycleTimeByIdForReplay(long id, Long recycleTime) {
         idToRecycleTime.put(id, recycleTime);
     }
 
-    public synchronized boolean isRecycleDatabase(long dbId) {
+    public boolean isRecycleDatabase(long dbId) {
         return idToDatabase.containsKey(dbId);
     }
 
-    public synchronized boolean isRecycleTable(long dbId, long tableId) {
+    public boolean isRecycleTable(long dbId, long tableId) {
         return isRecycleDatabase(dbId) || idToTable.containsKey(tableId);
     }
 
-    public synchronized boolean isRecyclePartition(long dbId, long tableId, 
long partitionId) {
+    public boolean isRecyclePartition(long dbId, long tableId, long 
partitionId) {
         return isRecycleTable(dbId, tableId) || 
idToPartition.containsKey(partitionId);
     }
 
-    public synchronized void getRecycleIds(Set<Long> dbIds, Set<Long> 
tableIds, Set<Long> partitionIds) {
+    public void getRecycleIds(Set<Long> dbIds, Set<Long> tableIds, Set<Long> 
partitionIds) {
         dbIds.addAll(idToDatabase.keySet());
         tableIds.addAll(idToTable.keySet());
         partitionIds.addAll(idToPartition.keySet());
     }
 
-    private synchronized boolean isExpire(long id, long currentTimeMs) {
+    private boolean isExpire(long id, long currentTimeMs) {
         long latency = currentTimeMs - idToRecycleTime.get(id);
         return (Config.catalog_trash_ignore_min_erase_latency || latency > 
minEraseLatency)
                 && latency > Config.catalog_trash_expire_second * 1000L;
     }
 
-    private synchronized void eraseDatabase(long currentTimeMs, int keepNum) {
+    private void eraseDatabase(long currentTimeMs, int keepNum) {
         int eraseNum = 0;
         StopWatch watch = StopWatch.createStarted();
         try {
-            // 1. erase expired database
-            Iterator<Map.Entry<Long, RecycleDatabaseInfo>> dbIter = 
idToDatabase.entrySet().iterator();
-            while (dbIter.hasNext()) {
-                Map.Entry<Long, RecycleDatabaseInfo> entry = dbIter.next();
-                RecycleDatabaseInfo dbInfo = entry.getValue();
-                Database db = dbInfo.getDb();
-                if (isExpire(db.getId(), currentTimeMs)) {
-                    // erase db
-                    dbIter.remove();
-                    idToRecycleTime.remove(entry.getKey());
+            // 1. collect expired database IDs under read lock
+            List<Long> expiredIds = new ArrayList<>();
+            readLock();
+            try {
+                for (Map.Entry<Long, RecycleDatabaseInfo> entry : 
idToDatabase.entrySet()) {
+                    if (isExpire(entry.getKey(), currentTimeMs)) {
+                        expiredIds.add(entry.getKey());
+                    }
+                }
+            } finally {
+                readUnlock();
+            }
+
+            // 2. erase each expired database one at a time
+            for (Long dbId : expiredIds) {
+                writeLock();
+                try {
+                    RecycleDatabaseInfo dbInfo = idToDatabase.remove(dbId);
+                    if (dbInfo == null) {
+                        continue;
+                    }
+                    Database db = dbInfo.getDb();
+                    idToRecycleTime.remove(dbId);
 
                     dbNameToIds.computeIfPresent(db.getFullName(), (k, v) -> {
                         v.remove(db.getId());
@@ -274,13 +327,23 @@ public class CatalogRecycleBin extends MasterDaemon 
implements Writable {
                     Env.getCurrentEnv().eraseDatabase(db.getId(), true);
                     LOG.info("erase db[{}]", db.getId());
                     eraseNum++;
+                } finally {
+                    writeUnlock();
                 }
             }
-            // 2. erase exceed number
+
+            // 3. erase exceed number
             if (keepNum < 0) {
                 return;
             }
-            for (Map.Entry<String, Set<Long>> entry : dbNameToIds.entrySet()) {
+            List<Map.Entry<String, Set<Long>>> groups;
+            readLock();
+            try {
+                groups = new ArrayList<>(dbNameToIds.entrySet());
+            } finally {
+                readUnlock();
+            }
+            for (Map.Entry<String, Set<Long>> entry : groups) {
                 String dbName = entry.getKey();
                 eraseDatabaseWithSameName(dbName, currentTimeMs, keepNum, 
Lists.newArrayList(entry.getValue()));
             }
@@ -290,29 +353,40 @@ public class CatalogRecycleBin extends MasterDaemon 
implements Writable {
         }
     }
 
-    private synchronized void eraseDatabaseWithSameName(String dbName, long 
currentTimeMs,
+    private void eraseDatabaseWithSameName(String dbName, long currentTimeMs,
                                                         int 
maxSameNameTrashNum, List<Long> sameNameDbIdList) {
-        List<Long> dbIdToErase = 
getIdListToEraseByRecycleTime(sameNameDbIdList, maxSameNameTrashNum);
+        List<Long> dbIdToErase;
+        readLock();
+        try {
+            dbIdToErase = getIdListToEraseByRecycleTime(sameNameDbIdList, 
maxSameNameTrashNum);
+        } finally {
+            readUnlock();
+        }
         for (Long dbId : dbIdToErase) {
-            RecycleDatabaseInfo dbInfo = idToDatabase.get(dbId);
-            if (!isExpireMinLatency(dbId, currentTimeMs)) {
-                continue;
-            }
-            eraseAllTables(dbInfo);
-            idToDatabase.remove(dbId);
-            idToRecycleTime.remove(dbId);
+            writeLock();
+            try {
+                RecycleDatabaseInfo dbInfo = idToDatabase.get(dbId);
+                if (dbInfo == null || !isExpireMinLatency(dbId, 
currentTimeMs)) {
+                    continue;
+                }
+                eraseAllTables(dbInfo);
+                idToDatabase.remove(dbId);
+                idToRecycleTime.remove(dbId);
 
-            dbNameToIds.computeIfPresent(dbName, (k, v) -> {
-                v.remove(dbId);
-                return v.isEmpty() ? null : v;
-            });
+                dbNameToIds.computeIfPresent(dbName, (k, v) -> {
+                    v.remove(dbId);
+                    return v.isEmpty() ? null : v;
+                });
 
-            Env.getCurrentEnv().eraseDatabase(dbId, true);
-            LOG.info("erase database[{}] name: {}", dbId, dbName);
+                Env.getCurrentEnv().eraseDatabase(dbId, true);
+                LOG.info("erase database[{}] name: {}", dbId, dbName);
+            } finally {
+                writeUnlock();
+            }
         }
     }
 
-    private synchronized boolean isExpireMinLatency(long id, long 
currentTimeMs) {
+    private boolean isExpireMinLatency(long id, long currentTimeMs) {
         return (currentTimeMs - idToRecycleTime.get(id)) > minEraseLatency || 
FeConstants.runningUnitTest;
     }
 
@@ -348,40 +422,56 @@ public class CatalogRecycleBin extends MasterDaemon 
implements Writable {
         }
     }
 
-    public synchronized void replayEraseDatabase(long dbId) {
-        RecycleDatabaseInfo dbInfo = idToDatabase.remove(dbId);
-        idToRecycleTime.remove(dbId);
+    public void replayEraseDatabase(long dbId) {
+        writeLock();
+        try {
+            RecycleDatabaseInfo dbInfo = idToDatabase.remove(dbId);
+            idToRecycleTime.remove(dbId);
 
-        if (dbInfo != null) {
-            dbNameToIds.computeIfPresent(dbInfo.getDb().getFullName(), (k, v) 
-> {
-                v.remove(dbId);
-                return v.isEmpty() ? null : v;
-            });
-        }
+            if (dbInfo != null) {
+                dbNameToIds.computeIfPresent(dbInfo.getDb().getFullName(), (k, 
v) -> {
+                    v.remove(dbId);
+                    return v.isEmpty() ? null : v;
+                });
+            }
 
-        Env.getCurrentEnv().eraseDatabase(dbId, false);
-        LOG.info("replay erase db[{}]", dbId);
+            Env.getCurrentEnv().eraseDatabase(dbId, false);
+            LOG.info("replay erase db[{}]", dbId);
+        } finally {
+            writeUnlock();
+        }
     }
 
-    private synchronized void eraseTable(long currentTimeMs, int keepNum) {
+    private void eraseTable(long currentTimeMs, int keepNum) {
         int eraseNum = 0;
         StopWatch watch = StopWatch.createStarted();
         try {
-            // 1. erase expired tables
-            Iterator<Map.Entry<Long, RecycleTableInfo>> tableIter = 
idToTable.entrySet().iterator();
-            while (tableIter.hasNext()) {
-                Map.Entry<Long, RecycleTableInfo> entry = tableIter.next();
-                RecycleTableInfo tableInfo = entry.getValue();
-                Table table = tableInfo.getTable();
-                long tableId = table.getId();
+            // 1. collect expired table IDs under read lock
+            List<Long> expiredIds = new ArrayList<>();
+            readLock();
+            try {
+                for (Map.Entry<Long, RecycleTableInfo> entry : 
idToTable.entrySet()) {
+                    if (isExpire(entry.getKey(), currentTimeMs)) {
+                        expiredIds.add(entry.getKey());
+                    }
+                }
+            } finally {
+                readUnlock();
+            }
 
-                if (isExpire(tableId, currentTimeMs)) {
+            // 2. erase each expired table one at a time
+            for (Long tableId : expiredIds) {
+                writeLock();
+                try {
+                    RecycleTableInfo tableInfo = idToTable.remove(tableId);
+                    if (tableInfo == null) {
+                        continue;
+                    }
+                    Table table = tableInfo.getTable();
                     if (table.isManagedTable()) {
                         Env.getCurrentEnv().onEraseOlapTable(tableInfo.dbId, 
(OlapTable) table, false);
                     }
 
-                    // erase table
-                    tableIter.remove();
                     idToRecycleTime.remove(tableId);
 
                     
dbIdTableNameToIds.computeIfPresent(Pair.of(tableInfo.getDbId(), 
table.getName()),
@@ -390,18 +480,26 @@ public class CatalogRecycleBin extends MasterDaemon 
implements Writable {
                             return v.isEmpty() ? null : v;
                         });
 
-                    // log
                     Env.getCurrentEnv().getEditLog().logEraseTable(tableId);
                     LOG.info("erase table[{}]", tableId);
                     eraseNum++;
+                } finally {
+                    writeUnlock();
                 }
-            } // end for tables
+            }
 
-            // 2. erase exceed num
+            // 3. erase exceed num
             if (keepNum < 0) {
                 return;
             }
-            for (Map.Entry<Pair<Long, String>, Set<Long>> entry : 
dbIdTableNameToIds.entrySet()) {
+            List<Map.Entry<Pair<Long, String>, Set<Long>>> groups;
+            readLock();
+            try {
+                groups = new ArrayList<>(dbIdTableNameToIds.entrySet());
+            } finally {
+                readUnlock();
+            }
+            for (Map.Entry<Pair<Long, String>, Set<Long>> entry : groups) {
                 eraseTableWithSameName(entry.getKey().first, 
entry.getKey().second, currentTimeMs, keepNum,
                         Lists.newArrayList(entry.getValue()));
             }
@@ -411,71 +509,98 @@ public class CatalogRecycleBin extends MasterDaemon 
implements Writable {
         }
     }
 
-    private synchronized void eraseTableWithSameName(long dbId, String 
tableName, long currentTimeMs,
+    private void eraseTableWithSameName(long dbId, String tableName, long 
currentTimeMs,
             int maxSameNameTrashNum, List<Long> sameNameTableIdList) {
-        List<Long> tableIdToErase = 
getIdListToEraseByRecycleTime(sameNameTableIdList, maxSameNameTrashNum);
+        List<Long> tableIdToErase;
+        readLock();
+        try {
+            tableIdToErase = 
getIdListToEraseByRecycleTime(sameNameTableIdList, maxSameNameTrashNum);
+        } finally {
+            readUnlock();
+        }
         for (Long tableId : tableIdToErase) {
-            RecycleTableInfo tableInfo = idToTable.get(tableId);
-            if (!isExpireMinLatency(tableId, currentTimeMs)) {
-                continue;
-            }
-            Table table = tableInfo.getTable();
-            if (table.isManagedTable()) {
-                Env.getCurrentEnv().onEraseOlapTable(dbId, (OlapTable) table, 
false);
-            }
+            writeLock();
+            try {
+                RecycleTableInfo tableInfo = idToTable.get(tableId);
+                if (tableInfo == null || !isExpireMinLatency(tableId, 
currentTimeMs)) {
+                    continue;
+                }
+                Table table = tableInfo.getTable();
+                if (table.isManagedTable()) {
+                    Env.getCurrentEnv().onEraseOlapTable(dbId, (OlapTable) 
table, false);
+                }
 
-            idToTable.remove(tableId);
-            idToRecycleTime.remove(tableId);
+                idToTable.remove(tableId);
+                idToRecycleTime.remove(tableId);
 
-            dbIdTableNameToIds.computeIfPresent(Pair.of(dbId, tableName), (k, 
v) -> {
-                v.remove(tableId);
-                return v.isEmpty() ? null : v;
-            });
+                dbIdTableNameToIds.computeIfPresent(Pair.of(dbId, tableName), 
(k, v) -> {
+                    v.remove(tableId);
+                    return v.isEmpty() ? null : v;
+                });
 
-            Env.getCurrentEnv().getEditLog().logEraseTable(tableId);
-            LOG.info("erase table[{}] name: {} from db[{}]", tableId, 
tableName, dbId);
+                Env.getCurrentEnv().getEditLog().logEraseTable(tableId);
+                LOG.info("erase table[{}] name: {} from db[{}]", tableId, 
tableName, dbId);
+            } finally {
+                writeUnlock();
+            }
         }
     }
 
-    public synchronized void replayEraseTable(long tableId) {
-        LOG.info("before replay erase table[{}]", tableId);
-        RecycleTableInfo tableInfo = idToTable.remove(tableId);
-        idToRecycleTime.remove(tableId);
-        if (tableInfo == null) {
-            // FIXME(walter): Sometimes `eraseTable` in 'DROP DB ... FORCE' 
may be executed earlier than
-            // finish drop db, especially in the case of drop db with many 
tables.
-            return;
-        }
+    public void replayEraseTable(long tableId) {
+        writeLock();
+        try {
+            LOG.info("before replay erase table[{}]", tableId);
+            RecycleTableInfo tableInfo = idToTable.remove(tableId);
+            idToRecycleTime.remove(tableId);
+            if (tableInfo == null) {
+                // FIXME(walter): Sometimes `eraseTable` in 'DROP DB ... 
FORCE' may be executed earlier than
+                // finish drop db, especially in the case of drop db with many 
tables.
+                return;
+            }
 
-        dbIdTableNameToIds.computeIfPresent(Pair.of(tableInfo.getDbId(), 
tableInfo.getTable().getName()),
-                (k, v) -> {
-                v.remove(tableId);
-                return v.isEmpty() ? null : v;
-            });
+            dbIdTableNameToIds.computeIfPresent(Pair.of(tableInfo.getDbId(), 
tableInfo.getTable().getName()),
+                    (k, v) -> {
+                    v.remove(tableId);
+                    return v.isEmpty() ? null : v;
+                });
 
-        Table table = tableInfo.getTable();
-        if (table.isManagedTable()) {
-            Env.getCurrentEnv().onEraseOlapTable(tableInfo.dbId, (OlapTable) 
table, true);
+            Table table = tableInfo.getTable();
+            if (table.isManagedTable()) {
+                Env.getCurrentEnv().onEraseOlapTable(tableInfo.dbId, 
(OlapTable) table, true);
+            }
+            LOG.info("replay erase table[{}]", tableId);
+        } finally {
+            writeUnlock();
         }
-        LOG.info("replay erase table[{}]", tableId);
     }
 
-    private synchronized void erasePartition(long currentTimeMs, int keepNum) {
+    private void erasePartition(long currentTimeMs, int keepNum) {
         int eraseNum = 0;
         StopWatch watch = StopWatch.createStarted();
         try {
-            // 1. erase expired partitions
-            Iterator<Map.Entry<Long, RecyclePartitionInfo>> iterator = 
idToPartition.entrySet().iterator();
-            while (iterator.hasNext()) {
-                Map.Entry<Long, RecyclePartitionInfo> entry = iterator.next();
-                RecyclePartitionInfo partitionInfo = entry.getValue();
-                Partition partition = partitionInfo.getPartition();
+            // 1. collect expired partition IDs under read lock
+            List<Long> expiredIds = new ArrayList<>();
+            readLock();
+            try {
+                for (Map.Entry<Long, RecyclePartitionInfo> entry : 
idToPartition.entrySet()) {
+                    if (isExpire(entry.getKey(), currentTimeMs)) {
+                        expiredIds.add(entry.getKey());
+                    }
+                }
+            } finally {
+                readUnlock();
+            }
 
-                long partitionId = entry.getKey();
-                if (isExpire(partitionId, currentTimeMs)) {
+            // 2. erase each expired partition one at a time (microbatch)
+            for (Long partitionId : expiredIds) {
+                writeLock();
+                try {
+                    RecyclePartitionInfo partitionInfo = 
idToPartition.remove(partitionId);
+                    if (partitionInfo == null) {
+                        continue;
+                    }
+                    Partition partition = partitionInfo.getPartition();
                     Env.getCurrentEnv().onErasePartition(partition);
-                    // erase partition
-                    iterator.remove();
                     idToRecycleTime.remove(partitionId);
 
                     dbTblIdPartitionNameToIds.computeIfPresent(
@@ -486,18 +611,28 @@ public class CatalogRecycleBin extends MasterDaemon 
implements Writable {
                                 });
                                 return partitionMap.isEmpty() ? null : 
partitionMap;
                             });
-                    // log
+
                     
Env.getCurrentEnv().getEditLog().logErasePartition(partitionId);
                     LOG.info("erase partition[{}]. reason: expired", 
partitionId);
                     eraseNum++;
+                } finally {
+                    writeUnlock();
                 }
-            } // end for partitions
+            }
 
-            // 2. erase exceed number
+            // 3. erase exceed number
             if (keepNum < 0) {
                 return;
             }
-            for (Map.Entry<Pair<Long, Long>, Map<String, Set<Long>>> entry : 
dbTblIdPartitionNameToIds.entrySet()) {
+            // Collect same-name groups under read lock
+            List<Map.Entry<Pair<Long, Long>, Map<String, Set<Long>>>> groups;
+            readLock();
+            try {
+                groups = new ArrayList<>(dbTblIdPartitionNameToIds.entrySet());
+            } finally {
+                readUnlock();
+            }
+            for (Map.Entry<Pair<Long, Long>, Map<String, Set<Long>>> entry : 
groups) {
                 long dbId = entry.getKey().first;
                 long tableId = entry.getKey().second;
                 for (Map.Entry<String, Set<Long>> partitionEntry : 
entry.getValue().entrySet()) {
@@ -511,66 +646,84 @@ public class CatalogRecycleBin extends MasterDaemon 
implements Writable {
         }
     }
 
-    private synchronized void erasePartitionWithSameName(long dbId, long 
tableId, String partitionName,
+    private void erasePartitionWithSameName(long dbId, long tableId, String 
partitionName,
             long currentTimeMs, int maxSameNameTrashNum, List<Long> 
sameNamePartitionIdList) {
-        List<Long> partitionIdToErase = 
getIdListToEraseByRecycleTime(sameNamePartitionIdList,
-                maxSameNameTrashNum);
+        List<Long> partitionIdToErase;
+        readLock();
+        try {
+            partitionIdToErase = 
getIdListToEraseByRecycleTime(sameNamePartitionIdList, maxSameNameTrashNum);
+        } finally {
+            readUnlock();
+        }
         for (Long partitionId : partitionIdToErase) {
-            RecyclePartitionInfo partitionInfo = 
idToPartition.get(partitionId);
-            if (!isExpireMinLatency(partitionId, currentTimeMs)) {
-                continue;
-            }
-            Partition partition = partitionInfo.getPartition();
+            writeLock();
+            try {
+                RecyclePartitionInfo partitionInfo = 
idToPartition.get(partitionId);
+                if (partitionInfo == null || !isExpireMinLatency(partitionId, 
currentTimeMs)) {
+                    continue;
+                }
+                Partition partition = partitionInfo.getPartition();
 
-            Env.getCurrentEnv().onErasePartition(partition);
-            idToPartition.remove(partitionId);
-            idToRecycleTime.remove(partitionId);
+                Env.getCurrentEnv().onErasePartition(partition);
+                idToPartition.remove(partitionId);
+                idToRecycleTime.remove(partitionId);
 
-            dbTblIdPartitionNameToIds.computeIfPresent(Pair.of(dbId, tableId), 
(pair, partitionMap) -> {
-                partitionMap.computeIfPresent(partitionName, (name, idSet) -> {
-                    idSet.remove(partitionId);
-                    return idSet.isEmpty() ? null : idSet;
+                dbTblIdPartitionNameToIds.computeIfPresent(Pair.of(dbId, 
tableId), (pair, partitionMap) -> {
+                    partitionMap.computeIfPresent(partitionName, (name, idSet) 
-> {
+                        idSet.remove(partitionId);
+                        return idSet.isEmpty() ? null : idSet;
+                    });
+                    return partitionMap.isEmpty() ? null : partitionMap;
                 });
-                return partitionMap.isEmpty() ? null : partitionMap;
-            });
 
-            Env.getCurrentEnv().getEditLog().logErasePartition(partitionId);
-            LOG.info("erase partition[{}] name: {} from table[{}] from 
db[{}]", partitionId,
-                    partitionName, tableId, dbId);
+                
Env.getCurrentEnv().getEditLog().logErasePartition(partitionId);
+                LOG.info("erase partition[{}] name: {} from table[{}] from 
db[{}]", partitionId,
+                        partitionName, tableId, dbId);
+            } finally {
+                writeUnlock();
+            }
         }
     }
 
-    public synchronized void replayErasePartition(long partitionId) {
-        RecyclePartitionInfo partitionInfo = idToPartition.remove(partitionId);
-        idToRecycleTime.remove(partitionId);
+    public void replayErasePartition(long partitionId) {
+        writeLock();
+        try {
+            RecyclePartitionInfo partitionInfo = 
idToPartition.remove(partitionId);
+            idToRecycleTime.remove(partitionId);
 
-        if (partitionInfo == null) {
-            LOG.warn("replayErasePartition: partitionInfo is null for 
partitionId[{}]", partitionId);
-            return;
-        }
+            if (partitionInfo == null) {
+                LOG.warn("replayErasePartition: partitionInfo is null for 
partitionId[{}]", partitionId);
+                return;
+            }
 
-        dbTblIdPartitionNameToIds.computeIfPresent(
-                Pair.of(partitionInfo.getDbId(), partitionInfo.getTableId()), 
(pair, partitionMap) -> {
-                    
partitionMap.computeIfPresent(partitionInfo.getPartition().getName(), (name, 
idSet) -> {
-                        idSet.remove(partitionId);
-                        return idSet.isEmpty() ? null : idSet;
+            dbTblIdPartitionNameToIds.computeIfPresent(
+                    Pair.of(partitionInfo.getDbId(), 
partitionInfo.getTableId()), (pair, partitionMap) -> {
+                        
partitionMap.computeIfPresent(partitionInfo.getPartition().getName(), (name, 
idSet) -> {
+                            idSet.remove(partitionId);
+                            return idSet.isEmpty() ? null : idSet;
+                        });
+                        return partitionMap.isEmpty() ? null : partitionMap;
                     });
-                    return partitionMap.isEmpty() ? null : partitionMap;
-                });
 
-        Partition partition = partitionInfo.getPartition();
-        Env.getCurrentEnv().onErasePartition(partition);
+            Partition partition = partitionInfo.getPartition();
+            Env.getCurrentEnv().onErasePartition(partition);
 
-        LOG.info("replay erase partition[{}]", partitionId);
+            LOG.info("replay erase partition[{}]", partitionId);
+        } finally {
+            writeUnlock();
+        }
     }
 
-    private synchronized List<Long> getIdListToEraseByRecycleTime(List<Long> 
ids, int maxTrashNum) {
+    private List<Long> getIdListToEraseByRecycleTime(List<Long> ids, int 
maxTrashNum) {
         List<Long> idToErase = Lists.newArrayList();
         if (ids.size() <= maxTrashNum) {
             return idToErase;
         }
-        // order by recycle time desc
-        ids.sort((x, y) -> Long.compare(idToRecycleTime.get(y), 
idToRecycleTime.get(x)));
+        // order by recycle time desc; use getOrDefault to handle stale IDs
+        // that may have been removed between snapshot and read lock 
acquisition
+        ids.sort((x, y) -> Long.compare(
+                idToRecycleTime.getOrDefault(y, 0L),
+                idToRecycleTime.getOrDefault(x, 0L)));
 
         for (int i = maxTrashNum; i < ids.size(); i++) {
             idToErase.add(ids.get(i));
@@ -578,66 +731,76 @@ public class CatalogRecycleBin extends MasterDaemon 
implements Writable {
         return idToErase;
     }
 
-    public synchronized Database recoverDatabase(String dbName, long dbId) 
throws DdlException {
-        RecycleDatabaseInfo dbInfo = null;
-        // The recycle time of the force dropped tables and databases will be 
set to zero, use 1 here to
-        // skip these databases and tables.
-        long recycleTime = 1;
-        Iterator<Map.Entry<Long, RecycleDatabaseInfo>> iterator = 
idToDatabase.entrySet().iterator();
-        while (iterator.hasNext()) {
-            Map.Entry<Long, RecycleDatabaseInfo> entry = iterator.next();
-            if (dbName.equals(entry.getValue().getDb().getFullName())) {
-                if (dbId == -1) {
-                    if (recycleTime <= idToRecycleTime.get(entry.getKey())) {
-                        recycleTime = idToRecycleTime.get(entry.getKey());
+    public Database recoverDatabase(String dbName, long dbId) throws 
DdlException {
+        writeLock();
+        try {
+            RecycleDatabaseInfo dbInfo = null;
+            // The recycle time of the force dropped tables and databases will 
be set to zero, use 1 here to
+            // skip these databases and tables.
+            long recycleTime = 1;
+            Iterator<Map.Entry<Long, RecycleDatabaseInfo>> iterator = 
idToDatabase.entrySet().iterator();
+            while (iterator.hasNext()) {
+                Map.Entry<Long, RecycleDatabaseInfo> entry = iterator.next();
+                if (dbName.equals(entry.getValue().getDb().getFullName())) {
+                    if (dbId == -1) {
+                        if (recycleTime <= 
idToRecycleTime.get(entry.getKey())) {
+                            recycleTime = idToRecycleTime.get(entry.getKey());
+                            dbInfo = entry.getValue();
+                        }
+                    } else if (entry.getKey() == dbId) {
                         dbInfo = entry.getValue();
+                        break;
                     }
-                } else if (entry.getKey() == dbId) {
-                    dbInfo = entry.getValue();
-                    break;
                 }
             }
-        }
 
-        if (dbInfo == null) {
-            throw new DdlException("Unknown database '" + dbName + "' or 
database id '" + dbId + "'");
-        }
+            if (dbInfo == null) {
+                throw new DdlException("Unknown database '" + dbName + "' or 
database id '" + dbId + "'");
+            }
 
-        // 1. recover all tables in this db
-        recoverAllTables(dbInfo);
+            // 1. recover all tables in this db
+            recoverAllTables(dbInfo);
 
-        Database db = dbInfo.getDb();
-        // 2. remove db from idToDatabase and idToRecycleTime
-        idToDatabase.remove(db.getId());
-        idToRecycleTime.remove(db.getId());
+            Database db = dbInfo.getDb();
+            // 2. remove db from idToDatabase and idToRecycleTime
+            idToDatabase.remove(db.getId());
+            idToRecycleTime.remove(db.getId());
 
-        dbNameToIds.computeIfPresent(dbInfo.getDb().getFullName(), (k, v) -> {
-            v.remove(dbId);
-            return v.isEmpty() ? null : v;
-        });
+            dbNameToIds.computeIfPresent(dbInfo.getDb().getFullName(), (k, v) 
-> {
+                v.remove(dbId);
+                return v.isEmpty() ? null : v;
+            });
 
-        return db;
+            return db;
+        } finally {
+            writeUnlock();
+        }
     }
 
-    public synchronized Database replayRecoverDatabase(long dbId) {
-        RecycleDatabaseInfo dbInfo = idToDatabase.get(dbId);
-
+    public Database replayRecoverDatabase(long dbId) {
+        writeLock();
         try {
-            recoverAllTables(dbInfo);
-        } catch (DdlException e) {
-            // should not happened
-            LOG.error("failed replay recover database: {}", dbId, e);
-        }
+            RecycleDatabaseInfo dbInfo = idToDatabase.get(dbId);
 
-        idToDatabase.remove(dbId);
-        idToRecycleTime.remove(dbId);
+            try {
+                recoverAllTables(dbInfo);
+            } catch (DdlException e) {
+                // should not happened
+                LOG.error("failed replay recover database: {}", dbId, e);
+            }
 
-        dbNameToIds.computeIfPresent(dbInfo.getDb().getFullName(), (k, v) -> {
-            v.remove(dbId);
-            return v.isEmpty() ? null : v;
-        });
+            idToDatabase.remove(dbId);
+            idToRecycleTime.remove(dbId);
 
-        return dbInfo.getDb();
+            dbNameToIds.computeIfPresent(dbInfo.getDb().getFullName(), (k, v) 
-> {
+                v.remove(dbId);
+                return v.isEmpty() ? null : v;
+            });
+
+            return dbInfo.getDb();
+        } finally {
+            writeUnlock();
+        }
     }
 
     private void recoverAllTables(RecycleDatabaseInfo dbInfo) throws 
DdlException {
@@ -676,71 +839,81 @@ public class CatalogRecycleBin extends MasterDaemon 
implements Writable {
         }
     }
 
-    public synchronized boolean recoverTable(Database db, String tableName, 
long tableId,
+    public boolean recoverTable(Database db, String tableName, long tableId,
                                              String newTableName) throws 
DdlException {
-        // make sure to get db lock
-        Table table = null;
-        // The recycle time of the force dropped tables and databases will be 
set to zero, use 1 here to
-        // skip these databases and tables.
-        long recycleTime = 1;
-        long dbId = db.getId();
-        Iterator<Map.Entry<Long, RecycleTableInfo>> iterator = 
idToTable.entrySet().iterator();
-        while (iterator.hasNext()) {
-            Map.Entry<Long, RecycleTableInfo> entry = iterator.next();
-            RecycleTableInfo tableInfo = entry.getValue();
-            if (tableInfo.getDbId() != dbId) {
-                continue;
-            }
+        writeLock();
+        try {
+            // make sure to get db lock
+            Table table = null;
+            // The recycle time of the force dropped tables and databases will 
be set to zero, use 1 here to
+            // skip these databases and tables.
+            long recycleTime = 1;
+            long dbId = db.getId();
+            Iterator<Map.Entry<Long, RecycleTableInfo>> iterator = 
idToTable.entrySet().iterator();
+            while (iterator.hasNext()) {
+                Map.Entry<Long, RecycleTableInfo> entry = iterator.next();
+                RecycleTableInfo tableInfo = entry.getValue();
+                if (tableInfo.getDbId() != dbId) {
+                    continue;
+                }
 
-            if (!tableInfo.getTable().getName().equals(tableName)) {
-                continue;
-            }
+                if (!tableInfo.getTable().getName().equals(tableName)) {
+                    continue;
+                }
 
-            if (tableId == -1) {
-                if (recycleTime <= idToRecycleTime.get(entry.getKey())) {
-                    recycleTime = idToRecycleTime.get(entry.getKey());
+                if (tableId == -1) {
+                    if (recycleTime <= idToRecycleTime.get(entry.getKey())) {
+                        recycleTime = idToRecycleTime.get(entry.getKey());
+                        table = tableInfo.getTable();
+                    }
+                } else if (entry.getKey() == tableId) {
                     table = tableInfo.getTable();
+                    break;
                 }
-            } else if (entry.getKey() == tableId) {
-                table = tableInfo.getTable();
-                break;
             }
-        }
 
-        if (table == null) {
-            throw new DdlException("Unknown table '" + tableName + "' or table 
id '" + tableId + "' in "
-                + db.getFullName());
-        }
+            if (table == null) {
+                throw new DdlException("Unknown table '" + tableName + "' or 
table id '" + tableId + "' in "
+                    + db.getFullName());
+            }
 
-        if (table.getType() == TableType.MATERIALIZED_VIEW) {
-            throw new DdlException("Can not recover materialized view '" + 
tableName + "' or table id '"
-                    + tableId + "' in " + db.getFullName());
-        }
+            if (table.getType() == TableType.MATERIALIZED_VIEW) {
+                throw new DdlException("Can not recover materialized view '" + 
tableName + "' or table id '"
+                        + tableId + "' in " + db.getFullName());
+            }
 
-        innerRecoverTable(db, table, tableName, newTableName, null, false);
-        LOG.info("recover db[{}] with table[{}]: {}", dbId, table.getId(), 
table.getName());
-        return true;
+            innerRecoverTable(db, table, tableName, newTableName, null, false);
+            LOG.info("recover db[{}] with table[{}]: {}", dbId, table.getId(), 
table.getName());
+            return true;
+        } finally {
+            writeUnlock();
+        }
     }
 
-    public synchronized void replayRecoverTable(Database db, long tableId, 
String newTableName) throws DdlException {
-        // make sure to get db write lock
-        Iterator<Map.Entry<Long, RecycleTableInfo>> iterator = 
idToTable.entrySet().iterator();
-        while (iterator.hasNext()) {
-            Map.Entry<Long, RecycleTableInfo> entry = iterator.next();
-            RecycleTableInfo tableInfo = entry.getValue();
-            if (tableInfo.getTable().getId() != tableId) {
-                continue;
-            }
-            Preconditions.checkState(tableInfo.getDbId() == db.getId());
-            Table table = tableInfo.getTable();
-            String tableName = table.getName();
-            if (innerRecoverTable(db, table, tableName, newTableName, 
iterator, true)) {
-                break;
+    public void replayRecoverTable(Database db, long tableId, String 
newTableName) throws DdlException {
+        writeLock();
+        try {
+            // make sure to get db write lock
+            Iterator<Map.Entry<Long, RecycleTableInfo>> iterator = 
idToTable.entrySet().iterator();
+            while (iterator.hasNext()) {
+                Map.Entry<Long, RecycleTableInfo> entry = iterator.next();
+                RecycleTableInfo tableInfo = entry.getValue();
+                if (tableInfo.getTable().getId() != tableId) {
+                    continue;
+                }
+                Preconditions.checkState(tableInfo.getDbId() == db.getId());
+                Table table = tableInfo.getTable();
+                String tableName = table.getName();
+                if (innerRecoverTable(db, table, tableName, newTableName, 
iterator, true)) {
+                    break;
+                }
             }
+        } finally {
+            writeUnlock();
         }
     }
 
-    private synchronized boolean innerRecoverTable(Database db, Table table, 
String tableName, String newTableName,
+    private boolean innerRecoverTable(Database db, Table table, String 
tableName, String newTableName,
                                                 Iterator<Map.Entry<Long, 
RecycleTableInfo>> iterator,
                                                 boolean isReplay) throws 
DdlException {
         table.writeLock();
@@ -795,217 +968,245 @@ public class CatalogRecycleBin extends MasterDaemon 
implements Writable {
         return true;
     }
 
-    public synchronized void recoverPartition(long dbId, OlapTable table, 
String partitionName,
+    public void recoverPartition(long dbId, OlapTable table, String 
partitionName,
             long partitionIdToRecover, String newPartitionName) throws 
DdlException {
-        if (table.getType() == TableType.MATERIALIZED_VIEW) {
-            throw new DdlException("Can not recover partition in materialized 
view: " + table.getName());
-        }
+        writeLock();
+        try {
+            if (table.getType() == TableType.MATERIALIZED_VIEW) {
+                throw new DdlException("Can not recover partition in 
materialized view: " + table.getName());
+            }
 
-        long recycleTime = -1;
-        // make sure to get db write lock
-        RecyclePartitionInfo recoverPartitionInfo = null;
+            long recycleTime = -1;
+            // make sure to get db write lock
+            RecyclePartitionInfo recoverPartitionInfo = null;
 
-        Iterator<Map.Entry<Long, RecyclePartitionInfo>> iterator = 
idToPartition.entrySet().iterator();
-        while (iterator.hasNext()) {
-            Map.Entry<Long, RecyclePartitionInfo> entry = iterator.next();
-            RecyclePartitionInfo partitionInfo = entry.getValue();
+            Iterator<Map.Entry<Long, RecyclePartitionInfo>> iterator = 
idToPartition.entrySet().iterator();
+            while (iterator.hasNext()) {
+                Map.Entry<Long, RecyclePartitionInfo> entry = iterator.next();
+                RecyclePartitionInfo partitionInfo = entry.getValue();
 
-            if (partitionInfo.getTableId() != table.getId()) {
-                continue;
-            }
+                if (partitionInfo.getTableId() != table.getId()) {
+                    continue;
+                }
 
-            if 
(!partitionInfo.getPartition().getName().equalsIgnoreCase(partitionName)) {
-                continue;
-            }
+                if 
(!partitionInfo.getPartition().getName().equalsIgnoreCase(partitionName)) {
+                    continue;
+                }
 
-            if (partitionIdToRecover == -1) {
-                if (recycleTime <= idToRecycleTime.get(entry.getKey())) {
-                    recycleTime = idToRecycleTime.get(entry.getKey());
+                if (partitionIdToRecover == -1) {
+                    if (recycleTime <= idToRecycleTime.get(entry.getKey())) {
+                        recycleTime = idToRecycleTime.get(entry.getKey());
+                        recoverPartitionInfo = partitionInfo;
+                    }
+                } else if (entry.getKey() == partitionIdToRecover) {
                     recoverPartitionInfo = partitionInfo;
+                    break;
                 }
-            } else if (entry.getKey() == partitionIdToRecover) {
-                recoverPartitionInfo = partitionInfo;
-                break;
             }
-        }
-
-        if (recoverPartitionInfo == null) {
-            throw new DdlException("No partition named '" + partitionName + "' 
or partition id '" + partitionIdToRecover
-                    + "' in table " + table.getName());
-        }
-
-        PartitionInfo partitionInfo = table.getPartitionInfo();
-        PartitionItem recoverItem = null;
-        if (partitionInfo.getType() == PartitionType.RANGE) {
-            recoverItem = new 
RangePartitionItem(recoverPartitionInfo.getRange());
-        } else if (partitionInfo.getType() == PartitionType.LIST) {
-            recoverItem = recoverPartitionInfo.getListPartitionItem();
-        }
-        // check if partition item is invalid
-        if (partitionInfo.getAnyIntersectItem(recoverItem, false) != null) {
-            throw new DdlException("Can not recover partition[" + 
partitionName + "]. Partition item conflict.");
-        }
-
-        // check if schema change
-        Partition recoverPartition = recoverPartitionInfo.getPartition();
-        Set<Long> tableIndex = table.getIndexIdToMeta().keySet();
-        Set<Long> partitionIndex = 
recoverPartition.getMaterializedIndices(IndexExtState.ALL).stream()
-                .map(i -> i.getId()).collect(Collectors.toSet());
-        if (!tableIndex.equals(partitionIndex)) {
-            throw new DdlException("table's index not equal with partition's 
index. table's index=" + tableIndex
-                    + ", partition's index=" + partitionIndex);
-        }
 
-        // check if partition name exists
-        
Preconditions.checkState(recoverPartition.getName().equalsIgnoreCase(partitionName));
-        if (!Strings.isNullOrEmpty(newPartitionName)) {
-            if (table.checkPartitionNameExist(newPartitionName)) {
-                throw new DdlException("Partition name[" + newPartitionName + 
"] is already used");
+            if (recoverPartitionInfo == null) {
+                throw new DdlException("No partition named '" + partitionName
+                        + "' or partition id '" + partitionIdToRecover
+                        + "' in table " + table.getName());
             }
-            recoverPartition.setName(newPartitionName);
-        }
-
-        // recover partition
-        table.addPartition(recoverPartition);
-
-        // recover partition info
-        long partitionId = recoverPartition.getId();
-        partitionInfo.setItem(partitionId, false, recoverItem);
-        partitionInfo.setDataProperty(partitionId, 
recoverPartitionInfo.getDataProperty());
-        partitionInfo.setReplicaAllocation(partitionId, 
recoverPartitionInfo.getReplicaAlloc());
-        partitionInfo.setIsInMemory(partitionId, 
recoverPartitionInfo.isInMemory());
-        partitionInfo.setIsMutable(partitionId, 
recoverPartitionInfo.isMutable());
-
-        // remove from recycle bin
-        idToPartition.remove(partitionId);
-        idToRecycleTime.remove(partitionId);
 
-        if (!Env.getCurrentEnv().invalidCacheForCloud()) {
-            long version = table.getNextVersion();
-            table.updateVisibleVersionAndTime(version, 
System.currentTimeMillis());
-        }
-
-        dbTblIdPartitionNameToIds.computeIfPresent(
-                Pair.of(recoverPartitionInfo.getDbId(), 
recoverPartitionInfo.getTableId()), (pair, partitionMap) -> {
-                    partitionMap.computeIfPresent(partitionName, (name, idSet) 
-> {
-                        idSet.remove(recoverPartition.getId());
-                        return idSet.isEmpty() ? null : idSet;
-                    });
-                    return partitionMap.isEmpty() ? null : partitionMap;
-                });
-
-        // log
-        RecoverInfo recoverInfo = new RecoverInfo(dbId, table.getId(), 
partitionId, "",
-                                                    table.getName(), "", 
partitionName, newPartitionName);
-        Env.getCurrentEnv().getEditLog().logRecoverPartition(recoverInfo);
-        LOG.info("recover partition[{}]", partitionId);
-    }
+            PartitionInfo partitionInfo = table.getPartitionInfo();
+            PartitionItem recoverItem = null;
+            if (partitionInfo.getType() == PartitionType.RANGE) {
+                recoverItem = new 
RangePartitionItem(recoverPartitionInfo.getRange());
+            } else if (partitionInfo.getType() == PartitionType.LIST) {
+                recoverItem = recoverPartitionInfo.getListPartitionItem();
+            }
+            // check if partition item is invalid
+            if (partitionInfo.getAnyIntersectItem(recoverItem, false) != null) 
{
+                throw new DdlException("Can not recover partition[" + 
partitionName + "]. Partition item conflict.");
+            }
 
-    // The caller should keep table write lock
-    public synchronized void replayRecoverPartition(OlapTable table, long 
partitionId,
-                                                    String newPartitionName) 
throws DdlException {
-        Iterator<Map.Entry<Long, RecyclePartitionInfo>> iterator = 
idToPartition.entrySet().iterator();
-        Env currentEnv = Env.getCurrentEnv();
-        while (iterator.hasNext()) {
-            Map.Entry<Long, RecyclePartitionInfo> entry = iterator.next();
-            RecyclePartitionInfo recyclePartitionInfo = entry.getValue();
-            if (recyclePartitionInfo.getPartition().getId() != partitionId) {
-                continue;
+            // check if schema change
+            Partition recoverPartition = recoverPartitionInfo.getPartition();
+            Set<Long> tableIndex = table.getIndexIdToMeta().keySet();
+            Set<Long> partitionIndex = 
recoverPartition.getMaterializedIndices(IndexExtState.ALL).stream()
+                    .map(i -> i.getId()).collect(Collectors.toSet());
+            if (!tableIndex.equals(partitionIndex)) {
+                throw new DdlException("table's index not equal with 
partition's index. table's index=" + tableIndex
+                        + ", partition's index=" + partitionIndex);
             }
 
-            Preconditions.checkState(recyclePartitionInfo.getTableId() == 
table.getId());
+            // check if partition name exists
+            
Preconditions.checkState(recoverPartition.getName().equalsIgnoreCase(partitionName));
             if (!Strings.isNullOrEmpty(newPartitionName)) {
                 if (table.checkPartitionNameExist(newPartitionName)) {
                     throw new DdlException("Partition name[" + 
newPartitionName + "] is already used");
                 }
+                recoverPartition.setName(newPartitionName);
             }
-            table.addPartition(recyclePartitionInfo.getPartition());
-            if (!Strings.isNullOrEmpty(newPartitionName)) {
-                
table.renamePartition(recyclePartitionInfo.getPartition().getName(), 
newPartitionName);
-            }
-            PartitionInfo partitionInfo = table.getPartitionInfo();
-            PartitionItem recoverItem = null;
-            if (partitionInfo.getType() == PartitionType.RANGE) {
-                recoverItem = new 
RangePartitionItem(recyclePartitionInfo.getRange());
-            } else if (partitionInfo.getType() == PartitionType.LIST) {
-                recoverItem = recyclePartitionInfo.getListPartitionItem();
-            }
+
+            // recover partition
+            table.addPartition(recoverPartition);
+
+            // recover partition info
+            long partitionId = recoverPartition.getId();
             partitionInfo.setItem(partitionId, false, recoverItem);
-            partitionInfo.setDataProperty(partitionId, 
recyclePartitionInfo.getDataProperty());
-            partitionInfo.setReplicaAllocation(partitionId, 
recyclePartitionInfo.getReplicaAlloc());
-            partitionInfo.setIsInMemory(partitionId, 
recyclePartitionInfo.isInMemory());
-            partitionInfo.setIsMutable(partitionId, 
recyclePartitionInfo.isMutable());
+            partitionInfo.setDataProperty(partitionId, 
recoverPartitionInfo.getDataProperty());
+            partitionInfo.setReplicaAllocation(partitionId, 
recoverPartitionInfo.getReplicaAlloc());
+            partitionInfo.setIsInMemory(partitionId, 
recoverPartitionInfo.isInMemory());
+            partitionInfo.setIsMutable(partitionId, 
recoverPartitionInfo.isMutable());
 
-            iterator.remove();
+            // remove from recycle bin
+            idToPartition.remove(partitionId);
             idToRecycleTime.remove(partitionId);
 
-            if (!currentEnv.invalidCacheForCloud()) {
+            if (!Env.getCurrentEnv().invalidCacheForCloud()) {
                 long version = table.getNextVersion();
                 table.updateVisibleVersionAndTime(version, 
System.currentTimeMillis());
             }
 
             dbTblIdPartitionNameToIds.computeIfPresent(
-                    Pair.of(recyclePartitionInfo.getDbId(), table.getId()), 
(pair, partitionMap) -> {
-                        
partitionMap.computeIfPresent(recyclePartitionInfo.getPartition().getName(), 
(name, idSet) -> {
-                            idSet.remove(partitionId);
+                    Pair.of(recoverPartitionInfo.getDbId(),
+                            recoverPartitionInfo.getTableId()), (pair, 
partitionMap) -> {
+                        partitionMap.computeIfPresent(partitionName, (name, 
idSet) -> {
+                            idSet.remove(recoverPartition.getId());
                             return idSet.isEmpty() ? null : idSet;
                         });
                         return partitionMap.isEmpty() ? null : partitionMap;
                     });
 
-            LOG.info("replay recover partition[{}]", partitionId);
-            break;
+            // log
+            RecoverInfo recoverInfo = new RecoverInfo(dbId, table.getId(), 
partitionId, "",
+                                                        table.getName(), "", 
partitionName, newPartitionName);
+            Env.getCurrentEnv().getEditLog().logRecoverPartition(recoverInfo);
+            LOG.info("recover partition[{}]", partitionId);
+        } finally {
+            writeUnlock();
         }
     }
 
-    // erase database in catalog recycle bin instantly
-    public synchronized void eraseDatabaseInstantly(long dbId) throws 
DdlException {
-        // 1. find dbInfo and erase db
-        RecycleDatabaseInfo dbInfo = idToDatabase.get(dbId);
-        if (dbInfo != null) {
-            // erase db
-            Env.getCurrentEnv().eraseDatabase(dbId, true);
-
-            // erase db from idToDatabase and idToRecycleTime
-            idToDatabase.remove(dbId);
-            idToRecycleTime.remove(dbId);
+    // The caller should keep table write lock
+    public void replayRecoverPartition(OlapTable table, long partitionId,
+                                                    String newPartitionName) 
throws DdlException {
+        writeLock();
+        try {
+            Iterator<Map.Entry<Long, RecyclePartitionInfo>> iterator = 
idToPartition.entrySet().iterator();
+            Env currentEnv = Env.getCurrentEnv();
+            while (iterator.hasNext()) {
+                Map.Entry<Long, RecyclePartitionInfo> entry = iterator.next();
+                RecyclePartitionInfo recyclePartitionInfo = entry.getValue();
+                if (recyclePartitionInfo.getPartition().getId() != 
partitionId) {
+                    continue;
+                }
 
-            dbNameToIds.computeIfPresent(dbInfo.getDb().getFullName(), (k, v) 
-> {
-                v.remove(dbId);
-                return v.isEmpty() ? null : v;
-            });
+                Preconditions.checkState(recyclePartitionInfo.getTableId() == 
table.getId());
+                if (!Strings.isNullOrEmpty(newPartitionName)) {
+                    if (table.checkPartitionNameExist(newPartitionName)) {
+                        throw new DdlException("Partition name[" + 
newPartitionName + "] is already used");
+                    }
+                }
+                table.addPartition(recyclePartitionInfo.getPartition());
+                if (!Strings.isNullOrEmpty(newPartitionName)) {
+                    
table.renamePartition(recyclePartitionInfo.getPartition().getName(), 
newPartitionName);
+                }
+                PartitionInfo partitionInfo = table.getPartitionInfo();
+                PartitionItem recoverItem = null;
+                if (partitionInfo.getType() == PartitionType.RANGE) {
+                    recoverItem = new 
RangePartitionItem(recyclePartitionInfo.getRange());
+                } else if (partitionInfo.getType() == PartitionType.LIST) {
+                    recoverItem = recyclePartitionInfo.getListPartitionItem();
+                }
+                partitionInfo.setItem(partitionId, false, recoverItem);
+                partitionInfo.setDataProperty(partitionId, 
recyclePartitionInfo.getDataProperty());
+                partitionInfo.setReplicaAllocation(partitionId, 
recyclePartitionInfo.getReplicaAlloc());
+                partitionInfo.setIsInMemory(partitionId, 
recyclePartitionInfo.isInMemory());
+                partitionInfo.setIsMutable(partitionId, 
recyclePartitionInfo.isMutable());
+
+                iterator.remove();
+                idToRecycleTime.remove(partitionId);
+
+                if (!currentEnv.invalidCacheForCloud()) {
+                    long version = table.getNextVersion();
+                    table.updateVisibleVersionAndTime(version, 
System.currentTimeMillis());
+                }
+
+                dbTblIdPartitionNameToIds.computeIfPresent(
+                        Pair.of(recyclePartitionInfo.getDbId(), 
table.getId()), (pair, partitionMap) -> {
+                            partitionMap.computeIfPresent(
+                                    
recyclePartitionInfo.getPartition().getName(),
+                                    (name, idSet) -> {
+                                        idSet.remove(partitionId);
+                                        return idSet.isEmpty() ? null : idSet;
+                                    });
+                            return partitionMap.isEmpty() ? null : 
partitionMap;
+                        });
 
-            // log for erase db
-            String dbName = dbInfo.getDb().getName();
-            LOG.info("erase db[{}]: {}", dbId, dbName);
+                LOG.info("replay recover partition[{}]", partitionId);
+                break;
+            }
+        } finally {
+            writeUnlock();
         }
+    }
 
-        // 2. remove all tables with the same dbId
-        List<Long> tableIdToErase = Lists.newArrayList();
-        Iterator<Map.Entry<Long, RecycleTableInfo>> tableIterator = 
idToTable.entrySet().iterator();
-        while (tableIterator.hasNext()) {
-            Map.Entry<Long, RecycleTableInfo> entry = tableIterator.next();
-            RecycleTableInfo tableInfo = entry.getValue();
-            if (tableInfo.getDbId() == dbId) {
-                tableIdToErase.add(entry.getKey());
+    // erase database in catalog recycle bin instantly
+    public void eraseDatabaseInstantly(long dbId) throws DdlException {
+        // 1. erase db
+        RecycleDatabaseInfo dbInfo;
+        writeLock();
+        try {
+            dbInfo = idToDatabase.get(dbId);
+            if (dbInfo != null) {
+                Env.getCurrentEnv().eraseDatabase(dbId, true);
+                idToDatabase.remove(dbId);
+                idToRecycleTime.remove(dbId);
+
+                dbNameToIds.computeIfPresent(dbInfo.getDb().getFullName(), (k, 
v) -> {
+                    v.remove(dbId);
+                    return v.isEmpty() ? null : v;
+                });
+
+                String dbName = dbInfo.getDb().getName();
+                LOG.info("erase db[{}]: {}", dbId, dbName);
+            }
+        } finally {
+            writeUnlock();
+        }
+
+        // 2. collect tables with same dbId
+        List<Long> tableIdToErase = new ArrayList<>();
+        readLock();
+        try {
+            for (Map.Entry<Long, RecycleTableInfo> entry : 
idToTable.entrySet()) {
+                if (entry.getValue().getDbId() == dbId) {
+                    tableIdToErase.add(entry.getKey());
+                }
             }
+        } finally {
+            readUnlock();
         }
         for (Long tableId : tableIdToErase) {
-            eraseTableInstantly(tableId);
+            try {
+                eraseTableInstantly(tableId);
+            } catch (DdlException e) {
+                LOG.info("table[{}] already erased by concurrent operation, 
skip", tableId);
+            }
         }
 
-        // 3. remove all partitions with the same dbId
-        List<Long> partitionIdToErase = Lists.newArrayList();
-        Iterator<Map.Entry<Long, RecyclePartitionInfo>> partitionIterator = 
idToPartition.entrySet().iterator();
-        while (partitionIterator.hasNext()) {
-            Map.Entry<Long, RecyclePartitionInfo> entry = 
partitionIterator.next();
-            RecyclePartitionInfo partitionInfo = entry.getValue();
-            if (partitionInfo.getDbId() == dbId) {
-                partitionIdToErase.add(entry.getKey());
+        // 3. collect partitions with same dbId
+        List<Long> partitionIdToErase = new ArrayList<>();
+        readLock();
+        try {
+            for (Map.Entry<Long, RecyclePartitionInfo> entry : 
idToPartition.entrySet()) {
+                if (entry.getValue().getDbId() == dbId) {
+                    partitionIdToErase.add(entry.getKey());
+                }
             }
+        } finally {
+            readUnlock();
         }
         for (Long partitionId : partitionIdToErase) {
-            erasePartitionInstantly(partitionId);
+            try {
+                erasePartitionInstantly(partitionId);
+            } catch (DdlException e) {
+                LOG.info("partition[{}] already erased by concurrent 
operation, skip", partitionId);
+            }
         }
 
         // 4. determine if nothing is deleted
@@ -1015,44 +1216,53 @@ public class CatalogRecycleBin extends MasterDaemon 
implements Writable {
     }
 
     // erase table in catalog recycle bin instantly
-    public synchronized void eraseTableInstantly(long tableId) throws 
DdlException {
-        // 1. find tableInfo and erase table
-        RecycleTableInfo tableInfo = idToTable.get(tableId);
-        if (tableInfo != null) {
-            // erase table
-            long dbId = tableInfo.getDbId();
-            Table table = tableInfo.getTable();
-            if (table.getType() == TableType.OLAP || table.getType() == 
TableType.MATERIALIZED_VIEW) {
-                Env.getCurrentEnv().onEraseOlapTable(dbId, (OlapTable) table, 
false);
-            }
+    public void eraseTableInstantly(long tableId) throws DdlException {
+        // 1. erase table
+        RecycleTableInfo tableInfo;
+        writeLock();
+        try {
+            tableInfo = idToTable.get(tableId);
+            if (tableInfo != null) {
+                long dbId = tableInfo.getDbId();
+                Table table = tableInfo.getTable();
+                if (table.getType() == TableType.OLAP || table.getType() == 
TableType.MATERIALIZED_VIEW) {
+                    Env.getCurrentEnv().onEraseOlapTable(dbId, (OlapTable) 
table, false);
+                }
 
-            // erase table from idToTable and idToRecycleTime
-            idToTable.remove(tableId);
-            idToRecycleTime.remove(tableId);
+                idToTable.remove(tableId);
+                idToRecycleTime.remove(tableId);
 
-            dbIdTableNameToIds.computeIfPresent(Pair.of(dbId, 
table.getName()), (k, v) -> {
-                v.remove(tableId);
-                return v.isEmpty() ? null : v;
-            });
+                dbIdTableNameToIds.computeIfPresent(Pair.of(dbId, 
table.getName()), (k, v) -> {
+                    v.remove(tableId);
+                    return v.isEmpty() ? null : v;
+                });
 
-            // log for erase table
-            String tableName = table.getName();
-            Env.getCurrentEnv().getEditLog().logEraseTable(tableId);
-            LOG.info("erase db[{}]'s table[{}]: {}", dbId, tableId, tableName);
+                String tableName = table.getName();
+                Env.getCurrentEnv().getEditLog().logEraseTable(tableId);
+                LOG.info("erase db[{}]'s table[{}]: {}", dbId, tableId, 
tableName);
+            }
+        } finally {
+            writeUnlock();
         }
 
-        // 2. erase all partitions with the same tableId
-        List<Long> partitionIdToErase = Lists.newArrayList();
-        Iterator<Map.Entry<Long, RecyclePartitionInfo>> partitionIterator = 
idToPartition.entrySet().iterator();
-        while (partitionIterator.hasNext()) {
-            Map.Entry<Long, RecyclePartitionInfo> entry = 
partitionIterator.next();
-            RecyclePartitionInfo partitionInfo = entry.getValue();
-            if (partitionInfo.getTableId() == tableId) {
-                partitionIdToErase.add(entry.getKey());
+        // 2. collect partitions with same tableId
+        List<Long> partitionIdToErase = new ArrayList<>();
+        readLock();
+        try {
+            for (Map.Entry<Long, RecyclePartitionInfo> entry : 
idToPartition.entrySet()) {
+                if (entry.getValue().getTableId() == tableId) {
+                    partitionIdToErase.add(entry.getKey());
+                }
             }
+        } finally {
+            readUnlock();
         }
         for (Long partitionId : partitionIdToErase) {
-            erasePartitionInstantly(partitionId);
+            try {
+                erasePartitionInstantly(partitionId);
+            } catch (DdlException e) {
+                LOG.info("partition[{}] already erased by concurrent 
operation, skip", partitionId);
+            }
         }
 
         // 3. determine if nothing is deleted
@@ -1062,35 +1272,36 @@ public class CatalogRecycleBin extends MasterDaemon 
implements Writable {
     }
 
     // erase partition in catalog recycle bin instantly
-    public synchronized void erasePartitionInstantly(long partitionId) throws 
DdlException {
-        // 1. find partitionInfo to erase
-        RecyclePartitionInfo partitionInfo = idToPartition.get(partitionId);
-        if (partitionInfo == null) {
-            throw new DdlException("No partition id '" + partitionId + "'");
-        }
+    public void erasePartitionInstantly(long partitionId) throws DdlException {
+        writeLock();
+        try {
+            RecyclePartitionInfo partitionInfo = 
idToPartition.get(partitionId);
+            if (partitionInfo == null) {
+                throw new DdlException("No partition id '" + partitionId + 
"'");
+            }
 
-        // 2. erase partition
-        Partition partition = partitionInfo.getPartition();
-        Env.getCurrentEnv().onErasePartition(partition);
+            Partition partition = partitionInfo.getPartition();
+            Env.getCurrentEnv().onErasePartition(partition);
 
-        // 3. erase partition in idToPartition and idToRecycleTime
-        idToPartition.remove(partitionId);
-        idToRecycleTime.remove(partitionId);
+            idToPartition.remove(partitionId);
+            idToRecycleTime.remove(partitionId);
 
-        dbTblIdPartitionNameToIds.computeIfPresent(
-                Pair.of(partitionInfo.getDbId(), partitionInfo.getTableId()), 
(pair, partitionMap) -> {
-                    partitionMap.computeIfPresent(partition.getName(), (name, 
idSet) -> {
-                        idSet.remove(partitionId);
-                        return idSet.isEmpty() ? null : idSet;
+            dbTblIdPartitionNameToIds.computeIfPresent(
+                    Pair.of(partitionInfo.getDbId(), 
partitionInfo.getTableId()), (pair, partitionMap) -> {
+                        partitionMap.computeIfPresent(partition.getName(), 
(name, idSet) -> {
+                            idSet.remove(partitionId);
+                            return idSet.isEmpty() ? null : idSet;
+                        });
+                        return partitionMap.isEmpty() ? null : partitionMap;
                     });
-                    return partitionMap.isEmpty() ? null : partitionMap;
-                });
 
-        // 4. log for erase partition
-        long tableId = partitionInfo.getTableId();
-        String partitionName = partition.getName();
-        Env.getCurrentEnv().getEditLog().logErasePartition(partitionId);
-        LOG.info("erase table[{}]'s partition[{}]: {}", tableId, partitionId, 
partitionName);
+            long tableId = partitionInfo.getTableId();
+            String partitionName = partition.getName();
+            Env.getCurrentEnv().getEditLog().logErasePartition(partitionId);
+            LOG.info("erase table[{}]'s partition[{}]: {}", tableId, 
partitionId, partitionName);
+        } finally {
+            writeUnlock();
+        }
     }
 
     // no need to use synchronized.
@@ -1191,187 +1402,203 @@ public class CatalogRecycleBin extends MasterDaemon 
implements Writable {
         eraseDatabase(currentTimeMs, keepNum);
     }
 
-    public synchronized List<List<String>> getInfo() {
-        Map<Long, Pair<Long, Long>> dbToDataSize = new HashMap<>();
-        List<List<String>> tableInfos = Lists.newArrayList();
-        for (Map.Entry<Long, RecycleTableInfo> entry : idToTable.entrySet()) {
-            List<String> info = Lists.newArrayList();
-            info.add("Table");
-            RecycleTableInfo tableInfo = entry.getValue();
-            Table table = tableInfo.getTable();
-            info.add(table.getName());
-            info.add(String.valueOf(tableInfo.getDbId()));
-            info.add(String.valueOf(entry.getKey()));
-            info.add("");
-            //info.add(String.valueOf(idToRecycleTime.get(entry.getKey())));
-            
info.add(TimeUtils.longToTimeString(idToRecycleTime.get(entry.getKey())));
-            // data size
-            long dataSize = table.getDataSize(false);
-            info.add(DebugUtil.printByteWithUnit(dataSize));
-            // remote data size
-            long remoteDataSize = table instanceof OlapTable ? ((OlapTable) 
table).getRemoteDataSize() : 0L;
-            info.add(DebugUtil.printByteWithUnit(remoteDataSize));
-            // calculate database data size
-            dbToDataSize.compute(tableInfo.getDbId(), (k, v) -> {
-                if (v == null) {
-                    return Pair.of(dataSize, remoteDataSize);
-                } else {
-                    v.first += dataSize;
-                    v.second += remoteDataSize;
-                    return v;
-                }
-            });
+    public List<List<String>> getInfo() {
+        readLock();
+        try {
+            Map<Long, Pair<Long, Long>> dbToDataSize = new HashMap<>();
+            List<List<String>> tableInfos = Lists.newArrayList();
+            for (Map.Entry<Long, RecycleTableInfo> entry : 
idToTable.entrySet()) {
+                List<String> info = Lists.newArrayList();
+                info.add("Table");
+                RecycleTableInfo tableInfo = entry.getValue();
+                Table table = tableInfo.getTable();
+                info.add(table.getName());
+                info.add(String.valueOf(tableInfo.getDbId()));
+                info.add(String.valueOf(entry.getKey()));
+                info.add("");
+                
//info.add(String.valueOf(idToRecycleTime.get(entry.getKey())));
+                
info.add(TimeUtils.longToTimeString(idToRecycleTime.get(entry.getKey())));
+                // data size
+                long dataSize = table.getDataSize(false);
+                info.add(DebugUtil.printByteWithUnit(dataSize));
+                // remote data size
+                long remoteDataSize = table instanceof OlapTable ? 
((OlapTable) table).getRemoteDataSize() : 0L;
+                info.add(DebugUtil.printByteWithUnit(remoteDataSize));
+                // calculate database data size
+                dbToDataSize.compute(tableInfo.getDbId(), (k, v) -> {
+                    if (v == null) {
+                        return Pair.of(dataSize, remoteDataSize);
+                    } else {
+                        v.first += dataSize;
+                        v.second += remoteDataSize;
+                        return v;
+                    }
+                });
 
-            tableInfos.add(info);
-        }
-        // sort by Name, DropTime
-        tableInfos.sort((x, y) -> {
-            int nameRet = x.get(1).compareTo(y.get(1));
-            if (nameRet == 0) {
-                return x.get(5).compareTo(y.get(5));
-            } else {
-                return nameRet;
+                tableInfos.add(info);
             }
-        });
-
-        List<List<String>> partitionInfos = Lists.newArrayList();
-        for (Map.Entry<Long, RecyclePartitionInfo> entry : 
idToPartition.entrySet()) {
-            List<String> info = Lists.newArrayList();
-            info.add("Partition");
-            RecyclePartitionInfo partitionInfo = entry.getValue();
-            Partition partition = partitionInfo.getPartition();
-            info.add(partition.getName());
-            info.add(String.valueOf(partitionInfo.getDbId()));
-            info.add(String.valueOf(partitionInfo.getTableId()));
-            info.add(String.valueOf(entry.getKey()));
-            //info.add(String.valueOf(idToRecycleTime.get(entry.getKey())));
-            
info.add(TimeUtils.longToTimeString(idToRecycleTime.get(entry.getKey())));
-            // data size
-            long dataSize = partition.getDataSize(false);
-            info.add(DebugUtil.printByteWithUnit(dataSize));
-            // remote data size
-            long remoteDataSize = partition.getRemoteDataSize();
-            info.add(DebugUtil.printByteWithUnit(remoteDataSize));
-            // calculate database data size
-            dbToDataSize.compute(partitionInfo.getDbId(), (k, v) -> {
-                if (v == null) {
-                    return Pair.of(dataSize, remoteDataSize);
+            // sort by Name, DropTime
+            tableInfos.sort((x, y) -> {
+                int nameRet = x.get(1).compareTo(y.get(1));
+                if (nameRet == 0) {
+                    return x.get(5).compareTo(y.get(5));
                 } else {
-                    v.first += dataSize;
-                    v.second += remoteDataSize;
-                    return v;
+                    return nameRet;
                 }
             });
 
-            partitionInfos.add(info);
-        }
-        // sort by Name, DropTime
-        partitionInfos.sort((x, y) -> {
-            int nameRet = x.get(1).compareTo(y.get(1));
-            if (nameRet == 0) {
-                return x.get(5).compareTo(y.get(5));
-            } else {
-                return nameRet;
-            }
-        });
+            List<List<String>> partitionInfos = Lists.newArrayList();
+            for (Map.Entry<Long, RecyclePartitionInfo> entry : 
idToPartition.entrySet()) {
+                List<String> info = Lists.newArrayList();
+                info.add("Partition");
+                RecyclePartitionInfo partitionInfo = entry.getValue();
+                Partition partition = partitionInfo.getPartition();
+                info.add(partition.getName());
+                info.add(String.valueOf(partitionInfo.getDbId()));
+                info.add(String.valueOf(partitionInfo.getTableId()));
+                info.add(String.valueOf(entry.getKey()));
+                
//info.add(String.valueOf(idToRecycleTime.get(entry.getKey())));
+                
info.add(TimeUtils.longToTimeString(idToRecycleTime.get(entry.getKey())));
+                // data size
+                long dataSize = partition.getDataSize(false);
+                info.add(DebugUtil.printByteWithUnit(dataSize));
+                // remote data size
+                long remoteDataSize = partition.getRemoteDataSize();
+                info.add(DebugUtil.printByteWithUnit(remoteDataSize));
+                // calculate database data size
+                dbToDataSize.compute(partitionInfo.getDbId(), (k, v) -> {
+                    if (v == null) {
+                        return Pair.of(dataSize, remoteDataSize);
+                    } else {
+                        v.first += dataSize;
+                        v.second += remoteDataSize;
+                        return v;
+                    }
+                });
 
-        List<List<String>> dbInfos = Lists.newArrayList();
-        for (Map.Entry<Long, RecycleDatabaseInfo> entry : 
idToDatabase.entrySet()) {
-            List<String> info = Lists.newArrayList();
-            info.add("Database");
-            RecycleDatabaseInfo dbInfo = entry.getValue();
-            Database db = dbInfo.getDb();
-            info.add(db.getFullName());
-            info.add(String.valueOf(entry.getKey()));
-            info.add("");
-            info.add("");
-            //info.add(String.valueOf(idToRecycleTime.get(entry.getKey())));
-            
info.add(TimeUtils.longToTimeString(idToRecycleTime.get(entry.getKey())));
-            // data size
-            Pair<Long, Long> dataSizePair = 
dbToDataSize.getOrDefault(entry.getKey(), Pair.of(0L, 0L));
-            info.add(DebugUtil.printByteWithUnit(dataSizePair.first));
-            // remote data size
-            info.add(DebugUtil.printByteWithUnit(dataSizePair.second));
-
-            dbInfos.add(info);
-        }
-        // sort by Name, DropTime
-        dbInfos.sort((x, y) -> {
-            int nameRet = x.get(1).compareTo(y.get(1));
-            if (nameRet == 0) {
-                return x.get(5).compareTo(y.get(5));
-            } else {
-                return nameRet;
+                partitionInfos.add(info);
             }
-        });
-
-        return Stream.of(dbInfos, tableInfos, 
partitionInfos).flatMap(Collection::stream).collect(Collectors.toList());
-    }
+            // sort by Name, DropTime
+            partitionInfos.sort((x, y) -> {
+                int nameRet = x.get(1).compareTo(y.get(1));
+                if (nameRet == 0) {
+                    return x.get(5).compareTo(y.get(5));
+                } else {
+                    return nameRet;
+                }
+            });
 
-    public synchronized Map<Long, Pair<Long, Long>> getDbToRecycleSize() {
-        Map<Long, Pair<Long, Long>> dbToRecycleSize = new HashMap<>();
-        for (Map.Entry<Long, RecycleTableInfo> entry : idToTable.entrySet()) {
-            RecycleTableInfo tableInfo = entry.getValue();
-            Table table = tableInfo.getTable();
-            if (!(table instanceof OlapTable)) {
-                continue;
+            List<List<String>> dbInfos = Lists.newArrayList();
+            for (Map.Entry<Long, RecycleDatabaseInfo> entry : 
idToDatabase.entrySet()) {
+                List<String> info = Lists.newArrayList();
+                info.add("Database");
+                RecycleDatabaseInfo dbInfo = entry.getValue();
+                Database db = dbInfo.getDb();
+                info.add(db.getFullName());
+                info.add(String.valueOf(entry.getKey()));
+                info.add("");
+                info.add("");
+                
//info.add(String.valueOf(idToRecycleTime.get(entry.getKey())));
+                
info.add(TimeUtils.longToTimeString(idToRecycleTime.get(entry.getKey())));
+                // data size
+                Pair<Long, Long> dataSizePair = 
dbToDataSize.getOrDefault(entry.getKey(), Pair.of(0L, 0L));
+                info.add(DebugUtil.printByteWithUnit(dataSizePair.first));
+                // remote data size
+                info.add(DebugUtil.printByteWithUnit(dataSizePair.second));
+
+                dbInfos.add(info);
             }
-            long dataSize = table.getDataSize(false);
-            long remoteDataSize = ((OlapTable) table).getRemoteDataSize();
-            dbToRecycleSize.compute(tableInfo.getDbId(), (k, v) -> {
-                if (v == null) {
-                    return Pair.of(dataSize, remoteDataSize);
+            // sort by Name, DropTime
+            dbInfos.sort((x, y) -> {
+                int nameRet = x.get(1).compareTo(y.get(1));
+                if (nameRet == 0) {
+                    return x.get(5).compareTo(y.get(5));
                 } else {
-                    v.first += dataSize;
-                    v.second += remoteDataSize;
-                    return v;
+                    return nameRet;
                 }
             });
+
+            return Stream.of(dbInfos, tableInfos, partitionInfos)
+                    .flatMap(Collection::stream).collect(Collectors.toList());
+        } finally {
+            readUnlock();
         }
+    }
 
-        for (Map.Entry<Long, RecyclePartitionInfo> entry : 
idToPartition.entrySet()) {
-            RecyclePartitionInfo partitionInfo = entry.getValue();
-            Partition partition = partitionInfo.getPartition();
-            long dataSize = partition.getDataSize(false);
-            long remoteDataSize = partition.getRemoteDataSize();
-            dbToRecycleSize.compute(partitionInfo.getDbId(), (k, v) -> {
-                if (v == null) {
-                    return Pair.of(dataSize, remoteDataSize);
-                } else {
-                    v.first += dataSize;
-                    v.second += remoteDataSize;
-                    return v;
+    public Map<Long, Pair<Long, Long>> getDbToRecycleSize() {
+        readLock();
+        try {
+            Map<Long, Pair<Long, Long>> dbToRecycleSize = new HashMap<>();
+            for (Map.Entry<Long, RecycleTableInfo> entry : 
idToTable.entrySet()) {
+                RecycleTableInfo tableInfo = entry.getValue();
+                Table table = tableInfo.getTable();
+                if (!(table instanceof OlapTable)) {
+                    continue;
                 }
-            });
+                long dataSize = table.getDataSize(false);
+                long remoteDataSize = ((OlapTable) table).getRemoteDataSize();
+                dbToRecycleSize.compute(tableInfo.getDbId(), (k, v) -> {
+                    if (v == null) {
+                        return Pair.of(dataSize, remoteDataSize);
+                    } else {
+                        v.first += dataSize;
+                        v.second += remoteDataSize;
+                        return v;
+                    }
+                });
+            }
+
+            for (Map.Entry<Long, RecyclePartitionInfo> entry : 
idToPartition.entrySet()) {
+                RecyclePartitionInfo partitionInfo = entry.getValue();
+                Partition partition = partitionInfo.getPartition();
+                long dataSize = partition.getDataSize(false);
+                long remoteDataSize = partition.getRemoteDataSize();
+                dbToRecycleSize.compute(partitionInfo.getDbId(), (k, v) -> {
+                    if (v == null) {
+                        return Pair.of(dataSize, remoteDataSize);
+                    } else {
+                        v.first += dataSize;
+                        v.second += remoteDataSize;
+                        return v;
+                    }
+                });
+            }
+            return dbToRecycleSize;
+        } finally {
+            readUnlock();
         }
-        return dbToRecycleSize;
     }
 
-    // Need to add "synchronized", because when calling /dump api to dump 
image,
+    // Need to add read lock, because when calling /dump api to dump image,
     // this class is not protected by any lock, will throw 
ConcurrentModificationException.
     @Override
-    public synchronized void write(DataOutput out) throws IOException {
-        out.writeInt(idToDatabase.size());
-        for (Map.Entry<Long, RecycleDatabaseInfo> entry : 
idToDatabase.entrySet()) {
-            out.writeLong(entry.getKey());
-            entry.getValue().write(out);
-        }
-        out.writeInt(idToTable.size());
-        for (Map.Entry<Long, RecycleTableInfo> entry : idToTable.entrySet()) {
-            out.writeLong(entry.getKey());
-            entry.getValue().write(out);
-        }
-        out.writeInt(idToPartition.size());
-        for (Map.Entry<Long, RecyclePartitionInfo> entry : 
idToPartition.entrySet()) {
-            out.writeLong(entry.getKey());
-            entry.getValue().write(out);
-        }
-        out.writeInt(idToRecycleTime.size());
-        for (Map.Entry<Long, Long> entry : idToRecycleTime.entrySet()) {
-            out.writeLong(entry.getKey());
-            out.writeLong(entry.getValue());
-        }
-        Text.writeString(out, GsonUtils.GSON.toJson(this));
+    public void write(DataOutput out) throws IOException {
+        readLock();
+        try {
+            out.writeInt(idToDatabase.size());
+            for (Map.Entry<Long, RecycleDatabaseInfo> entry : 
idToDatabase.entrySet()) {
+                out.writeLong(entry.getKey());
+                entry.getValue().write(out);
+            }
+            out.writeInt(idToTable.size());
+            for (Map.Entry<Long, RecycleTableInfo> entry : 
idToTable.entrySet()) {
+                out.writeLong(entry.getKey());
+                entry.getValue().write(out);
+            }
+            out.writeInt(idToPartition.size());
+            for (Map.Entry<Long, RecyclePartitionInfo> entry : 
idToPartition.entrySet()) {
+                out.writeLong(entry.getKey());
+                entry.getValue().write(out);
+            }
+            out.writeInt(idToRecycleTime.size());
+            for (Map.Entry<Long, Long> entry : idToRecycleTime.entrySet()) {
+                out.writeLong(entry.getKey());
+                out.writeLong(entry.getValue());
+            }
+            Text.writeString(out, GsonUtils.GSON.toJson(this));
+        } finally {
+            readUnlock();
+        }
     }
 
     public void readFieldsWithGson(DataInput in) throws IOException {
@@ -1608,14 +1835,19 @@ public class CatalogRecycleBin extends MasterDaemon 
implements Writable {
     }
 
     // only for unit test
-    public synchronized void clearAll() {
-        idToDatabase.clear();
-        idToTable.clear();
-        idToPartition.clear();
-        idToRecycleTime.clear();
-        dbNameToIds.clear();
-        dbIdTableNameToIds.clear();
-        dbTblIdPartitionNameToIds.clear();
-        LOG.info("Cleared all objects in recycle bin");
+    public void clearAll() {
+        writeLock();
+        try {
+            idToDatabase.clear();
+            idToTable.clear();
+            idToPartition.clear();
+            idToRecycleTime.clear();
+            dbNameToIds.clear();
+            dbIdTableNameToIds.clear();
+            dbTblIdPartitionNameToIds.clear();
+            LOG.info("Cleared all objects in recycle bin");
+        } finally {
+            writeUnlock();
+        }
     }
 }
diff --git 
a/fe/fe-core/src/test/java/org/apache/doris/catalog/CatalogRecycleBinTest.java 
b/fe/fe-core/src/test/java/org/apache/doris/catalog/CatalogRecycleBinTest.java
index 20f5c01bea2..3b642b5809f 100644
--- 
a/fe/fe-core/src/test/java/org/apache/doris/catalog/CatalogRecycleBinTest.java
+++ 
b/fe/fe-core/src/test/java/org/apache/doris/catalog/CatalogRecycleBinTest.java
@@ -35,11 +35,19 @@ import org.junit.BeforeClass;
 import org.junit.Test;
 
 import java.io.File;
+import java.util.ArrayList;
 import java.util.List;
 import java.util.Map;
 import java.util.Optional;
 import java.util.Set;
 import java.util.UUID;
+import java.util.concurrent.CountDownLatch;
+import java.util.concurrent.CyclicBarrier;
+import java.util.concurrent.ExecutorService;
+import java.util.concurrent.Executors;
+import java.util.concurrent.Future;
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.atomic.AtomicBoolean;
 import java.util.stream.Collectors;
 
 public class CatalogRecycleBinTest {
@@ -835,5 +843,182 @@ public class CatalogRecycleBinTest {
         db.unregisterTable(CatalogTestUtil.testEsTableId1);
         recycleBin.recycleTable(CatalogTestUtil.testDbId1, esTable, false, 
false, 0);
     }
+
+    @Test
+    public void testConcurrentReadsDoNotBlock() throws Exception {
+        CatalogRecycleBin recycleBin = Env.getCurrentRecycleBin();
+
+        // Recycle several partitions
+        for (int i = 1; i <= 50; i++) {
+            MaterializedIndex index = new MaterializedIndex(2000 + i, 
IndexState.NORMAL);
+            RandomDistributionInfo dist = new RandomDistributionInfo(1);
+            Partition partition = new Partition(3000 + i, "part_" + i, index, 
dist);
+            recycleBin.recyclePartition(
+                    CatalogTestUtil.testDbId1, CatalogTestUtil.testTableId1,
+                    CatalogTestUtil.testTable1, partition, null, null,
+                    new DataProperty(TStorageMedium.HDD), new 
ReplicaAllocation((short) 3),
+                    false, false);
+        }
+
+        // Multiple reader threads should run concurrently without blocking 
each other
+        int numReaders = 10;
+        CyclicBarrier barrier = new CyclicBarrier(numReaders);
+        ExecutorService executor = Executors.newFixedThreadPool(numReaders);
+        List<Future<Boolean>> futures = new ArrayList<>();
+
+        for (int i = 0; i < numReaders; i++) {
+            futures.add(executor.submit(() -> {
+                barrier.await(5, TimeUnit.SECONDS);
+                // Perform various read operations concurrently
+                for (int j = 1; j <= 50; j++) {
+                    recycleBin.isRecyclePartition(CatalogTestUtil.testDbId1,
+                            CatalogTestUtil.testTableId1, 3000 + j);
+                    recycleBin.getRecycleTimeById(3000 + j);
+                }
+                Set<Long> dbIds = Sets.newHashSet();
+                Set<Long> tableIds = Sets.newHashSet();
+                Set<Long> partIds = Sets.newHashSet();
+                recycleBin.getRecycleIds(dbIds, tableIds, partIds);
+                return true;
+            }));
+        }
+
+        executor.shutdown();
+        Assert.assertTrue(executor.awaitTermination(30, TimeUnit.SECONDS));
+        for (Future<Boolean> f : futures) {
+            Assert.assertTrue(f.get());
+        }
+    }
+
+    @Test
+    public void testConcurrentRecycleAndRead() throws Exception {
+        CatalogRecycleBin recycleBin = Env.getCurrentRecycleBin();
+
+        AtomicBoolean readerError = new AtomicBoolean(false);
+        AtomicBoolean writerDone = new AtomicBoolean(false);
+        CountDownLatch startLatch = new CountDownLatch(1);
+
+        // Writer thread: continuously recycles partitions
+        Thread writer = new Thread(() -> {
+            try {
+                startLatch.await();
+                for (int i = 1; i <= 100; i++) {
+                    MaterializedIndex index = new MaterializedIndex(4000 + i, 
IndexState.NORMAL);
+                    RandomDistributionInfo dist = new 
RandomDistributionInfo(1);
+                    Partition partition = new Partition(5000 + i, "cpart_" + 
i, index, dist);
+                    recycleBin.recyclePartition(
+                            CatalogTestUtil.testDbId1, 
CatalogTestUtil.testTableId1,
+                            CatalogTestUtil.testTable1, partition, null, null,
+                            new DataProperty(TStorageMedium.HDD), new 
ReplicaAllocation((short) 3),
+                            false, false);
+                }
+            } catch (Exception e) {
+                readerError.set(true);
+            } finally {
+                writerDone.set(true);
+            }
+        });
+
+        // Reader threads: continuously read while writer is active
+        List<Thread> readers = new ArrayList<>();
+        for (int r = 0; r < 5; r++) {
+            Thread reader = new Thread(() -> {
+                try {
+                    startLatch.await();
+                    while (!writerDone.get()) {
+                        // These should never throw 
ConcurrentModificationException
+                        Set<Long> dbIds = Sets.newHashSet();
+                        Set<Long> tableIds = Sets.newHashSet();
+                        Set<Long> partIds = Sets.newHashSet();
+                        recycleBin.getRecycleIds(dbIds, tableIds, partIds);
+                        
recycleBin.isRecyclePartition(CatalogTestUtil.testDbId1,
+                                CatalogTestUtil.testTableId1, 5001);
+                    }
+                } catch (Exception e) {
+                    readerError.set(true);
+                }
+            });
+            readers.add(reader);
+        }
+
+        writer.start();
+        readers.forEach(Thread::start);
+        startLatch.countDown();
+
+        writer.join(30_000);
+        for (Thread reader : readers) {
+            reader.join(30_000);
+        }
+
+        Assert.assertFalse("Reader or writer thread encountered an error", 
readerError.get());
+        // Verify all 100 partitions were recycled
+        for (int i = 1; i <= 100; i++) {
+            
Assert.assertTrue(recycleBin.isRecyclePartition(CatalogTestUtil.testDbId1,
+                    CatalogTestUtil.testTableId1, 5000 + i));
+        }
+    }
+
+    @Test
+    public void testMicrobatchEraseReleasesLockBetweenItems() throws Exception 
{
+        CatalogRecycleBin recycleBin = Env.getCurrentRecycleBin();
+
+        // Recycle many partitions
+        int numPartitions = 50;
+        for (int i = 1; i <= numPartitions; i++) {
+            MaterializedIndex index = new MaterializedIndex(6000 + i, 
IndexState.NORMAL);
+            RandomDistributionInfo dist = new RandomDistributionInfo(1);
+            Partition partition = new Partition(7000 + i, "epart_" + i, index, 
dist);
+            recycleBin.recyclePartition(
+                    CatalogTestUtil.testDbId1, CatalogTestUtil.testTableId1,
+                    CatalogTestUtil.testTable1, partition, null, null,
+                    new DataProperty(TStorageMedium.HDD), new 
ReplicaAllocation((short) 3),
+                    false, false);
+        }
+
+        // Verify all were recycled
+        Set<Long> dbIds = Sets.newHashSet();
+        Set<Long> tableIds = Sets.newHashSet();
+        Set<Long> partitionIds = Sets.newHashSet();
+        recycleBin.getRecycleIds(dbIds, tableIds, partitionIds);
+        Assert.assertEquals(numPartitions, partitionIds.size());
+
+        // Now run erase daemon which should process items one at a time
+        // While erase is running, a concurrent recyclePartition should be 
able to
+        // proceed between items (not blocked for the entire erase duration)
+        AtomicBoolean recycleCompleted = new AtomicBoolean(false);
+        AtomicBoolean eraseStarted = new AtomicBoolean(false);
+
+        Thread eraseThread = new Thread(() -> {
+            eraseStarted.set(true);
+            recycleBin.runAfterCatalogReady();
+        });
+
+        eraseThread.start();
+
+        // Wait briefly for erase to start, then try to recycle a new partition
+        Thread.sleep(50);
+        if (eraseStarted.get()) {
+            MaterializedIndex newIndex = new MaterializedIndex(8000, 
IndexState.NORMAL);
+            RandomDistributionInfo newDist = new RandomDistributionInfo(1);
+            Partition newPartition = new Partition(9000, "new_part", newIndex, 
newDist);
+            recycleBin.recyclePartition(
+                    CatalogTestUtil.testDbId1, CatalogTestUtil.testTableId1,
+                    CatalogTestUtil.testTable1, newPartition, null, null,
+                    new DataProperty(TStorageMedium.HDD), new 
ReplicaAllocation((short) 3),
+                    false, false);
+            recycleCompleted.set(true);
+        }
+
+        eraseThread.join(60_000);
+        Assert.assertFalse("Erase thread should have finished", 
eraseThread.isAlive());
+
+        // The new partition should have been recycled successfully
+        if (eraseStarted.get()) {
+            Assert.assertTrue("recyclePartition should succeed during erase",
+                    recycleCompleted.get());
+            
Assert.assertTrue(recycleBin.isRecyclePartition(CatalogTestUtil.testDbId1,
+                    CatalogTestUtil.testTableId1, 9000));
+        }
+    }
 }
 


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to