abdullah alamoudi has uploaded a new change for review. https://asterix-gerrit.ics.uci.edu/2618
Change subject: [NO ISSUE][ING] Merge STOPPED and PERMANENT_FAILURE Active States ...................................................................... [NO ISSUE][ING] Merge STOPPED and PERMANENT_FAILURE Active States Change-Id: I7f3b14aec46728fbe8b256b915d0e30992b2fe47 --- M asterixdb/asterix-active/src/main/java/org/apache/asterix/active/ActivityState.java M asterixdb/asterix-app/src/main/java/org/apache/asterix/app/active/ActiveEntityEventsListener.java M asterixdb/asterix-app/src/main/java/org/apache/asterix/app/active/FeedEventsListener.java M asterixdb/asterix-app/src/main/java/org/apache/asterix/app/active/RecoveryTask.java M asterixdb/asterix-app/src/test/java/org/apache/asterix/test/active/ActiveEventsListenerTest.java M asterixdb/asterix-app/src/test/java/org/apache/asterix/test/active/DummyFeedEventsListener.java M asterixdb/asterix-app/src/test/java/org/apache/asterix/test/active/TestEventsListener.java M asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/feed/watch/WaitForStateSubscriber.java 8 files changed, 40 insertions(+), 66 deletions(-) git pull ssh://asterix-gerrit.ics.uci.edu:29418/asterixdb refs/changes/18/2618/1 diff --git a/asterixdb/asterix-active/src/main/java/org/apache/asterix/active/ActivityState.java b/asterixdb/asterix-active/src/main/java/org/apache/asterix/active/ActivityState.java index eb43d10..9765620 100644 --- a/asterixdb/asterix-active/src/main/java/org/apache/asterix/active/ActivityState.java +++ b/asterixdb/asterix-active/src/main/java/org/apache/asterix/active/ActivityState.java @@ -24,11 +24,6 @@ */ STOPPED, /** - * Failure to recover from a temporary faliure caused the activity to fail permanantly. - * No further recovery attempts will be made. - */ - PERMANENTLY_FAILED, - /** * An unexpected failure caused the activity to fail but recovery attempts will start taking place */ TEMPORARILY_FAILED, diff --git a/asterixdb/asterix-app/src/main/java/org/apache/asterix/app/active/ActiveEntityEventsListener.java b/asterixdb/asterix-app/src/main/java/org/apache/asterix/app/active/ActiveEntityEventsListener.java index 2a214e3..18f80dc 100644 --- a/asterixdb/asterix-app/src/main/java/org/apache/asterix/app/active/ActiveEntityEventsListener.java +++ b/asterixdb/asterix-app/src/main/java/org/apache/asterix/app/active/ActiveEntityEventsListener.java @@ -129,7 +129,10 @@ LOGGER.log(level, "State of " + getEntityId() + "is being set to " + newState + " from " + state); this.prevState = state; this.state = newState; - if (newState == ActivityState.SUSPENDED) { + if (newState == ActivityState.STARTING || newState == ActivityState.RECOVERING + || newState == ActivityState.RESUMING) { + jobFailure = null; + } else if (newState == ActivityState.SUSPENDED) { suspended = true; } notifySubscribers(STATE_CHANGED); @@ -342,22 +345,12 @@ } } - /** - * this method is called before an action call is returned. It ensures that the request didn't fail - * - */ - protected synchronized void checkNoFailure() throws HyracksDataException { - if (state == ActivityState.PERMANENTLY_FAILED) { - throw HyracksDataException.create(jobFailure); - } - } - @Override public synchronized void recover() { LOGGER.log(level, "Recover is called on " + entityId); if (retryPolicyFactory == NoRetryPolicyFactory.INSTANCE) { LOGGER.log(level, "But it has no recovery policy, so it is set to permanent failure"); - setState(ActivityState.PERMANENTLY_FAILED); + setState(ActivityState.STOPPED); } else { ExecutorService executor = appCtx.getServiceContext().getControllerService().getExecutor(); setState(ActivityState.TEMPORARILY_FAILED); @@ -371,7 +364,7 @@ public synchronized void start(MetadataProvider metadataProvider) throws HyracksDataException, InterruptedException { waitForNonTransitionState(); - if (state != ActivityState.PERMANENTLY_FAILED && state != ActivityState.STOPPED) { + if (state != ActivityState.STOPPED) { throw new RuntimeDataException(ErrorCode.ACTIVE_ENTITY_ALREADY_STARTED, entityId, state); } try { @@ -379,7 +372,7 @@ doStart(metadataProvider); setRunning(metadataProvider, true); } catch (Exception e) { - setState(ActivityState.PERMANENTLY_FAILED); + setState(ActivityState.STOPPED); LOGGER.log(Level.ERROR, "Failed to start the entity " + entityId, e); throw HyracksDataException.create(e); } @@ -398,11 +391,10 @@ @Override public synchronized void stop(MetadataProvider metadataProvider) throws HyracksDataException, InterruptedException { waitForNonTransitionState(); - if (state != ActivityState.RUNNING && state != ActivityState.PERMANENTLY_FAILED - && state != ActivityState.TEMPORARILY_FAILED) { + if (state != ActivityState.RUNNING && state != ActivityState.TEMPORARILY_FAILED) { throw new RuntimeDataException(ErrorCode.ACTIVE_ENTITY_CANNOT_BE_STOPPED, entityId, state); } - if (state == ActivityState.TEMPORARILY_FAILED || state == ActivityState.PERMANENTLY_FAILED) { + if (state == ActivityState.TEMPORARILY_FAILED) { if (rt != null) { setState(ActivityState.STOPPING); rt.cancel(); @@ -421,7 +413,7 @@ doStop(metadataProvider); setRunning(metadataProvider, false); } catch (Exception e) { - setState(ActivityState.PERMANENTLY_FAILED); + setState(ActivityState.STOPPED); LOGGER.log(Level.ERROR, "Failed to stop the entity " + entityId, e); throw HyracksDataException.create(e); } @@ -445,7 +437,7 @@ LOGGER.log(level, "Waiting for ongoing activities"); waitForNonTransitionState(); LOGGER.log(level, "Proceeding with suspension. Current state is " + state); - if (state == ActivityState.STOPPED || state == ActivityState.PERMANENTLY_FAILED) { + if (state == ActivityState.STOPPED) { suspended = true; return; } @@ -479,7 +471,7 @@ // restore state setState(prevState); } else { - setState(ActivityState.PERMANENTLY_FAILED); + setState(ActivityState.STOPPED); } } throw HyracksDataException.create(e); @@ -489,7 +481,7 @@ @Override public synchronized void resume(MetadataProvider metadataProvider) throws HyracksDataException { - if (state == ActivityState.STOPPED || state == ActivityState.PERMANENTLY_FAILED) { + if (state == ActivityState.STOPPED) { suspended = false; notifyAll(); return; @@ -517,7 +509,7 @@ @Override public boolean isActive() { - return state != ActivityState.STOPPED && state != ActivityState.PERMANENTLY_FAILED; + return state != ActivityState.STOPPED; } @Override diff --git a/asterixdb/asterix-app/src/main/java/org/apache/asterix/app/active/FeedEventsListener.java b/asterixdb/asterix-app/src/main/java/org/apache/asterix/app/active/FeedEventsListener.java index fbf644f..4785c2f 100644 --- a/asterixdb/asterix-app/src/main/java/org/apache/asterix/app/active/FeedEventsListener.java +++ b/asterixdb/asterix-app/src/main/java/org/apache/asterix/app/active/FeedEventsListener.java @@ -25,17 +25,11 @@ import org.apache.asterix.active.EntityId; import org.apache.asterix.active.IActiveEntityEventSubscriber; import org.apache.asterix.active.IRetryPolicyFactory; -import org.apache.asterix.app.translator.DefaultStatementExecutorFactory; -import org.apache.asterix.app.translator.QueryTranslator; -import org.apache.asterix.common.context.IStorageComponentProvider; import org.apache.asterix.common.dataflow.ICcApplicationContext; import org.apache.asterix.common.exceptions.ErrorCode; import org.apache.asterix.common.exceptions.RuntimeDataException; import org.apache.asterix.common.utils.JobUtils; -import org.apache.asterix.compiler.provider.AqlCompilationProvider; -import org.apache.asterix.compiler.provider.ILangCompilationProvider; import org.apache.asterix.external.feed.watch.WaitForStateSubscriber; -import org.apache.asterix.file.StorageComponentProvider; import org.apache.asterix.lang.common.statement.StartFeedStatement; import org.apache.asterix.metadata.declared.MetadataProvider; import org.apache.asterix.metadata.entities.Dataset; @@ -45,7 +39,6 @@ import org.apache.asterix.utils.FeedOperations; import org.apache.commons.lang3.tuple.Pair; import org.apache.hyracks.algebricks.common.constraints.AlgebricksAbsolutePartitionConstraint; -import org.apache.hyracks.algebricks.common.exceptions.AlgebricksException; import org.apache.hyracks.api.client.IHyracksClientConnection; import org.apache.hyracks.api.exceptions.HyracksDataException; import org.apache.hyracks.api.job.JobSpecification; @@ -85,8 +78,8 @@ Pair<JobSpecification, AlgebricksAbsolutePartitionConstraint> jobInfo = FeedOperations.buildStartFeedJob(mdProvider, feed, feedConnections, statementExecutor, hcc); JobSpecification feedJob = jobInfo.getLeft(); - WaitForStateSubscriber eventSubscriber = new WaitForStateSubscriber(this, EnumSet.of(ActivityState.RUNNING, - ActivityState.TEMPORARILY_FAILED, ActivityState.PERMANENTLY_FAILED)); + WaitForStateSubscriber eventSubscriber = new WaitForStateSubscriber(this, + EnumSet.of(ActivityState.RUNNING, ActivityState.TEMPORARILY_FAILED, ActivityState.STOPPED)); feedJob.setProperty(ActiveNotificationHandler.ACTIVE_ENTITY_PROPERTY_NAME, entityId); // TODO(Yingyi): currently we do not check IFrameWriter protocol violations for Feed jobs. // We will need to design general exception handling mechanism for feeds. @@ -98,8 +91,8 @@ throw eventSubscriber.getFailure(); } if (wait) { - IActiveEntityEventSubscriber stoppedSubscriber = new WaitForStateSubscriber(this, - EnumSet.of(ActivityState.STOPPED, ActivityState.PERMANENTLY_FAILED)); + IActiveEntityEventSubscriber stoppedSubscriber = + new WaitForStateSubscriber(this, EnumSet.of(ActivityState.STOPPED)); stoppedSubscriber.sync(); } } catch (Exception e) { @@ -110,7 +103,7 @@ @Override protected Void doStop(MetadataProvider metadataProvider) throws HyracksDataException { IActiveEntityEventSubscriber eventSubscriber = - new WaitForStateSubscriber(this, EnumSet.of(ActivityState.STOPPED, ActivityState.PERMANENTLY_FAILED)); + new WaitForStateSubscriber(this, EnumSet.of(ActivityState.STOPPED)); try { // Construct ActiveMessage for (int i = 0; i < getLocations().getLocations().length; i++) { diff --git a/asterixdb/asterix-app/src/main/java/org/apache/asterix/app/active/RecoveryTask.java b/asterixdb/asterix-app/src/main/java/org/apache/asterix/app/active/RecoveryTask.java index 2de8319..b574b26 100644 --- a/asterixdb/asterix-app/src/main/java/org/apache/asterix/app/active/RecoveryTask.java +++ b/asterixdb/asterix-app/src/main/java/org/apache/asterix/app/active/RecoveryTask.java @@ -92,7 +92,7 @@ if (retryPolicyFactory == NoRetryPolicyFactory.INSTANCE) { synchronized (listener) { if (!cancelRecovery) { - listener.setState(ActivityState.PERMANENTLY_FAILED); + listener.setState(ActivityState.STOPPED); } } } else { @@ -170,7 +170,7 @@ return null; } if (listener.getState() == ActivityState.TEMPORARILY_FAILED) { - listener.setState(ActivityState.PERMANENTLY_FAILED); + listener.setState(ActivityState.STOPPED); } listener.notifyAll(); } diff --git a/asterixdb/asterix-app/src/test/java/org/apache/asterix/test/active/ActiveEventsListenerTest.java b/asterixdb/asterix-app/src/test/java/org/apache/asterix/test/active/ActiveEventsListenerTest.java index 9612ead..15e8482 100644 --- a/asterixdb/asterix-app/src/test/java/org/apache/asterix/test/active/ActiveEventsListenerTest.java +++ b/asterixdb/asterix-app/src/test/java/org/apache/asterix/test/active/ActiveEventsListenerTest.java @@ -180,7 +180,7 @@ Action action = users[0].startActivity(listener); action.sync(); assertFailure(action, 0); - Assert.assertEquals(ActivityState.PERMANENTLY_FAILED, listener.getState()); + Assert.assertEquals(ActivityState.STOPPED, listener.getState()); } @Test @@ -190,7 +190,7 @@ Action action = users[0].startActivity(listener); action.sync(); assertFailure(action, 0); - Assert.assertEquals(ActivityState.PERMANENTLY_FAILED, listener.getState()); + Assert.assertEquals(ActivityState.STOPPED, listener.getState()); } @Test @@ -492,19 +492,19 @@ WaitForStateSubscriber tempFailSubscriber = new WaitForStateSubscriber(listener, EnumSet.of(ActivityState.TEMPORARILY_FAILED)); WaitForStateSubscriber permFailSubscriber = - new WaitForStateSubscriber(listener, EnumSet.of(ActivityState.PERMANENTLY_FAILED)); + new WaitForStateSubscriber(listener, EnumSet.of(ActivityState.STOPPED)); listener.onStart(Behavior.FAIL_COMPILE); clusterController.jobFinish(listener.getJobId(), JobStatus.FAILURE, Collections.singletonList(new HyracksDataException("Compilation Failure"))); tempFailSubscriber.sync(); permFailSubscriber.sync(); - Assert.assertEquals(ActivityState.PERMANENTLY_FAILED, listener.getState()); + Assert.assertEquals(ActivityState.STOPPED, listener.getState()); } @Test public void testStartAfterPermenantFailure() throws Exception { testRecoveryFailureAfterOneAttemptCompilationFailure(); - Assert.assertEquals(ActivityState.PERMANENTLY_FAILED, listener.getState()); + Assert.assertEquals(ActivityState.STOPPED, listener.getState()); listener.onStart(Behavior.SUCCEED); WaitForStateSubscriber subscriber = new WaitForStateSubscriber(listener, EnumSet.of(ActivityState.RUNNING)); users[1].startActivity(listener); @@ -536,13 +536,13 @@ WaitForStateSubscriber tempFailSubscriber = new WaitForStateSubscriber(listener, EnumSet.of(ActivityState.TEMPORARILY_FAILED)); WaitForStateSubscriber permFailSubscriber = - new WaitForStateSubscriber(listener, EnumSet.of(ActivityState.PERMANENTLY_FAILED)); + new WaitForStateSubscriber(listener, EnumSet.of(ActivityState.STOPPED)); listener.onStart(Behavior.FAIL_RUNTIME); clusterController.jobFinish(listener.getJobId(), JobStatus.FAILURE, Collections.singletonList(new HyracksDataException("Runtime Failure"))); tempFailSubscriber.sync(); permFailSubscriber.sync(); - Assert.assertEquals(ActivityState.PERMANENTLY_FAILED, listener.getState()); + Assert.assertEquals(ActivityState.STOPPED, listener.getState()); } @SuppressWarnings("deprecation") @@ -555,12 +555,12 @@ WaitForStateSubscriber tempFailSubscriber = new WaitForStateSubscriber(listener, EnumSet.of(ActivityState.TEMPORARILY_FAILED)); WaitForStateSubscriber permFailSubscriber = - new WaitForStateSubscriber(listener, EnumSet.of(ActivityState.PERMANENTLY_FAILED)); + new WaitForStateSubscriber(listener, EnumSet.of(ActivityState.STOPPED)); clusterController.jobFinish(listener.getJobId(), JobStatus.FAILURE, Collections.singletonList(new HyracksDataException("Runtime Failure"))); tempFailSubscriber.sync(); permFailSubscriber.sync(); - Assert.assertEquals(ActivityState.PERMANENTLY_FAILED, listener.getState()); + Assert.assertEquals(ActivityState.STOPPED, listener.getState()); } @SuppressWarnings("deprecation") @@ -1067,13 +1067,13 @@ @Test public void testCreateNewShadowWhilePermanentFailure() throws Exception { testRecoveryFailureAfterOneAttemptCompilationFailure(); - Assert.assertEquals(ActivityState.PERMANENTLY_FAILED, listener.getState()); + Assert.assertEquals(ActivityState.STOPPED, listener.getState()); Dataset newDataset = new Dataset(dataverseName, "newDataset", null, null, null, null, null, null, null, null, 0, 0); Action createDatasetAction = users[0].addDataset(newDataset, listener); createDatasetAction.sync(); assertSuccess(createDatasetAction); - Assert.assertEquals(ActivityState.PERMANENTLY_FAILED, listener.getState()); + Assert.assertEquals(ActivityState.STOPPED, listener.getState()); Assert.assertEquals(3, listener.getDatasets().size()); Assert.assertEquals(clusterController.getAllDatasets().size(), listener.getDatasets().size()); } @@ -1187,11 +1187,11 @@ @Test public void testDeleteShadowWhilePermanentFailure() throws Exception { testRecoveryFailureAfterOneAttemptCompilationFailure(); - Assert.assertEquals(ActivityState.PERMANENTLY_FAILED, listener.getState()); + Assert.assertEquals(ActivityState.STOPPED, listener.getState()); Action dropDatasetAction = users[0].dropDataset(secondDataset, listener); dropDatasetAction.sync(); assertSuccess(dropDatasetAction); - Assert.assertEquals(ActivityState.PERMANENTLY_FAILED, listener.getState()); + Assert.assertEquals(ActivityState.STOPPED, listener.getState()); Assert.assertEquals(1, listener.getDatasets().size()); Assert.assertEquals(clusterController.getAllDatasets().size(), listener.getDatasets().size()); } @@ -1317,7 +1317,7 @@ @Test public void testCreateNewIndexWhilePermanentFailure() throws Exception { testRecoveryFailureAfterOneAttemptCompilationFailure(); - Assert.assertEquals(ActivityState.PERMANENTLY_FAILED, listener.getState()); + Assert.assertEquals(ActivityState.STOPPED, listener.getState()); Action add = users[1].addIndex(firstDataset, listener); add.sync(); assertSuccess(add); @@ -1442,7 +1442,7 @@ @Test public void testDeleteIndexWhilePermanentFailure() throws Exception { testRecoveryFailureAfterOneAttemptCompilationFailure(); - Assert.assertEquals(ActivityState.PERMANENTLY_FAILED, listener.getState()); + Assert.assertEquals(ActivityState.STOPPED, listener.getState()); Action drop = users[1].dropIndex(firstDataset, listener); drop.sync(); assertSuccess(drop); diff --git a/asterixdb/asterix-app/src/test/java/org/apache/asterix/test/active/DummyFeedEventsListener.java b/asterixdb/asterix-app/src/test/java/org/apache/asterix/test/active/DummyFeedEventsListener.java index c269803..ff8a5c1 100644 --- a/asterixdb/asterix-app/src/test/java/org/apache/asterix/test/active/DummyFeedEventsListener.java +++ b/asterixdb/asterix-app/src/test/java/org/apache/asterix/test/active/DummyFeedEventsListener.java @@ -35,7 +35,6 @@ import org.apache.asterix.metadata.entities.FeedConnection; import org.apache.asterix.translator.IStatementExecutor; import org.apache.hyracks.algebricks.common.constraints.AlgebricksAbsolutePartitionConstraint; -import org.apache.hyracks.algebricks.common.exceptions.AlgebricksException; import org.apache.hyracks.api.client.IHyracksClientConnection; import org.apache.hyracks.api.exceptions.HyracksDataException; @@ -66,7 +65,7 @@ @Override protected Void doStop(MetadataProvider metadataProvider) throws HyracksDataException { IActiveEntityEventSubscriber eventSubscriber = - new WaitForStateSubscriber(this, EnumSet.of(ActivityState.RUNNING, ActivityState.PERMANENTLY_FAILED)); + new WaitForStateSubscriber(this, EnumSet.of(ActivityState.RUNNING, ActivityState.STOPPED)); try { eventSubscriber.sync(); } catch (Exception e) { diff --git a/asterixdb/asterix-app/src/test/java/org/apache/asterix/test/active/TestEventsListener.java b/asterixdb/asterix-app/src/test/java/org/apache/asterix/test/active/TestEventsListener.java index 7e1bc37..f270e73 100644 --- a/asterixdb/asterix-app/src/test/java/org/apache/asterix/test/active/TestEventsListener.java +++ b/asterixdb/asterix-app/src/test/java/org/apache/asterix/test/active/TestEventsListener.java @@ -122,7 +122,7 @@ throw HyracksDataException.create(e); } WaitForStateSubscriber subscriber = new WaitForStateSubscriber(this, - EnumSet.of(ActivityState.RUNNING, ActivityState.TEMPORARILY_FAILED, ActivityState.PERMANENTLY_FAILED)); + EnumSet.of(ActivityState.RUNNING, ActivityState.TEMPORARILY_FAILED, ActivityState.STOPPED)); if (onStart == Behavior.FAIL_RUNTIME || onStart == Behavior.STEP_FAIL_RUNTIME) { clusterController.jobFinish(jobId, JobStatus.FAILURE, Collections.singletonList(new HyracksDataException("RuntimeFailure"))); @@ -151,7 +151,7 @@ try { Set<ActivityState> waitFor; if (intention == ActivityState.STOPPING) { - waitFor = EnumSet.of(ActivityState.STOPPED, ActivityState.PERMANENTLY_FAILED); + waitFor = EnumSet.of(ActivityState.STOPPED); } else if (intention == ActivityState.SUSPENDING) { waitFor = EnumSet.of(ActivityState.SUSPENDED, ActivityState.TEMPORARILY_FAILED); } else { diff --git a/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/feed/watch/WaitForStateSubscriber.java b/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/feed/watch/WaitForStateSubscriber.java index a1cdfb0..d8cac44 100644 --- a/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/feed/watch/WaitForStateSubscriber.java +++ b/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/feed/watch/WaitForStateSubscriber.java @@ -39,12 +39,7 @@ @Override public void notify(ActiveEvent event) throws HyracksDataException { if (targetStates.contains(listener.getState())) { - if (listener.getState() == ActivityState.PERMANENTLY_FAILED - || listener.getState() == ActivityState.TEMPORARILY_FAILED) { - complete(listener.getJobFailure()); - } else { - complete(null); - } + complete(listener.getJobFailure()); } else if (event != null && event.getEventKind() == ActiveEvent.Kind.FAILURE) { try { complete((Exception) event.getEventObject()); -- To view, visit https://asterix-gerrit.ics.uci.edu/2618 To unsubscribe, visit https://asterix-gerrit.ics.uci.edu/settings Gerrit-MessageType: newchange Gerrit-Change-Id: I7f3b14aec46728fbe8b256b915d0e30992b2fe47 Gerrit-PatchSet: 1 Gerrit-Project: asterixdb Gerrit-Branch: master Gerrit-Owner: abdullah alamoudi <bamou...@gmail.com>