Murtadha Hubail has submitted this change and it was merged. Change subject: [NO ISSUE][ING] Refactor Active Suspend/Resume Logic ......................................................................
[NO ISSUE][ING] Refactor Active Suspend/Resume Logic - user model changes: no - storage format changes: no - interface changes: yes Details: - Refactor the logic for checking DDLs on connected datasets. - Refactor suspend listener API to allow for suspend for a DDL on a dataset. - Allow suspended active listeners to be unregistered. This is done to support removing suspended listeners on active entities that were dropped. Change-Id: I38254582e08d97951a949f7327c8c3d7cf2ab51d Reviewed-on: https://asterix-gerrit.ics.uci.edu/2999 Tested-by: Jenkins <jenk...@fulliautomatix.ics.uci.edu> Contrib: Jenkins <jenk...@fulliautomatix.ics.uci.edu> Integration-Tests: Jenkins <jenk...@fulliautomatix.ics.uci.edu> Reviewed-by: Till Westmann <ti...@apache.org> --- M asterixdb/asterix-active/src/main/java/org/apache/asterix/active/IActiveEntityEventsListener.java M asterixdb/asterix-app/src/main/java/org/apache/asterix/app/active/ActiveEntityEventsListener.java M asterixdb/asterix-app/src/main/java/org/apache/asterix/app/active/ActiveNotificationHandler.java M asterixdb/asterix-app/src/main/java/org/apache/asterix/app/translator/QueryTranslator.java M asterixdb/asterix-metadata/src/main/java/org/apache/asterix/metadata/entities/Dataset.java 5 files changed, 74 insertions(+), 83 deletions(-) Approvals: Anon. E. Moose #1000171: Till Westmann: Looks good to me, approved Jenkins: Verified; ; Verified Objections: Jenkins: Violations found diff --git a/asterixdb/asterix-active/src/main/java/org/apache/asterix/active/IActiveEntityEventsListener.java b/asterixdb/asterix-active/src/main/java/org/apache/asterix/active/IActiveEntityEventsListener.java index 2d36bb2..ca610aa 100644 --- a/asterixdb/asterix-active/src/main/java/org/apache/asterix/active/IActiveEntityEventsListener.java +++ b/asterixdb/asterix-active/src/main/java/org/apache/asterix/active/IActiveEntityEventsListener.java @@ -85,6 +85,11 @@ boolean isActive(); /** + * @return true, if this {@link IActiveEntityEventsListener} is suspended. Otherwise false. + */ + boolean isSuspended(); + + /** * unregister the listener upon deletion of entity * * @throws HyracksDataException diff --git a/asterixdb/asterix-app/src/main/java/org/apache/asterix/app/active/ActiveEntityEventsListener.java b/asterixdb/asterix-app/src/main/java/org/apache/asterix/app/active/ActiveEntityEventsListener.java index 62f2c02..783d823 100644 --- a/asterixdb/asterix-app/src/main/java/org/apache/asterix/app/active/ActiveEntityEventsListener.java +++ b/asterixdb/asterix-app/src/main/java/org/apache/asterix/app/active/ActiveEntityEventsListener.java @@ -681,6 +681,11 @@ } @Override + public synchronized boolean isSuspended() { + return suspended; + } + + @Override public String toString() { return "{\"class\":\"" + getClass().getSimpleName() + "\"" + "\"entityId\":\"" + entityId + "\"" + "\"state\":\"" + state + "\"" + "}"; diff --git a/asterixdb/asterix-app/src/main/java/org/apache/asterix/app/active/ActiveNotificationHandler.java b/asterixdb/asterix-app/src/main/java/org/apache/asterix/app/active/ActiveNotificationHandler.java index 6eba4ea..a572e28 100644 --- a/asterixdb/asterix-app/src/main/java/org/apache/asterix/app/active/ActiveNotificationHandler.java +++ b/asterixdb/asterix-app/src/main/java/org/apache/asterix/app/active/ActiveNotificationHandler.java @@ -36,7 +36,6 @@ import org.apache.asterix.metadata.entities.Dataset; import org.apache.asterix.metadata.utils.DatasetUtil; import org.apache.commons.lang3.tuple.Pair; -import org.apache.hyracks.algebricks.common.exceptions.AlgebricksException; import org.apache.hyracks.api.exceptions.HyracksDataException; import org.apache.hyracks.api.exceptions.HyracksException; import org.apache.hyracks.api.job.IJobLifecycleListener; @@ -219,7 +218,7 @@ if (registeredListener == null) { throw new RuntimeDataException(ErrorCode.ACTIVE_ENTITY_LISTENER_IS_NOT_REGISTERED, listener.getEntityId()); } - if (registeredListener.isActive()) { + if (registeredListener.isActive() && !registeredListener.isSuspended()) { entityEventListeners.put(registeredListener.getEntityId(), registeredListener); throw new RuntimeDataException(ErrorCode.CANNOT_DERIGESTER_ACTIVE_ENTITY_LISTENER, listener.getEntityId()); } @@ -251,8 +250,7 @@ } } - public void suspend(MetadataProvider mdProvider) - throws AlgebricksException, HyracksDataException, InterruptedException { + public void suspend(MetadataProvider mdProvider) throws HyracksDataException { synchronized (this) { if (suspended) { throw new RuntimeDataException(ErrorCode.ACTIVE_EVENT_HANDLER_ALREADY_SUSPENDED); @@ -260,54 +258,67 @@ LOGGER.log(level, "Suspending active events handler"); suspended = true; } + Collection<IActiveEntityEventsListener> registeredListeners = entityEventListeners.values(); + for (IActiveEntityEventsListener listener : registeredListeners) { + suspendForDdlOrHalt(listener, mdProvider, null); + } + } + + public void resume(MetadataProvider mdProvider) { + LOGGER.log(level, "Resuming active events handler"); + for (IActiveEntityEventsListener listener : entityEventListeners.values()) { + resumeOrHalt(listener, mdProvider); + } + synchronized (this) { + suspended = false; + } + } + + public void suspendForDdlOrHalt(IActiveEntityEventsListener listener, MetadataProvider metadataProvider, + Dataset targetDataset) { try { - IMetadataLockManager lockManager = mdProvider.getApplicationContext().getMetadataLockManager(); - Collection<IActiveEntityEventsListener> registeredListeners = entityEventListeners.values(); - for (IActiveEntityEventsListener listener : registeredListeners) { - // write lock the listener - // exclusive lock all the datasets - String dataverseName = listener.getEntityId().getDataverse(); - String entityName = listener.getEntityId().getEntityName(); - if (LOGGER.isEnabled(level)) { - LOGGER.log(level, "Suspending " + listener.getEntityId()); - } - LOGGER.log(level, "Acquiring locks"); - lockManager.acquireActiveEntityWriteLock(mdProvider.getLocks(), dataverseName + '.' + entityName); - List<Dataset> datasets = ((ActiveEntityEventsListener) listener).getDatasets(); - for (Dataset dataset : datasets) { - lockManager.acquireDatasetExclusiveModificationLock(mdProvider.getLocks(), - DatasetUtil.getFullyQualifiedName(dataset)); - } - LOGGER.log(level, "locks acquired"); - ((ActiveEntityEventsListener) listener).suspend(mdProvider); - if (LOGGER.isEnabled(level)) { - LOGGER.log(level, listener.getEntityId() + " suspended"); - } + // write lock the listener + // exclusive lock all the datasets (except the target dataset) + IMetadataLockManager lockManager = metadataProvider.getApplicationContext().getMetadataLockManager(); + String dataverseName = listener.getEntityId().getDataverse(); + String entityName = listener.getEntityId().getEntityName(); + if (LOGGER.isEnabled(level)) { + LOGGER.log(level, "Suspending " + listener.getEntityId()); } - } catch (Throwable th) { + LOGGER.log(level, "Acquiring locks"); + lockManager.acquireActiveEntityWriteLock(metadataProvider.getLocks(), dataverseName + '.' + entityName); + List<Dataset> datasets = ((ActiveEntityEventsListener) listener).getDatasets(); + for (Dataset dataset : datasets) { + if (targetDataset != null && targetDataset.equals(dataset)) { + // DDL operation already acquired the proper lock for the operation + continue; + } + lockManager.acquireDatasetExclusiveModificationLock(metadataProvider.getLocks(), + DatasetUtil.getFullyQualifiedName(dataset)); + } + LOGGER.log(level, "locks acquired"); + ((ActiveEntityEventsListener) listener).suspend(metadataProvider); + if (LOGGER.isEnabled(level)) { + LOGGER.log(level, listener.getEntityId() + " suspended"); + } + } catch (Throwable th) { // NOSONAR must halt in case of any failure LOGGER.error("Suspend active failed", th); ExitUtil.halt(ExitUtil.EC_ACTIVE_SUSPEND_FAILURE); } } - public void resume(MetadataProvider mdProvider) throws HyracksDataException { - LOGGER.log(level, "Resuming active events handler"); + public void resumeOrHalt(IActiveEntityEventsListener listener, MetadataProvider metadataProvider) { try { - for (IActiveEntityEventsListener listener : entityEventListeners.values()) { - if (LOGGER.isEnabled(level)) { - LOGGER.log(level, "Resuming " + listener.getEntityId()); - } - ((ActiveEntityEventsListener) listener).resume(mdProvider); - if (LOGGER.isEnabled(level)) { - LOGGER.log(level, listener.getEntityId() + " resumed"); - } + if (LOGGER.isEnabled(level)) { + LOGGER.log(level, "Resuming " + listener.getEntityId()); } - } catch (Throwable th) { + ((ActiveEntityEventsListener) listener).resume(metadataProvider); + if (LOGGER.isEnabled(level)) { + LOGGER.log(level, listener.getEntityId() + " resumed"); + } + } catch (Throwable th) { // NOSONAR must halt in case of any failure LOGGER.error("Resume active failed", th); ExitUtil.halt(ExitUtil.EC_ACTIVE_RESUME_FAILURE); - } - synchronized (this) { - suspended = false; } } } diff --git a/asterixdb/asterix-app/src/main/java/org/apache/asterix/app/translator/QueryTranslator.java b/asterixdb/asterix-app/src/main/java/org/apache/asterix/app/translator/QueryTranslator.java index 8e86b9c..cffa178 100644 --- a/asterixdb/asterix-app/src/main/java/org/apache/asterix/app/translator/QueryTranslator.java +++ b/asterixdb/asterix-app/src/main/java/org/apache/asterix/app/translator/QueryTranslator.java @@ -70,7 +70,6 @@ import org.apache.asterix.common.utils.JobUtils; import org.apache.asterix.common.utils.JobUtils.ProgressState; import org.apache.asterix.compiler.provider.ILangCompilationProvider; -import org.apache.asterix.external.feed.management.FeedConnectionId; import org.apache.asterix.external.indexing.ExternalFile; import org.apache.asterix.external.indexing.IndexingConstants; import org.apache.asterix.external.operators.FeedIntakeOperatorNodePushable; @@ -728,22 +727,14 @@ protected static void validateIfResourceIsActiveInFeed(ICcApplicationContext appCtx, Dataset dataset, SourceLocation sourceLoc) throws CompilationException { - StringBuilder builder = null; ActiveNotificationHandler activeEventHandler = (ActiveNotificationHandler) appCtx.getActiveNotificationHandler(); IActiveEntityEventsListener[] listeners = activeEventHandler.getEventListeners(); for (IActiveEntityEventsListener listener : listeners) { if (listener.isEntityUsingDataset(dataset) && listener.isActive()) { - if (builder == null) { - builder = new StringBuilder(); - } - builder.append(listener.getEntityId() + "\n"); + throw new CompilationException(ErrorCode.COMPILATION_CANT_DROP_ACTIVE_DATASET, sourceLoc, + dataset.getFullyQualifiedName(), listener.getEntityId().toString()); } - } - if (builder != null) { - throw new CompilationException(ErrorCode.COMPILATION_ERROR, sourceLoc, - "Dataset " + dataset.getDataverseName() + "." + dataset.getDatasetName() + " is currently being " - + "fed into by the following active entities.\n" + builder.toString()); } } @@ -935,7 +926,7 @@ } } - public static void doCreateIndex(IHyracksClientConnection hcc, MetadataProvider metadataProvider, Dataset ds, + protected void doCreateIndex(IHyracksClientConnection hcc, MetadataProvider metadataProvider, Dataset ds, Index index, EnumSet<JobFlag> jobFlags, SourceLocation sourceLoc) throws Exception { ProgressState progress = ProgressState.NO_PROGRESS; boolean bActiveTxn = true; @@ -949,7 +940,7 @@ try { index.setPendingOp(MetadataUtil.PENDING_ADD_OP); if (ds.getDatasetType() == DatasetType.INTERNAL) { - validateIfResourceIsActiveInFeed(metadataProvider.getApplicationContext(), ds, sourceLoc); + validateDatasetState(metadataProvider, ds, sourceLoc); } else { // External dataset // Check if the dataset is indexible @@ -1414,7 +1405,7 @@ } } - public static void doDropDataset(String dataverseName, String datasetName, MetadataProvider metadataProvider, + public void doDropDataset(String dataverseName, String datasetName, MetadataProvider metadataProvider, boolean ifExists, IHyracksClientConnection hcc, boolean dropCorrespondingNodeGroup, SourceLocation sourceLoc) throws Exception { MutableObject<ProgressState> progress = new MutableObject<>(ProgressState.NO_PROGRESS); @@ -1434,6 +1425,7 @@ dataverseName); } } + validateDatasetState(metadataProvider, ds, sourceLoc); ds.drop(metadataProvider, mdTxnCtx, jobsToExecute, bActiveTxn, progress, hcc, dropCorrespondingNodeGroup, sourceLoc); MetadataManager.INSTANCE.commitTransaction(mdTxnCtx.getValue()); @@ -1497,23 +1489,6 @@ throw new CompilationException(ErrorCode.UNKNOWN_DATASET_IN_DATAVERSE, sourceLoc, datasetName, dataverseName); } - ActiveNotificationHandler activeEventHandler = - (ActiveNotificationHandler) appCtx.getActiveNotificationHandler(); - IActiveEntityEventsListener[] listeners = activeEventHandler.getEventListeners(); - StringBuilder builder = null; - for (IActiveEntityEventsListener listener : listeners) { - if (listener.isEntityUsingDataset(ds)) { - if (builder == null) { - builder = new StringBuilder(); - } - builder.append(new FeedConnectionId(listener.getEntityId(), datasetName) + "\n"); - } - } - if (builder != null) { - throw new CompilationException(ErrorCode.COMPILATION_ERROR, sourceLoc, "Dataset" + datasetName - + " is currently being fed into by the following active entities: " + builder.toString()); - } - if (ds.getDatasetType() == DatasetType.INTERNAL) { Index index = MetadataManager.INSTANCE.getIndex(mdTxnCtx, dataverseName, datasetName, indexName); if (index == null) { @@ -1525,6 +1500,7 @@ } } ensureNonPrimaryIndexDrop(index, sourceLoc); + validateDatasetState(metadataProvider, ds, sourceLoc); // #. prepare a job to drop the index in NC. jobsToExecute.add(IndexUtil.buildDropIndexJobSpec(index, metadataProvider, ds, sourceLoc)); @@ -2986,4 +2962,9 @@ } return m; } + + protected void validateDatasetState(MetadataProvider metadataProvider, Dataset dataset, SourceLocation sourceLoc) + throws Exception { + validateIfResourceIsActiveInFeed(metadataProvider.getApplicationContext(), dataset, sourceLoc); + } } diff --git a/asterixdb/asterix-metadata/src/main/java/org/apache/asterix/metadata/entities/Dataset.java b/asterixdb/asterix-metadata/src/main/java/org/apache/asterix/metadata/entities/Dataset.java index 8471d45..a25ed20 100644 --- a/asterixdb/asterix-metadata/src/main/java/org/apache/asterix/metadata/entities/Dataset.java +++ b/asterixdb/asterix-metadata/src/main/java/org/apache/asterix/metadata/entities/Dataset.java @@ -341,17 +341,6 @@ throws Exception { Map<FeedConnectionId, Pair<JobSpecification, Boolean>> disconnectJobList = new HashMap<>(); if (getDatasetType() == DatasetType.INTERNAL) { - // prepare job spec(s) that would disconnect any active feeds involving the dataset. - IActiveNotificationHandler activeListener = (IActiveNotificationHandler) metadataProvider - .getApplicationContext().getActiveNotificationHandler(); - IActiveEntityEventsListener[] activeListeners = activeListener.getEventListeners(); - for (IActiveEntityEventsListener listener : activeListeners) { - if (listener.isEntityUsingDataset(this)) { - throw new CompilationException(ErrorCode.COMPILATION_CANT_DROP_ACTIVE_DATASET, - RecordUtil.toFullyQualifiedName(dataverseName, datasetName), - listener.getEntityId().toString()); - } - } // #. prepare jobs to drop the datatset and the indexes in NC List<Index> indexes = MetadataManager.INSTANCE.getDatasetIndexes(mdTxnCtx.getValue(), dataverseName, datasetName); -- To view, visit https://asterix-gerrit.ics.uci.edu/2999 To unsubscribe, visit https://asterix-gerrit.ics.uci.edu/settings Gerrit-MessageType: merged Gerrit-Change-Id: I38254582e08d97951a949f7327c8c3d7cf2ab51d Gerrit-PatchSet: 3 Gerrit-Project: asterixdb Gerrit-Branch: master Gerrit-Owner: Murtadha Hubail <mhub...@apache.org> Gerrit-Reviewer: Anon. E. Moose #1000171 Gerrit-Reviewer: Jenkins <jenk...@fulliautomatix.ics.uci.edu> Gerrit-Reviewer: Murtadha Hubail <mhub...@apache.org> Gerrit-Reviewer: Till Westmann <ti...@apache.org>