Murtadha Hubail has submitted this change and it was merged.

Change subject: [NO ISSUE][ING] Refactor Active Suspend/Resume Logic
......................................................................


[NO ISSUE][ING] Refactor Active Suspend/Resume Logic

- user model changes: no
- storage format changes: no
- interface changes: yes

Details:
- Refactor the logic for checking DDLs on connected datasets.
- Refactor suspend listener API to allow for suspend for a DDL
  on a dataset.
- Allow suspended active listeners to be unregistered. This is
  done to support removing suspended listeners on active entities
  that were dropped.

Change-Id: I38254582e08d97951a949f7327c8c3d7cf2ab51d
Reviewed-on: https://asterix-gerrit.ics.uci.edu/2999
Tested-by: Jenkins <jenk...@fulliautomatix.ics.uci.edu>
Contrib: Jenkins <jenk...@fulliautomatix.ics.uci.edu>
Integration-Tests: Jenkins <jenk...@fulliautomatix.ics.uci.edu>
Reviewed-by: Till Westmann <ti...@apache.org>
---
M 
asterixdb/asterix-active/src/main/java/org/apache/asterix/active/IActiveEntityEventsListener.java
M 
asterixdb/asterix-app/src/main/java/org/apache/asterix/app/active/ActiveEntityEventsListener.java
M 
asterixdb/asterix-app/src/main/java/org/apache/asterix/app/active/ActiveNotificationHandler.java
M 
asterixdb/asterix-app/src/main/java/org/apache/asterix/app/translator/QueryTranslator.java
M 
asterixdb/asterix-metadata/src/main/java/org/apache/asterix/metadata/entities/Dataset.java
5 files changed, 74 insertions(+), 83 deletions(-)

Approvals:
  Anon. E. Moose #1000171: 
  Till Westmann: Looks good to me, approved
  Jenkins: Verified; ; Verified

Objections:
  Jenkins: Violations found



diff --git 
a/asterixdb/asterix-active/src/main/java/org/apache/asterix/active/IActiveEntityEventsListener.java
 
b/asterixdb/asterix-active/src/main/java/org/apache/asterix/active/IActiveEntityEventsListener.java
index 2d36bb2..ca610aa 100644
--- 
a/asterixdb/asterix-active/src/main/java/org/apache/asterix/active/IActiveEntityEventsListener.java
+++ 
b/asterixdb/asterix-active/src/main/java/org/apache/asterix/active/IActiveEntityEventsListener.java
@@ -85,6 +85,11 @@
     boolean isActive();
 
     /**
+     * @return true, if this {@link IActiveEntityEventsListener} is suspended. 
Otherwise false.
+     */
+    boolean isSuspended();
+
+    /**
      * unregister the listener upon deletion of entity
      *
      * @throws HyracksDataException
diff --git 
a/asterixdb/asterix-app/src/main/java/org/apache/asterix/app/active/ActiveEntityEventsListener.java
 
b/asterixdb/asterix-app/src/main/java/org/apache/asterix/app/active/ActiveEntityEventsListener.java
index 62f2c02..783d823 100644
--- 
a/asterixdb/asterix-app/src/main/java/org/apache/asterix/app/active/ActiveEntityEventsListener.java
+++ 
b/asterixdb/asterix-app/src/main/java/org/apache/asterix/app/active/ActiveEntityEventsListener.java
@@ -681,6 +681,11 @@
     }
 
     @Override
+    public synchronized boolean isSuspended() {
+        return suspended;
+    }
+
+    @Override
     public String toString() {
         return "{\"class\":\"" + getClass().getSimpleName() + "\"" + 
"\"entityId\":\"" + entityId + "\""
                 + "\"state\":\"" + state + "\"" + "}";
diff --git 
a/asterixdb/asterix-app/src/main/java/org/apache/asterix/app/active/ActiveNotificationHandler.java
 
b/asterixdb/asterix-app/src/main/java/org/apache/asterix/app/active/ActiveNotificationHandler.java
index 6eba4ea..a572e28 100644
--- 
a/asterixdb/asterix-app/src/main/java/org/apache/asterix/app/active/ActiveNotificationHandler.java
+++ 
b/asterixdb/asterix-app/src/main/java/org/apache/asterix/app/active/ActiveNotificationHandler.java
@@ -36,7 +36,6 @@
 import org.apache.asterix.metadata.entities.Dataset;
 import org.apache.asterix.metadata.utils.DatasetUtil;
 import org.apache.commons.lang3.tuple.Pair;
-import org.apache.hyracks.algebricks.common.exceptions.AlgebricksException;
 import org.apache.hyracks.api.exceptions.HyracksDataException;
 import org.apache.hyracks.api.exceptions.HyracksException;
 import org.apache.hyracks.api.job.IJobLifecycleListener;
@@ -219,7 +218,7 @@
         if (registeredListener == null) {
             throw new 
RuntimeDataException(ErrorCode.ACTIVE_ENTITY_LISTENER_IS_NOT_REGISTERED, 
listener.getEntityId());
         }
-        if (registeredListener.isActive()) {
+        if (registeredListener.isActive() && 
!registeredListener.isSuspended()) {
             entityEventListeners.put(registeredListener.getEntityId(), 
registeredListener);
             throw new 
RuntimeDataException(ErrorCode.CANNOT_DERIGESTER_ACTIVE_ENTITY_LISTENER, 
listener.getEntityId());
         }
@@ -251,8 +250,7 @@
         }
     }
 
-    public void suspend(MetadataProvider mdProvider)
-            throws AlgebricksException, HyracksDataException, 
InterruptedException {
+    public void suspend(MetadataProvider mdProvider) throws 
HyracksDataException {
         synchronized (this) {
             if (suspended) {
                 throw new 
RuntimeDataException(ErrorCode.ACTIVE_EVENT_HANDLER_ALREADY_SUSPENDED);
@@ -260,54 +258,67 @@
             LOGGER.log(level, "Suspending active events handler");
             suspended = true;
         }
+        Collection<IActiveEntityEventsListener> registeredListeners = 
entityEventListeners.values();
+        for (IActiveEntityEventsListener listener : registeredListeners) {
+            suspendForDdlOrHalt(listener, mdProvider, null);
+        }
+    }
+
+    public void resume(MetadataProvider mdProvider) {
+        LOGGER.log(level, "Resuming active events handler");
+        for (IActiveEntityEventsListener listener : 
entityEventListeners.values()) {
+            resumeOrHalt(listener, mdProvider);
+        }
+        synchronized (this) {
+            suspended = false;
+        }
+    }
+
+    public void suspendForDdlOrHalt(IActiveEntityEventsListener listener, 
MetadataProvider metadataProvider,
+            Dataset targetDataset) {
         try {
-            IMetadataLockManager lockManager = 
mdProvider.getApplicationContext().getMetadataLockManager();
-            Collection<IActiveEntityEventsListener> registeredListeners = 
entityEventListeners.values();
-            for (IActiveEntityEventsListener listener : registeredListeners) {
-                // write lock the listener
-                // exclusive lock all the datasets
-                String dataverseName = listener.getEntityId().getDataverse();
-                String entityName = listener.getEntityId().getEntityName();
-                if (LOGGER.isEnabled(level)) {
-                    LOGGER.log(level, "Suspending " + listener.getEntityId());
-                }
-                LOGGER.log(level, "Acquiring locks");
-                
lockManager.acquireActiveEntityWriteLock(mdProvider.getLocks(), dataverseName + 
'.' + entityName);
-                List<Dataset> datasets = ((ActiveEntityEventsListener) 
listener).getDatasets();
-                for (Dataset dataset : datasets) {
-                    
lockManager.acquireDatasetExclusiveModificationLock(mdProvider.getLocks(),
-                            DatasetUtil.getFullyQualifiedName(dataset));
-                }
-                LOGGER.log(level, "locks acquired");
-                ((ActiveEntityEventsListener) listener).suspend(mdProvider);
-                if (LOGGER.isEnabled(level)) {
-                    LOGGER.log(level, listener.getEntityId() + " suspended");
-                }
+            // write lock the listener
+            // exclusive lock all the datasets (except the target dataset)
+            IMetadataLockManager lockManager = 
metadataProvider.getApplicationContext().getMetadataLockManager();
+            String dataverseName = listener.getEntityId().getDataverse();
+            String entityName = listener.getEntityId().getEntityName();
+            if (LOGGER.isEnabled(level)) {
+                LOGGER.log(level, "Suspending " + listener.getEntityId());
             }
-        } catch (Throwable th) {
+            LOGGER.log(level, "Acquiring locks");
+            
lockManager.acquireActiveEntityWriteLock(metadataProvider.getLocks(), 
dataverseName + '.' + entityName);
+            List<Dataset> datasets = ((ActiveEntityEventsListener) 
listener).getDatasets();
+            for (Dataset dataset : datasets) {
+                if (targetDataset != null && targetDataset.equals(dataset)) {
+                    // DDL operation already acquired the proper lock for the 
operation
+                    continue;
+                }
+                
lockManager.acquireDatasetExclusiveModificationLock(metadataProvider.getLocks(),
+                        DatasetUtil.getFullyQualifiedName(dataset));
+            }
+            LOGGER.log(level, "locks acquired");
+            ((ActiveEntityEventsListener) listener).suspend(metadataProvider);
+            if (LOGGER.isEnabled(level)) {
+                LOGGER.log(level, listener.getEntityId() + " suspended");
+            }
+        } catch (Throwable th) { // NOSONAR must halt in case of any failure
             LOGGER.error("Suspend active failed", th);
             ExitUtil.halt(ExitUtil.EC_ACTIVE_SUSPEND_FAILURE);
         }
     }
 
-    public void resume(MetadataProvider mdProvider) throws 
HyracksDataException {
-        LOGGER.log(level, "Resuming active events handler");
+    public void resumeOrHalt(IActiveEntityEventsListener listener, 
MetadataProvider metadataProvider) {
         try {
-            for (IActiveEntityEventsListener listener : 
entityEventListeners.values()) {
-                if (LOGGER.isEnabled(level)) {
-                    LOGGER.log(level, "Resuming " + listener.getEntityId());
-                }
-                ((ActiveEntityEventsListener) listener).resume(mdProvider);
-                if (LOGGER.isEnabled(level)) {
-                    LOGGER.log(level, listener.getEntityId() + " resumed");
-                }
+            if (LOGGER.isEnabled(level)) {
+                LOGGER.log(level, "Resuming " + listener.getEntityId());
             }
-        } catch (Throwable th) {
+            ((ActiveEntityEventsListener) listener).resume(metadataProvider);
+            if (LOGGER.isEnabled(level)) {
+                LOGGER.log(level, listener.getEntityId() + " resumed");
+            }
+        } catch (Throwable th) { // NOSONAR must halt in case of any failure
             LOGGER.error("Resume active failed", th);
             ExitUtil.halt(ExitUtil.EC_ACTIVE_RESUME_FAILURE);
-        }
-        synchronized (this) {
-            suspended = false;
         }
     }
 }
diff --git 
a/asterixdb/asterix-app/src/main/java/org/apache/asterix/app/translator/QueryTranslator.java
 
b/asterixdb/asterix-app/src/main/java/org/apache/asterix/app/translator/QueryTranslator.java
index 8e86b9c..cffa178 100644
--- 
a/asterixdb/asterix-app/src/main/java/org/apache/asterix/app/translator/QueryTranslator.java
+++ 
b/asterixdb/asterix-app/src/main/java/org/apache/asterix/app/translator/QueryTranslator.java
@@ -70,7 +70,6 @@
 import org.apache.asterix.common.utils.JobUtils;
 import org.apache.asterix.common.utils.JobUtils.ProgressState;
 import org.apache.asterix.compiler.provider.ILangCompilationProvider;
-import org.apache.asterix.external.feed.management.FeedConnectionId;
 import org.apache.asterix.external.indexing.ExternalFile;
 import org.apache.asterix.external.indexing.IndexingConstants;
 import org.apache.asterix.external.operators.FeedIntakeOperatorNodePushable;
@@ -728,22 +727,14 @@
 
     protected static void 
validateIfResourceIsActiveInFeed(ICcApplicationContext appCtx, Dataset dataset,
             SourceLocation sourceLoc) throws CompilationException {
-        StringBuilder builder = null;
         ActiveNotificationHandler activeEventHandler =
                 (ActiveNotificationHandler) 
appCtx.getActiveNotificationHandler();
         IActiveEntityEventsListener[] listeners = 
activeEventHandler.getEventListeners();
         for (IActiveEntityEventsListener listener : listeners) {
             if (listener.isEntityUsingDataset(dataset) && listener.isActive()) 
{
-                if (builder == null) {
-                    builder = new StringBuilder();
-                }
-                builder.append(listener.getEntityId() + "\n");
+                throw new 
CompilationException(ErrorCode.COMPILATION_CANT_DROP_ACTIVE_DATASET, sourceLoc,
+                        dataset.getFullyQualifiedName(), 
listener.getEntityId().toString());
             }
-        }
-        if (builder != null) {
-            throw new CompilationException(ErrorCode.COMPILATION_ERROR, 
sourceLoc,
-                    "Dataset " + dataset.getDataverseName() + "." + 
dataset.getDatasetName() + " is currently being "
-                            + "fed into by the following active entities.\n" + 
builder.toString());
         }
     }
 
@@ -935,7 +926,7 @@
         }
     }
 
-    public static void doCreateIndex(IHyracksClientConnection hcc, 
MetadataProvider metadataProvider, Dataset ds,
+    protected void doCreateIndex(IHyracksClientConnection hcc, 
MetadataProvider metadataProvider, Dataset ds,
             Index index, EnumSet<JobFlag> jobFlags, SourceLocation sourceLoc) 
throws Exception {
         ProgressState progress = ProgressState.NO_PROGRESS;
         boolean bActiveTxn = true;
@@ -949,7 +940,7 @@
         try {
             index.setPendingOp(MetadataUtil.PENDING_ADD_OP);
             if (ds.getDatasetType() == DatasetType.INTERNAL) {
-                
validateIfResourceIsActiveInFeed(metadataProvider.getApplicationContext(), ds, 
sourceLoc);
+                validateDatasetState(metadataProvider, ds, sourceLoc);
             } else {
                 // External dataset
                 // Check if the dataset is indexible
@@ -1414,7 +1405,7 @@
         }
     }
 
-    public static void doDropDataset(String dataverseName, String datasetName, 
MetadataProvider metadataProvider,
+    public void doDropDataset(String dataverseName, String datasetName, 
MetadataProvider metadataProvider,
             boolean ifExists, IHyracksClientConnection hcc, boolean 
dropCorrespondingNodeGroup,
             SourceLocation sourceLoc) throws Exception {
         MutableObject<ProgressState> progress = new 
MutableObject<>(ProgressState.NO_PROGRESS);
@@ -1434,6 +1425,7 @@
                             dataverseName);
                 }
             }
+            validateDatasetState(metadataProvider, ds, sourceLoc);
             ds.drop(metadataProvider, mdTxnCtx, jobsToExecute, bActiveTxn, 
progress, hcc, dropCorrespondingNodeGroup,
                     sourceLoc);
             MetadataManager.INSTANCE.commitTransaction(mdTxnCtx.getValue());
@@ -1497,23 +1489,6 @@
                 throw new 
CompilationException(ErrorCode.UNKNOWN_DATASET_IN_DATAVERSE, sourceLoc, 
datasetName,
                         dataverseName);
             }
-            ActiveNotificationHandler activeEventHandler =
-                    (ActiveNotificationHandler) 
appCtx.getActiveNotificationHandler();
-            IActiveEntityEventsListener[] listeners = 
activeEventHandler.getEventListeners();
-            StringBuilder builder = null;
-            for (IActiveEntityEventsListener listener : listeners) {
-                if (listener.isEntityUsingDataset(ds)) {
-                    if (builder == null) {
-                        builder = new StringBuilder();
-                    }
-                    builder.append(new 
FeedConnectionId(listener.getEntityId(), datasetName) + "\n");
-                }
-            }
-            if (builder != null) {
-                throw new CompilationException(ErrorCode.COMPILATION_ERROR, 
sourceLoc, "Dataset" + datasetName
-                        + " is currently being fed into by the following 
active entities: " + builder.toString());
-            }
-
             if (ds.getDatasetType() == DatasetType.INTERNAL) {
                 Index index = MetadataManager.INSTANCE.getIndex(mdTxnCtx, 
dataverseName, datasetName, indexName);
                 if (index == null) {
@@ -1525,6 +1500,7 @@
                     }
                 }
                 ensureNonPrimaryIndexDrop(index, sourceLoc);
+                validateDatasetState(metadataProvider, ds, sourceLoc);
                 // #. prepare a job to drop the index in NC.
                 jobsToExecute.add(IndexUtil.buildDropIndexJobSpec(index, 
metadataProvider, ds, sourceLoc));
 
@@ -2986,4 +2962,9 @@
         }
         return m;
     }
+
+    protected void validateDatasetState(MetadataProvider metadataProvider, 
Dataset dataset, SourceLocation sourceLoc)
+            throws Exception {
+        
validateIfResourceIsActiveInFeed(metadataProvider.getApplicationContext(), 
dataset, sourceLoc);
+    }
 }
diff --git 
a/asterixdb/asterix-metadata/src/main/java/org/apache/asterix/metadata/entities/Dataset.java
 
b/asterixdb/asterix-metadata/src/main/java/org/apache/asterix/metadata/entities/Dataset.java
index 8471d45..a25ed20 100644
--- 
a/asterixdb/asterix-metadata/src/main/java/org/apache/asterix/metadata/entities/Dataset.java
+++ 
b/asterixdb/asterix-metadata/src/main/java/org/apache/asterix/metadata/entities/Dataset.java
@@ -341,17 +341,6 @@
             throws Exception {
         Map<FeedConnectionId, Pair<JobSpecification, Boolean>> 
disconnectJobList = new HashMap<>();
         if (getDatasetType() == DatasetType.INTERNAL) {
-            // prepare job spec(s) that would disconnect any active feeds 
involving the dataset.
-            IActiveNotificationHandler activeListener = 
(IActiveNotificationHandler) metadataProvider
-                    .getApplicationContext().getActiveNotificationHandler();
-            IActiveEntityEventsListener[] activeListeners = 
activeListener.getEventListeners();
-            for (IActiveEntityEventsListener listener : activeListeners) {
-                if (listener.isEntityUsingDataset(this)) {
-                    throw new 
CompilationException(ErrorCode.COMPILATION_CANT_DROP_ACTIVE_DATASET,
-                            RecordUtil.toFullyQualifiedName(dataverseName, 
datasetName),
-                            listener.getEntityId().toString());
-                }
-            }
             // #. prepare jobs to drop the datatset and the indexes in NC
             List<Index> indexes =
                     
MetadataManager.INSTANCE.getDatasetIndexes(mdTxnCtx.getValue(), dataverseName, 
datasetName);

-- 
To view, visit https://asterix-gerrit.ics.uci.edu/2999
To unsubscribe, visit https://asterix-gerrit.ics.uci.edu/settings

Gerrit-MessageType: merged
Gerrit-Change-Id: I38254582e08d97951a949f7327c8c3d7cf2ab51d
Gerrit-PatchSet: 3
Gerrit-Project: asterixdb
Gerrit-Branch: master
Gerrit-Owner: Murtadha Hubail <mhub...@apache.org>
Gerrit-Reviewer: Anon. E. Moose #1000171
Gerrit-Reviewer: Jenkins <jenk...@fulliautomatix.ics.uci.edu>
Gerrit-Reviewer: Murtadha Hubail <mhub...@apache.org>
Gerrit-Reviewer: Till Westmann <ti...@apache.org>

Reply via email to