http://git-wip-us.apache.org/repos/asf/reef/blob/5ed56eba/lang/java/reef-runtime-mock/src/main/java/org/apache/reef/mock/runtime/MockUtils.java ---------------------------------------------------------------------- diff --git a/lang/java/reef-runtime-mock/src/main/java/org/apache/reef/mock/runtime/MockUtils.java b/lang/java/reef-runtime-mock/src/main/java/org/apache/reef/mock/runtime/MockUtils.java deleted file mode 100644 index 0b073c8..0000000 --- a/lang/java/reef-runtime-mock/src/main/java/org/apache/reef/mock/runtime/MockUtils.java +++ /dev/null @@ -1,48 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, - * software distributed under the License is distributed on an - * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY - * KIND, either express or implied. See the License for the - * specific language governing permissions and limitations - * under the License. - */ - -package org.apache.reef.mock.runtime; - -import org.apache.reef.annotations.Unstable; -import org.apache.reef.annotations.audience.Private; -import org.apache.reef.tang.Configuration; -import org.apache.reef.tang.Injector; -import org.apache.reef.tang.Tang; -import org.apache.reef.tang.annotations.Name; -import org.apache.reef.tang.exceptions.InjectionException; - -/** - * mock utilities. - */ -@Unstable -@Private -final class MockUtils { - - private MockUtils() { - } - - public static <U, T extends Name<U>> U getValue(final Configuration configuration, final Class<T> name) { - try { - final Injector injector = Tang.Factory.getTang().newInjector(configuration); - return injector.getNamedInstance(name); - } catch (InjectionException e) { - throw new IllegalStateException(e); - } - } -}
http://git-wip-us.apache.org/repos/asf/reef/blob/5ed56eba/lang/java/reef-runtime-mock/src/main/java/org/apache/reef/mock/runtime/package-info.java ---------------------------------------------------------------------- diff --git a/lang/java/reef-runtime-mock/src/main/java/org/apache/reef/mock/runtime/package-info.java b/lang/java/reef-runtime-mock/src/main/java/org/apache/reef/mock/runtime/package-info.java deleted file mode 100644 index b5cf639..0000000 --- a/lang/java/reef-runtime-mock/src/main/java/org/apache/reef/mock/runtime/package-info.java +++ /dev/null @@ -1,23 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, - * software distributed under the License is distributed on an - * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY - * KIND, either express or implied. See the License for the - * specific language governing permissions and limitations - * under the License. - * - */ -/** - * mock runtime implementation. - */ -package org.apache.reef.mock.runtime; http://git-wip-us.apache.org/repos/asf/reef/blob/5ed56eba/lang/java/reef-runtime-mock/src/test/java/org/apache/reef/mock/BasicMockTests.java ---------------------------------------------------------------------- diff --git a/lang/java/reef-runtime-mock/src/test/java/org/apache/reef/mock/BasicMockTests.java b/lang/java/reef-runtime-mock/src/test/java/org/apache/reef/mock/BasicMockTests.java deleted file mode 100644 index 984a9f4..0000000 --- a/lang/java/reef-runtime-mock/src/test/java/org/apache/reef/mock/BasicMockTests.java +++ /dev/null @@ -1,205 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, - * software distributed under the License is distributed on an - * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY - * KIND, either express or implied. See the License for the - * specific language governing permissions and limitations - * under the License. - */ - -package org.apache.reef.mock; - -import org.apache.reef.driver.context.ActiveContext; -import org.apache.reef.driver.evaluator.AllocatedEvaluator; -import org.apache.reef.driver.task.RunningTask; -import org.apache.reef.mock.request.ProcessRequestInternal; -import org.apache.reef.mock.runtime.MockAllocatedEvalautor; -import org.apache.reef.mock.runtime.MockClock; -import org.apache.reef.tang.Configuration; -import org.apache.reef.tang.Injector; -import org.apache.reef.tang.Tang; -import org.junit.Before; -import org.junit.Test; - -import static org.junit.Assert.assertEquals; -import static org.junit.Assert.assertTrue; - -/** - * basic mock tests. - */ -final class BasicMockTests { - - private MockApplication mockApplication; - - private MockRuntime mockRuntime; - - private MockClock mockClock; - - @Before - public void initialize() throws Exception { - final Configuration conf = MockConfiguration.CONF - .set(MockConfiguration.ON_DRIVER_STARTED, MockApplication.StartHandler.class) - .set(MockConfiguration.ON_DRIVER_STOP, MockApplication.StopHandler.class) - .set(MockConfiguration.ON_CONTEXT_ACTIVE, MockApplication.ActiveContextHandler.class) - .set(MockConfiguration.ON_CONTEXT_CLOSED, MockApplication.ContextClosedHandler.class) - .set(MockConfiguration.ON_CONTEXT_FAILED, MockApplication.FailedContextHandler.class) - .set(MockConfiguration.ON_EVALUATOR_ALLOCATED, MockApplication.AllocatedEvaluatorHandler.class) - .set(MockConfiguration.ON_EVALUATOR_COMPLETED, MockApplication.CompletedEvaluatorHandler.class) - .set(MockConfiguration.ON_EVALUATOR_FAILED, MockApplication.FailedEvaluatorHandler.class) - .set(MockConfiguration.ON_TASK_COMPLETED, MockApplication.CompletedTaskHandler.class) - .set(MockConfiguration.ON_TASK_FAILED, MockApplication.FailedTaskHandler.class) - .set(MockConfiguration.ON_TASK_RUNNING, MockApplication.RunningTaskHandler.class) - .set(MockConfiguration.ON_TASK_SUSPENDED, MockApplication.SuspendedTaskHandler.class) - .build(); - - final Injector injector = Tang.Factory.getTang().newInjector(conf); - this.mockApplication = injector.getInstance(MockApplication.class); - this.mockRuntime = injector.getInstance(MockRuntime.class); - this.mockClock = injector.getInstance(MockClock.class); - - this.mockClock.run(); - } - - @Test - public void testSuccessRequests() throws Exception { - assertTrue("mock application received start event", this.mockApplication.isRunning()); - - this.mockApplication.requestEvaluators(1); - assertTrue("check for process event", this.mockRuntime.hasProcessRequest()); - final ProcessRequest allocateEvaluatorRequest = this.mockRuntime.getNextProcessRequest(); - assertEquals("allocate evalautor request", ProcessRequest.Type.ALLOCATE_EVALUATOR, - allocateEvaluatorRequest.getType()); - final AllocatedEvaluator evaluator = - ((ProcessRequestInternal<AllocatedEvaluator, Object>)allocateEvaluatorRequest) - .getSuccessEvent(); - this.mockRuntime.succeed(allocateEvaluatorRequest); - assertTrue("evaluator allocation succeeded", - this.mockApplication.getAllocatedEvaluators().contains(evaluator)); - final ActiveContext rootContext = this.mockApplication.getContext(evaluator, - MockAllocatedEvalautor.ROOT_CONTEXT_IDENTIFIER_PREFIX + evaluator.getId()); - assertTrue("root context", rootContext != null); - - - // submit a task - this.mockApplication.submitTask(rootContext, "test-task"); - assertTrue("create task queued", this.mockRuntime.hasProcessRequest()); - final ProcessRequest createTaskRequest = this.mockRuntime.getNextProcessRequest(); - assertEquals("create task request", ProcessRequest.Type.CREATE_TASK, - createTaskRequest.getType()); - final RunningTask task = (RunningTask) ((ProcessRequestInternal)createTaskRequest).getSuccessEvent(); - this.mockRuntime.succeed(createTaskRequest); - assertTrue("task running", this.mockApplication.getRunningTasks().contains(task)); - - // check task auto complete - assertTrue("check for request", this.mockRuntime.hasProcessRequest()); - final ProcessRequestInternal completedTask = - (ProcessRequestInternal) this.mockRuntime.getNextProcessRequest(); - assertEquals("complete task request", ProcessRequest.Type.COMPLETE_TASK, - completedTask.getType()); - this.mockRuntime.succeed(completedTask); - assertEquals("no running tasks", 0, this.mockApplication.getRunningTasks().size()); - - // create a sub-context - this.mockApplication.submitContext(rootContext, "child"); - assertTrue("check for request", this.mockRuntime.hasProcessRequest()); - final ProcessRequestInternal createContextRequest = - (ProcessRequestInternal) this.mockRuntime.getNextProcessRequest(); - assertEquals("create context request", ProcessRequest.Type.CREATE_CONTEXT, - createContextRequest.getType()); - this.mockRuntime.succeed(createContextRequest); - final ActiveContext context = this.mockApplication.getContext(evaluator, "child"); - assertTrue("child context", context.getParentId().get().equals(rootContext.getId())); - } - - @Test - public void testFailureRequests() throws Exception { - assertTrue("mock application received start event", this.mockApplication.isRunning()); - - this.mockApplication.requestEvaluators(1); - assertTrue("check for process event", this.mockRuntime.hasProcessRequest()); - ProcessRequest allocateEvaluatorRequest = this.mockRuntime.getNextProcessRequest(); - this.mockRuntime.fail(allocateEvaluatorRequest); - assertEquals("evaluator allocation failed", 1, - this.mockApplication.getFailedEvaluators().size()); - - this.mockApplication.requestEvaluators(1); - allocateEvaluatorRequest = this.mockRuntime.getNextProcessRequest(); - final AllocatedEvaluator evaluator = - (AllocatedEvaluator)((ProcessRequestInternal)allocateEvaluatorRequest).getSuccessEvent(); - this.mockRuntime.succeed(allocateEvaluatorRequest); - final ActiveContext rootContext = this.mockApplication - .getContext(evaluator, MockAllocatedEvalautor.ROOT_CONTEXT_IDENTIFIER_PREFIX + evaluator.getId()); - - - // submit a task - this.mockApplication.submitTask(rootContext, "test-task"); - assertTrue("create task queued", this.mockRuntime.hasProcessRequest()); - final ProcessRequest createTaskRequest = this.mockRuntime.getNextProcessRequest(); - assertEquals("create task request", ProcessRequest.Type.CREATE_TASK, - createTaskRequest.getType()); - this.mockRuntime.fail(createTaskRequest); - assertEquals("task running", 1, this.mockApplication.getFailedTasks().size()); - - // create a sub-context - this.mockApplication.submitContext(rootContext, "child"); - assertTrue("check for request", this.mockRuntime.hasProcessRequest()); - final ProcessRequestInternal createContextRequest = - (ProcessRequestInternal) this.mockRuntime.getNextProcessRequest(); - this.mockRuntime.fail(createContextRequest); - assertEquals("child context", 1, this.mockApplication.getFailedContext().size()); - } - - @Test - public void testMockFailures() { - // make sure we're running - assertTrue("mock application received start event", this.mockApplication.isRunning()); - - // allocate an evaluator and get root context - this.mockApplication.requestEvaluators(1); - this.mockRuntime.succeed(this.mockRuntime.getNextProcessRequest()); - final AllocatedEvaluator evaluator = this.mockRuntime.getCurrentAllocatedEvaluators().iterator().next(); - final ActiveContext rootContext = this.mockApplication.getContext(evaluator, - MockAllocatedEvalautor.ROOT_CONTEXT_IDENTIFIER_PREFIX + evaluator.getId()); - - // create a child context off of root context - this.mockApplication.submitContext(rootContext, "child"); - this.mockRuntime.succeed(this.mockRuntime.getNextProcessRequest()); - final ActiveContext childContext = this.mockApplication.getContext(evaluator, "child"); - - // submit a task from child context - this.mockApplication.submitTask(childContext, "test-task"); - final ProcessRequest createTaskRequest = this.mockRuntime.getNextProcessRequest(); - createTaskRequest.setAutoComplete(false); // keep it running - this.mockRuntime.succeed(createTaskRequest); - final RunningTask task = this.mockRuntime.getCurrentRunningTasks().iterator().next(); - - // fail task - this.mockRuntime.fail(task); - assertEquals("task failed", 1, this.mockApplication.getFailedTasks().size()); - - // fail child context - this.mockRuntime.fail(childContext); - assertTrue("child context failed", - this.mockApplication.getFailedContext().iterator().next().getId().equals(childContext.getId())); - // evaluator should still be up - assertEquals("check evaluator", 0, this.mockApplication.getFailedEvaluators().size()); - - // fail evaluator - this.mockRuntime.fail(evaluator); - assertEquals("evaluator failed", 1, this.mockApplication.getFailedEvaluators().size()); - - // both contexts should be failed - assertEquals("root and child contexts failed", 2, - this.mockApplication.getFailedContext().size()); - } -} http://git-wip-us.apache.org/repos/asf/reef/blob/5ed56eba/lang/java/reef-runtime-mock/src/test/java/org/apache/reef/mock/MockApplication.java ---------------------------------------------------------------------- diff --git a/lang/java/reef-runtime-mock/src/test/java/org/apache/reef/mock/MockApplication.java b/lang/java/reef-runtime-mock/src/test/java/org/apache/reef/mock/MockApplication.java deleted file mode 100644 index 86a105e..0000000 --- a/lang/java/reef-runtime-mock/src/test/java/org/apache/reef/mock/MockApplication.java +++ /dev/null @@ -1,275 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, - * software distributed under the License is distributed on an - * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY - * KIND, either express or implied. See the License for the - * specific language governing permissions and limitations - * under the License. - */ - -package org.apache.reef.mock; - -import org.apache.reef.driver.context.ActiveContext; -import org.apache.reef.driver.context.ClosedContext; -import org.apache.reef.driver.context.ContextConfiguration; -import org.apache.reef.driver.context.FailedContext; -import org.apache.reef.driver.evaluator.AllocatedEvaluator; -import org.apache.reef.driver.evaluator.CompletedEvaluator; -import org.apache.reef.driver.evaluator.EvaluatorRequestor; -import org.apache.reef.driver.evaluator.FailedEvaluator; -import org.apache.reef.driver.task.*; -import org.apache.reef.tang.annotations.Unit; -import org.apache.reef.task.Task; -import org.apache.reef.wake.EventHandler; -import org.apache.reef.wake.time.Clock; -import org.apache.reef.wake.time.event.Alarm; -import org.apache.reef.wake.time.event.StartTime; -import org.apache.reef.wake.time.event.StopTime; - -import javax.inject.Inject; -import java.util.*; -import java.util.logging.Level; -import java.util.logging.Logger; - -/** - * mock application. - */ -@Unit -final class MockApplication { - - private static final Logger LOG = Logger.getLogger(MockApplication.class.getName()); - - private final Clock clock; - - private final EvaluatorRequestor evaluatorRequestor; - - private final Map<String, Map<String, ActiveContext>> evaluatorId2ContextId2ContextMap = new HashMap<>(); - - private final Map<String, AllocatedEvaluator> evaluatorMap = new HashMap<>(); - - private final Map<String, FailedEvaluator> failedEvaluatorMap = new HashMap<>(); - - private final Map<String, RunningTask> evaluatorIdRunningTaskMap = new HashMap<>(); - - private final Set<FailedContext> failedContextSet = new HashSet<>(); - - private final Set<FailedTask> failedTaskSet = new HashSet<>(); - - private final Set<SuspendedTask> suspendedTaskSet = new HashSet<>(); - - private boolean running = false; - - @Inject - MockApplication(final Clock clock, final EvaluatorRequestor evaluatorRequestor) { - this.clock = clock; - this.evaluatorRequestor = evaluatorRequestor; - } - - ActiveContext getContext(final AllocatedEvaluator evaluator, final String identifier) { - return this.evaluatorId2ContextId2ContextMap.get(evaluator.getId()).get(identifier); - } - - Collection<RunningTask> getRunningTasks() { - return Collections.unmodifiableCollection(this.evaluatorIdRunningTaskMap.values()); - } - - Collection<AllocatedEvaluator> getAllocatedEvaluators() { - return Collections.unmodifiableCollection(this.evaluatorMap.values()); - } - - Collection<FailedEvaluator> getFailedEvaluators() { - return Collections.unmodifiableCollection(this.failedEvaluatorMap.values()); - } - - Collection<FailedTask> getFailedTasks() { - return Collections.unmodifiableCollection(this.failedTaskSet); - } - - Collection<FailedContext> getFailedContext() { - return Collections.unmodifiableCollection(this.failedContextSet); - } - - void requestEvaluators(final int numEvaluators) { - LOG.log(Level.INFO, "request {0} Evaluators", numEvaluators); - evaluatorRequestor.newRequest() - .setMemory(128) - .setNumberOfCores(1) - .setNumber(numEvaluators) - .submit(); - } - - void submitTask(final ActiveContext context, final String identifier) { - context.submitTask(TaskConfiguration.CONF - .set(TaskConfiguration.IDENTIFIER, identifier) - .set(TaskConfiguration.TASK, DummyTestTask.class) - .build()); - } - - void submitContext(final ActiveContext context, final String identifier) { - context.submitContext(ContextConfiguration.CONF - .set(ContextConfiguration.IDENTIFIER, identifier) - .build()); - } - - boolean isRunning() { - return this.running; - } - - boolean exists(final AllocatedEvaluator evaluator) { - return this.evaluatorMap.containsKey(evaluator.getId()); - } - - /** - * Job Driver is ready and the clock is set up: request the evaluatorMap. - */ - final class StartHandler implements EventHandler<StartTime> { - @Override - public void onNext(final StartTime startTime) { - clock.scheduleAlarm(Integer.MAX_VALUE, new EventHandler<Alarm>() { - @Override - public void onNext(final Alarm value) { - throw new RuntimeException("should not happen"); - } - }); - running = true; - } - } - - /** - * Job Driver is is shutting down: write to the log. - */ - final class StopHandler implements EventHandler<StopTime> { - @Override - public void onNext(final StopTime stopTime) { - running = false; - } - } - - /** - * Receive notification that an Evaluator had been allocated, - * and submitTask a new Task in that Evaluator. - */ - final class AllocatedEvaluatorHandler implements EventHandler<AllocatedEvaluator> { - @Override - public void onNext(final AllocatedEvaluator eval) { - evaluatorMap.put(eval.getId(), eval); - } - } - - /** - * Receive notification that the Evaluator has been shut down. - */ - final class CompletedEvaluatorHandler implements EventHandler<CompletedEvaluator> { - @Override - public void onNext(final CompletedEvaluator eval) { - evaluatorMap.remove(eval.getId()); - evaluatorId2ContextId2ContextMap.remove(eval.getId()); - evaluatorIdRunningTaskMap.remove(eval.getId()); - } - } - - final class FailedEvaluatorHandler implements EventHandler<FailedEvaluator> { - - @Override - public void onNext(final FailedEvaluator eval) { - evaluatorMap.remove(eval.getId()); - evaluatorId2ContextId2ContextMap.remove(eval.getId()); - evaluatorIdRunningTaskMap.remove(eval.getId()); - failedEvaluatorMap.put(eval.getId(), eval); - failedContextSet.addAll(eval.getFailedContextList()); - } - } - - /** - * Receive notification that the Context is active. - */ - final class ActiveContextHandler implements EventHandler<ActiveContext> { - @Override - public void onNext(final ActiveContext context) { - if (!evaluatorId2ContextId2ContextMap.containsKey(context.getEvaluatorId())) { - evaluatorId2ContextId2ContextMap.put(context.getEvaluatorId(), new HashMap<String, ActiveContext>()); - } - if (evaluatorId2ContextId2ContextMap.get(context.getEvaluatorId()).containsKey(context.getId())) { - throw new IllegalStateException( - String.format("Context %s on evaluator %s already exists on evaluator with " + - "same identifier", context.getId(), context.getEvaluatorId())); - } - evaluatorId2ContextId2ContextMap.get(context.getEvaluatorId()).put(context.getId(), context); - } - } - - final class ContextClosedHandler implements EventHandler<ClosedContext> { - @Override - public void onNext(final ClosedContext value) { - assert evaluatorId2ContextId2ContextMap.containsKey(value.getEvaluatorId()); - assert evaluatorId2ContextId2ContextMap.get(value.getEvaluatorId()).containsKey(value.getId()); - evaluatorId2ContextId2ContextMap.get(value.getEvaluatorId()).remove(value.getId()); - } - } - - final class FailedContextHandler implements EventHandler<FailedContext> { - @Override - public void onNext(final FailedContext value) { - if (evaluatorId2ContextId2ContextMap.containsKey(value.getEvaluatorId()) && - evaluatorId2ContextId2ContextMap.get(value.getEvaluatorId()).containsKey(value.getId())) { - evaluatorId2ContextId2ContextMap.get(value.getEvaluatorId()).remove(value.getEvaluatorId()); - } else { - // must have failed before it succeeded - } - failedContextSet.add(value); - } - } - - /** - * Receive notification that the Task is running. - */ - final class RunningTaskHandler implements EventHandler<RunningTask> { - @Override - public void onNext(final RunningTask task) { - evaluatorIdRunningTaskMap.put(task.getActiveContext().getEvaluatorId(), task); - } - } - - /** - * Receive notification that the Task has completed successfully. - */ - final class CompletedTaskHandler implements EventHandler<CompletedTask> { - @Override - public void onNext(final CompletedTask task) { - evaluatorIdRunningTaskMap.remove(task.getActiveContext().getEvaluatorId()); - } - } - - final class FailedTaskHandler implements EventHandler<FailedTask> { - @Override - public void onNext(final FailedTask value) { - evaluatorIdRunningTaskMap.remove(value.getActiveContext().get().getEvaluatorId()); - failedTaskSet.add(value); - } - } - - final class SuspendedTaskHandler implements EventHandler<SuspendedTask> { - @Override - public void onNext(final SuspendedTask value) { - evaluatorIdRunningTaskMap.remove(value.getActiveContext().getEvaluatorId()); - suspendedTaskSet.add(value); - } - } - - private static final class DummyTestTask implements Task { - @Override - public byte[] call(final byte[] memento) throws Exception { - return new byte[0]; - } - } -} http://git-wip-us.apache.org/repos/asf/reef/blob/5ed56eba/lang/java/reef-runtime-mock/src/test/java/org/apache/reef/mock/driver/BasicMockTests.java ---------------------------------------------------------------------- diff --git a/lang/java/reef-runtime-mock/src/test/java/org/apache/reef/mock/driver/BasicMockTests.java b/lang/java/reef-runtime-mock/src/test/java/org/apache/reef/mock/driver/BasicMockTests.java new file mode 100644 index 0000000..8b52295 --- /dev/null +++ b/lang/java/reef-runtime-mock/src/test/java/org/apache/reef/mock/driver/BasicMockTests.java @@ -0,0 +1,163 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.reef.mock.driver; + +import org.apache.reef.driver.context.ActiveContext; +import org.apache.reef.driver.evaluator.AllocatedEvaluator; +import org.apache.reef.driver.task.RunningTask; +import org.apache.reef.mock.driver.request.ProcessRequestInternal; +import org.apache.reef.mock.driver.runtime.MockAllocatedEvaluator; +import org.apache.reef.mock.driver.runtime.MockClock; +import org.apache.reef.tang.Configuration; +import org.apache.reef.tang.Injector; +import org.apache.reef.tang.Tang; +import org.junit.Before; +import org.junit.Test; + +import static org.junit.Assert.assertEquals; +import static org.junit.Assert.assertTrue; + +/** + * basic mock tests. + */ +final class BasicMockTests { + + private MockApplication mockApplication; + + private MockRuntime mockRuntime; + + private MockClock mockClock; + + @Before + public void initialize() throws Exception { + final Configuration conf = MockConfiguration.CONF + .set(MockConfiguration.ON_DRIVER_STARTED, MockApplication.StartHandler.class) + .set(MockConfiguration.ON_DRIVER_STOP, MockApplication.StopHandler.class) + .set(MockConfiguration.ON_CONTEXT_ACTIVE, MockApplication.ActiveContextHandler.class) + .set(MockConfiguration.ON_CONTEXT_CLOSED, MockApplication.ContextClosedHandler.class) + .set(MockConfiguration.ON_CONTEXT_FAILED, MockApplication.FailedContextHandler.class) + .set(MockConfiguration.ON_EVALUATOR_ALLOCATED, MockApplication.AllocatedEvaluatorHandler.class) + .set(MockConfiguration.ON_EVALUATOR_COMPLETED, MockApplication.CompletedEvaluatorHandler.class) + .set(MockConfiguration.ON_EVALUATOR_FAILED, MockApplication.FailedEvaluatorHandler.class) + .set(MockConfiguration.ON_TASK_COMPLETED, MockApplication.CompletedTaskHandler.class) + .set(MockConfiguration.ON_TASK_FAILED, MockApplication.FailedTaskHandler.class) + .set(MockConfiguration.ON_TASK_RUNNING, MockApplication.RunningTaskHandler.class) + .set(MockConfiguration.ON_TASK_SUSPENDED, MockApplication.SuspendedTaskHandler.class) + .build(); + + final Injector injector = Tang.Factory.getTang().newInjector(conf); + this.mockApplication = injector.getInstance(MockApplication.class); + this.mockRuntime = injector.getInstance(MockRuntime.class); + this.mockClock = injector.getInstance(MockClock.class); + + this.mockClock.run(); + } + + @Test + public void testSuccessRequests() throws Exception { + assertTrue("mock application received start event", this.mockApplication.isRunning()); + + this.mockApplication.requestEvaluators(1); + assertTrue("check for process event", this.mockRuntime.hasProcessRequest()); + final ProcessRequest allocateEvaluatorRequest = this.mockRuntime.getNextProcessRequest(); + assertEquals("allocate evaluator request", ProcessRequest.Type.ALLOCATE_EVALUATOR, + allocateEvaluatorRequest.getType()); + final AllocatedEvaluator evaluator = + ((ProcessRequestInternal<AllocatedEvaluator, Object>)allocateEvaluatorRequest) + .getSuccessEvent(); + this.mockRuntime.succeed(allocateEvaluatorRequest); + assertTrue("evaluator allocation succeeded", + this.mockApplication.getAllocatedEvaluators().contains(evaluator)); + final String contextId = "foo"; + this.mockApplication.submitContext(evaluator, contextId); + final ActiveContext rootContext = ((MockAllocatedEvaluator) evaluator).getRootContext(); + assertTrue("root context", rootContext != null); + + + // submit a task + this.mockApplication.submitTask(rootContext, "test-task"); + assertTrue("create task queued", this.mockRuntime.hasProcessRequest()); + final ProcessRequest createTaskRequest = this.mockRuntime.getNextProcessRequest(); + assertEquals("create task request", ProcessRequest.Type.CREATE_TASK, + createTaskRequest.getType()); + final RunningTask task = (RunningTask) ((ProcessRequestInternal)createTaskRequest).getSuccessEvent(); + this.mockRuntime.succeed(createTaskRequest); + assertTrue("task running", this.mockApplication.getRunningTasks().contains(task)); + + // check task auto complete + assertTrue("check for request", this.mockRuntime.hasProcessRequest()); + final ProcessRequestInternal completedTask = + (ProcessRequestInternal) this.mockRuntime.getNextProcessRequest(); + assertEquals("complete task request", ProcessRequest.Type.COMPLETE_TASK, + completedTask.getType()); + this.mockRuntime.succeed(completedTask); + assertEquals("no running tasks", 0, this.mockApplication.getRunningTasks().size()); + } + + @Test + public void testFailureRequests() throws Exception { + assertTrue("mock application received start event", this.mockApplication.isRunning()); + + this.mockApplication.requestEvaluators(1); + assertTrue("check for process event", this.mockRuntime.hasProcessRequest()); + ProcessRequest allocateEvaluatorRequest = this.mockRuntime.getNextProcessRequest(); + this.mockRuntime.fail(allocateEvaluatorRequest); + assertEquals("evaluator allocation failed", 1, + this.mockApplication.getFailedEvaluators().size()); + + this.mockApplication.requestEvaluators(1); + allocateEvaluatorRequest = this.mockRuntime.getNextProcessRequest(); + final AllocatedEvaluator evaluator = + (AllocatedEvaluator)((ProcessRequestInternal)allocateEvaluatorRequest).getSuccessEvent(); + this.mockRuntime.succeed(allocateEvaluatorRequest); + this.mockApplication.submitContext(evaluator, "FOO"); + final ActiveContext rootContext = this.mockApplication + .getContext(evaluator, "FOO"); + + + // submit a task + this.mockApplication.submitTask(rootContext, "test-task"); + assertTrue("create task queued", this.mockRuntime.hasProcessRequest()); + final ProcessRequest createTaskRequest = this.mockRuntime.getNextProcessRequest(); + assertEquals("create task request", ProcessRequest.Type.CREATE_TASK, + createTaskRequest.getType()); + this.mockRuntime.fail(createTaskRequest); + assertEquals("task running", 1, this.mockApplication.getFailedTasks().size()); + } + + @Test + public void testMockFailures() { + // make sure we're running + assertTrue("mock application received start event", this.mockApplication.isRunning()); + + // allocate an evaluator and get root context + this.mockApplication.requestEvaluators(1); + this.mockRuntime.succeed(this.mockRuntime.getNextProcessRequest()); + final AllocatedEvaluator evaluator = this.mockRuntime.getCurrentAllocatedEvaluators().iterator().next(); + this.mockApplication.submitContext(evaluator, "FOO"); + // fail evaluator + this.mockRuntime.fail(evaluator); + assertEquals("evaluator failed", 1, this.mockApplication.getFailedEvaluators().size()); + + // both contexts should be failed + assertEquals("root and child contexts failed", 2, + this.mockApplication.getFailedContext().size()); + } +} http://git-wip-us.apache.org/repos/asf/reef/blob/5ed56eba/lang/java/reef-runtime-mock/src/test/java/org/apache/reef/mock/driver/MockApplication.java ---------------------------------------------------------------------- diff --git a/lang/java/reef-runtime-mock/src/test/java/org/apache/reef/mock/driver/MockApplication.java b/lang/java/reef-runtime-mock/src/test/java/org/apache/reef/mock/driver/MockApplication.java new file mode 100644 index 0000000..3ce66b2 --- /dev/null +++ b/lang/java/reef-runtime-mock/src/test/java/org/apache/reef/mock/driver/MockApplication.java @@ -0,0 +1,275 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.reef.mock.driver; + +import org.apache.reef.driver.context.ActiveContext; +import org.apache.reef.driver.context.ClosedContext; +import org.apache.reef.driver.context.ContextConfiguration; +import org.apache.reef.driver.context.FailedContext; +import org.apache.reef.driver.evaluator.AllocatedEvaluator; +import org.apache.reef.driver.evaluator.CompletedEvaluator; +import org.apache.reef.driver.evaluator.EvaluatorRequestor; +import org.apache.reef.driver.evaluator.FailedEvaluator; +import org.apache.reef.driver.task.*; +import org.apache.reef.tang.annotations.Unit; +import org.apache.reef.task.Task; +import org.apache.reef.wake.EventHandler; +import org.apache.reef.wake.time.Clock; +import org.apache.reef.wake.time.event.Alarm; +import org.apache.reef.wake.time.event.StartTime; +import org.apache.reef.wake.time.event.StopTime; + +import javax.inject.Inject; +import java.util.*; +import java.util.logging.Level; +import java.util.logging.Logger; + +/** + * mock application. + */ +@Unit +final class MockApplication { + + private static final Logger LOG = Logger.getLogger(MockApplication.class.getName()); + + private final Clock clock; + + private final EvaluatorRequestor evaluatorRequestor; + + private final Map<String, Map<String, ActiveContext>> evaluatorId2ContextId2ContextMap = new HashMap<>(); + + private final Map<String, AllocatedEvaluator> evaluatorMap = new HashMap<>(); + + private final Map<String, FailedEvaluator> failedEvaluatorMap = new HashMap<>(); + + private final Map<String, RunningTask> evaluatorIdRunningTaskMap = new HashMap<>(); + + private final Set<FailedContext> failedContextSet = new HashSet<>(); + + private final Set<FailedTask> failedTaskSet = new HashSet<>(); + + private final Set<SuspendedTask> suspendedTaskSet = new HashSet<>(); + + private boolean running = false; + + @Inject + MockApplication(final Clock clock, final EvaluatorRequestor evaluatorRequestor) { + this.clock = clock; + this.evaluatorRequestor = evaluatorRequestor; + } + + ActiveContext getContext(final AllocatedEvaluator evaluator, final String identifier) { + return this.evaluatorId2ContextId2ContextMap.get(evaluator.getId()).get(identifier); + } + + Collection<RunningTask> getRunningTasks() { + return Collections.unmodifiableCollection(this.evaluatorIdRunningTaskMap.values()); + } + + Collection<AllocatedEvaluator> getAllocatedEvaluators() { + return Collections.unmodifiableCollection(this.evaluatorMap.values()); + } + + Collection<FailedEvaluator> getFailedEvaluators() { + return Collections.unmodifiableCollection(this.failedEvaluatorMap.values()); + } + + Collection<FailedTask> getFailedTasks() { + return Collections.unmodifiableCollection(this.failedTaskSet); + } + + Collection<FailedContext> getFailedContext() { + return Collections.unmodifiableCollection(this.failedContextSet); + } + + void requestEvaluators(final int numEvaluators) { + LOG.log(Level.INFO, "request {0} Evaluators", numEvaluators); + evaluatorRequestor.newRequest() + .setMemory(128) + .setNumberOfCores(1) + .setNumber(numEvaluators) + .submit(); + } + + void submitTask(final ActiveContext context, final String identifier) { + context.submitTask(TaskConfiguration.CONF + .set(TaskConfiguration.IDENTIFIER, identifier) + .set(TaskConfiguration.TASK, DummyTestTask.class) + .build()); + } + + void submitContext(final AllocatedEvaluator evaluator, final String identifier) { + evaluator.submitContext(ContextConfiguration.CONF + .set(ContextConfiguration.IDENTIFIER, identifier) + .build()); + } + + boolean isRunning() { + return this.running; + } + + boolean exists(final AllocatedEvaluator evaluator) { + return this.evaluatorMap.containsKey(evaluator.getId()); + } + + /** + * Job Driver is ready and the clock is set up: request the evaluatorMap. + */ + final class StartHandler implements EventHandler<StartTime> { + @Override + public void onNext(final StartTime startTime) { + clock.scheduleAlarm(Integer.MAX_VALUE, new EventHandler<Alarm>() { + @Override + public void onNext(final Alarm value) { + throw new RuntimeException("should not happen"); + } + }); + running = true; + } + } + + /** + * Job Driver is is shutting down: write to the log. + */ + final class StopHandler implements EventHandler<StopTime> { + @Override + public void onNext(final StopTime stopTime) { + running = false; + } + } + + /** + * Receive notification that an Evaluator had been allocated, + * and submitTask a new Task in that Evaluator. + */ + final class AllocatedEvaluatorHandler implements EventHandler<AllocatedEvaluator> { + @Override + public void onNext(final AllocatedEvaluator eval) { + evaluatorMap.put(eval.getId(), eval); + } + } + + /** + * Receive notification that the Evaluator has been shut down. + */ + final class CompletedEvaluatorHandler implements EventHandler<CompletedEvaluator> { + @Override + public void onNext(final CompletedEvaluator eval) { + evaluatorMap.remove(eval.getId()); + evaluatorId2ContextId2ContextMap.remove(eval.getId()); + evaluatorIdRunningTaskMap.remove(eval.getId()); + } + } + + final class FailedEvaluatorHandler implements EventHandler<FailedEvaluator> { + + @Override + public void onNext(final FailedEvaluator eval) { + evaluatorMap.remove(eval.getId()); + evaluatorId2ContextId2ContextMap.remove(eval.getId()); + evaluatorIdRunningTaskMap.remove(eval.getId()); + failedEvaluatorMap.put(eval.getId(), eval); + failedContextSet.addAll(eval.getFailedContextList()); + } + } + + /** + * Receive notification that the Context is active. + */ + final class ActiveContextHandler implements EventHandler<ActiveContext> { + @Override + public void onNext(final ActiveContext context) { + if (!evaluatorId2ContextId2ContextMap.containsKey(context.getEvaluatorId())) { + evaluatorId2ContextId2ContextMap.put(context.getEvaluatorId(), new HashMap<String, ActiveContext>()); + } + if (evaluatorId2ContextId2ContextMap.get(context.getEvaluatorId()).containsKey(context.getId())) { + throw new IllegalStateException( + String.format("Context %s on evaluator %s already exists on evaluator with " + + "same identifier", context.getId(), context.getEvaluatorId())); + } + evaluatorId2ContextId2ContextMap.get(context.getEvaluatorId()).put(context.getId(), context); + } + } + + final class ContextClosedHandler implements EventHandler<ClosedContext> { + @Override + public void onNext(final ClosedContext value) { + assert evaluatorId2ContextId2ContextMap.containsKey(value.getEvaluatorId()); + assert evaluatorId2ContextId2ContextMap.get(value.getEvaluatorId()).containsKey(value.getId()); + evaluatorId2ContextId2ContextMap.get(value.getEvaluatorId()).remove(value.getId()); + } + } + + final class FailedContextHandler implements EventHandler<FailedContext> { + @Override + public void onNext(final FailedContext value) { + if (evaluatorId2ContextId2ContextMap.containsKey(value.getEvaluatorId()) && + evaluatorId2ContextId2ContextMap.get(value.getEvaluatorId()).containsKey(value.getId())) { + evaluatorId2ContextId2ContextMap.get(value.getEvaluatorId()).remove(value.getEvaluatorId()); + } else { + // must have failed before it succeeded + } + failedContextSet.add(value); + } + } + + /** + * Receive notification that the Task is running. + */ + final class RunningTaskHandler implements EventHandler<RunningTask> { + @Override + public void onNext(final RunningTask task) { + evaluatorIdRunningTaskMap.put(task.getActiveContext().getEvaluatorId(), task); + } + } + + /** + * Receive notification that the Task has completed successfully. + */ + final class CompletedTaskHandler implements EventHandler<CompletedTask> { + @Override + public void onNext(final CompletedTask task) { + evaluatorIdRunningTaskMap.remove(task.getActiveContext().getEvaluatorId()); + } + } + + final class FailedTaskHandler implements EventHandler<FailedTask> { + @Override + public void onNext(final FailedTask value) { + evaluatorIdRunningTaskMap.remove(value.getActiveContext().get().getEvaluatorId()); + failedTaskSet.add(value); + } + } + + final class SuspendedTaskHandler implements EventHandler<SuspendedTask> { + @Override + public void onNext(final SuspendedTask value) { + evaluatorIdRunningTaskMap.remove(value.getActiveContext().getEvaluatorId()); + suspendedTaskSet.add(value); + } + } + + private static final class DummyTestTask implements Task { + @Override + public byte[] call(final byte[] memento) throws Exception { + return new byte[0]; + } + } +} http://git-wip-us.apache.org/repos/asf/reef/blob/5ed56eba/lang/java/reef-runtime-mock/src/test/java/org/apache/reef/mock/driver/package-info.java ---------------------------------------------------------------------- diff --git a/lang/java/reef-runtime-mock/src/test/java/org/apache/reef/mock/driver/package-info.java b/lang/java/reef-runtime-mock/src/test/java/org/apache/reef/mock/driver/package-info.java new file mode 100644 index 0000000..75f80b6 --- /dev/null +++ b/lang/java/reef-runtime-mock/src/test/java/org/apache/reef/mock/driver/package-info.java @@ -0,0 +1,23 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + * + */ +/** + * mock runtime tests. + */ +package org.apache.reef.mock.driver; http://git-wip-us.apache.org/repos/asf/reef/blob/5ed56eba/lang/java/reef-runtime-mock/src/test/java/org/apache/reef/mock/package-info.java ---------------------------------------------------------------------- diff --git a/lang/java/reef-runtime-mock/src/test/java/org/apache/reef/mock/package-info.java b/lang/java/reef-runtime-mock/src/test/java/org/apache/reef/mock/package-info.java deleted file mode 100644 index e93688c..0000000 --- a/lang/java/reef-runtime-mock/src/test/java/org/apache/reef/mock/package-info.java +++ /dev/null @@ -1,23 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, - * software distributed under the License is distributed on an - * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY - * KIND, either express or implied. See the License for the - * specific language governing permissions and limitations - * under the License. - * - */ -/** - * mock runtime tests. - */ -package org.apache.reef.mock; http://git-wip-us.apache.org/repos/asf/reef/blob/5ed56eba/lang/java/reef-runtime-yarn/src/main/java/org/apache/reef/runtime/yarn/driver/YarnDriverRestartConfiguration.java ---------------------------------------------------------------------- diff --git a/lang/java/reef-runtime-yarn/src/main/java/org/apache/reef/runtime/yarn/driver/YarnDriverRestartConfiguration.java b/lang/java/reef-runtime-yarn/src/main/java/org/apache/reef/runtime/yarn/driver/YarnDriverRestartConfiguration.java index dc175e2..0f88c46 100644 --- a/lang/java/reef-runtime-yarn/src/main/java/org/apache/reef/runtime/yarn/driver/YarnDriverRestartConfiguration.java +++ b/lang/java/reef-runtime-yarn/src/main/java/org/apache/reef/runtime/yarn/driver/YarnDriverRestartConfiguration.java @@ -40,7 +40,7 @@ import org.apache.reef.tang.formats.OptionalImpl; @Unstable public final class YarnDriverRestartConfiguration extends ConfigurationModuleBuilder { /** - * The Evaluator Preserver implementation used for YARN. Defaults to DFSEvalutorPreserver. + * The Evaluator Preserver implementation used for YARN. Defaults to DFSEvaluatorPreserver. */ public static final OptionalImpl<EvaluatorPreserver> EVALUATOR_PRESERVER = new OptionalImpl<>(); http://git-wip-us.apache.org/repos/asf/reef/blob/5ed56eba/lang/java/reef-runtime-yarn/src/main/java/org/apache/reef/runtime/yarn/driver/YarnDriverRuntimeRestartManager.java ---------------------------------------------------------------------- diff --git a/lang/java/reef-runtime-yarn/src/main/java/org/apache/reef/runtime/yarn/driver/YarnDriverRuntimeRestartManager.java b/lang/java/reef-runtime-yarn/src/main/java/org/apache/reef/runtime/yarn/driver/YarnDriverRuntimeRestartManager.java index bdf1779..9ba8d28 100644 --- a/lang/java/reef-runtime-yarn/src/main/java/org/apache/reef/runtime/yarn/driver/YarnDriverRuntimeRestartManager.java +++ b/lang/java/reef-runtime-yarn/src/main/java/org/apache/reef/runtime/yarn/driver/YarnDriverRuntimeRestartManager.java @@ -156,7 +156,7 @@ public final class YarnDriverRuntimeRestartManager implements DriverRuntimeResta /** * Used by {@link org.apache.reef.driver.restart.DriverRestartManager}. * Gets the list of previous containers from the resource manager, - * compares that list to the YarnDriverRuntimeRestartManager's own list based on the evalutor preserver, + * compares that list to the YarnDriverRuntimeRestartManager's own list based on the evaluator preserver, * and determine which evaluators are alive and which have failed during restart. * @return a map of Evaluator ID to {@link EvaluatorRestartInfo} for evaluators that have either failed or survived * driver restart.
