This is an automated email from the ASF dual-hosted git repository. caiconghui pushed a commit to branch writeLock in repository https://gitbox.apache.org/repos/asf/incubator-doris.git
commit e1c4b9ef919edf57d802b3356790048f2ca8bb3b Author: caiconghui <[email protected]> AuthorDate: Mon Sep 13 20:07:06 2021 +0800 fix --- .../main/java/org/apache/doris/alter/Alter.java | 75 ++++----- .../java/org/apache/doris/alter/AlterHandler.java | 2 +- .../java/org/apache/doris/alter/AlterJobV2.java | 2 +- .../doris/alter/MaterializedViewHandler.java | 10 +- .../java/org/apache/doris/alter/RollupJob.java | 5 +- .../java/org/apache/doris/alter/RollupJobV2.java | 4 +- .../apache/doris/alter/SchemaChangeHandler.java | 8 +- .../org/apache/doris/alter/SchemaChangeJob.java | 15 +- .../org/apache/doris/alter/SchemaChangeJobV2.java | 4 +- .../java/org/apache/doris/backup/RestoreJob.java | 22 ++- .../java/org/apache/doris/catalog/Catalog.java | 171 +++++++++++++-------- .../apache/doris/catalog/CatalogRecycleBin.java | 1 + .../java/org/apache/doris/catalog/Database.java | 52 ++++--- .../main/java/org/apache/doris/catalog/Table.java | 33 ++-- .../org/apache/doris/catalog/TabletStatMgr.java | 5 +- .../doris/clone/DynamicPartitionScheduler.java | 4 +- .../org/apache/doris/clone/TabletSchedCtx.java | 10 +- .../org/apache/doris/clone/TabletScheduler.java | 9 +- .../apache/doris/common/util/MetaLockUtils.java | 18 ++- .../doris/consistency/CheckConsistencyJob.java | 8 +- .../org/apache/doris/http/rest/RowCountAction.java | 2 +- .../doris/http/rest/TableRowCountAction.java | 4 +- .../apache/doris/journal/bdbje/BDBJEJournal.java | 7 +- .../java/org/apache/doris/load/LoadChecker.java | 7 +- .../apache/doris/load/loadv2/BrokerLoadJob.java | 2 +- .../org/apache/doris/load/loadv2/SparkLoadJob.java | 2 +- .../java/org/apache/doris/master/MasterImpl.java | 6 +- .../org/apache/doris/master/ReportHandler.java | 11 +- .../doris/transaction/DatabaseTransactionMgr.java | 2 +- .../org/apache/doris/catalog/InfoSchemaDbTest.java | 3 +- .../doris/common/util/MetaLockUtilsTest.java | 3 +- 31 files changed, 304 insertions(+), 203 deletions(-) diff --git a/fe/fe-core/src/main/java/org/apache/doris/alter/Alter.java b/fe/fe-core/src/main/java/org/apache/doris/alter/Alter.java index bf1a0c8..542acdd 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/alter/Alter.java +++ b/fe/fe-core/src/main/java/org/apache/doris/alter/Alter.java @@ -148,7 +148,7 @@ public class Alter { } else if (currentAlterOps.hasPartitionOp()) { Preconditions.checkState(alterClauses.size() == 1); AlterClause alterClause = alterClauses.get(0); - olapTable.writeLock(); + olapTable.writeLockOrDdlException(); try { if (alterClause instanceof DropPartitionClause) { if (!((DropPartitionClause) alterClause).isTempPartition()) { @@ -207,7 +207,7 @@ public class Alter { private void processModifyTableComment(Database db, OlapTable tbl, AlterClause alterClause) throws DdlException { - tbl.writeLock(); + tbl.writeLockOrDdlException(); try { ModifyTableCommentClause clause = (ModifyTableCommentClause) alterClause; tbl.setComment(clause.getComment()); @@ -221,7 +221,7 @@ public class Alter { private void processModifyColumnComment(Database db, OlapTable tbl, List<AlterClause> alterClauses) throws DdlException { - tbl.writeLock(); + tbl.writeLockOrDdlException(); try { // check first Map<String, String> colToComment = Maps.newHashMap(); @@ -334,7 +334,7 @@ public class Alter { ((SchemaChangeHandler) schemaChangeHandler).updatePartitionsInMemoryMeta( db, tableName, partitionNames, properties); OlapTable olapTable = (OlapTable) table; - olapTable.writeLock(); + olapTable.writeLockOrDdlException(); try { modifyPartitionsProperty(db, olapTable, partitionNames, properties); } finally { @@ -356,27 +356,29 @@ public class Alter { ReplaceTableClause clause = (ReplaceTableClause) alterClauses.get(0); String newTblName = clause.getTblName(); boolean swapTable = clause.isSwapTable(); - Table newTbl = db.getTableOrMetaException(newTblName, TableType.OLAP); - OlapTable olapNewTbl = (OlapTable) newTbl; - db.writeLock(); - origTable.writeLock(); + db.writeLockOrDdlException(); try { - String oldTblName = origTable.getName(); - // First, we need to check whether the table to be operated on can be renamed - olapNewTbl.checkAndSetName(oldTblName, true); - if (swapTable) { - origTable.checkAndSetName(newTblName, true); + Table newTbl = db.getTableOrMetaException(newTblName, TableType.OLAP); + OlapTable olapNewTbl = (OlapTable) newTbl; + origTable.writeLock(); + try { + String oldTblName = origTable.getName(); + // First, we need to check whether the table to be operated on can be renamed + olapNewTbl.checkAndSetName(oldTblName, true); + if (swapTable) { + origTable.checkAndSetName(newTblName, true); + } + replaceTableInternal(db, origTable, olapNewTbl, swapTable, false); + // write edit log + ReplaceTableOperationLog log = new ReplaceTableOperationLog(db.getId(), origTable.getId(), olapNewTbl.getId(), swapTable); + Catalog.getCurrentCatalog().getEditLog().logReplaceTable(log); + LOG.info("finish replacing table {} with table {}, is swap: {}", oldTblName, newTblName, swapTable); + } finally { + origTable.writeUnlock(); } - replaceTableInternal(db, origTable, olapNewTbl, swapTable, false); - // write edit log - ReplaceTableOperationLog log = new ReplaceTableOperationLog(db.getId(), origTable.getId(), olapNewTbl.getId(), swapTable); - Catalog.getCurrentCatalog().getEditLog().logReplaceTable(log); - LOG.info("finish replacing table {} with table {}, is swap: {}", oldTblName, newTblName, swapTable); } finally { - origTable.writeUnlock(); db.writeUnlock(); } - } public void replayReplaceTable(ReplaceTableOperationLog log) throws MetaNotFoundException { @@ -446,25 +448,28 @@ public class Alter { } private void modifyViewDef(Database db, View view, String inlineViewDef, long sqlMode, List<Column> newFullSchema) throws DdlException { - db.writeLock(); - view.writeLock(); + db.writeLockOrDdlException(); try { - view.setInlineViewDefWithSqlMode(inlineViewDef, sqlMode); + view.writeLockOrDdlException(); try { - view.init(); - } catch (UserException e) { - throw new DdlException("failed to init view stmt", e); + view.setInlineViewDefWithSqlMode(inlineViewDef, sqlMode); + try { + view.init(); + } catch (UserException e) { + throw new DdlException("failed to init view stmt", e); + } + view.setNewFullSchema(newFullSchema); + String viewName = view.getName(); + db.dropTable(viewName); + db.createTable(view); + + AlterViewInfo alterViewInfo = new AlterViewInfo(db.getId(), view.getId(), inlineViewDef, newFullSchema, sqlMode); + Catalog.getCurrentCatalog().getEditLog().logModifyViewDef(alterViewInfo); + LOG.info("modify view[{}] definition to {}", viewName, inlineViewDef); + } finally { + view.writeUnlock(); } - view.setNewFullSchema(newFullSchema); - String viewName = view.getName(); - db.dropTable(viewName); - db.createTable(view); - - AlterViewInfo alterViewInfo = new AlterViewInfo(db.getId(), view.getId(), inlineViewDef, newFullSchema, sqlMode); - Catalog.getCurrentCatalog().getEditLog().logModifyViewDef(alterViewInfo); - LOG.info("modify view[{}] definition to {}", viewName, inlineViewDef); } finally { - view.writeUnlock(); db.writeUnlock(); } } diff --git a/fe/fe-core/src/main/java/org/apache/doris/alter/AlterHandler.java b/fe/fe-core/src/main/java/org/apache/doris/alter/AlterHandler.java index 2f825b2..9aefdbb 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/alter/AlterHandler.java +++ b/fe/fe-core/src/main/java/org/apache/doris/alter/AlterHandler.java @@ -420,7 +420,7 @@ public abstract class AlterHandler extends MasterDaemon { Database db = Catalog.getCurrentCatalog().getDbOrMetaException(task.getDbId()); OlapTable tbl = db.getTableOrMetaException(task.getTableId(), Table.TableType.OLAP); - tbl.writeLock(); + tbl.writeLockOrMetaException(); try { Partition partition = tbl.getPartition(task.getPartitionId()); if (partition == null) { diff --git a/fe/fe-core/src/main/java/org/apache/doris/alter/AlterJobV2.java b/fe/fe-core/src/main/java/org/apache/doris/alter/AlterJobV2.java index 6a31be1..c8c8815 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/alter/AlterJobV2.java +++ b/fe/fe-core/src/main/java/org/apache/doris/alter/AlterJobV2.java @@ -194,7 +194,7 @@ public abstract class AlterJobV2 implements Writable { throw new AlterCancelException(e.getMessage()); } - tbl.writeLock(); + tbl.writeLockOrAlterCancelException(); try { boolean isStable = tbl.isStable(Catalog.getCurrentSystemInfo(), Catalog.getCurrentCatalog().getTabletScheduler(), db.getClusterName()); diff --git a/fe/fe-core/src/main/java/org/apache/doris/alter/MaterializedViewHandler.java b/fe/fe-core/src/main/java/org/apache/doris/alter/MaterializedViewHandler.java index 66c1c53..ccd5219 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/alter/MaterializedViewHandler.java +++ b/fe/fe-core/src/main/java/org/apache/doris/alter/MaterializedViewHandler.java @@ -174,7 +174,7 @@ public class MaterializedViewHandler extends AlterHandler { */ public void processCreateMaterializedView(CreateMaterializedViewStmt addMVClause, Database db, OlapTable olapTable) throws DdlException, AnalysisException { - olapTable.writeLock(); + olapTable.writeLockOrDdlException(); try { olapTable.checkStableAndNormal(db.getClusterName()); if (olapTable.existTempPartitions()) { @@ -229,7 +229,7 @@ public class MaterializedViewHandler extends AlterHandler { Map<String, RollupJobV2> rollupNameJobMap = new LinkedHashMap<>(); // save job id for log Set<Long> logJobIdSet = new HashSet<>(); - olapTable.writeLock(); + olapTable.writeLockOrDdlException(); try { if (olapTable.existTempPartitions()) { throw new DdlException("Can not alter table when there are temp partitions in table"); @@ -708,7 +708,7 @@ public class MaterializedViewHandler extends AlterHandler { public void processBatchDropRollup(List<AlterClause> dropRollupClauses, Database db, OlapTable olapTable) throws DdlException, MetaNotFoundException { - olapTable.writeLock(); + olapTable.writeLockOrDdlException(); try { if (olapTable.existTempPartitions()) { throw new DdlException("Can not alter table when there are temp partitions in table"); @@ -744,7 +744,7 @@ public class MaterializedViewHandler extends AlterHandler { public void processDropMaterializedView(DropMaterializedViewStmt dropMaterializedViewStmt, Database db, OlapTable olapTable) throws DdlException, MetaNotFoundException { - olapTable.writeLock(); + olapTable.writeLockOrDdlException(); try { // check table state if (olapTable.getState() != OlapTableState.NORMAL) { @@ -881,7 +881,7 @@ public class MaterializedViewHandler extends AlterHandler { try { Database db = Catalog.getCurrentCatalog().getDbOrMetaException(dbId); OlapTable olapTable = db.getTableOrMetaException(tableId, Table.TableType.OLAP); - olapTable.writeLock(); + olapTable.writeLockOrMetaException(); try { if (olapTable.getState() == olapTableState) { return; diff --git a/fe/fe-core/src/main/java/org/apache/doris/alter/RollupJob.java b/fe/fe-core/src/main/java/org/apache/doris/alter/RollupJob.java index 9b83815..1ab6cd2 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/alter/RollupJob.java +++ b/fe/fe-core/src/main/java/org/apache/doris/alter/RollupJob.java @@ -643,7 +643,10 @@ public class RollupJob extends AlterJob { return -1; } - olapTable.writeLock(); + if (!olapTable.writeLockIfExist()) { + LOG.warn("unknown table, tableName=" + olapTable.getName()); + return -1; + } try { // if all previous transaction has finished, then check base and rollup replica num synchronized (this) { diff --git a/fe/fe-core/src/main/java/org/apache/doris/alter/RollupJobV2.java b/fe/fe-core/src/main/java/org/apache/doris/alter/RollupJobV2.java index 28fbd79..e074b3a 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/alter/RollupJobV2.java +++ b/fe/fe-core/src/main/java/org/apache/doris/alter/RollupJobV2.java @@ -281,7 +281,7 @@ public class RollupJobV2 extends AlterJobV2 implements GsonPostProcessable { // create all rollup replicas success. // add rollup index to catalog - tbl.writeLock(); + tbl.writeLockOrAlterCancelException(); try { Preconditions.checkState(tbl.getState() == OlapTableState.ROLLUP); addRollupIndexToCatalog(tbl); @@ -427,7 +427,7 @@ public class RollupJobV2 extends AlterJobV2 implements GsonPostProcessable { * all tasks are finished. check the integrity. * we just check whether all rollup replicas are healthy. */ - tbl.writeLock(); + tbl.writeLockOrAlterCancelException(); try { Preconditions.checkState(tbl.getState() == OlapTableState.ROLLUP); for (Map.Entry<Long, MaterializedIndex> entry : this.partitionIdToRollupIndex.entrySet()) { diff --git a/fe/fe-core/src/main/java/org/apache/doris/alter/SchemaChangeHandler.java b/fe/fe-core/src/main/java/org/apache/doris/alter/SchemaChangeHandler.java index 27bfa69..465c333 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/alter/SchemaChangeHandler.java +++ b/fe/fe-core/src/main/java/org/apache/doris/alter/SchemaChangeHandler.java @@ -1611,7 +1611,7 @@ public class SchemaChangeHandler extends AlterHandler { @Override public void process(List<AlterClause> alterClauses, String clusterName, Database db, OlapTable olapTable) throws UserException { - olapTable.writeLock(); + olapTable.writeLockOrDdlException(); try { // index id -> index schema Map<Long, LinkedList<Column>> indexSchemaMap = new HashMap<>(); @@ -1709,7 +1709,7 @@ public class SchemaChangeHandler extends AlterHandler { @Override public void processExternalTable(List<AlterClause> alterClauses, Database db, Table externalTable) throws UserException { - externalTable.writeLock(); + externalTable.writeLockOrDdlException(); try { // copy the external table schema columns List<Column> newSchema = Lists.newArrayList(); @@ -1790,7 +1790,7 @@ public class SchemaChangeHandler extends AlterHandler { updatePartitionInMemoryMeta(db, olapTable.getName(), partition.getName(), isInMemory); } - olapTable.writeLock(); + olapTable.writeLockOrDdlException(); try { Catalog.getCurrentCatalog().modifyTableInMemoryMeta(db, olapTable, properties); } finally { @@ -1918,7 +1918,7 @@ public class SchemaChangeHandler extends AlterHandler { AlterJobV2 schemaChangeJobV2 = null; OlapTable olapTable = db.getOlapTableOrDdlException(tableName); - olapTable.writeLock(); + olapTable.writeLockOrDdlException(); try { if (olapTable.getState() != OlapTableState.SCHEMA_CHANGE && olapTable.getState() != OlapTableState.WAITING_STABLE) { diff --git a/fe/fe-core/src/main/java/org/apache/doris/alter/SchemaChangeJob.java b/fe/fe-core/src/main/java/org/apache/doris/alter/SchemaChangeJob.java index f781d65..f2df913 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/alter/SchemaChangeJob.java +++ b/fe/fe-core/src/main/java/org/apache/doris/alter/SchemaChangeJob.java @@ -584,15 +584,9 @@ public class SchemaChangeJob extends AlterJob { long replicaId = schemaChangeTask.getReplicaId(); // update replica's info - OlapTable olapTable; - try { - Database db = Catalog.getCurrentCatalog().getDbOrMetaException(dbId); - olapTable = db.getTableOrMetaException(tableId, Table.TableType.OLAP); - } catch (MetaNotFoundException e) { - LOG.warn(e.getMessage()); - return; - } - olapTable.writeLock(); + Database db = Catalog.getCurrentCatalog().getDbOrMetaException(dbId); + OlapTable olapTable = db.getTableOrMetaException(tableId, Table.TableType.OLAP); + olapTable.writeLockOrMetaException(); try { Preconditions.checkState(olapTable.getState() == OlapTableState.SCHEMA_CHANGE); @@ -668,12 +662,11 @@ public class SchemaChangeJob extends AlterJob { OlapTable olapTable; try { olapTable = db.getTableOrMetaException(tableId, Table.TableType.OLAP); + olapTable.writeLockOrMetaException(); } catch (MetaNotFoundException e) { LOG.warn(e.getMessage()); return -1; } - - olapTable.writeLock(); try { synchronized (this) { boolean hasUnfinishedPartition = false; diff --git a/fe/fe-core/src/main/java/org/apache/doris/alter/SchemaChangeJobV2.java b/fe/fe-core/src/main/java/org/apache/doris/alter/SchemaChangeJobV2.java index 1ad1493..2d77da2 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/alter/SchemaChangeJobV2.java +++ b/fe/fe-core/src/main/java/org/apache/doris/alter/SchemaChangeJobV2.java @@ -307,7 +307,7 @@ public class SchemaChangeJobV2 extends AlterJobV2 { // create all replicas success. // add all shadow indexes to catalog - tbl.writeLock(); + tbl.writeLockOrAlterCancelException(); try { Preconditions.checkState(tbl.getState() == OlapTableState.SCHEMA_CHANGE); addShadowIndexToCatalog(tbl); @@ -465,7 +465,7 @@ public class SchemaChangeJobV2 extends AlterJobV2 { * all tasks are finished. check the integrity. * we just check whether all new replicas are healthy. */ - tbl.writeLock(); + tbl.writeLockOrAlterCancelException(); try { Preconditions.checkState(tbl.getState() == OlapTableState.SCHEMA_CHANGE); diff --git a/fe/fe-core/src/main/java/org/apache/doris/backup/RestoreJob.java b/fe/fe-core/src/main/java/org/apache/doris/backup/RestoreJob.java index 8e92395..7614200 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/backup/RestoreJob.java +++ b/fe/fe-core/src/main/java/org/apache/doris/backup/RestoreJob.java @@ -445,7 +445,9 @@ public class RestoreJob extends AbstractJob { } OlapTable olapTbl = (OlapTable) tbl; - olapTbl.writeLock(); + if (!olapTbl.writeLockIfExist()) { + continue; + } try { if (olapTbl.getState() != OlapTableState.NORMAL) { status = new Status(ErrCode.COMMON_ERROR, @@ -783,7 +785,11 @@ public class RestoreJob extends AbstractJob { // add restored tables for (Table tbl : restoredTbls) { - db.writeLock(); + if (!db.writeLockIfExist()) { + status = new Status(ErrCode.COMMON_ERROR, "Database " + db.getFullName() + + " has been dropped"); + return; + } try { if (!db.createTable(tbl)) { status = new Status(ErrCode.COMMON_ERROR, "Table " + tbl.getName() @@ -1387,7 +1393,9 @@ public class RestoreJob extends AbstractJob { continue; } OlapTable olapTbl = (OlapTable) tbl; - tbl.writeLock(); + if (!tbl.writeLockIfExist()) { + continue; + } try { Map<Long, Pair<Long, Long>> map = restoredVersionInfo.rowMap().get(tblId); for (Map.Entry<Long, Pair<Long, Long>> entry : map.entrySet()) { @@ -1576,7 +1584,9 @@ public class RestoreJob extends AbstractJob { } LOG.info("remove restored partition in table {} when cancelled: {}", restoreTbl.getName(), entry.second.getName()); - restoreTbl.writeLock(); + if (!restoreTbl.writeLockIfExist()) { + continue; + } try { restoreTbl.dropPartition(dbId, entry.second.getName(), true /* force drop */); } finally { @@ -1625,7 +1635,9 @@ public class RestoreJob extends AbstractJob { } OlapTable olapTbl = (OlapTable) tbl; - tbl.writeLock(); + if (!tbl.writeLockIfExist()) { + continue; + } try { if (olapTbl.getState() == OlapTableState.RESTORE || olapTbl.getState() == OlapTableState.RESTORE_WITH_LOAD) { diff --git a/fe/fe-core/src/main/java/org/apache/doris/catalog/Catalog.java b/fe/fe-core/src/main/java/org/apache/doris/catalog/Catalog.java index e12b8a2..ca8fbfd 100755 --- a/fe/fe-core/src/main/java/org/apache/doris/catalog/Catalog.java +++ b/fe/fe-core/src/main/java/org/apache/doris/catalog/Catalog.java @@ -2734,7 +2734,26 @@ public class Catalog { // save table names for recycling Set<String> tableNames = db.getTableNamesWithLock(); - unprotectDropDb(db, stmt.isForceDrop(), false); + List<Table> tableList = db.getTables(); + MetaLockUtils.writeLockTables(tableList); + try { + if (!stmt.isForceDrop()) { + for (Table table : tableList) { + if (table.getType() == TableType.OLAP) { + OlapTable olapTable = (OlapTable) table; + if (olapTable.getState() != OlapTableState.NORMAL) { + throw new DdlException("The table [" + olapTable.getState() + "]'s state is " + olapTable.getState() + ", cannot be dropped." + + " please cancel the operation on olap table firstly. If you want to forcibly drop(cannot be recovered)," + + " please use \"DROP table FORCE\"."); + } + } + } + } + unprotectDropDb(db, tableList, stmt.isForceDrop(), false); + } finally { + MetaLockUtils.writeUnlockTables(tableList); + } + if (!stmt.isForceDrop()) { Catalog.getCurrentRecycleBin().recycleDatabase(db, tableNames); } else { @@ -2758,15 +2777,11 @@ public class Catalog { LOG.info("finish drop database[{}], is force : {}", dbName, stmt.isForceDrop()); } - public void unprotectDropDb(Database db, boolean isForeDrop, boolean isReplay) { - for (Table table : db.getTables()) { - table.writeLock(); - try { - unprotectDropTable(db, table, isForeDrop, isReplay); - } finally { - table.writeUnlock(); - } + public void unprotectDropDb(Database db, List<Table> tableList, boolean isForeDrop, boolean isReplay) { + for (Table table : tableList) { + unprotectDropTable(db, table, isForeDrop, isReplay); } + db.markDropped(); } public void replayDropLinkDb(DropLinkDbAndUpdateDbInfo info) { @@ -2792,7 +2807,13 @@ public class Catalog { db.writeLock(); try { Set<String> tableNames = db.getTableNamesWithLock(); - unprotectDropDb(db, isForceDrop, true); + List<Table> tableList = db.getTables(); + MetaLockUtils.writeLockTables(tableList); + try { + unprotectDropDb(db, tableList, isForceDrop, true); + } finally { + MetaLockUtils.writeUnlockTables(tableList); + } if (!isForceDrop) { Catalog.getCurrentRecycleBin().recycleDatabase(db, tableNames); } else { @@ -2838,6 +2859,10 @@ public class Catalog { // log RecoverInfo recoverInfo = new RecoverInfo(db.getId(), -1L, -1L); editLog.logRecoverDb(recoverInfo); + for (Table table : db.getTables()) { + table.unmarkDropped(); + } + db.unmarkDropped(); } finally { unlock(); } @@ -2850,7 +2875,7 @@ public class Catalog { String tableName = recoverStmt.getTableName(); Database db = this.getDbOrDdlException(dbName); - db.writeLock(); + db.writeLockOrDdlException(); try { if (db.getTable(tableName).isPresent()) { ErrorReport.reportDdlException(ErrorCode.ERR_TABLE_EXISTS_ERROR, tableName); @@ -2869,7 +2894,7 @@ public class Catalog { Database db = this.getDbOrDdlException(dbName); OlapTable olapTable = db.getOlapTableOrDdlException(tableName); - olapTable.writeLock(); + olapTable.writeLockOrDdlException(); try { String partitionName = recoverStmt.getPartitionName(); if (olapTable.getPartition(partitionName) != null) { @@ -2899,24 +2924,33 @@ public class Catalog { public void alterDatabaseQuota(AlterDatabaseQuotaStmt stmt) throws DdlException { String dbName = stmt.getDbName(); Database db = this.getDbOrDdlException(dbName); - QuotaType quotaType = stmt.getQuotaType(); - if (quotaType == QuotaType.DATA) { - db.setDataQuotaWithLock(stmt.getQuota()); - } else if (quotaType == QuotaType.REPLICA) { - db.setReplicaQuotaWithLock(stmt.getQuota()); + db.writeLockOrDdlException(); + try { + if (quotaType == QuotaType.DATA) { + db.setDataQuota(stmt.getQuota()); + } else if (quotaType == QuotaType.REPLICA) { + db.setReplicaQuota(stmt.getQuota()); + } + long quota = stmt.getQuota(); + DatabaseInfo dbInfo = new DatabaseInfo(dbName, "", quota, quotaType); + editLog.logAlterDb(dbInfo); + } finally { + db.writeUnlock(); } - long quota = stmt.getQuota(); - DatabaseInfo dbInfo = new DatabaseInfo(dbName, "", quota, quotaType); - editLog.logAlterDb(dbInfo); } public void replayAlterDatabaseQuota(String dbName, long quota, QuotaType quotaType) throws MetaNotFoundException { Database db = this.getDbOrMetaException(dbName); - if (quotaType == QuotaType.DATA) { - db.setDataQuotaWithLock(quota); - } else if (quotaType == QuotaType.REPLICA) { - db.setReplicaQuotaWithLock(quota); + db.writeLock(); + try { + if (quotaType == QuotaType.DATA) { + db.setDataQuota(quota); + } else if (quotaType == QuotaType.REPLICA) { + db.setReplicaQuota(quota); + } + } finally { + db.writeUnlock(); } } @@ -3268,8 +3302,7 @@ public class Catalog { // check again table = db.getOlapTableOrDdlException(tableName); - - table.writeLock(); + table.writeLockOrDdlException(); try { olapTable = (OlapTable) table; if (olapTable.getState() != OlapTableState.NORMAL) { @@ -4378,10 +4411,13 @@ public class Catalog { } } - public void replayCreateTable(String dbName, Table table) { + public void replayCreateTable(String dbName, Table table) throws MetaNotFoundException { Database db = this.fullNameToDb.get(dbName); - db.createTableWithLock(table, true, false); - + try { + db.createTableWithLock(table, true, false); + } catch (DdlException e) { + throw new MetaNotFoundException(e.getMessage()); + } if (!isCheckpointThread()) { // add to inverted index if (table.getType() == TableType.OLAP) { @@ -4508,7 +4544,7 @@ public class Catalog { // check database Database db = this.getDbOrDdlException(dbName); - db.writeLock(); + db.writeLockOrDdlException(); try { Table table = db.getTableNullable(tableName); if (table == null) { @@ -4569,6 +4605,7 @@ public class Catalog { } db.dropTable(table.getName()); + table.markDropped(); if (!isForceDrop) { Catalog.getCurrentRecycleBin().recycleTable(db.getId(), table); } else { @@ -4906,7 +4943,7 @@ public class Catalog { OlapTable olapTable = (OlapTable) table; // use try lock to avoid blocking a long time. // if block too long, backend report rpc will timeout. - if (!olapTable.tryWriteLock(Table.TRY_LOCK_TIMEOUT_MS, TimeUnit.MILLISECONDS)) { + if (!olapTable.tryWriteLockIfExist(Table.TRY_LOCK_TIMEOUT_MS, TimeUnit.MILLISECONDS)) { LOG.warn("try get table {} writelock but failed when checking backend storage medium", table.getName()); continue; } @@ -5254,42 +5291,45 @@ public class Catalog { // entry of rename table operation public void renameTable(Database db, Table table, TableRenameClause tableRenameClause) throws DdlException { - db.writeLock(); - table.writeLock(); + db.writeLockOrDdlException(); try { - if (table instanceof OlapTable) { - OlapTable olapTable = (OlapTable) table; - if (olapTable.getState() != OlapTableState.NORMAL) { - throw new DdlException("Table[" + olapTable.getName() + "] is under " + olapTable.getState()); + table.writeLockOrDdlException(); + try { + if (table instanceof OlapTable) { + OlapTable olapTable = (OlapTable) table; + if (olapTable.getState() != OlapTableState.NORMAL) { + throw new DdlException("Table[" + olapTable.getName() + "] is under " + olapTable.getState()); + } } - } - String oldTableName = table.getName(); - String newTableName = tableRenameClause.getNewTableName(); - if (oldTableName.equals(newTableName)) { - throw new DdlException("Same table name"); - } + String oldTableName = table.getName(); + String newTableName = tableRenameClause.getNewTableName(); + if (oldTableName.equals(newTableName)) { + throw new DdlException("Same table name"); + } - // check if name is already used - if (db.getTable(newTableName).isPresent()) { - throw new DdlException("Table name[" + newTableName + "] is already used"); - } + // check if name is already used + if (db.getTable(newTableName).isPresent()) { + throw new DdlException("Table name[" + newTableName + "] is already used"); + } - if (table.getType() == TableType.OLAP) { - // olap table should also check if any rollup has same name as "newTableName" - ((OlapTable) table).checkAndSetName(newTableName, false); - } else { - table.setName(newTableName); - } + if (table.getType() == TableType.OLAP) { + // olap table should also check if any rollup has same name as "newTableName" + ((OlapTable) table).checkAndSetName(newTableName, false); + } else { + table.setName(newTableName); + } - db.dropTable(oldTableName); - db.createTable(table); + db.dropTable(oldTableName); + db.createTable(table); - TableInfo tableInfo = TableInfo.createForTableRename(db.getId(), table.getId(), newTableName); - editLog.logTableRename(tableInfo); - LOG.info("rename table[{}] to {}", oldTableName, newTableName); + TableInfo tableInfo = TableInfo.createForTableRename(db.getId(), table.getId(), newTableName); + editLog.logTableRename(tableInfo); + LOG.info("rename table[{}] to {}", oldTableName, newTableName); + } finally { + table.writeUnlock(); + } } finally { - table.writeUnlock(); db.writeUnlock(); } } @@ -5424,7 +5464,7 @@ public class Catalog { } public void renameRollup(Database db, OlapTable table, RollupRenameClause renameClause) throws DdlException { - table.writeLock(); + table.writeLockOrDdlException(); try { if (table.getState() != OlapTableState.NORMAL) { throw new DdlException("Table[" + table.getName() + "] is under " + table.getState()); @@ -5485,7 +5525,7 @@ public class Catalog { } public void renamePartition(Database db, OlapTable table, PartitionRenameClause renameClause) throws DdlException { - table.writeLock(); + table.writeLockOrDdlException(); try { if (table.getState() != OlapTableState.NORMAL) { throw new DdlException("Table[" + table.getName() + "] is under " + table.getState()); @@ -5696,8 +5736,7 @@ public class Catalog { } public void modifyDefaultDistributionBucketNum(Database db, OlapTable olapTable, ModifyDistributionClause modifyDistributionClause) throws DdlException { - olapTable.writeLock(); - + olapTable.writeLockOrDdlException(); try { if (olapTable.isColocateTable()) { throw new DdlException("Cannot change default bucket number of colocate table."); @@ -6723,7 +6762,7 @@ public class Catalog { // before replacing, we need to check again. // Things may be changed outside the table lock. olapTable = (OlapTable) db.getTableOrDdlException(copiedTbl.getId()); - olapTable.writeLock(); + olapTable.writeLockOrDdlException(); try { if (olapTable.getState() != OlapTableState.NORMAL) { throw new DdlException("Table' state is not NORMAL: " + olapTable.getState()); @@ -6925,7 +6964,7 @@ public class Catalog { // Convert table's distribution type from random to hash. // random distribution is no longer supported. public void convertDistributionType(Database db, OlapTable tbl) throws DdlException { - tbl.writeLock(); + tbl.writeLockOrDdlException(); try { if (!tbl.convertRandomDistributionToHashDistribution()) { throw new DdlException("Table " + tbl.getName() + " is not random distributed"); @@ -7071,7 +7110,7 @@ public class Catalog { } Database db = this.getDbOrMetaException(meta.getDbId()); Table table = db.getTableOrMetaException(meta.getTableId()); - table.writeLock(); + table.writeLockOrMetaException(); try { Replica replica = tabletInvertedIndex.getReplica(tabletId, backendId); if (replica == null) { diff --git a/fe/fe-core/src/main/java/org/apache/doris/catalog/CatalogRecycleBin.java b/fe/fe-core/src/main/java/org/apache/doris/catalog/CatalogRecycleBin.java index 5d5e903..af414ff 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/catalog/CatalogRecycleBin.java +++ b/fe/fe-core/src/main/java/org/apache/doris/catalog/CatalogRecycleBin.java @@ -376,6 +376,7 @@ public class CatalogRecycleBin extends MasterDaemon implements Writable { RecoverInfo recoverInfo = new RecoverInfo(dbId, table.getId(), -1L); Catalog.getCurrentCatalog().getEditLog().logRecoverTable(recoverInfo); LOG.info("recover db[{}] with table[{}]: {}", dbId, table.getId(), table.getName()); + table.unmarkDropped(); return true; } diff --git a/fe/fe-core/src/main/java/org/apache/doris/catalog/Database.java b/fe/fe-core/src/main/java/org/apache/doris/catalog/Database.java index 621703f..494ca89 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/catalog/Database.java +++ b/fe/fe-core/src/main/java/org/apache/doris/catalog/Database.java @@ -100,6 +100,8 @@ public class Database extends MetaObject implements Writable { private volatile long replicaQuotaSize; + private volatile boolean isDropped; + public enum DbState { NORMAL, LINK, MOVE } @@ -131,6 +133,14 @@ public class Database extends MetaObject implements Writable { this.dbEncryptKey = new DatabaseEncryptKey(); } + public void markDropped() { + isDropped = true; + } + + public void unmarkDropped() { + isDropped = false; + } + public void readLock() { this.rwLock.readLock().lock(); } @@ -156,10 +166,26 @@ public class Database extends MetaObject implements Writable { } } - public boolean isWriteLockHeldByCurrentThread() { - return this.rwLock.writeLock().isHeldByCurrentThread(); + public boolean writeLockIfExist() { + if (!isDropped) { + this.rwLock.writeLock().lock(); + return true; + } + return false; + } + + public <E extends Exception> void writeLockOrException(E e) throws E { + writeLock(); + if (isDropped) { + writeUnlock(); + throw e; + } } + public void writeLockOrDdlException() throws DdlException { + writeLockOrException(new DdlException("unknown db, dbName=" + fullQualifiedName)); + } + public long getId() { return id; } @@ -177,26 +203,16 @@ public class Database extends MetaObject implements Writable { } } - public void setDataQuotaWithLock(long newQuota) { + public void setDataQuota(long newQuota) { Preconditions.checkArgument(newQuota >= 0L); LOG.info("database[{}] set quota from {} to {}", fullQualifiedName, dataQuotaBytes, newQuota); - writeLock(); - try { - this.dataQuotaBytes = newQuota; - } finally { - writeUnlock(); - } + this.dataQuotaBytes = newQuota; } - public void setReplicaQuotaWithLock(long newQuota) { + public void setReplicaQuota(long newQuota) { Preconditions.checkArgument(newQuota >= 0L); LOG.info("database[{}] set replica quota from {} to {}", fullQualifiedName, replicaQuotaSize, newQuota); - writeLock(); - try { - this.replicaQuotaSize = newQuota; - } finally { - writeUnlock(); - } + this.replicaQuotaSize = newQuota; } public long getDataQuota() { @@ -303,12 +319,12 @@ public class Database extends MetaObject implements Writable { } // return pair <success?, table exist?> - public Pair<Boolean, Boolean> createTableWithLock(Table table, boolean isReplay, boolean setIfNotExist) { + public Pair<Boolean, Boolean> createTableWithLock(Table table, boolean isReplay, boolean setIfNotExist) throws DdlException { boolean result = true; // if a table is already exists, then edit log won't be executed // some caller of this method may need to know this message boolean isTableExist = false; - writeLock(); + writeLockOrDdlException(); try { String tableName = table.getName(); if (Catalog.isStoredTableNamesLowerCase()) { diff --git a/fe/fe-core/src/main/java/org/apache/doris/catalog/Table.java b/fe/fe-core/src/main/java/org/apache/doris/catalog/Table.java index c2b349f..cf5d577 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/catalog/Table.java +++ b/fe/fe-core/src/main/java/org/apache/doris/catalog/Table.java @@ -17,8 +17,8 @@ package org.apache.doris.catalog; +import org.apache.doris.alter.AlterCancelException; import org.apache.doris.analysis.CreateTableStmt; -import org.apache.doris.common.AnalysisException; import org.apache.doris.common.DdlException; import org.apache.doris.common.FeMetaVersion; import org.apache.doris.common.MetaNotFoundException; @@ -139,6 +139,14 @@ public class Table extends MetaObject implements Writable { this.createTime = Instant.now().getEpochSecond(); } + public void markDropped() { + isDropped = true; + } + + public void unmarkDropped() { + isDropped = false; + } + public void readLock() { this.rwLock.readLock().lock(); } @@ -201,22 +209,14 @@ public class Table extends MetaObject implements Writable { writeLockOrException(new MetaNotFoundException("unknown table, tableName=" + name)); } - public void writeLockOrAnalysisException() throws AnalysisException { - writeLockOrException(new AnalysisException("unknown table, tableName=" + name)); - } - - public boolean tryWriteLockOrDdlException(long timeout, TimeUnit unit) throws DdlException { - return tryWriteLockOrException(timeout, unit, new DdlException("unknown table, tableName=" + name)); + public void writeLockOrAlterCancelException() throws AlterCancelException { + writeLockOrException(new AlterCancelException("unknown table, tableName=" + name)); } public boolean tryWriteLockOrMetaException(long timeout, TimeUnit unit) throws MetaNotFoundException { return tryWriteLockOrException(timeout, unit, new MetaNotFoundException("unknown table, tableName=" + name)); } - public boolean tryWriteLockOrAnalysisException(long timeout, TimeUnit unit) throws AnalysisException { - return tryWriteLockOrException(timeout, unit, new AnalysisException("unknown table, tableName=" + name)); - } - public <E extends Exception> boolean tryWriteLockOrException(long timeout, TimeUnit unit, E e) throws E { if (tryWriteLock(timeout, unit)) { if (isDropped) { @@ -228,6 +228,17 @@ public class Table extends MetaObject implements Writable { return false; } + public boolean tryWriteLockIfExist(long timeout, TimeUnit unit) { + if (tryWriteLock(timeout, unit)) { + if (isDropped) { + writeUnlock(); + return false; + } + return true; + } + return false; + } + public boolean isTypeRead() { return isTypeRead; } diff --git a/fe/fe-core/src/main/java/org/apache/doris/catalog/TabletStatMgr.java b/fe/fe-core/src/main/java/org/apache/doris/catalog/TabletStatMgr.java index ed02c39..9755822 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/catalog/TabletStatMgr.java +++ b/fe/fe-core/src/main/java/org/apache/doris/catalog/TabletStatMgr.java @@ -91,9 +91,10 @@ public class TabletStatMgr extends MasterDaemon { if (table.getType() != TableType.OLAP) { continue; } - OlapTable olapTable = (OlapTable) table; - table.writeLock(); + if (!table.writeLockIfExist()) { + continue; + } try { for (Partition partition : olapTable.getAllPartitions()) { long version = partition.getVisibleVersion(); diff --git a/fe/fe-core/src/main/java/org/apache/doris/clone/DynamicPartitionScheduler.java b/fe/fe-core/src/main/java/org/apache/doris/clone/DynamicPartitionScheduler.java index 62905a7..5b43573 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/clone/DynamicPartitionScheduler.java +++ b/fe/fe-core/src/main/java/org/apache/doris/clone/DynamicPartitionScheduler.java @@ -363,7 +363,9 @@ public class DynamicPartitionScheduler extends MasterDaemon { } for (DropPartitionClause dropPartitionClause : dropPartitionClauses) { - olapTable.writeLock(); + if (!olapTable.writeLockIfExist()) { + continue; + } try { Catalog.getCurrentCatalog().dropPartition(db, olapTable, dropPartitionClause); clearDropPartitionFailedMsg(olapTable.getId()); diff --git a/fe/fe-core/src/main/java/org/apache/doris/clone/TabletSchedCtx.java b/fe/fe-core/src/main/java/org/apache/doris/clone/TabletSchedCtx.java index 18e66b1..391b929 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/clone/TabletSchedCtx.java +++ b/fe/fe-core/src/main/java/org/apache/doris/clone/TabletSchedCtx.java @@ -685,8 +685,7 @@ public class TabletSchedCtx implements Comparable<TabletSchedCtx> { Database db = Catalog.getCurrentCatalog().getDbNullable(dbId); if (db != null) { Table table = db.getTableNullable(tblId); - if (table != null) { - table.writeLock(); + if (table != null && table.writeLockIfExist()) { try { List<Replica> cloneReplicas = Lists.newArrayList(); tablet.getReplicas().stream().filter(r -> r.getState() == ReplicaState.CLONE).forEach(r -> { @@ -840,10 +839,9 @@ public class TabletSchedCtx implements Comparable<TabletSchedCtx> { } // 1. check the tablet status first - Database db = Catalog.getCurrentCatalog().getDbOrException(dbId, s -> new SchedException(Status.UNRECOVERABLE, "db does not exist")); - OlapTable olapTable = (OlapTable) db.getTableOrException(tblId, s -> new SchedException(Status.UNRECOVERABLE, "tbl does not exist")); - - olapTable.writeLock(); + Database db = Catalog.getCurrentCatalog().getDbOrException(dbId, s -> new SchedException(Status.UNRECOVERABLE, "db " + dbId + " does not exist")); + OlapTable olapTable = (OlapTable) db.getTableOrException(tblId, s -> new SchedException(Status.UNRECOVERABLE, "tbl " + tabletId + " does not exist")); + olapTable.writeLockOrException(new SchedException(Status.UNRECOVERABLE, "table " + olapTable.getName() + " does not exist")); try { Partition partition = olapTable.getPartition(partitionId); if (partition == null) { diff --git a/fe/fe-core/src/main/java/org/apache/doris/clone/TabletScheduler.java b/fe/fe-core/src/main/java/org/apache/doris/clone/TabletScheduler.java index 907c917..f591112 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/clone/TabletScheduler.java +++ b/fe/fe-core/src/main/java/org/apache/doris/clone/TabletScheduler.java @@ -460,10 +460,11 @@ public class TabletScheduler extends MasterDaemon { stat.counterTabletScheduled.incrementAndGet(); Pair<TabletStatus, TabletSchedCtx.Priority> statusPair; - // check this tablet again - Database db = catalog.getDbOrException(tabletCtx.getDbId(), s -> new SchedException(Status.UNRECOVERABLE, "db does not exist")); - OlapTable tbl = (OlapTable) db.getTableOrException(tabletCtx.getTblId(), s -> new SchedException(Status.UNRECOVERABLE, "tbl does not exist")); - tbl.writeLock(); + Database db = Catalog.getCurrentCatalog().getDbOrException(tabletCtx.getDbId(), + s -> new SchedException(Status.UNRECOVERABLE, "db " + tabletCtx.getDbId() + " does not exist")); + OlapTable tbl = (OlapTable) db.getTableOrException(tabletCtx.getTblId(), + s -> new SchedException(Status.UNRECOVERABLE, "tbl " + tabletCtx.getTblId() + " does not exist")); + tbl.writeLockOrException(new SchedException(Status.UNRECOVERABLE, "table " + tbl.getName() + " does not exist")); try { boolean isColocateTable = colocateTableIndex.isColocateTable(tbl.getId()); diff --git a/fe/fe-core/src/main/java/org/apache/doris/common/util/MetaLockUtils.java b/fe/fe-core/src/main/java/org/apache/doris/common/util/MetaLockUtils.java index ed0e06a..de5a3c2 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/common/util/MetaLockUtils.java +++ b/fe/fe-core/src/main/java/org/apache/doris/common/util/MetaLockUtils.java @@ -19,6 +19,7 @@ package org.apache.doris.common.util; import org.apache.doris.catalog.Database; import org.apache.doris.catalog.Table; +import org.apache.doris.common.MetaNotFoundException; import java.util.List; import java.util.concurrent.TimeUnit; @@ -60,9 +61,22 @@ public class MetaLockUtils { } } - public static boolean tryWriteLockTables(List<Table> tableList, long timeout, TimeUnit unit) { + public static void writeLockTablesOrMetaException(List<Table> tableList) throws MetaNotFoundException { for (int i = 0; i < tableList.size(); i++) { - if (!tableList.get(i).tryWriteLock(timeout, unit)) { + try { + tableList.get(i).writeLockOrMetaException(); + } catch (MetaNotFoundException e) { + for (int j = i - 1; j >= 0; j--) { + tableList.get(j).writeUnlock(); + } + throw e; + } + } + } + + public static boolean tryWriteLockTables(List<Table> tableList, long timeout, TimeUnit unit) throws MetaNotFoundException { + for (int i = 0; i < tableList.size(); i++) { + if (!tableList.get(i).tryWriteLockOrMetaException(timeout, unit)) { for (int j = i - 1; j >= 0; j--) { tableList.get(j).writeUnlock(); } diff --git a/fe/fe-core/src/main/java/org/apache/doris/consistency/CheckConsistencyJob.java b/fe/fe-core/src/main/java/org/apache/doris/consistency/CheckConsistencyJob.java index d0cbb49..4d32786 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/consistency/CheckConsistencyJob.java +++ b/fe/fe-core/src/main/java/org/apache/doris/consistency/CheckConsistencyJob.java @@ -215,7 +215,10 @@ public class CheckConsistencyJob { if (state != JobState.RUNNING) { // failed to send task. set tablet's checked version and version hash to avoid choosing it again - table.writeLock(); + if (!table.writeLockIfExist()) { + LOG.debug("table[{}] does not exist", tabletMeta.getTableId()); + return false; + } try { tablet.setCheckedVersion(checkedVersion, checkedVersionHash); } finally { @@ -261,11 +264,10 @@ public class CheckConsistencyJob { boolean isConsistent = true; Table table = db.getTableNullable(tabletMeta.getTableId()); - if (table == null) { + if (table == null || !table.writeLockIfExist()) { LOG.warn("table[{}] does not exist", tabletMeta.getTableId()); return -1; } - table.writeLock(); try { OlapTable olapTable = (OlapTable) table; diff --git a/fe/fe-core/src/main/java/org/apache/doris/http/rest/RowCountAction.java b/fe/fe-core/src/main/java/org/apache/doris/http/rest/RowCountAction.java index e134fc3..4648065 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/http/rest/RowCountAction.java +++ b/fe/fe-core/src/main/java/org/apache/doris/http/rest/RowCountAction.java @@ -74,7 +74,7 @@ public class RowCountAction extends RestBaseAction { Catalog catalog = Catalog.getCurrentCatalog(); Database db = catalog.getDbOrDdlException(dbName); OlapTable olapTable = db.getOlapTableOrDdlException(tableName); - olapTable.writeLock(); + olapTable.writeLockOrDdlException(); try { for (Partition partition : olapTable.getAllPartitions()) { long version = partition.getVisibleVersion(); diff --git a/fe/fe-core/src/main/java/org/apache/doris/http/rest/TableRowCountAction.java b/fe/fe-core/src/main/java/org/apache/doris/http/rest/TableRowCountAction.java index 3b75547..1b4a8ed 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/http/rest/TableRowCountAction.java +++ b/fe/fe-core/src/main/java/org/apache/doris/http/rest/TableRowCountAction.java @@ -84,13 +84,13 @@ public class TableRowCountAction extends RestBaseAction { throw new DorisHttpException(HttpResponseStatus.BAD_REQUEST, e.getMessage()); } - table.writeLock(); + table.readLock(); try { OlapTable olapTable = (OlapTable) table; resultMap.put("status", 200); resultMap.put("size", olapTable.proximateRowCount()); } finally { - table.writeUnlock(); + table.readUnlock(); } } catch (DorisHttpException e) { // status code should conforms to HTTP semantic diff --git a/fe/fe-core/src/main/java/org/apache/doris/journal/bdbje/BDBJEJournal.java b/fe/fe-core/src/main/java/org/apache/doris/journal/bdbje/BDBJEJournal.java index 967caad..10e1b8f 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/journal/bdbje/BDBJEJournal.java +++ b/fe/fe-core/src/main/java/org/apache/doris/journal/bdbje/BDBJEJournal.java @@ -155,8 +155,10 @@ public class BDBJEJournal implements Journal { // Parameter null means auto commit if (currentJournalDB.put(null, theKey, theData) == OperationStatus.SUCCESS) { writeSucceed = true; - LOG.debug("master write journal {} finished. db name {}, current time {}", - id, currentJournalDB.getDatabaseName(), System.currentTimeMillis()); + if (LOG.isDebugEnabled()) { + LOG.debug("master write journal {} finished. db name {}, current time {}", + id, currentJournalDB.getDatabaseName(), System.currentTimeMillis()); + } break; } } catch (DatabaseException e) { @@ -166,7 +168,6 @@ public class BDBJEJournal implements Journal { } catch (InterruptedException e1) { e1.printStackTrace(); } - continue; } } diff --git a/fe/fe-core/src/main/java/org/apache/doris/load/LoadChecker.java b/fe/fe-core/src/main/java/org/apache/doris/load/LoadChecker.java index 32f36f6..2e3588a 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/load/LoadChecker.java +++ b/fe/fe-core/src/main/java/org/apache/doris/load/LoadChecker.java @@ -330,7 +330,12 @@ public class LoadChecker extends MasterDaemon { // concurrent problems // table in tables are ordered. - MetaLockUtils.writeLockTables(tables); + try { + MetaLockUtils.writeLockTablesOrMetaException(tables); + } catch (UserException e) { + load.cancelLoadJob(job, CancelType.LOAD_RUN_FAIL, "table does not exist. dbId: " + job.getDbId() + ", err: " + e.getMessage()); + return; + } try { TabletInvertedIndex invertedIndex = Catalog.getCurrentInvertedIndex(); for (Replica replica : job.getFinishedReplicas()) { diff --git a/fe/fe-core/src/main/java/org/apache/doris/load/loadv2/BrokerLoadJob.java b/fe/fe-core/src/main/java/org/apache/doris/load/loadv2/BrokerLoadJob.java index afd153d..529293a 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/load/loadv2/BrokerLoadJob.java +++ b/fe/fe-core/src/main/java/org/apache/doris/load/loadv2/BrokerLoadJob.java @@ -275,6 +275,7 @@ public class BrokerLoadJob extends BulkLoadJob { try { db = getDb(); tableList = db.getTablesOnIdOrderOrThrowException(Lists.newArrayList(fileGroupAggInfo.getAllTableIds())); + MetaLockUtils.writeLockTablesOrMetaException(tableList); } catch (MetaNotFoundException e) { LOG.warn(new LogBuilder(LogKey.LOAD_JOB, id) .add("database_id", dbId) @@ -283,7 +284,6 @@ public class BrokerLoadJob extends BulkLoadJob { cancelJobWithoutCheck(new FailMsg(FailMsg.CancelType.LOAD_RUN_FAIL, e.getMessage()), true, true); return; } - MetaLockUtils.writeLockTables(tableList); try { LOG.info(new LogBuilder(LogKey.LOAD_JOB, id) .add("txn_id", transactionId) diff --git a/fe/fe-core/src/main/java/org/apache/doris/load/loadv2/SparkLoadJob.java b/fe/fe-core/src/main/java/org/apache/doris/load/loadv2/SparkLoadJob.java index cb38cbf..0667c1e 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/load/loadv2/SparkLoadJob.java +++ b/fe/fe-core/src/main/java/org/apache/doris/load/loadv2/SparkLoadJob.java @@ -625,7 +625,7 @@ public class SparkLoadJob extends BulkLoadJob { .build()); Database db = getDb(); List<Table> tableList = db.getTablesOnIdOrderOrThrowException(Lists.newArrayList(tableToLoadPartitions.keySet())); - MetaLockUtils.writeLockTables(tableList); + MetaLockUtils.writeLockTablesOrMetaException(tableList); try { Catalog.getCurrentGlobalTransactionMgr().commitTransaction( dbId, tableList, transactionId, commitInfos, diff --git a/fe/fe-core/src/main/java/org/apache/doris/master/MasterImpl.java b/fe/fe-core/src/main/java/org/apache/doris/master/MasterImpl.java index 55de348..fdf971e 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/master/MasterImpl.java +++ b/fe/fe-core/src/main/java/org/apache/doris/master/MasterImpl.java @@ -338,12 +338,11 @@ public class MasterImpl { LOG.debug("push report state: {}", pushState.name()); OlapTable olapTable = (OlapTable) db.getTableNullable(tableId); - if (olapTable == null) { + if (olapTable == null || !olapTable.writeLockIfExist()) { AgentTaskQueue.removeTask(backendId, TTaskType.REALTIME_PUSH, signature); LOG.warn("finish push replica error, cannot find table[" + tableId + "] when push finished"); return; } - olapTable.writeLock(); try { Partition partition = olapTable.getPartition(partitionId); if (partition == null) { @@ -552,12 +551,11 @@ public class MasterImpl { LOG.debug("push report state: {}", pushState.name()); OlapTable olapTable = (OlapTable) db.getTableNullable(tableId); - if (olapTable == null) { + if (olapTable == null || !olapTable.writeLockIfExist()) { AgentTaskQueue.removeTask(backendId, TTaskType.REALTIME_PUSH, signature); LOG.warn("finish push replica error, cannot find table[" + tableId + "] when push finished"); return; } - olapTable.writeLock(); try { Partition partition = olapTable.getPartition(partitionId); if (partition == null) { diff --git a/fe/fe-core/src/main/java/org/apache/doris/master/ReportHandler.java b/fe/fe-core/src/main/java/org/apache/doris/master/ReportHandler.java index df0e599..d676b88 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/master/ReportHandler.java +++ b/fe/fe-core/src/main/java/org/apache/doris/master/ReportHandler.java @@ -416,10 +416,9 @@ public class ReportHandler extends Daemon { long tabletId = tabletIds.get(i); long tableId = tabletMeta.getTableId(); OlapTable olapTable = (OlapTable) db.getTableNullable(tableId); - if (olapTable == null) { + if (olapTable == null || !olapTable.writeLockIfExist()) { continue; } - olapTable.writeLock(); try { long partitionId = tabletMeta.getPartitionId(); Partition partition = olapTable.getPartition(partitionId); @@ -545,10 +544,9 @@ public class ReportHandler extends Daemon { long tabletId = tabletIds.get(i); long tableId = tabletMeta.getTableId(); OlapTable olapTable = (OlapTable) db.getTableNullable(tableId); - if (olapTable == null) { + if (olapTable == null || !olapTable.writeLockIfExist()) { continue; } - olapTable.writeLock(); try { long partitionId = tabletMeta.getPartitionId(); Partition partition = olapTable.getPartition(partitionId); @@ -817,10 +815,9 @@ public class ReportHandler extends Daemon { long tabletId = tabletIds.get(i); long tableId = tabletMeta.getTableId(); OlapTable olapTable = (OlapTable) db.getTableNullable(tableId); - if (olapTable == null) { + if (olapTable == null || !olapTable.writeLockIfExist()) { continue; } - olapTable.writeLock(); try { long partitionId = tabletMeta.getPartitionId(); Partition partition = olapTable.getPartition(partitionId); @@ -939,7 +936,7 @@ public class ReportHandler extends Daemon { Database db = Catalog.getCurrentCatalog().getDbOrMetaException(dbId); OlapTable olapTable = db.getTableOrMetaException(tableId, Table.TableType.OLAP); - olapTable.writeLock(); + olapTable.writeLockOrMetaException(); try { Partition partition = olapTable.getPartition(partitionId); if (partition == null) { diff --git a/fe/fe-core/src/main/java/org/apache/doris/transaction/DatabaseTransactionMgr.java b/fe/fe-core/src/main/java/org/apache/doris/transaction/DatabaseTransactionMgr.java index cdb73f4..a3a8ca6 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/transaction/DatabaseTransactionMgr.java +++ b/fe/fe-core/src/main/java/org/apache/doris/transaction/DatabaseTransactionMgr.java @@ -718,7 +718,7 @@ public class DatabaseTransactionMgr { } List<Table> tableList = db.getTablesOnIdOrderOrThrowException(tableIdList); - MetaLockUtils.writeLockTables(tableList); + MetaLockUtils.writeLockTablesOrMetaException(tableList); try { boolean hasError = false; Iterator<TableCommitInfo> tableCommitInfoIterator = transactionState.getIdToTableCommitInfos().values().iterator(); diff --git a/fe/fe-core/src/test/java/org/apache/doris/catalog/InfoSchemaDbTest.java b/fe/fe-core/src/test/java/org/apache/doris/catalog/InfoSchemaDbTest.java index 0a808c9..422a7ac 100644 --- a/fe/fe-core/src/test/java/org/apache/doris/catalog/InfoSchemaDbTest.java +++ b/fe/fe-core/src/test/java/org/apache/doris/catalog/InfoSchemaDbTest.java @@ -17,6 +17,7 @@ package org.apache.doris.catalog; +import org.apache.doris.common.DdlException; import org.junit.Assert; import org.junit.Test; @@ -24,7 +25,7 @@ import java.io.IOException; public class InfoSchemaDbTest { @Test - public void testNormal() throws IOException { + public void testNormal() throws IOException, DdlException { Database db = new InfoSchemaDb(); Assert.assertFalse(db.createTable(null)); diff --git a/fe/fe-core/src/test/java/org/apache/doris/common/util/MetaLockUtilsTest.java b/fe/fe-core/src/test/java/org/apache/doris/common/util/MetaLockUtilsTest.java index 47fb1ff..7ce2d0a 100644 --- a/fe/fe-core/src/test/java/org/apache/doris/common/util/MetaLockUtilsTest.java +++ b/fe/fe-core/src/test/java/org/apache/doris/common/util/MetaLockUtilsTest.java @@ -20,6 +20,7 @@ package org.apache.doris.common.util; import com.google.common.collect.Lists; import org.apache.doris.catalog.Database; import org.apache.doris.catalog.Table; +import org.apache.doris.common.MetaNotFoundException; import org.junit.Assert; import org.junit.Test; @@ -55,7 +56,7 @@ public class MetaLockUtilsTest { } @Test - public void testWriteLockTables() { + public void testWriteLockTables() throws MetaNotFoundException { List<Table> tableList = Lists.newArrayList(new Table(Table.TableType.OLAP), new Table(Table.TableType.OLAP)); MetaLockUtils.writeLockTables(tableList); Assert.assertTrue(tableList.get(0).isWriteLockHeldByCurrentThread()); --------------------------------------------------------------------- To unsubscribe, e-mail: [email protected] For additional commands, e-mail: [email protected]
