abdullah alamoudi has uploaded a new change for review. https://asterix-gerrit.ics.uci.edu/1596
Change subject: Add Active Partition Event Message ...................................................................... Add Active Partition Event Message Enable active runtimes to send messages to the listener. In addition this change introduces extension locks to metadata lock manager. Change-Id: I7b4629752e912614927b816d4ce3422ac89c5426 --- M asterixdb/asterix-active/src/main/java/org/apache/asterix/active/message/ActivePartitionMessage.java M asterixdb/asterix-app/src/main/java/org/apache/asterix/app/translator/QueryTranslator.java M asterixdb/asterix-metadata/src/main/java/org/apache/asterix/metadata/entities/Dataset.java M asterixdb/asterix-metadata/src/main/java/org/apache/asterix/metadata/utils/MetadataLockManager.java 4 files changed, 87 insertions(+), 155 deletions(-) git pull ssh://asterix-gerrit.ics.uci.edu:29418/asterixdb refs/changes/96/1596/1 diff --git a/asterixdb/asterix-active/src/main/java/org/apache/asterix/active/message/ActivePartitionMessage.java b/asterixdb/asterix-active/src/main/java/org/apache/asterix/active/message/ActivePartitionMessage.java index e4a57e6..5f184b4 100644 --- a/asterixdb/asterix-active/src/main/java/org/apache/asterix/active/message/ActivePartitionMessage.java +++ b/asterixdb/asterix-active/src/main/java/org/apache/asterix/active/message/ActivePartitionMessage.java @@ -31,6 +31,7 @@ public static final byte ACTIVE_RUNTIME_REGISTERED = 0x00; public static final byte ACTIVE_RUNTIME_DEREGISTERED = 0x01; + public static final byte ACTIVE_RUNTIME_EVENT = 0x02; private static final long serialVersionUID = 1L; private final ActiveRuntimeId activeRuntimeId; private final JobId jobId; diff --git a/asterixdb/asterix-app/src/main/java/org/apache/asterix/app/translator/QueryTranslator.java b/asterixdb/asterix-app/src/main/java/org/apache/asterix/app/translator/QueryTranslator.java index e64cf14..42fb67b 100644 --- a/asterixdb/asterix-app/src/main/java/org/apache/asterix/app/translator/QueryTranslator.java +++ b/asterixdb/asterix-app/src/main/java/org/apache/asterix/app/translator/QueryTranslator.java @@ -208,9 +208,8 @@ protected final IStorageComponentProvider componentProvider; protected final ExecutorService executorService; - public QueryTranslator(List<Statement> statements, SessionConfig conf, - ILangCompilationProvider compliationProvider, IStorageComponentProvider componentProvider, - ExecutorService executorService) { + public QueryTranslator(List<Statement> statements, SessionConfig conf, ILangCompilationProvider compliationProvider, + IStorageComponentProvider componentProvider, ExecutorService executorService) { this.statements = statements; this.sessionConfig = conf; this.componentProvider = componentProvider; @@ -457,8 +456,9 @@ } } - protected void validateCompactionPolicy(String compactionPolicy, Map<String, String> compactionPolicyProperties, - MetadataTransactionContext mdTxnCtx, boolean isExternalDataset) throws CompilationException, Exception { + protected static void validateCompactionPolicy(String compactionPolicy, + Map<String, String> compactionPolicyProperties, MetadataTransactionContext mdTxnCtx, + boolean isExternalDataset) throws CompilationException, Exception { CompactionPolicy compactionPolicyEntity = MetadataManager.INSTANCE.getCompactionPolicy(mdTxnCtx, MetadataConstants.METADATA_DATAVERSE_NAME, compactionPolicy); if (compactionPolicyEntity == null) { @@ -534,8 +534,8 @@ if (dt == null) { throw new AlgebricksException(": type " + itemTypeName + " could not be found."); } - String ngName = - ngNameId != null ? ngNameId.getValue() : configureNodegroupForDataset(dd, dataverseName, mdTxnCtx); + String ngName = ngNameId != null ? ngNameId.getValue() + : configureNodegroupForDataset(dataverseName, datasetName, dd.getHints(), mdTxnCtx); if (compactionPolicy == null) { compactionPolicy = GlobalConfig.DEFAULT_COMPACTION_POLICY_NAME; @@ -585,8 +585,7 @@ break; case EXTERNAL: String adapter = ((ExternalDetailsDecl) dd.getDatasetDetailsDecl()).getAdapter(); - Map<String, String> properties = - ((ExternalDetailsDecl) dd.getDatasetDetailsDecl()).getProperties(); + Map<String, String> properties = ((ExternalDetailsDecl) dd.getDatasetDetailsDecl()).getProperties(); datasetDetails = new ExternalDatasetDetails(adapter, properties, new Date(), TransactionState.COMMIT); @@ -711,22 +710,22 @@ } } - protected String configureNodegroupForDataset(DatasetDecl dd, String dataverse, - MetadataTransactionContext mdTxnCtx) throws CompilationException { + protected static String configureNodegroupForDataset(String dataverse, String datasetName, + Map<String, String> hints, MetadataTransactionContext mdTxnCtx) throws CompilationException { int nodegroupCardinality; String nodegroupName; - String hintValue = dd.getHints().get(DatasetNodegroupCardinalityHint.NAME); + String hintValue = hints.get(DatasetNodegroupCardinalityHint.NAME); if (hintValue == null) { nodegroupName = MetadataConstants.METADATA_DEFAULT_NODEGROUP_NAME; return nodegroupName; } else { int numChosen = 0; boolean valid = DatasetHints.validate(DatasetNodegroupCardinalityHint.NAME, - dd.getHints().get(DatasetNodegroupCardinalityHint.NAME)).first; + hints.get(DatasetNodegroupCardinalityHint.NAME)).first; if (!valid) { throw new CompilationException("Incorrect use of hint:" + DatasetNodegroupCardinalityHint.NAME); } else { - nodegroupCardinality = Integer.parseInt(dd.getHints().get(DatasetNodegroupCardinalityHint.NAME)); + nodegroupCardinality = Integer.parseInt(hints.get(DatasetNodegroupCardinalityHint.NAME)); } List<String> nodeNames = AppContextInfo.INSTANCE.getMetadataProperties().getNodeNames(); List<String> nodeNamesClone = new ArrayList<>(nodeNames); @@ -753,7 +752,7 @@ b[selected] = temp; } } - nodegroupName = dataverse + ":" + dd.getName().getValue(); + nodegroupName = dataverse + ":" + datasetName; MetadataManager.INSTANCE.addNodegroup(mdTxnCtx, new NodeGroup(nodegroupName, selectedNodes)); return nodegroupName; } @@ -921,11 +920,10 @@ // Get snapshot from External File System externalFilesSnapshot = ExternalIndexingOperations.getSnapshotFromExternalFileSystem(ds); // Add an entry for the files index - filesIndex = - new Index(dataverseName, datasetName, IndexingConstants.getFilesIndexName(datasetName), - IndexType.BTREE, ExternalIndexingOperations.FILE_INDEX_FIELD_NAMES, null, - ExternalIndexingOperations.FILE_INDEX_FIELD_TYPES, false, false, - MetadataUtil.PENDING_ADD_OP); + filesIndex = new Index(dataverseName, datasetName, IndexingConstants.getFilesIndexName(datasetName), + IndexType.BTREE, ExternalIndexingOperations.FILE_INDEX_FIELD_NAMES, null, + ExternalIndexingOperations.FILE_INDEX_FIELD_TYPES, false, false, + MetadataUtil.PENDING_ADD_OP); MetadataManager.INSTANCE.addIndex(metadataProvider.getMetadataTxnContext(), filesIndex); // Add files to the external files index for (ExternalFile file : externalFilesSnapshot) { @@ -960,8 +958,8 @@ // #. add a new index with PendingAddOp index = new Index(dataverseName, datasetName, indexName, stmtCreateIndex.getIndexType(), indexFields, - keySourceIndicators, indexFieldTypes, stmtCreateIndex.getGramLength(), - stmtCreateIndex.isEnforced(), false, MetadataUtil.PENDING_ADD_OP); + keySourceIndicators, indexFieldTypes, stmtCreateIndex.getGramLength(), stmtCreateIndex.isEnforced(), + false, MetadataUtil.PENDING_ADD_OP); MetadataManager.INSTANCE.addIndex(metadataProvider.getMetadataTxnContext(), index); ARecordType enforcedType = null; @@ -1013,8 +1011,8 @@ // add another new files index with PendingNoOp after deleting the index with // PendingAddOp if (firstExternalDatasetIndex) { - MetadataManager.INSTANCE.dropIndex(metadataProvider.getMetadataTxnContext(), dataverseName, - datasetName, filesIndex.getIndexName()); + MetadataManager.INSTANCE.dropIndex(metadataProvider.getMetadataTxnContext(), dataverseName, datasetName, + filesIndex.getIndexName()); filesIndex.setPendingOp(MetadataUtil.PENDING_NO_OP); MetadataManager.INSTANCE.addIndex(metadataProvider.getMetadataTxnContext(), filesIndex); // update transaction timestamp @@ -1186,8 +1184,8 @@ stopFeedBeforeDelete(new Pair<>(dvId, new Identifier(activeEntityId.getEntityName())), metadataProvider); // prepare job to remove feed log storage - jobsToExecute.add(FeedOperations.buildRemoveFeedStorageJob(MetadataManager.INSTANCE - .getFeed(mdTxnCtx, dataverseName, activeEntityId.getEntityName()))); + jobsToExecute.add(FeedOperations.buildRemoveFeedStorageJob( + MetadataManager.INSTANCE.getFeed(mdTxnCtx, dataverseName, activeEntityId.getEntityName()))); } } @@ -1201,8 +1199,8 @@ MetadataManager.INSTANCE.getDatasetIndexes(mdTxnCtx, dataverseName, datasetName); for (int k = 0; k < indexes.size(); k++) { if (indexes.get(k).isSecondaryIndex()) { - jobsToExecute.add(IndexUtil.buildDropSecondaryIndexJobSpec(indexes.get(k), - metadataProvider, datasets.get(j))); + jobsToExecute.add(IndexUtil.buildDropSecondaryIndexJobSpec(indexes.get(k), metadataProvider, + datasets.get(j))); } } Index primaryIndex = @@ -1217,8 +1215,8 @@ jobsToExecute.add(ExternalIndexingOperations.buildDropFilesIndexJobSpec(metadataProvider, datasets.get(j))); } else { - jobsToExecute.add(IndexUtil.buildDropSecondaryIndexJobSpec(indexes.get(k), - metadataProvider, datasets.get(j))); + jobsToExecute.add(IndexUtil.buildDropSecondaryIndexJobSpec(indexes.get(k), metadataProvider, + datasets.get(j))); } } ExternalDatasetsRegistry.INSTANCE.removeDatasetInfo(datasets.get(j)); @@ -1311,6 +1309,11 @@ DropDatasetStatement stmtDelete = (DropDatasetStatement) stmt; String dataverseName = getActiveDataverse(stmtDelete.getDataverseName()); String datasetName = stmtDelete.getDatasetName().getValue(); + doDropDataset(dataverseName, datasetName, metadataProvider, stmtDelete.getIfExists(), hcc); + } + + public static void doDropDataset(String dataverseName, String datasetName, MetadataProvider metadataProvider, + boolean ifExists, IHyracksClientConnection hcc) throws Exception { MutableObject<ProgressState> progress = new MutableObject<>(ProgressState.NO_PROGRESS); MutableObject<MetadataTransactionContext> mdTxnCtx = new MutableObject<>(MetadataManager.INSTANCE.beginTransaction()); @@ -1321,12 +1324,12 @@ try { Dataset ds = MetadataManager.INSTANCE.getDataset(mdTxnCtx.getValue(), dataverseName, datasetName); if (ds == null) { - if (stmtDelete.getIfExists()) { + if (ifExists) { MetadataManager.INSTANCE.commitTransaction(mdTxnCtx.getValue()); return; } else { - throw new AlgebricksException("There is no dataset with this name " + datasetName - + " in dataverse " + dataverseName + "."); + throw new AlgebricksException("There is no dataset with this name " + datasetName + " in dataverse " + + dataverseName + "."); } } ds.drop(metadataProvider, mdTxnCtx, jobsToExecute, bActiveTxn, progress, hcc); @@ -1362,109 +1365,10 @@ + "." + datasetName + ") couldn't be removed from the metadata", e); } } - throw e; } finally { MetadataLockManager.INSTANCE.dropDatasetEnd(dataverseName, dataverseName + "." + datasetName); ExternalDatasetsRegistry.INSTANCE.releaseAcquiredLocks(metadataProvider); - } - } - - protected void doDropDataset(Dataset ds, String datasetName, MetadataProvider metadataProvider, - MutableObject<MetadataTransactionContext> mdTxnCtx, List<JobSpecification> jobsToExecute, - String dataverseName, MutableBoolean bActiveTxn, MutableObject<ProgressState> progress, - IHyracksClientConnection hcc) throws Exception { - Map<FeedConnectionId, Pair<JobSpecification, Boolean>> disconnectJobList = new HashMap<>(); - if (ds.getDatasetType() == DatasetType.INTERNAL) { - // prepare job spec(s) that would disconnect any active feeds involving the dataset. - IActiveEntityEventsListener[] activeListeners = ActiveJobNotificationHandler.INSTANCE.getEventListeners(); - for (IActiveEntityEventsListener listener : activeListeners) { - if (listener.isEntityUsingDataset(ds)) { - throw new CompilationException( - "Can't drop dataset since it is connected to active entity: " + listener.getEntityId()); - } - } - - // #. prepare jobs to drop the datatset and the indexes in NC - List<Index> indexes = - MetadataManager.INSTANCE.getDatasetIndexes(mdTxnCtx.getValue(), dataverseName, datasetName); - for (int j = 0; j < indexes.size(); j++) { - if (indexes.get(j).isSecondaryIndex()) { - jobsToExecute.add(IndexUtil.buildDropSecondaryIndexJobSpec(indexes.get(j), metadataProvider, ds)); - } - } - Index primaryIndex = - MetadataManager.INSTANCE.getIndex(mdTxnCtx.getValue(), dataverseName, datasetName, datasetName); - jobsToExecute.add(DatasetUtil.dropDatasetJobSpec(ds, primaryIndex, metadataProvider)); - // #. mark the existing dataset as PendingDropOp - MetadataManager.INSTANCE.dropDataset(mdTxnCtx.getValue(), dataverseName, datasetName); - MetadataManager.INSTANCE.addDataset(mdTxnCtx.getValue(), - new Dataset(dataverseName, datasetName, ds.getItemTypeDataverseName(), ds.getItemTypeName(), - ds.getMetaItemTypeDataverseName(), ds.getMetaItemTypeName(), ds.getNodeGroupName(), - ds.getCompactionPolicy(), ds.getCompactionPolicyProperties(), ds.getDatasetDetails(), - ds.getHints(), ds.getDatasetType(), ds.getDatasetId(), MetadataUtil.PENDING_DROP_OP)); - - MetadataManager.INSTANCE.commitTransaction(mdTxnCtx.getValue()); - bActiveTxn.setValue(false); - progress.setValue(ProgressState.ADDED_PENDINGOP_RECORD_TO_METADATA); - - // # disconnect the feeds - for (Pair<JobSpecification, Boolean> p : disconnectJobList.values()) { - JobUtils.runJob(hcc, p.first, true); - } - - // #. run the jobs - for (JobSpecification jobSpec : jobsToExecute) { - JobUtils.runJob(hcc, jobSpec, true); - } - - mdTxnCtx.setValue(MetadataManager.INSTANCE.beginTransaction()); - bActiveTxn.setValue(true); - metadataProvider.setMetadataTxnContext(mdTxnCtx.getValue()); - } else { - // External dataset - ExternalDatasetsRegistry.INSTANCE.removeDatasetInfo(ds); - // #. prepare jobs to drop the datatset and the indexes in NC - List<Index> indexes = - MetadataManager.INSTANCE.getDatasetIndexes(mdTxnCtx.getValue(), dataverseName, datasetName); - for (int j = 0; j < indexes.size(); j++) { - if (ExternalIndexingOperations.isFileIndex(indexes.get(j))) { - jobsToExecute.add(IndexUtil.buildDropSecondaryIndexJobSpec(indexes.get(j), metadataProvider, ds)); - } else { - jobsToExecute.add(ExternalIndexingOperations.buildDropFilesIndexJobSpec(metadataProvider, ds)); - } - } - - // #. mark the existing dataset as PendingDropOp - MetadataManager.INSTANCE.dropDataset(mdTxnCtx.getValue(), dataverseName, datasetName); - MetadataManager.INSTANCE.addDataset(mdTxnCtx.getValue(), - new Dataset(dataverseName, datasetName, ds.getItemTypeDataverseName(), ds.getItemTypeName(), - ds.getNodeGroupName(), ds.getCompactionPolicy(), ds.getCompactionPolicyProperties(), - ds.getDatasetDetails(), ds.getHints(), ds.getDatasetType(), ds.getDatasetId(), - MetadataUtil.PENDING_DROP_OP)); - - MetadataManager.INSTANCE.commitTransaction(mdTxnCtx.getValue()); - bActiveTxn.setValue(false); - progress.setValue(ProgressState.ADDED_PENDINGOP_RECORD_TO_METADATA); - - // #. run the jobs - for (JobSpecification jobSpec : jobsToExecute) { - JobUtils.runJob(hcc, jobSpec, true); - } - if (!indexes.isEmpty()) { - ExternalDatasetsRegistry.INSTANCE.removeDatasetInfo(ds); - } - mdTxnCtx.setValue(MetadataManager.INSTANCE.beginTransaction()); - bActiveTxn.setValue(true); - metadataProvider.setMetadataTxnContext(mdTxnCtx.getValue()); - } - - // #. finally, delete the dataset. - MetadataManager.INSTANCE.dropDataset(mdTxnCtx.getValue(), dataverseName, datasetName); - // Drop the associated nodegroup - String nodegroup = ds.getNodeGroupName(); - if (!nodegroup.equalsIgnoreCase(MetadataConstants.METADATA_DEFAULT_NODEGROUP_NAME)) { - MetadataManager.INSTANCE.dropNodegroup(mdTxnCtx.getValue(), dataverseName + ":" + datasetName); } } @@ -1524,10 +1428,9 @@ // #. mark PendingDropOp on the existing index MetadataManager.INSTANCE.dropIndex(mdTxnCtx, dataverseName, datasetName, indexName); MetadataManager.INSTANCE.addIndex(mdTxnCtx, - new Index(dataverseName, datasetName, indexName, index.getIndexType(), - index.getKeyFieldNames(), index.getKeyFieldSourceIndicators(), - index.getKeyFieldTypes(), index.isEnforcingKeyFileds(), index.isPrimaryIndex(), - MetadataUtil.PENDING_DROP_OP)); + new Index(dataverseName, datasetName, indexName, index.getIndexType(), index.getKeyFieldNames(), + index.getKeyFieldSourceIndicators(), index.getKeyFieldTypes(), + index.isEnforcingKeyFileds(), index.isPrimaryIndex(), MetadataUtil.PENDING_DROP_OP)); // #. commit the existing transaction before calling runJob. MetadataManager.INSTANCE.commitTransaction(mdTxnCtx); @@ -1586,10 +1489,9 @@ // #. mark PendingDropOp on the existing index MetadataManager.INSTANCE.dropIndex(mdTxnCtx, dataverseName, datasetName, indexName); MetadataManager.INSTANCE.addIndex(mdTxnCtx, - new Index(dataverseName, datasetName, indexName, index.getIndexType(), - index.getKeyFieldNames(), index.getKeyFieldSourceIndicators(), - index.getKeyFieldTypes(), index.isEnforcingKeyFileds(), index.isPrimaryIndex(), - MetadataUtil.PENDING_DROP_OP)); + new Index(dataverseName, datasetName, indexName, index.getIndexType(), index.getKeyFieldNames(), + index.getKeyFieldSourceIndicators(), index.getKeyFieldTypes(), + index.isEnforcingKeyFileds(), index.isPrimaryIndex(), MetadataUtil.PENDING_DROP_OP)); // #. commit the existing transaction before calling runJob. MetadataManager.INSTANCE.commitTransaction(mdTxnCtx); @@ -1648,8 +1550,8 @@ } catch (Exception e2) { e.addSuppressed(e2); abort(e, e2, mdTxnCtx); - throw new IllegalStateException("System is inconsistent state: pending index(" + dataverseName - + "." + datasetName + "." + indexName + ") couldn't be removed from the metadata", e); + throw new IllegalStateException("System is inconsistent state: pending index(" + dataverseName + "." + + datasetName + "." + indexName + ") couldn't be removed from the metadata", e); } } @@ -1783,8 +1685,7 @@ CompiledLoadFromFileStatement cls = new CompiledLoadFromFileStatement(dataverseName, loadStmt.getDatasetName().getValue(), loadStmt.getAdapter(), loadStmt.getProperties(), loadStmt.dataIsAlreadySorted()); - JobSpecification spec = - apiFramework.compileQuery(hcc, metadataProvider, null, 0, null, sessionConfig, cls); + JobSpecification spec = apiFramework.compileQuery(hcc, metadataProvider, null, 0, null, sessionConfig, cls); MetadataManager.INSTANCE.commitTransaction(mdTxnCtx); bActiveTxn = false; if (spec != null) { @@ -1989,8 +1890,8 @@ try { mdTxnCtx = MetadataManager.INSTANCE.beginTransaction(); metadataProvider.setMetadataTxnContext(mdTxnCtx); - FeedPolicyEntity feedPolicy = MetadataManager.INSTANCE - .getFeedPolicy(metadataProvider.getMetadataTxnContext(), dataverse, policy); + FeedPolicyEntity feedPolicy = + MetadataManager.INSTANCE.getFeedPolicy(metadataProvider.getMetadataTxnContext(), dataverse, policy); if (feedPolicy != null) { if (cfps.getIfNotExists()) { MetadataManager.INSTANCE.commitTransaction(mdTxnCtx); @@ -2002,8 +1903,8 @@ boolean extendingExisting = cfps.getSourcePolicyName() != null; String description = cfps.getDescription() == null ? "" : cfps.getDescription(); if (extendingExisting) { - FeedPolicyEntity sourceFeedPolicy = MetadataManager.INSTANCE.getFeedPolicy( - metadataProvider.getMetadataTxnContext(), dataverse, cfps.getSourcePolicyName()); + FeedPolicyEntity sourceFeedPolicy = MetadataManager.INSTANCE + .getFeedPolicy(metadataProvider.getMetadataTxnContext(), dataverse, cfps.getSourcePolicyName()); if (sourceFeedPolicy == null) { sourceFeedPolicy = MetadataManager.INSTANCE.getFeedPolicy(metadataProvider.getMetadataTxnContext(), MetadataConstants.METADATA_DATAVERSE_NAME, cfps.getSourcePolicyName()); @@ -2135,8 +2036,7 @@ } // Start try { - MetadataLockManager.INSTANCE.startFeedBegin(dataverseName, dataverseName + "." + feedName, - feedConnections); + MetadataLockManager.INSTANCE.startFeedBegin(dataverseName, dataverseName + "." + feedName, feedConnections); // Prepare policy List<IDataset> datasets = new ArrayList<>(); for (FeedConnection connection : feedConnections) { @@ -2761,8 +2661,8 @@ handlePregelixStatement(metadataProvider, runStmt, hcc); break; default: - throw new AlgebricksException("The system \"" + runStmt.getSystem() - + "\" specified in your run statement is not supported."); + throw new AlgebricksException( + "The system \"" + runStmt.getSystem() + "\" specified in your run statement is not supported."); } } diff --git a/asterixdb/asterix-metadata/src/main/java/org/apache/asterix/metadata/entities/Dataset.java b/asterixdb/asterix-metadata/src/main/java/org/apache/asterix/metadata/entities/Dataset.java index ad75b6b..cd8cf3b 100644 --- a/asterixdb/asterix-metadata/src/main/java/org/apache/asterix/metadata/entities/Dataset.java +++ b/asterixdb/asterix-metadata/src/main/java/org/apache/asterix/metadata/entities/Dataset.java @@ -609,4 +609,8 @@ public IFrameOperationCallbackFactory getFrameOpCallbackFactory() { return NoOpFrameOperationCallbackFactory.INSTANCE; } + + public boolean isTemp() { + return getDatasetDetails().isTemp(); + } } diff --git a/asterixdb/asterix-metadata/src/main/java/org/apache/asterix/metadata/utils/MetadataLockManager.java b/asterixdb/asterix-metadata/src/main/java/org/apache/asterix/metadata/utils/MetadataLockManager.java index 81eaa9b..30ec9a7 100644 --- a/asterixdb/asterix-metadata/src/main/java/org/apache/asterix/metadata/utils/MetadataLockManager.java +++ b/asterixdb/asterix-metadata/src/main/java/org/apache/asterix/metadata/utils/MetadataLockManager.java @@ -24,7 +24,6 @@ import java.util.concurrent.locks.ReentrantReadWriteLock; import org.apache.asterix.metadata.entities.Dataverse; -import org.apache.asterix.metadata.entities.Feed; import org.apache.asterix.metadata.entities.FeedConnection; public class MetadataLockManager { @@ -38,6 +37,7 @@ private final ConcurrentHashMap<String, ReentrantReadWriteLock> feedPolicyLocks; private final ConcurrentHashMap<String, ReentrantReadWriteLock> compactionPolicyLocks; private final ConcurrentHashMap<String, ReentrantReadWriteLock> dataTypeLocks; + private final ConcurrentHashMap<String, ReentrantReadWriteLock> extensionLocks; private MetadataLockManager() { dataversesLocks = new ConcurrentHashMap<>(); @@ -48,6 +48,7 @@ feedPolicyLocks = new ConcurrentHashMap<>(); compactionPolicyLocks = new ConcurrentHashMap<>(); dataTypeLocks = new ConcurrentHashMap<>(); + extensionLocks = new ConcurrentHashMap<>(); } public void acquireDataverseReadLock(String dataverseName) { @@ -410,8 +411,8 @@ releaseDataverseReadLock(dataverseName); } - public void insertDeleteUpsertBegin(String dataverseName, String datasetFullyQualifiedName, - List<String> dataverses, List<String> datasets) { + public void insertDeleteUpsertBegin(String dataverseName, String datasetFullyQualifiedName, List<String> dataverses, + List<String> datasets) { dataverses.add(dataverseName); datasets.add(datasetFullyQualifiedName); Collections.sort(dataverses); @@ -632,4 +633,30 @@ releaseExternalDatasetRefreshLock(datasetFullyQualifiedName); releaseDataverseReadLock(dataverseName); } + + public void acquireExtensionReadLock(String entityName) { + ReentrantReadWriteLock entityLock = extensionLocks.get(entityName); + if (entityLock == null) { + extensionLocks.putIfAbsent(entityName, new ReentrantReadWriteLock()); + entityLock = extensionLocks.get(entityName); + } + entityLock.readLock().lock(); + } + + public void releaseExtensionReadLock(String entityName) { + extensionLocks.get(entityName).readLock().unlock(); + } + + public void acquireExtensionWriteLock(String entityName) { + ReentrantReadWriteLock entityLock = extensionLocks.get(entityName); + if (entityLock == null) { + extensionLocks.putIfAbsent(entityName, new ReentrantReadWriteLock()); + entityLock = extensionLocks.get(entityName); + } + entityLock.writeLock().lock(); + } + + public void releaseExtensionWriteLock(String entityName) { + extensionLocks.get(entityName).writeLock().unlock(); + } } -- To view, visit https://asterix-gerrit.ics.uci.edu/1596 To unsubscribe, visit https://asterix-gerrit.ics.uci.edu/settings Gerrit-MessageType: newchange Gerrit-Change-Id: I7b4629752e912614927b816d4ce3422ac89c5426 Gerrit-PatchSet: 1 Gerrit-Project: asterixdb Gerrit-Branch: master Gerrit-Owner: abdullah alamoudi <bamou...@gmail.com>