Repository: incubator-reef Updated Branches: refs/heads/master f8d8cd427 -> 2c00d8910
[REEF-607] Race condition in TaskDone and EvaluatorDone observed in .NET This addressed the issue by * Fixed EventHandlerIdlenessSource to return not idle properly. * Used callback EventHandlers to prevent driver from hanging when the final EvaluatorCompleted call is completed. * Added an E2E test to capture. JIRA: [REEF-607](https://issues.apache.org/jira/browse/REEF-607) Pull Request: This closes #386 Project: http://git-wip-us.apache.org/repos/asf/incubator-reef/repo Commit: http://git-wip-us.apache.org/repos/asf/incubator-reef/commit/2c00d891 Tree: http://git-wip-us.apache.org/repos/asf/incubator-reef/tree/2c00d891 Diff: http://git-wip-us.apache.org/repos/asf/incubator-reef/diff/2c00d891 Branch: refs/heads/master Commit: 2c00d89108319f6545a57e6e5e5d441bb7cc1830 Parents: f8d8cd4 Author: Andrew Chung <[email protected]> Authored: Wed Aug 19 16:18:27 2015 -0700 Committer: Markus Weimer <[email protected]> Committed: Wed Aug 19 18:02:22 2015 -0700 ---------------------------------------------------------------------- .../driver/evaluator/EvaluatorManager.java | 20 +++-- .../evaluator/EvaluatorMessageDispatcher.java | 21 ++++- .../evaluator/IdlenessCallbackEventHandler.java | 49 +++++++++++ .../IdlenessCallbackEventHandlerFactory.java | 49 +++++++++++ .../driver/idle/EventHandlerIdlenessSource.java | 2 +- .../evaluatorexit/EvaluatorCompleteTest.java | 64 ++++++++++++++ .../EvaluatorCompleteTestDriver.java | 91 ++++++++++++++++++++ .../EvaluatorCompleteTestTask.java | 38 ++++++++ 8 files changed, 321 insertions(+), 13 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/incubator-reef/blob/2c00d891/lang/java/reef-common/src/main/java/org/apache/reef/runtime/common/driver/evaluator/EvaluatorManager.java ---------------------------------------------------------------------- diff --git a/lang/java/reef-common/src/main/java/org/apache/reef/runtime/common/driver/evaluator/EvaluatorManager.java b/lang/java/reef-common/src/main/java/org/apache/reef/runtime/common/driver/evaluator/EvaluatorManager.java index 12060dd..35dcd35 100644 --- a/lang/java/reef-common/src/main/java/org/apache/reef/runtime/common/driver/evaluator/EvaluatorManager.java +++ b/lang/java/reef-common/src/main/java/org/apache/reef/runtime/common/driver/evaluator/EvaluatorManager.java @@ -222,17 +222,19 @@ public final class EvaluatorManager implements Identifiable, AutoCloseable { @Override public void close() { synchronized (this.evaluatorDescriptor) { - if (this.stateManager.isRunning()) { + if (this.stateManager.isAllocatedOrSubmittedOrRunning()) { LOG.log(Level.WARNING, "Dirty shutdown of running evaluator id[{0}]", getId()); try { - // Killing the evaluator means that it doesn't need to send a confirmation; it just dies. - final EvaluatorRuntimeProtocol.EvaluatorControlProto evaluatorControlProto = - EvaluatorRuntimeProtocol.EvaluatorControlProto.newBuilder() - .setTimestamp(System.currentTimeMillis()) - .setIdentifier(getId()) - .setKillEvaluator(EvaluatorRuntimeProtocol.KillEvaluatorProto.newBuilder().build()) - .build(); - sendEvaluatorControlMessage(evaluatorControlProto); + if (this.stateManager.isRunning()){ + // Killing the evaluator means that it doesn't need to send a confirmation; it just dies. + final EvaluatorRuntimeProtocol.EvaluatorControlProto evaluatorControlProto = + EvaluatorRuntimeProtocol.EvaluatorControlProto.newBuilder() + .setTimestamp(System.currentTimeMillis()) + .setIdentifier(getId()) + .setKillEvaluator(EvaluatorRuntimeProtocol.KillEvaluatorProto.newBuilder().build()) + .build(); + sendEvaluatorControlMessage(evaluatorControlProto); + } } finally { this.stateManager.setKilled(); } http://git-wip-us.apache.org/repos/asf/incubator-reef/blob/2c00d891/lang/java/reef-common/src/main/java/org/apache/reef/runtime/common/driver/evaluator/EvaluatorMessageDispatcher.java ---------------------------------------------------------------------- diff --git a/lang/java/reef-common/src/main/java/org/apache/reef/runtime/common/driver/evaluator/EvaluatorMessageDispatcher.java b/lang/java/reef-common/src/main/java/org/apache/reef/runtime/common/driver/evaluator/EvaluatorMessageDispatcher.java index 2fda8c6..bc80b9e 100644 --- a/lang/java/reef-common/src/main/java/org/apache/reef/runtime/common/driver/evaluator/EvaluatorMessageDispatcher.java +++ b/lang/java/reef-common/src/main/java/org/apache/reef/runtime/common/driver/evaluator/EvaluatorMessageDispatcher.java @@ -34,6 +34,7 @@ import org.apache.reef.tang.annotations.Parameter; import org.apache.reef.wake.EventHandler; import javax.inject.Inject; +import java.util.HashSet; import java.util.Set; import java.util.logging.Level; import java.util.logging.Logger; @@ -128,7 +129,9 @@ public final class EvaluatorMessageDispatcher { @Parameter(EvaluatorDispatcherThreads.class) final int numberOfThreads, @Parameter(EvaluatorManager.EvaluatorIdentifier.class) final String evaluatorIdentifier, - final DriverExceptionHandler driverExceptionHandler) { + final DriverExceptionHandler driverExceptionHandler, + final IdlenessCallbackEventHandlerFactory idlenessCallbackEventHandlerFactory + ) { this.serviceDispatcher = new DispatchingEStage(driverExceptionHandler, numberOfThreads, evaluatorIdentifier); this.applicationDispatcher = new DispatchingEStage(this.serviceDispatcher); @@ -162,8 +165,6 @@ public final class EvaluatorMessageDispatcher { this.serviceDispatcher.register(FailedTask.class, serviceTaskExceptionEventHandlers); // Application Evaluator event handlers - this.applicationDispatcher.register(FailedEvaluator.class, evaluatorFailedHandlers); - this.applicationDispatcher.register(CompletedEvaluator.class, evaluatorCompletedHandlers); this.applicationDispatcher.register(AllocatedEvaluator.class, evaluatorAllocatedHandlers); // Service Evaluator event handlers @@ -181,6 +182,20 @@ public final class EvaluatorMessageDispatcher { this.driverRestartServiceDispatcher.register(ActiveContext.class, serviceDriverRestartActiveContextHandlers); this.driverRestartServiceDispatcher.register(DriverRestartCompleted.class, serviceDriverRestartCompletedHandlers); + final Set<EventHandler<CompletedEvaluator>> evaluatorCompletedCallbackHandlers = new HashSet<>(); + for (final EventHandler<CompletedEvaluator> evaluatorCompletedHandler : evaluatorCompletedHandlers) { + evaluatorCompletedCallbackHandlers.add( + idlenessCallbackEventHandlerFactory.createIdlenessCallbackWrapperHandler(evaluatorCompletedHandler)); + } + this.applicationDispatcher.register(CompletedEvaluator.class, evaluatorCompletedCallbackHandlers); + + final Set<EventHandler<FailedEvaluator>> evaluatorFailedCallbackHandlers = new HashSet<>(); + for (final EventHandler<FailedEvaluator> evaluatorFailedHandler : evaluatorFailedHandlers) { + evaluatorFailedCallbackHandlers.add( + idlenessCallbackEventHandlerFactory.createIdlenessCallbackWrapperHandler(evaluatorFailedHandler)); + } + this.applicationDispatcher.register(FailedEvaluator.class, evaluatorFailedCallbackHandlers); + LOG.log(Level.FINE, "Instantiated 'EvaluatorMessageDispatcher'"); } http://git-wip-us.apache.org/repos/asf/incubator-reef/blob/2c00d891/lang/java/reef-common/src/main/java/org/apache/reef/runtime/common/driver/evaluator/IdlenessCallbackEventHandler.java ---------------------------------------------------------------------- diff --git a/lang/java/reef-common/src/main/java/org/apache/reef/runtime/common/driver/evaluator/IdlenessCallbackEventHandler.java b/lang/java/reef-common/src/main/java/org/apache/reef/runtime/common/driver/evaluator/IdlenessCallbackEventHandler.java new file mode 100644 index 0000000..7286b32 --- /dev/null +++ b/lang/java/reef-common/src/main/java/org/apache/reef/runtime/common/driver/evaluator/IdlenessCallbackEventHandler.java @@ -0,0 +1,49 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.reef.runtime.common.driver.evaluator; + +import org.apache.reef.annotations.audience.DriverSide; +import org.apache.reef.annotations.audience.Private; +import org.apache.reef.runtime.common.driver.idle.EventHandlerIdlenessSource; +import org.apache.reef.wake.EventHandler; + +/** + * Checks for idleness after an {@link EventHandler#onNext(Object)} invocation. + */ +@Private +@DriverSide +final class IdlenessCallbackEventHandler<T> implements EventHandler<T> { + private final EventHandler<T> innerHandler; + private final EventHandlerIdlenessSource idlenessSource; + + IdlenessCallbackEventHandler(final EventHandler<T> innerHandler, + final EventHandlerIdlenessSource idlenessSource) { + this.innerHandler = innerHandler; + this.idlenessSource = idlenessSource; + } + + /** + * Checks idleness as a callback to the innerHandler. + */ + @Override + public void onNext(final T value) { + innerHandler.onNext(value); + idlenessSource.check(); + } +} http://git-wip-us.apache.org/repos/asf/incubator-reef/blob/2c00d891/lang/java/reef-common/src/main/java/org/apache/reef/runtime/common/driver/evaluator/IdlenessCallbackEventHandlerFactory.java ---------------------------------------------------------------------- diff --git a/lang/java/reef-common/src/main/java/org/apache/reef/runtime/common/driver/evaluator/IdlenessCallbackEventHandlerFactory.java b/lang/java/reef-common/src/main/java/org/apache/reef/runtime/common/driver/evaluator/IdlenessCallbackEventHandlerFactory.java new file mode 100644 index 0000000..21a0d65 --- /dev/null +++ b/lang/java/reef-common/src/main/java/org/apache/reef/runtime/common/driver/evaluator/IdlenessCallbackEventHandlerFactory.java @@ -0,0 +1,49 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.reef.runtime.common.driver.evaluator; + +import org.apache.reef.annotations.audience.DriverSide; +import org.apache.reef.annotations.audience.Private; +import org.apache.reef.runtime.common.driver.idle.EventHandlerIdlenessSource; +import org.apache.reef.wake.EventHandler; + +import javax.inject.Inject; + +/** + * Creates {@link IdlenessCallbackEventHandler}, which runs an idleness check after the innerHandler's + * {@link EventHandler#onNext(Object)} call is handled. + */ +@Private +@DriverSide +final class IdlenessCallbackEventHandlerFactory { + private final EventHandlerIdlenessSource idlenessSource; + + @Inject + private IdlenessCallbackEventHandlerFactory(final EventHandlerIdlenessSource idlenessSource) { + this.idlenessSource = idlenessSource; + } + + /** + * @return a new instance of {@link IdlenessCallbackEventHandler} wrapped with the specified innerHandler + */ + <T> IdlenessCallbackEventHandler<T> createIdlenessCallbackWrapperHandler( + final EventHandler<T> innerHandler) { + return new IdlenessCallbackEventHandler<>(innerHandler, idlenessSource); + } +} http://git-wip-us.apache.org/repos/asf/incubator-reef/blob/2c00d891/lang/java/reef-common/src/main/java/org/apache/reef/runtime/common/driver/idle/EventHandlerIdlenessSource.java ---------------------------------------------------------------------- diff --git a/lang/java/reef-common/src/main/java/org/apache/reef/runtime/common/driver/idle/EventHandlerIdlenessSource.java b/lang/java/reef-common/src/main/java/org/apache/reef/runtime/common/driver/idle/EventHandlerIdlenessSource.java index 9e8a389..b137ed3 100644 --- a/lang/java/reef-common/src/main/java/org/apache/reef/runtime/common/driver/idle/EventHandlerIdlenessSource.java +++ b/lang/java/reef-common/src/main/java/org/apache/reef/runtime/common/driver/idle/EventHandlerIdlenessSource.java @@ -31,7 +31,7 @@ public final class EventHandlerIdlenessSource implements DriverIdlenessSource { private static final IdleMessage IDLE_MESSAGE = new IdleMessage("EventHandlers", "All events have been processed.", true); private static final IdleMessage NOT_IDLE_MESSAGE = - new IdleMessage("EventHandlers", "Some events are still in flight.", true); + new IdleMessage("EventHandlers", "Some events are still in flight.", false); private final InjectionFuture<Evaluators> evaluators; private final InjectionFuture<DriverIdleManager> driverIdleManager; http://git-wip-us.apache.org/repos/asf/incubator-reef/blob/2c00d891/lang/java/reef-tests/src/test/java/org/apache/reef/tests/evaluatorexit/EvaluatorCompleteTest.java ---------------------------------------------------------------------- diff --git a/lang/java/reef-tests/src/test/java/org/apache/reef/tests/evaluatorexit/EvaluatorCompleteTest.java b/lang/java/reef-tests/src/test/java/org/apache/reef/tests/evaluatorexit/EvaluatorCompleteTest.java new file mode 100644 index 0000000..91d6ef6 --- /dev/null +++ b/lang/java/reef-tests/src/test/java/org/apache/reef/tests/evaluatorexit/EvaluatorCompleteTest.java @@ -0,0 +1,64 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.reef.tests.evaluatorexit; + +import org.apache.reef.client.DriverConfiguration; +import org.apache.reef.client.LauncherStatus; +import org.apache.reef.tang.Configuration; +import org.apache.reef.tests.TestEnvironment; +import org.apache.reef.tests.TestEnvironmentFactory; +import org.apache.reef.tests.library.driver.OnDriverStartedAllocateOne; +import org.apache.reef.util.EnvironmentUtils; +import org.junit.After; +import org.junit.Assert; +import org.junit.Before; +import org.junit.Test; + +/** + * Tests whether we receive both evaluator complete and task complete. + */ +public final class EvaluatorCompleteTest { + private final TestEnvironment testEnvironment = TestEnvironmentFactory.getNewTestEnvironment(); + + @Before + public void setUp() throws Exception { + testEnvironment.setUp(); + } + + @After + public void tearDown() throws Exception { + this.testEnvironment.tearDown(); + } + + @Test + public void testEvaluatorCompleted() { + final Configuration driverConfiguration = DriverConfiguration.CONF + .set(DriverConfiguration.DRIVER_IDENTIFIER, "TEST_EvaluatorComplete") + .set(DriverConfiguration.GLOBAL_LIBRARIES, EnvironmentUtils.getClassLocation(EvaluatorCompleteTestDriver.class)) + .set(DriverConfiguration.ON_DRIVER_STARTED, OnDriverStartedAllocateOne.class) + .set(DriverConfiguration.ON_TASK_COMPLETED, EvaluatorCompleteTestDriver.TaskCompletedHandler.class) + .set(DriverConfiguration.ON_EVALUATOR_ALLOCATED, EvaluatorCompleteTestDriver.EvaluatorAllocatedHandler.class) + .set(DriverConfiguration.ON_EVALUATOR_COMPLETED, EvaluatorCompleteTestDriver.EvaluatorCompletedHandler.class) + .set(DriverConfiguration.ON_DRIVER_STOP, EvaluatorCompleteTestDriver.StopHandler.class) + .build(); + final LauncherStatus status = this.testEnvironment.run(driverConfiguration); + Assert.assertTrue("Job state after execution: " + status, status.isSuccess()); + } + +} http://git-wip-us.apache.org/repos/asf/incubator-reef/blob/2c00d891/lang/java/reef-tests/src/test/java/org/apache/reef/tests/evaluatorexit/EvaluatorCompleteTestDriver.java ---------------------------------------------------------------------- diff --git a/lang/java/reef-tests/src/test/java/org/apache/reef/tests/evaluatorexit/EvaluatorCompleteTestDriver.java b/lang/java/reef-tests/src/test/java/org/apache/reef/tests/evaluatorexit/EvaluatorCompleteTestDriver.java new file mode 100644 index 0000000..4c87049 --- /dev/null +++ b/lang/java/reef-tests/src/test/java/org/apache/reef/tests/evaluatorexit/EvaluatorCompleteTestDriver.java @@ -0,0 +1,91 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.reef.tests.evaluatorexit; + +import org.apache.reef.driver.evaluator.AllocatedEvaluator; +import org.apache.reef.driver.evaluator.CompletedEvaluator; +import org.apache.reef.driver.task.CompletedTask; +import org.apache.reef.driver.task.TaskConfiguration; +import org.apache.reef.tang.Configuration; +import org.apache.reef.tang.annotations.Unit; +import org.apache.reef.tests.library.exceptions.DriverSideFailure; +import org.apache.reef.wake.EventHandler; +import org.apache.reef.wake.time.event.StopTime; + +import javax.inject.Inject; +import java.util.concurrent.atomic.AtomicBoolean; +import java.util.logging.Level; +import java.util.logging.Logger; + +@Unit +final class EvaluatorCompleteTestDriver { + private static final Logger LOG = Logger.getLogger(EvaluatorCompleteTestDriver.class.getName()); + private final AtomicBoolean completedEvaluatorReceived = new AtomicBoolean(false); + private final AtomicBoolean completedTaskReceived = new AtomicBoolean(false); + + @Inject + private EvaluatorCompleteTestDriver() { + } + + final class EvaluatorAllocatedHandler implements EventHandler<AllocatedEvaluator> { + + @Override + public void onNext(final AllocatedEvaluator allocatedEvaluator) { + final Configuration taskConfiguration = TaskConfiguration.CONF + .set(TaskConfiguration.IDENTIFIER, "EvaluatorCompleteTestTask") + .set(TaskConfiguration.TASK, EvaluatorCompleteTestTask.class) + .build(); + allocatedEvaluator.submitTask(taskConfiguration); + } + } + + final class EvaluatorCompletedHandler implements EventHandler<CompletedEvaluator> { + + @Override + public void onNext(final CompletedEvaluator completedEvaluator) { + LOG.log(Level.FINE, "Received a CompletedEvaluator for Evaluator {0}", completedEvaluator.getId()); + completedEvaluatorReceived.set(true); + } + } + + final class TaskCompletedHandler implements EventHandler<CompletedTask> { + + @Override + public void onNext(final CompletedTask completedTask) { + LOG.log(Level.FINE, "Received a CompletedTask for Evaluator {0}", completedTask.getId()); + completedTaskReceived.set(true); + completedTask.getActiveContext().close(); + } + } + + final class StopHandler implements EventHandler<StopTime> { + + @Override + public void onNext(final StopTime stopTime) { + synchronized (completedEvaluatorReceived) { + if (completedEvaluatorReceived.get() && completedTaskReceived.get()) { + LOG.log(Level.FINE, "Received an expected CompletedEvaluator and CompletedTask before exit. All good."); + } else { + throw new DriverSideFailure("Did not receive expected completion events."); + } + } + } + } + +} http://git-wip-us.apache.org/repos/asf/incubator-reef/blob/2c00d891/lang/java/reef-tests/src/test/java/org/apache/reef/tests/evaluatorexit/EvaluatorCompleteTestTask.java ---------------------------------------------------------------------- diff --git a/lang/java/reef-tests/src/test/java/org/apache/reef/tests/evaluatorexit/EvaluatorCompleteTestTask.java b/lang/java/reef-tests/src/test/java/org/apache/reef/tests/evaluatorexit/EvaluatorCompleteTestTask.java new file mode 100644 index 0000000..c5a0c3d --- /dev/null +++ b/lang/java/reef-tests/src/test/java/org/apache/reef/tests/evaluatorexit/EvaluatorCompleteTestTask.java @@ -0,0 +1,38 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.reef.tests.evaluatorexit; + +import org.apache.reef.task.Task; + +import javax.inject.Inject; + +/** + * Merely returns null. + */ +final class EvaluatorCompleteTestTask implements Task { + + @Inject + private EvaluatorCompleteTestTask() { + } + + @Override + public byte[] call(final byte[] memento) throws Exception { + return null; + } +}
