abdullah alamoudi has uploaded a new change for review.

  https://asterix-gerrit.ics.uci.edu/1859

Change subject: Refactor index creation
......................................................................

Refactor index creation

Change-Id: I0ccc216bb8bdb2e44ec7d8583faaeec452978f9e
---
M 
asterixdb/asterix-app/src/main/java/org/apache/asterix/app/translator/QueryTranslator.java
1 file changed, 95 insertions(+), 79 deletions(-)


  git pull ssh://asterix-gerrit.ics.uci.edu:29418/asterixdb 
refs/changes/59/1859/1

diff --git 
a/asterixdb/asterix-app/src/main/java/org/apache/asterix/app/translator/QueryTranslator.java
 
b/asterixdb/asterix-app/src/main/java/org/apache/asterix/app/translator/QueryTranslator.java
index 8a7d757..3c92868 100644
--- 
a/asterixdb/asterix-app/src/main/java/org/apache/asterix/app/translator/QueryTranslator.java
+++ 
b/asterixdb/asterix-app/src/main/java/org/apache/asterix/app/translator/QueryTranslator.java
@@ -688,7 +688,8 @@
         }
     }
 
-    protected void validateIfResourceIsActiveInFeed(Dataset dataset) throws 
CompilationException {
+    protected static void 
validateIfResourceIsActiveInFeed(ICcApplicationContext appCtx, Dataset dataset)
+            throws CompilationException {
         StringBuilder builder = null;
         ActiveLifecycleListener activeListener = (ActiveLifecycleListener) 
appCtx.getActiveLifecycleListener();
         ActiveJobNotificationHandler activeEventHandler = 
activeListener.getNotificationHandler();
@@ -720,8 +721,8 @@
                     hints.get(DatasetNodegroupCardinalityHint.NAME));
             boolean valid = validation.first;
             if (!valid) {
-                throw new CompilationException("Incorrect use of hint '" + 
DatasetNodegroupCardinalityHint.NAME +
-                        "': " + validation.second);
+                throw new CompilationException(
+                        "Incorrect use of hint '" + 
DatasetNodegroupCardinalityHint.NAME + "': " + validation.second);
             } else {
                 nodegroupCardinality = 
Integer.parseInt(hints.get(DatasetNodegroupCardinalityHint.NAME));
             }
@@ -735,26 +736,17 @@
 
     protected void handleCreateIndexStatement(MetadataProvider 
metadataProvider, Statement stmt,
             IHyracksClientConnection hcc) throws Exception {
-        ProgressState progress = ProgressState.NO_PROGRESS;
         CreateIndexStatement stmtCreateIndex = (CreateIndexStatement) stmt;
         String dataverseName = 
getActiveDataverse(stmtCreateIndex.getDataverseName());
         String datasetName = stmtCreateIndex.getDatasetName().getValue();
         List<Integer> keySourceIndicators = 
stmtCreateIndex.getFieldSourceIndicators();
-
         MetadataTransactionContext mdTxnCtx = 
MetadataManager.INSTANCE.beginTransaction();
-        boolean bActiveTxn = true;
         metadataProvider.setMetadataTxnContext(mdTxnCtx);
         
MetadataLockManager.INSTANCE.createIndexBegin(metadataProvider.getLocks(), 
dataverseName,
                 dataverseName + "." + datasetName);
         String indexName = null;
-        JobSpecification spec = null;
         Dataset ds = null;
         // For external datasets
-        List<ExternalFile> externalFilesSnapshot = null;
-        boolean firstExternalDatasetIndex = false;
-        boolean filesIndexReplicated = false;
-        Index filesIndex = null;
-        boolean datasetLocked = false;
         Index index = null;
         try {
             ds = metadataProvider.findDataset(dataverseName, datasetName);
@@ -766,6 +758,14 @@
             indexName = stmtCreateIndex.getIndexName().getValue();
             index = 
MetadataManager.INSTANCE.getIndex(metadataProvider.getMetadataTxnContext(), 
dataverseName,
                     datasetName, indexName);
+            if (index != null) {
+                if (stmtCreateIndex.getIfNotExists()) {
+                    MetadataManager.INSTANCE.commitTransaction(mdTxnCtx);
+                    return;
+                } else {
+                    throw new AlgebricksException("An index with this name " + 
indexName + " already exists.");
+                }
+            }
             Datatype dt = 
MetadataManager.INSTANCE.getDatatype(metadataProvider.getMetadataTxnContext(),
                     ds.getItemTypeDataverseName(), ds.getItemTypeName());
             ARecordType aRecordType = (ARecordType) dt.getDatatype();
@@ -829,16 +829,6 @@
 
             validateIndexKeyFields(stmtCreateIndex, keySourceIndicators, 
aRecordType, metaRecordType, indexFields,
                     indexFieldTypes);
-
-            if (index != null) {
-                if (stmtCreateIndex.getIfNotExists()) {
-                    MetadataManager.INSTANCE.commitTransaction(mdTxnCtx);
-                    return;
-                } else {
-                    throw new AlgebricksException("An index with this name " + 
indexName + " already exists.");
-                }
-            }
-
             // Checks whether a user is trying to create an inverted secondary 
index on a dataset
             // with a variable-length primary key.
             // Currently, we do not support this. Therefore, as a temporary 
solution, we print an
@@ -862,8 +852,30 @@
                 }
             }
 
+            Index newIndex = new Index(dataverseName, datasetName, indexName, 
stmtCreateIndex.getIndexType(),
+                    indexFields, keySourceIndicators, indexFieldTypes, 
stmtCreateIndex.getGramLength(),
+                    overridesFieldTypes, stmtCreateIndex.isEnforced(), false, 
MetadataUtil.PENDING_ADD_OP);
+            doCreateIndex(hcc, metadataProvider, ds, newIndex, jobFlags);
+        } finally {
+            metadataProvider.getLocks().unlock();
+        }
+    }
+
+    public static void doCreateIndex(IHyracksClientConnection hcc, 
MetadataProvider metadataProvider, Dataset ds,
+            Index index, EnumSet<JobFlag> jobFlags) throws Exception {
+        ProgressState progress = ProgressState.NO_PROGRESS;
+        boolean bActiveTxn = true;
+        Index filesIndex = null;
+        boolean firstExternalDatasetIndex = false;
+        boolean datasetLocked = false;
+        List<ExternalFile> externalFilesSnapshot;
+        MetadataTransactionContext mdTxnCtx = 
metadataProvider.getMetadataTxnContext();
+        JobSpecification spec;
+        boolean filesIndexReplicated = false;
+        try {
+            index.setPendingOp(MetadataUtil.PENDING_ADD_OP);
             if (ds.getDatasetType() == DatasetType.INTERNAL) {
-                validateIfResourceIsActiveInFeed(ds);
+                
validateIfResourceIsActiveInFeed(metadataProvider.getApplicationContext(), ds);
             } else {
                 // External dataset
                 // Check if the dataset is indexible
@@ -873,13 +885,14 @@
                                     + " Adapter can't be indexed");
                 }
                 // Check if the name of the index is valid
-                if (!ExternalIndexingOperations.isValidIndexName(datasetName, 
indexName)) {
+                if 
(!ExternalIndexingOperations.isValidIndexName(index.getDatasetName(), 
index.getIndexName())) {
                     throw new AlgebricksException("external dataset index name 
is invalid");
                 }
 
                 // Check if the files index exist
-                filesIndex = 
MetadataManager.INSTANCE.getIndex(metadataProvider.getMetadataTxnContext(), 
dataverseName,
-                        datasetName, 
IndexingConstants.getFilesIndexName(datasetName));
+                filesIndex = 
MetadataManager.INSTANCE.getIndex(metadataProvider.getMetadataTxnContext(),
+                        index.getDataverseName(), index.getDatasetName(),
+                        
IndexingConstants.getFilesIndexName(index.getDatasetName()));
                 firstExternalDatasetIndex = filesIndex == null;
                 // Lock external dataset
                 ExternalDatasetsRegistry.INSTANCE.buildIndexBegin(ds, 
firstExternalDatasetIndex);
@@ -887,7 +900,8 @@
                 if (firstExternalDatasetIndex) {
                     // Verify that no one has created an index before we 
acquire the lock
                     filesIndex = 
MetadataManager.INSTANCE.getIndex(metadataProvider.getMetadataTxnContext(),
-                            dataverseName, datasetName, 
IndexingConstants.getFilesIndexName(datasetName));
+                            index.getDataverseName(), index.getDatasetName(),
+                            
IndexingConstants.getFilesIndexName(index.getDatasetName()));
                     if (filesIndex != null) {
                         ExternalDatasetsRegistry.INSTANCE.buildIndexEnd(ds, 
firstExternalDatasetIndex);
                         firstExternalDatasetIndex = false;
@@ -898,9 +912,10 @@
                     // Get snapshot from External File System
                     externalFilesSnapshot = 
ExternalIndexingOperations.getSnapshotFromExternalFileSystem(ds);
                     // Add an entry for the files index
-                    filesIndex = new Index(dataverseName, datasetName, 
IndexingConstants.getFilesIndexName(datasetName),
-                            IndexType.BTREE, 
ExternalIndexingOperations.FILE_INDEX_FIELD_NAMES, null,
-                            ExternalIndexingOperations.FILE_INDEX_FIELD_TYPES, 
overridesFieldTypes, false, false,
+                    filesIndex = new Index(index.getDataverseName(), 
index.getDatasetName(),
+                            
IndexingConstants.getFilesIndexName(index.getDatasetName()), IndexType.BTREE,
+                            ExternalIndexingOperations.FILE_INDEX_FIELD_NAMES, 
null,
+                            ExternalIndexingOperations.FILE_INDEX_FIELD_TYPES, 
false, false, false,
                             MetadataUtil.PENDING_ADD_OP);
                     
MetadataManager.INSTANCE.addIndex(metadataProvider.getMetadataTxnContext(), 
filesIndex);
                     // Add files to the external files index
@@ -915,48 +930,43 @@
                                 "Failed to create job spec for replicating 
Files Index For external dataset");
                     }
                     filesIndexReplicated = true;
-                    runJob(hcc, spec);
+                    runJob(hcc, spec, jobFlags);
                 }
             }
 
             // check whether there exists another enforced index on the same 
field
-            if (stmtCreateIndex.isEnforced()) {
-                List<Index> indexes = MetadataManager.INSTANCE
-                        
.getDatasetIndexes(metadataProvider.getMetadataTxnContext(), dataverseName, 
datasetName);
+            if (index.isEnforced()) {
+                List<Index> indexes = 
MetadataManager.INSTANCE.getDatasetIndexes(
+                        metadataProvider.getMetadataTxnContext(), 
index.getDataverseName(), index.getDatasetName());
                 for (Index existingIndex : indexes) {
-                    if (existingIndex.getKeyFieldNames().equals(indexFields)
-                            && 
!existingIndex.getKeyFieldTypes().equals(indexFieldTypes)
+                    if 
(existingIndex.getKeyFieldNames().equals(index.getKeyFieldNames())
+                            && 
!existingIndex.getKeyFieldTypes().equals(index.getKeyFieldTypes())
                             && existingIndex.isEnforced()) {
-                        throw new CompilationException("Cannot create index " 
+ indexName + " , enforced index "
-                                + existingIndex.getIndexName() + " on field 
\"" + StringUtils.join(indexFields, ',')
-                                + "\" is already defined with type \"" + 
existingIndex.getKeyFieldTypes() + "\"");
+                        throw new CompilationException("Cannot create index " 
+ index.getIndexName()
+                                + " , enforced index " + 
existingIndex.getIndexName() + " on field \""
+                                + StringUtils.join(index.getKeyFieldNames(), 
',') + "\" is already defined with type \""
+                                + existingIndex.getKeyFieldTypes() + "\"");
                     }
                 }
             }
-
             // #. add a new index with PendingAddOp
-            index = new Index(dataverseName, datasetName, indexName, 
stmtCreateIndex.getIndexType(), indexFields,
-                    keySourceIndicators, indexFieldTypes, 
stmtCreateIndex.getGramLength(), overridesFieldTypes,
-                    stmtCreateIndex.isEnforced(),false, 
MetadataUtil.PENDING_ADD_OP);
             
MetadataManager.INSTANCE.addIndex(metadataProvider.getMetadataTxnContext(), 
index);
-
             // #. prepare to create the index artifact in NC.
             spec = IndexUtil.buildSecondaryIndexCreationJobSpec(ds, index, 
metadataProvider);
             if (spec == null) {
-                throw new CompilationException("Failed to create job spec for 
creating index '"
-                        + stmtCreateIndex.getDatasetName() + "." + 
stmtCreateIndex.getIndexName() + "'");
+                throw new CompilationException("Failed to create job spec for 
creating index '" + ds.getDatasetName()
+                        + "." + index.getIndexName() + "'");
             }
             MetadataManager.INSTANCE.commitTransaction(mdTxnCtx);
             bActiveTxn = false;
-
             progress = ProgressState.ADDED_PENDINGOP_RECORD_TO_METADATA;
-
             // #. create the index artifact in NC.
-            runJob(hcc, spec);
+            runJob(hcc, spec, jobFlags);
 
             // #. flush the internal dataset for correlated policy
             if (ds.isCorrelated() && ds.getDatasetType() == 
DatasetType.INTERNAL) {
-                FlushDatasetUtil.flushDataset(hcc, metadataProvider, 
dataverseName, datasetName, datasetName);
+                FlushDatasetUtil.flushDataset(hcc, metadataProvider, 
index.getDataverseName(), index.getDatasetName(),
+                        index.getDatasetName());
             }
 
             mdTxnCtx = MetadataManager.INSTANCE.beginTransaction();
@@ -968,7 +978,7 @@
             MetadataManager.INSTANCE.commitTransaction(mdTxnCtx);
             bActiveTxn = false;
 
-            runJob(hcc, spec);
+            runJob(hcc, spec, jobFlags);
 
             // #. begin new metadataTxn
             mdTxnCtx = MetadataManager.INSTANCE.beginTransaction();
@@ -976,15 +986,15 @@
             metadataProvider.setMetadataTxnContext(mdTxnCtx);
 
             // #. add another new index with PendingNoOp after deleting the 
index with PendingAddOp
-            
MetadataManager.INSTANCE.dropIndex(metadataProvider.getMetadataTxnContext(), 
dataverseName, datasetName,
-                    indexName);
+            
MetadataManager.INSTANCE.dropIndex(metadataProvider.getMetadataTxnContext(), 
index.getDataverseName(),
+                    index.getDatasetName(), index.getIndexName());
             index.setPendingOp(MetadataUtil.PENDING_NO_OP);
             
MetadataManager.INSTANCE.addIndex(metadataProvider.getMetadataTxnContext(), 
index);
             // add another new files index with PendingNoOp after deleting the 
index with
             // PendingAddOp
             if (firstExternalDatasetIndex) {
-                
MetadataManager.INSTANCE.dropIndex(metadataProvider.getMetadataTxnContext(), 
dataverseName, datasetName,
-                        filesIndex.getIndexName());
+                
MetadataManager.INSTANCE.dropIndex(metadataProvider.getMetadataTxnContext(), 
index.getDataverseName(),
+                        index.getDatasetName(), filesIndex.getIndexName());
                 filesIndex.setPendingOp(MetadataUtil.PENDING_NO_OP);
                 
MetadataManager.INSTANCE.addIndex(metadataProvider.getMetadataTxnContext(), 
filesIndex);
                 // update transaction timestamp
@@ -992,7 +1002,6 @@
                 MetadataManager.INSTANCE.updateDataset(mdTxnCtx, ds);
             }
             MetadataManager.INSTANCE.commitTransaction(mdTxnCtx);
-
         } catch (Exception e) {
             if (bActiveTxn) {
                 abort(e, e, mdTxnCtx);
@@ -1006,7 +1015,7 @@
                             
ExternalIndexingOperations.buildDropFilesIndexJobSpec(metadataProvider, ds);
                     MetadataManager.INSTANCE.commitTransaction(mdTxnCtx);
                     bActiveTxn = false;
-                    runJob(hcc, jobSpec);
+                    runJob(hcc, jobSpec, jobFlags);
                 } catch (Exception e2) {
                     e.addSuppressed(e2);
                     if (bActiveTxn) {
@@ -1025,7 +1034,7 @@
                     JobSpecification jobSpec = 
IndexUtil.buildDropIndexJobSpec(index, metadataProvider, ds);
                     MetadataManager.INSTANCE.commitTransaction(mdTxnCtx);
                     bActiveTxn = false;
-                    runJob(hcc, jobSpec);
+                    runJob(hcc, jobSpec, jobFlags);
                 } catch (Exception e2) {
                     e.addSuppressed(e2);
                     if (bActiveTxn) {
@@ -1043,21 +1052,25 @@
                     } catch (Exception e2) {
                         e.addSuppressed(e2);
                         abort(e, e2, mdTxnCtx);
-                        throw new IllegalStateException("System is 
inconsistent state: pending files for("
-                                + dataverseName + "." + datasetName + ") 
couldn't be removed from the metadata", e);
+                        throw new IllegalStateException(
+                                "System is inconsistent state: pending files 
for(" + index.getDataverseName() + "."
+                                        + index.getDatasetName() + ") couldn't 
be removed from the metadata",
+                                e);
                     }
                     mdTxnCtx = MetadataManager.INSTANCE.beginTransaction();
                     metadataProvider.setMetadataTxnContext(mdTxnCtx);
                     try {
                         // Drop the files index from metadata
-                        
MetadataManager.INSTANCE.dropIndex(metadataProvider.getMetadataTxnContext(), 
dataverseName,
-                                datasetName, 
IndexingConstants.getFilesIndexName(datasetName));
+                        
MetadataManager.INSTANCE.dropIndex(metadataProvider.getMetadataTxnContext(),
+                                index.getDataverseName(), 
index.getDatasetName(),
+                                
IndexingConstants.getFilesIndexName(index.getDatasetName()));
                         MetadataManager.INSTANCE.commitTransaction(mdTxnCtx);
                     } catch (Exception e2) {
                         e.addSuppressed(e2);
                         abort(e, e2, mdTxnCtx);
-                        throw new IllegalStateException("System is 
inconsistent state: pending index(" + dataverseName
-                                + "." + datasetName + "." + 
IndexingConstants.getFilesIndexName(datasetName)
+                        throw new IllegalStateException("System is 
inconsistent state: pending index("
+                                + index.getDataverseName() + "." + 
index.getDatasetName() + "."
+                                + 
IndexingConstants.getFilesIndexName(index.getDatasetName())
                                 + ") couldn't be removed from the metadata", 
e);
                     }
                 }
@@ -1065,19 +1078,19 @@
                 mdTxnCtx = MetadataManager.INSTANCE.beginTransaction();
                 metadataProvider.setMetadataTxnContext(mdTxnCtx);
                 try {
-                    
MetadataManager.INSTANCE.dropIndex(metadataProvider.getMetadataTxnContext(), 
dataverseName,
-                            datasetName, indexName);
+                    
MetadataManager.INSTANCE.dropIndex(metadataProvider.getMetadataTxnContext(),
+                            index.getDataverseName(), index.getDatasetName(), 
index.getIndexName());
                     MetadataManager.INSTANCE.commitTransaction(mdTxnCtx);
                 } catch (Exception e2) {
                     e.addSuppressed(e2);
                     abort(e, e2, mdTxnCtx);
-                    throw new IllegalStateException("System is in inconsistent 
state: pending index(" + dataverseName
-                            + "." + datasetName + "." + indexName + ") 
couldn't be removed from the metadata", e);
+                    throw new IllegalStateException("System is in inconsistent 
state: pending index("
+                            + index.getDataverseName() + "." + 
index.getDatasetName() + "." + index.getIndexName()
+                            + ") couldn't be removed from the metadata", e);
                 }
             }
             throw e;
         } finally {
-            metadataProvider.getLocks().unlock();
             if (datasetLocked) {
                 ExternalDatasetsRegistry.INSTANCE.buildIndexEnd(ds, 
firstExternalDatasetIndex);
             }
@@ -1087,8 +1100,8 @@
     protected void validateIndexKeyFields(CreateIndexStatement 
stmtCreateIndex, List<Integer> keySourceIndicators,
             ARecordType aRecordType, ARecordType metaRecordType, 
List<List<String>> indexFields,
             List<IAType> indexFieldTypes) throws AlgebricksException {
-        ValidateUtil.validateKeyFields(aRecordType, metaRecordType, 
indexFields, keySourceIndicators,
-                indexFieldTypes, stmtCreateIndex.getIndexType());
+        ValidateUtil.validateKeyFields(aRecordType, metaRecordType, 
indexFields, keySourceIndicators, indexFieldTypes,
+                stmtCreateIndex.getIndexType());
     }
 
     protected void handleCreateTypeStatement(MetadataProvider 
metadataProvider, Statement stmt) throws Exception {
@@ -1192,11 +1205,11 @@
                             
MetadataManager.INSTANCE.getDatasetIndexes(mdTxnCtx, dataverseName, 
datasetName);
                     for (int k = 0; k < indexes.size(); k++) {
                         if 
(ExternalIndexingOperations.isFileIndex(indexes.get(k))) {
-                            
jobsToExecute.add(ExternalIndexingOperations.buildDropFilesIndexJobSpec(metadataProvider,
-                                    dataset));
-                        } else {
                             jobsToExecute.add(
-                                    
IndexUtil.buildDropIndexJobSpec(indexes.get(k), metadataProvider, dataset));
+                                    
ExternalIndexingOperations.buildDropFilesIndexJobSpec(metadataProvider, 
dataset));
+                        } else {
+                            jobsToExecute
+                                    
.add(IndexUtil.buildDropIndexJobSpec(indexes.get(k), metadataProvider, 
dataset));
                         }
                     }
                     
ExternalDatasetsRegistry.INSTANCE.removeDatasetInfo(dataset);
@@ -2231,8 +2244,7 @@
             if (ds.getDatasetType() == DatasetType.INTERNAL) {
                 for (Index index : indexes) {
                     if (index.isSecondaryIndex()) {
-                        jobsToExecute
-                                
.add(IndexUtil.buildSecondaryIndexCompactJobSpec(ds, index, metadataProvider));
+                        
jobsToExecute.add(IndexUtil.buildSecondaryIndexCompactJobSpec(ds, index, 
metadataProvider));
                     }
                 }
             } else {
@@ -2257,8 +2269,7 @@
     }
 
     protected void prepareCompactJobsForExternalDataset(List<Index> indexes, 
Dataset ds,
-            List<JobSpecification> jobsToExecute, MetadataProvider 
metadataProvider)
-            throws AlgebricksException {
+            List<JobSpecification> jobsToExecute, MetadataProvider 
metadataProvider) throws AlgebricksException {
         for (int j = 0; j < indexes.size(); j++) {
             jobsToExecute.add(IndexUtil.buildSecondaryIndexCompactJobSpec(ds, 
indexes.get(j), metadataProvider));
 
@@ -2386,6 +2397,11 @@
     }
 
     private void runJob(IHyracksClientConnection hcc, JobSpecification 
jobSpec) throws Exception {
+        runJob(hcc, jobSpec, jobFlags);
+    }
+
+    private static void runJob(IHyracksClientConnection hcc, JobSpecification 
jobSpec, EnumSet<JobFlag> jobFlags)
+            throws Exception {
         JobUtils.runJob(hcc, jobSpec, jobFlags, true);
     }
 

-- 
To view, visit https://asterix-gerrit.ics.uci.edu/1859
To unsubscribe, visit https://asterix-gerrit.ics.uci.edu/settings

Gerrit-MessageType: newchange
Gerrit-Change-Id: I0ccc216bb8bdb2e44ec7d8583faaeec452978f9e
Gerrit-PatchSet: 1
Gerrit-Project: asterixdb
Gerrit-Branch: master
Gerrit-Owner: abdullah alamoudi <bamou...@gmail.com>

Reply via email to