abdullah alamoudi has uploaded a new change for review.

  https://asterix-gerrit.ics.uci.edu/1596

Change subject: Add Active Partition Event Message
......................................................................

Add Active Partition Event Message

Enable active runtimes to send messages to the listener. In addition
this change introduces extension locks to metadata lock manager.

Change-Id: I7b4629752e912614927b816d4ce3422ac89c5426
---
M 
asterixdb/asterix-active/src/main/java/org/apache/asterix/active/message/ActivePartitionMessage.java
M 
asterixdb/asterix-app/src/main/java/org/apache/asterix/app/translator/QueryTranslator.java
M 
asterixdb/asterix-metadata/src/main/java/org/apache/asterix/metadata/entities/Dataset.java
M 
asterixdb/asterix-metadata/src/main/java/org/apache/asterix/metadata/utils/MetadataLockManager.java
4 files changed, 87 insertions(+), 155 deletions(-)


  git pull ssh://asterix-gerrit.ics.uci.edu:29418/asterixdb 
refs/changes/96/1596/1

diff --git 
a/asterixdb/asterix-active/src/main/java/org/apache/asterix/active/message/ActivePartitionMessage.java
 
b/asterixdb/asterix-active/src/main/java/org/apache/asterix/active/message/ActivePartitionMessage.java
index e4a57e6..5f184b4 100644
--- 
a/asterixdb/asterix-active/src/main/java/org/apache/asterix/active/message/ActivePartitionMessage.java
+++ 
b/asterixdb/asterix-active/src/main/java/org/apache/asterix/active/message/ActivePartitionMessage.java
@@ -31,6 +31,7 @@
 
     public static final byte ACTIVE_RUNTIME_REGISTERED = 0x00;
     public static final byte ACTIVE_RUNTIME_DEREGISTERED = 0x01;
+    public static final byte ACTIVE_RUNTIME_EVENT = 0x02;
     private static final long serialVersionUID = 1L;
     private final ActiveRuntimeId activeRuntimeId;
     private final JobId jobId;
diff --git 
a/asterixdb/asterix-app/src/main/java/org/apache/asterix/app/translator/QueryTranslator.java
 
b/asterixdb/asterix-app/src/main/java/org/apache/asterix/app/translator/QueryTranslator.java
index e64cf14..42fb67b 100644
--- 
a/asterixdb/asterix-app/src/main/java/org/apache/asterix/app/translator/QueryTranslator.java
+++ 
b/asterixdb/asterix-app/src/main/java/org/apache/asterix/app/translator/QueryTranslator.java
@@ -208,9 +208,8 @@
     protected final IStorageComponentProvider componentProvider;
     protected final ExecutorService executorService;
 
-    public QueryTranslator(List<Statement> statements, SessionConfig conf,
-            ILangCompilationProvider compliationProvider, 
IStorageComponentProvider componentProvider,
-            ExecutorService executorService) {
+    public QueryTranslator(List<Statement> statements, SessionConfig conf, 
ILangCompilationProvider compliationProvider,
+            IStorageComponentProvider componentProvider, ExecutorService 
executorService) {
         this.statements = statements;
         this.sessionConfig = conf;
         this.componentProvider = componentProvider;
@@ -457,8 +456,9 @@
         }
     }
 
-    protected void validateCompactionPolicy(String compactionPolicy, 
Map<String, String> compactionPolicyProperties,
-            MetadataTransactionContext mdTxnCtx, boolean isExternalDataset) 
throws CompilationException, Exception {
+    protected static void validateCompactionPolicy(String compactionPolicy,
+            Map<String, String> compactionPolicyProperties, 
MetadataTransactionContext mdTxnCtx,
+            boolean isExternalDataset) throws CompilationException, Exception {
         CompactionPolicy compactionPolicyEntity = 
MetadataManager.INSTANCE.getCompactionPolicy(mdTxnCtx,
                 MetadataConstants.METADATA_DATAVERSE_NAME, compactionPolicy);
         if (compactionPolicyEntity == null) {
@@ -534,8 +534,8 @@
             if (dt == null) {
                 throw new AlgebricksException(": type " + itemTypeName + " 
could not be found.");
             }
-            String ngName =
-                    ngNameId != null ? ngNameId.getValue() : 
configureNodegroupForDataset(dd, dataverseName, mdTxnCtx);
+            String ngName = ngNameId != null ? ngNameId.getValue()
+                    : configureNodegroupForDataset(dataverseName, datasetName, 
dd.getHints(), mdTxnCtx);
 
             if (compactionPolicy == null) {
                 compactionPolicy = GlobalConfig.DEFAULT_COMPACTION_POLICY_NAME;
@@ -585,8 +585,7 @@
                     break;
                 case EXTERNAL:
                     String adapter = ((ExternalDetailsDecl) 
dd.getDatasetDetailsDecl()).getAdapter();
-                    Map<String, String> properties =
-                            ((ExternalDetailsDecl) 
dd.getDatasetDetailsDecl()).getProperties();
+                    Map<String, String> properties = ((ExternalDetailsDecl) 
dd.getDatasetDetailsDecl()).getProperties();
 
                     datasetDetails =
                             new ExternalDatasetDetails(adapter, properties, 
new Date(), TransactionState.COMMIT);
@@ -711,22 +710,22 @@
         }
     }
 
-    protected String configureNodegroupForDataset(DatasetDecl dd, String 
dataverse,
-            MetadataTransactionContext mdTxnCtx) throws CompilationException {
+    protected static String configureNodegroupForDataset(String dataverse, 
String datasetName,
+            Map<String, String> hints, MetadataTransactionContext mdTxnCtx) 
throws CompilationException {
         int nodegroupCardinality;
         String nodegroupName;
-        String hintValue = 
dd.getHints().get(DatasetNodegroupCardinalityHint.NAME);
+        String hintValue = hints.get(DatasetNodegroupCardinalityHint.NAME);
         if (hintValue == null) {
             nodegroupName = MetadataConstants.METADATA_DEFAULT_NODEGROUP_NAME;
             return nodegroupName;
         } else {
             int numChosen = 0;
             boolean valid = 
DatasetHints.validate(DatasetNodegroupCardinalityHint.NAME,
-                    
dd.getHints().get(DatasetNodegroupCardinalityHint.NAME)).first;
+                    hints.get(DatasetNodegroupCardinalityHint.NAME)).first;
             if (!valid) {
                 throw new CompilationException("Incorrect use of hint:" + 
DatasetNodegroupCardinalityHint.NAME);
             } else {
-                nodegroupCardinality = 
Integer.parseInt(dd.getHints().get(DatasetNodegroupCardinalityHint.NAME));
+                nodegroupCardinality = 
Integer.parseInt(hints.get(DatasetNodegroupCardinalityHint.NAME));
             }
             List<String> nodeNames = 
AppContextInfo.INSTANCE.getMetadataProperties().getNodeNames();
             List<String> nodeNamesClone = new ArrayList<>(nodeNames);
@@ -753,7 +752,7 @@
                     b[selected] = temp;
                 }
             }
-            nodegroupName = dataverse + ":" + dd.getName().getValue();
+            nodegroupName = dataverse + ":" + datasetName;
             MetadataManager.INSTANCE.addNodegroup(mdTxnCtx, new 
NodeGroup(nodegroupName, selectedNodes));
             return nodegroupName;
         }
@@ -921,11 +920,10 @@
                     // Get snapshot from External File System
                     externalFilesSnapshot = 
ExternalIndexingOperations.getSnapshotFromExternalFileSystem(ds);
                     // Add an entry for the files index
-                    filesIndex =
-                            new Index(dataverseName, datasetName, 
IndexingConstants.getFilesIndexName(datasetName),
-                                    IndexType.BTREE, 
ExternalIndexingOperations.FILE_INDEX_FIELD_NAMES, null,
-                                    
ExternalIndexingOperations.FILE_INDEX_FIELD_TYPES, false, false,
-                                    MetadataUtil.PENDING_ADD_OP);
+                    filesIndex = new Index(dataverseName, datasetName, 
IndexingConstants.getFilesIndexName(datasetName),
+                            IndexType.BTREE, 
ExternalIndexingOperations.FILE_INDEX_FIELD_NAMES, null,
+                            ExternalIndexingOperations.FILE_INDEX_FIELD_TYPES, 
false, false,
+                            MetadataUtil.PENDING_ADD_OP);
                     
MetadataManager.INSTANCE.addIndex(metadataProvider.getMetadataTxnContext(), 
filesIndex);
                     // Add files to the external files index
                     for (ExternalFile file : externalFilesSnapshot) {
@@ -960,8 +958,8 @@
 
             // #. add a new index with PendingAddOp
             index = new Index(dataverseName, datasetName, indexName, 
stmtCreateIndex.getIndexType(), indexFields,
-                    keySourceIndicators, indexFieldTypes, 
stmtCreateIndex.getGramLength(),
-                    stmtCreateIndex.isEnforced(), false, 
MetadataUtil.PENDING_ADD_OP);
+                    keySourceIndicators, indexFieldTypes, 
stmtCreateIndex.getGramLength(), stmtCreateIndex.isEnforced(),
+                    false, MetadataUtil.PENDING_ADD_OP);
             
MetadataManager.INSTANCE.addIndex(metadataProvider.getMetadataTxnContext(), 
index);
 
             ARecordType enforcedType = null;
@@ -1013,8 +1011,8 @@
             // add another new files index with PendingNoOp after deleting the 
index with
             // PendingAddOp
             if (firstExternalDatasetIndex) {
-                
MetadataManager.INSTANCE.dropIndex(metadataProvider.getMetadataTxnContext(), 
dataverseName,
-                        datasetName, filesIndex.getIndexName());
+                
MetadataManager.INSTANCE.dropIndex(metadataProvider.getMetadataTxnContext(), 
dataverseName, datasetName,
+                        filesIndex.getIndexName());
                 filesIndex.setPendingOp(MetadataUtil.PENDING_NO_OP);
                 
MetadataManager.INSTANCE.addIndex(metadataProvider.getMetadataTxnContext(), 
filesIndex);
                 // update transaction timestamp
@@ -1186,8 +1184,8 @@
                     stopFeedBeforeDelete(new Pair<>(dvId, new 
Identifier(activeEntityId.getEntityName())),
                             metadataProvider);
                     // prepare job to remove feed log storage
-                    
jobsToExecute.add(FeedOperations.buildRemoveFeedStorageJob(MetadataManager.INSTANCE
-                            .getFeed(mdTxnCtx, dataverseName, 
activeEntityId.getEntityName())));
+                    jobsToExecute.add(FeedOperations.buildRemoveFeedStorageJob(
+                            MetadataManager.INSTANCE.getFeed(mdTxnCtx, 
dataverseName, activeEntityId.getEntityName())));
                 }
             }
 
@@ -1201,8 +1199,8 @@
                             
MetadataManager.INSTANCE.getDatasetIndexes(mdTxnCtx, dataverseName, 
datasetName);
                     for (int k = 0; k < indexes.size(); k++) {
                         if (indexes.get(k).isSecondaryIndex()) {
-                            
jobsToExecute.add(IndexUtil.buildDropSecondaryIndexJobSpec(indexes.get(k),
-                                    metadataProvider, datasets.get(j)));
+                            
jobsToExecute.add(IndexUtil.buildDropSecondaryIndexJobSpec(indexes.get(k), 
metadataProvider,
+                                    datasets.get(j)));
                         }
                     }
                     Index primaryIndex =
@@ -1217,8 +1215,8 @@
                             
jobsToExecute.add(ExternalIndexingOperations.buildDropFilesIndexJobSpec(metadataProvider,
                                     datasets.get(j)));
                         } else {
-                            
jobsToExecute.add(IndexUtil.buildDropSecondaryIndexJobSpec(indexes.get(k),
-                                    metadataProvider, datasets.get(j)));
+                            
jobsToExecute.add(IndexUtil.buildDropSecondaryIndexJobSpec(indexes.get(k), 
metadataProvider,
+                                    datasets.get(j)));
                         }
                     }
                     
ExternalDatasetsRegistry.INSTANCE.removeDatasetInfo(datasets.get(j));
@@ -1311,6 +1309,11 @@
         DropDatasetStatement stmtDelete = (DropDatasetStatement) stmt;
         String dataverseName = 
getActiveDataverse(stmtDelete.getDataverseName());
         String datasetName = stmtDelete.getDatasetName().getValue();
+        doDropDataset(dataverseName, datasetName, metadataProvider, 
stmtDelete.getIfExists(), hcc);
+    }
+
+    public static void doDropDataset(String dataverseName, String datasetName, 
MetadataProvider metadataProvider,
+            boolean ifExists, IHyracksClientConnection hcc) throws Exception {
         MutableObject<ProgressState> progress = new 
MutableObject<>(ProgressState.NO_PROGRESS);
         MutableObject<MetadataTransactionContext> mdTxnCtx =
                 new 
MutableObject<>(MetadataManager.INSTANCE.beginTransaction());
@@ -1321,12 +1324,12 @@
         try {
             Dataset ds = 
MetadataManager.INSTANCE.getDataset(mdTxnCtx.getValue(), dataverseName, 
datasetName);
             if (ds == null) {
-                if (stmtDelete.getIfExists()) {
+                if (ifExists) {
                     
MetadataManager.INSTANCE.commitTransaction(mdTxnCtx.getValue());
                     return;
                 } else {
-                    throw new AlgebricksException("There is no dataset with 
this name " + datasetName
-                            + " in dataverse " + dataverseName + ".");
+                    throw new AlgebricksException("There is no dataset with 
this name " + datasetName + " in dataverse "
+                            + dataverseName + ".");
                 }
             }
             ds.drop(metadataProvider, mdTxnCtx, jobsToExecute, bActiveTxn, 
progress, hcc);
@@ -1362,109 +1365,10 @@
                             + "." + datasetName + ") couldn't be removed from 
the metadata", e);
                 }
             }
-
             throw e;
         } finally {
             MetadataLockManager.INSTANCE.dropDatasetEnd(dataverseName, 
dataverseName + "." + datasetName);
             
ExternalDatasetsRegistry.INSTANCE.releaseAcquiredLocks(metadataProvider);
-        }
-    }
-
-    protected void doDropDataset(Dataset ds, String datasetName, 
MetadataProvider metadataProvider,
-            MutableObject<MetadataTransactionContext> mdTxnCtx, 
List<JobSpecification> jobsToExecute,
-            String dataverseName, MutableBoolean bActiveTxn, 
MutableObject<ProgressState> progress,
-            IHyracksClientConnection hcc) throws Exception {
-        Map<FeedConnectionId, Pair<JobSpecification, Boolean>> 
disconnectJobList = new HashMap<>();
-        if (ds.getDatasetType() == DatasetType.INTERNAL) {
-            // prepare job spec(s) that would disconnect any active feeds 
involving the dataset.
-            IActiveEntityEventsListener[] activeListeners = 
ActiveJobNotificationHandler.INSTANCE.getEventListeners();
-            for (IActiveEntityEventsListener listener : activeListeners) {
-                if (listener.isEntityUsingDataset(ds)) {
-                    throw new CompilationException(
-                            "Can't drop dataset since it is connected to 
active entity: " + listener.getEntityId());
-                }
-            }
-
-            // #. prepare jobs to drop the datatset and the indexes in NC
-            List<Index> indexes =
-                    
MetadataManager.INSTANCE.getDatasetIndexes(mdTxnCtx.getValue(), dataverseName, 
datasetName);
-            for (int j = 0; j < indexes.size(); j++) {
-                if (indexes.get(j).isSecondaryIndex()) {
-                    
jobsToExecute.add(IndexUtil.buildDropSecondaryIndexJobSpec(indexes.get(j), 
metadataProvider, ds));
-                }
-            }
-            Index primaryIndex =
-                    MetadataManager.INSTANCE.getIndex(mdTxnCtx.getValue(), 
dataverseName, datasetName, datasetName);
-            jobsToExecute.add(DatasetUtil.dropDatasetJobSpec(ds, primaryIndex, 
metadataProvider));
-            // #. mark the existing dataset as PendingDropOp
-            MetadataManager.INSTANCE.dropDataset(mdTxnCtx.getValue(), 
dataverseName, datasetName);
-            MetadataManager.INSTANCE.addDataset(mdTxnCtx.getValue(),
-                    new Dataset(dataverseName, datasetName, 
ds.getItemTypeDataverseName(), ds.getItemTypeName(),
-                            ds.getMetaItemTypeDataverseName(), 
ds.getMetaItemTypeName(), ds.getNodeGroupName(),
-                            ds.getCompactionPolicy(), 
ds.getCompactionPolicyProperties(), ds.getDatasetDetails(),
-                            ds.getHints(), ds.getDatasetType(), 
ds.getDatasetId(), MetadataUtil.PENDING_DROP_OP));
-
-            MetadataManager.INSTANCE.commitTransaction(mdTxnCtx.getValue());
-            bActiveTxn.setValue(false);
-            
progress.setValue(ProgressState.ADDED_PENDINGOP_RECORD_TO_METADATA);
-
-            // # disconnect the feeds
-            for (Pair<JobSpecification, Boolean> p : 
disconnectJobList.values()) {
-                JobUtils.runJob(hcc, p.first, true);
-            }
-
-            // #. run the jobs
-            for (JobSpecification jobSpec : jobsToExecute) {
-                JobUtils.runJob(hcc, jobSpec, true);
-            }
-
-            mdTxnCtx.setValue(MetadataManager.INSTANCE.beginTransaction());
-            bActiveTxn.setValue(true);
-            metadataProvider.setMetadataTxnContext(mdTxnCtx.getValue());
-        } else {
-            // External dataset
-            ExternalDatasetsRegistry.INSTANCE.removeDatasetInfo(ds);
-            // #. prepare jobs to drop the datatset and the indexes in NC
-            List<Index> indexes =
-                    
MetadataManager.INSTANCE.getDatasetIndexes(mdTxnCtx.getValue(), dataverseName, 
datasetName);
-            for (int j = 0; j < indexes.size(); j++) {
-                if (ExternalIndexingOperations.isFileIndex(indexes.get(j))) {
-                    
jobsToExecute.add(IndexUtil.buildDropSecondaryIndexJobSpec(indexes.get(j), 
metadataProvider, ds));
-                } else {
-                    
jobsToExecute.add(ExternalIndexingOperations.buildDropFilesIndexJobSpec(metadataProvider,
 ds));
-                }
-            }
-
-            // #. mark the existing dataset as PendingDropOp
-            MetadataManager.INSTANCE.dropDataset(mdTxnCtx.getValue(), 
dataverseName, datasetName);
-            MetadataManager.INSTANCE.addDataset(mdTxnCtx.getValue(),
-                    new Dataset(dataverseName, datasetName, 
ds.getItemTypeDataverseName(), ds.getItemTypeName(),
-                            ds.getNodeGroupName(), ds.getCompactionPolicy(), 
ds.getCompactionPolicyProperties(),
-                            ds.getDatasetDetails(), ds.getHints(), 
ds.getDatasetType(), ds.getDatasetId(),
-                            MetadataUtil.PENDING_DROP_OP));
-
-            MetadataManager.INSTANCE.commitTransaction(mdTxnCtx.getValue());
-            bActiveTxn.setValue(false);
-            
progress.setValue(ProgressState.ADDED_PENDINGOP_RECORD_TO_METADATA);
-
-            // #. run the jobs
-            for (JobSpecification jobSpec : jobsToExecute) {
-                JobUtils.runJob(hcc, jobSpec, true);
-            }
-            if (!indexes.isEmpty()) {
-                ExternalDatasetsRegistry.INSTANCE.removeDatasetInfo(ds);
-            }
-            mdTxnCtx.setValue(MetadataManager.INSTANCE.beginTransaction());
-            bActiveTxn.setValue(true);
-            metadataProvider.setMetadataTxnContext(mdTxnCtx.getValue());
-        }
-
-        // #. finally, delete the dataset.
-        MetadataManager.INSTANCE.dropDataset(mdTxnCtx.getValue(), 
dataverseName, datasetName);
-        // Drop the associated nodegroup
-        String nodegroup = ds.getNodeGroupName();
-        if 
(!nodegroup.equalsIgnoreCase(MetadataConstants.METADATA_DEFAULT_NODEGROUP_NAME))
 {
-            MetadataManager.INSTANCE.dropNodegroup(mdTxnCtx.getValue(), 
dataverseName + ":" + datasetName);
         }
     }
 
@@ -1524,10 +1428,9 @@
                 // #. mark PendingDropOp on the existing index
                 MetadataManager.INSTANCE.dropIndex(mdTxnCtx, dataverseName, 
datasetName, indexName);
                 MetadataManager.INSTANCE.addIndex(mdTxnCtx,
-                        new Index(dataverseName, datasetName, indexName, 
index.getIndexType(),
-                                index.getKeyFieldNames(), 
index.getKeyFieldSourceIndicators(),
-                                index.getKeyFieldTypes(), 
index.isEnforcingKeyFileds(), index.isPrimaryIndex(),
-                                MetadataUtil.PENDING_DROP_OP));
+                        new Index(dataverseName, datasetName, indexName, 
index.getIndexType(), index.getKeyFieldNames(),
+                                index.getKeyFieldSourceIndicators(), 
index.getKeyFieldTypes(),
+                                index.isEnforcingKeyFileds(), 
index.isPrimaryIndex(), MetadataUtil.PENDING_DROP_OP));
 
                 // #. commit the existing transaction before calling runJob.
                 MetadataManager.INSTANCE.commitTransaction(mdTxnCtx);
@@ -1586,10 +1489,9 @@
                 // #. mark PendingDropOp on the existing index
                 MetadataManager.INSTANCE.dropIndex(mdTxnCtx, dataverseName, 
datasetName, indexName);
                 MetadataManager.INSTANCE.addIndex(mdTxnCtx,
-                        new Index(dataverseName, datasetName, indexName, 
index.getIndexType(),
-                                index.getKeyFieldNames(), 
index.getKeyFieldSourceIndicators(),
-                                index.getKeyFieldTypes(), 
index.isEnforcingKeyFileds(), index.isPrimaryIndex(),
-                                MetadataUtil.PENDING_DROP_OP));
+                        new Index(dataverseName, datasetName, indexName, 
index.getIndexType(), index.getKeyFieldNames(),
+                                index.getKeyFieldSourceIndicators(), 
index.getKeyFieldTypes(),
+                                index.isEnforcingKeyFileds(), 
index.isPrimaryIndex(), MetadataUtil.PENDING_DROP_OP));
 
                 // #. commit the existing transaction before calling runJob.
                 MetadataManager.INSTANCE.commitTransaction(mdTxnCtx);
@@ -1648,8 +1550,8 @@
                 } catch (Exception e2) {
                     e.addSuppressed(e2);
                     abort(e, e2, mdTxnCtx);
-                    throw new IllegalStateException("System is inconsistent 
state: pending index(" + dataverseName
-                            + "." + datasetName + "." + indexName + ") 
couldn't be removed from the metadata", e);
+                    throw new IllegalStateException("System is inconsistent 
state: pending index(" + dataverseName + "."
+                            + datasetName + "." + indexName + ") couldn't be 
removed from the metadata", e);
                 }
             }
 
@@ -1783,8 +1685,7 @@
             CompiledLoadFromFileStatement cls =
                     new CompiledLoadFromFileStatement(dataverseName, 
loadStmt.getDatasetName().getValue(),
                             loadStmt.getAdapter(), loadStmt.getProperties(), 
loadStmt.dataIsAlreadySorted());
-            JobSpecification spec =
-                    apiFramework.compileQuery(hcc, metadataProvider, null, 0, 
null, sessionConfig, cls);
+            JobSpecification spec = apiFramework.compileQuery(hcc, 
metadataProvider, null, 0, null, sessionConfig, cls);
             MetadataManager.INSTANCE.commitTransaction(mdTxnCtx);
             bActiveTxn = false;
             if (spec != null) {
@@ -1989,8 +1890,8 @@
         try {
             mdTxnCtx = MetadataManager.INSTANCE.beginTransaction();
             metadataProvider.setMetadataTxnContext(mdTxnCtx);
-            FeedPolicyEntity feedPolicy = MetadataManager.INSTANCE
-                    .getFeedPolicy(metadataProvider.getMetadataTxnContext(), 
dataverse, policy);
+            FeedPolicyEntity feedPolicy =
+                    
MetadataManager.INSTANCE.getFeedPolicy(metadataProvider.getMetadataTxnContext(),
 dataverse, policy);
             if (feedPolicy != null) {
                 if (cfps.getIfNotExists()) {
                     MetadataManager.INSTANCE.commitTransaction(mdTxnCtx);
@@ -2002,8 +1903,8 @@
             boolean extendingExisting = cfps.getSourcePolicyName() != null;
             String description = cfps.getDescription() == null ? "" : 
cfps.getDescription();
             if (extendingExisting) {
-                FeedPolicyEntity sourceFeedPolicy = 
MetadataManager.INSTANCE.getFeedPolicy(
-                        metadataProvider.getMetadataTxnContext(), dataverse, 
cfps.getSourcePolicyName());
+                FeedPolicyEntity sourceFeedPolicy = MetadataManager.INSTANCE
+                        
.getFeedPolicy(metadataProvider.getMetadataTxnContext(), dataverse, 
cfps.getSourcePolicyName());
                 if (sourceFeedPolicy == null) {
                     sourceFeedPolicy = 
MetadataManager.INSTANCE.getFeedPolicy(metadataProvider.getMetadataTxnContext(),
                             MetadataConstants.METADATA_DATAVERSE_NAME, 
cfps.getSourcePolicyName());
@@ -2135,8 +2036,7 @@
         }
         // Start
         try {
-            MetadataLockManager.INSTANCE.startFeedBegin(dataverseName, 
dataverseName + "." + feedName,
-                    feedConnections);
+            MetadataLockManager.INSTANCE.startFeedBegin(dataverseName, 
dataverseName + "." + feedName, feedConnections);
             // Prepare policy
             List<IDataset> datasets = new ArrayList<>();
             for (FeedConnection connection : feedConnections) {
@@ -2761,8 +2661,8 @@
                 handlePregelixStatement(metadataProvider, runStmt, hcc);
                 break;
             default:
-                throw new AlgebricksException("The system \"" + 
runStmt.getSystem()
-                        + "\" specified in your run statement is not 
supported.");
+                throw new AlgebricksException(
+                        "The system \"" + runStmt.getSystem() + "\" specified 
in your run statement is not supported.");
         }
 
     }
diff --git 
a/asterixdb/asterix-metadata/src/main/java/org/apache/asterix/metadata/entities/Dataset.java
 
b/asterixdb/asterix-metadata/src/main/java/org/apache/asterix/metadata/entities/Dataset.java
index ad75b6b..cd8cf3b 100644
--- 
a/asterixdb/asterix-metadata/src/main/java/org/apache/asterix/metadata/entities/Dataset.java
+++ 
b/asterixdb/asterix-metadata/src/main/java/org/apache/asterix/metadata/entities/Dataset.java
@@ -609,4 +609,8 @@
     public IFrameOperationCallbackFactory getFrameOpCallbackFactory() {
         return NoOpFrameOperationCallbackFactory.INSTANCE;
     }
+
+    public boolean isTemp() {
+        return getDatasetDetails().isTemp();
+    }
 }
diff --git 
a/asterixdb/asterix-metadata/src/main/java/org/apache/asterix/metadata/utils/MetadataLockManager.java
 
b/asterixdb/asterix-metadata/src/main/java/org/apache/asterix/metadata/utils/MetadataLockManager.java
index 81eaa9b..30ec9a7 100644
--- 
a/asterixdb/asterix-metadata/src/main/java/org/apache/asterix/metadata/utils/MetadataLockManager.java
+++ 
b/asterixdb/asterix-metadata/src/main/java/org/apache/asterix/metadata/utils/MetadataLockManager.java
@@ -24,7 +24,6 @@
 import java.util.concurrent.locks.ReentrantReadWriteLock;
 
 import org.apache.asterix.metadata.entities.Dataverse;
-import org.apache.asterix.metadata.entities.Feed;
 import org.apache.asterix.metadata.entities.FeedConnection;
 
 public class MetadataLockManager {
@@ -38,6 +37,7 @@
     private final ConcurrentHashMap<String, ReentrantReadWriteLock> 
feedPolicyLocks;
     private final ConcurrentHashMap<String, ReentrantReadWriteLock> 
compactionPolicyLocks;
     private final ConcurrentHashMap<String, ReentrantReadWriteLock> 
dataTypeLocks;
+    private final ConcurrentHashMap<String, ReentrantReadWriteLock> 
extensionLocks;
 
     private MetadataLockManager() {
         dataversesLocks = new ConcurrentHashMap<>();
@@ -48,6 +48,7 @@
         feedPolicyLocks = new ConcurrentHashMap<>();
         compactionPolicyLocks = new ConcurrentHashMap<>();
         dataTypeLocks = new ConcurrentHashMap<>();
+        extensionLocks = new ConcurrentHashMap<>();
     }
 
     public void acquireDataverseReadLock(String dataverseName) {
@@ -410,8 +411,8 @@
         releaseDataverseReadLock(dataverseName);
     }
 
-    public void insertDeleteUpsertBegin(String dataverseName, String 
datasetFullyQualifiedName,
-            List<String> dataverses, List<String> datasets) {
+    public void insertDeleteUpsertBegin(String dataverseName, String 
datasetFullyQualifiedName, List<String> dataverses,
+            List<String> datasets) {
         dataverses.add(dataverseName);
         datasets.add(datasetFullyQualifiedName);
         Collections.sort(dataverses);
@@ -632,4 +633,30 @@
         releaseExternalDatasetRefreshLock(datasetFullyQualifiedName);
         releaseDataverseReadLock(dataverseName);
     }
+
+    public void acquireExtensionReadLock(String entityName) {
+        ReentrantReadWriteLock entityLock = extensionLocks.get(entityName);
+        if (entityLock == null) {
+            extensionLocks.putIfAbsent(entityName, new 
ReentrantReadWriteLock());
+            entityLock = extensionLocks.get(entityName);
+        }
+        entityLock.readLock().lock();
+    }
+
+    public void releaseExtensionReadLock(String entityName) {
+        extensionLocks.get(entityName).readLock().unlock();
+    }
+
+    public void acquireExtensionWriteLock(String entityName) {
+        ReentrantReadWriteLock entityLock = extensionLocks.get(entityName);
+        if (entityLock == null) {
+            extensionLocks.putIfAbsent(entityName, new 
ReentrantReadWriteLock());
+            entityLock = extensionLocks.get(entityName);
+        }
+        entityLock.writeLock().lock();
+    }
+
+    public void releaseExtensionWriteLock(String entityName) {
+        extensionLocks.get(entityName).writeLock().unlock();
+    }
 }

-- 
To view, visit https://asterix-gerrit.ics.uci.edu/1596
To unsubscribe, visit https://asterix-gerrit.ics.uci.edu/settings

Gerrit-MessageType: newchange
Gerrit-Change-Id: I7b4629752e912614927b816d4ce3422ac89c5426
Gerrit-PatchSet: 1
Gerrit-Project: asterixdb
Gerrit-Branch: master
Gerrit-Owner: abdullah alamoudi <bamou...@gmail.com>

Reply via email to