http://git-wip-us.apache.org/repos/asf/reef/blob/5ed56eba/lang/java/reef-runtime-mock/src/main/java/org/apache/reef/mock/driver/request/SendMessageDriverToContext.java
----------------------------------------------------------------------
diff --git 
a/lang/java/reef-runtime-mock/src/main/java/org/apache/reef/mock/driver/request/SendMessageDriverToContext.java
 
b/lang/java/reef-runtime-mock/src/main/java/org/apache/reef/mock/driver/request/SendMessageDriverToContext.java
new file mode 100644
index 0000000..d90486a
--- /dev/null
+++ 
b/lang/java/reef-runtime-mock/src/main/java/org/apache/reef/mock/driver/request/SendMessageDriverToContext.java
@@ -0,0 +1,81 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.reef.mock.driver.request;
+
+import org.apache.reef.annotations.Unstable;
+import org.apache.reef.annotations.audience.Private;
+import org.apache.reef.driver.context.ActiveContext;
+import org.apache.reef.mock.driver.ProcessRequest;
+
+/**
+ * send message from driver to context process request.
+ */
+@Unstable
+@Private
+public final class SendMessageDriverToContext implements
+    ProcessRequestInternal<Object, Object> {
+
+  private final ActiveContext context;
+
+  private final byte[] message;
+
+  public SendMessageDriverToContext(final ActiveContext context, final byte[] 
message) {
+    this.context = context;
+    this.message = message;
+  }
+
+  @Override
+  public Type getType() {
+    return Type.SEND_MESSAGE_DRIVER_TO_CONTEXT;
+  }
+
+  public ActiveContext getContext() {
+    return this.context;
+  }
+
+  public byte[] getMessage() {
+    return this.message;
+  }
+
+  @Override
+  public Object getSuccessEvent() {
+    throw new UnsupportedOperationException();
+  }
+
+  @Override
+  public Object getFailureEvent() {
+    throw new UnsupportedOperationException();
+  }
+
+  @Override
+  public boolean doAutoComplete() {
+    return false;
+  }
+
+  @Override
+  public void setAutoComplete(final boolean value) {
+    throw new UnsupportedOperationException();
+  }
+
+  @Override
+  public ProcessRequest getCompletionProcessRequest() {
+    throw new UnsupportedOperationException();
+  }
+}

http://git-wip-us.apache.org/repos/asf/reef/blob/5ed56eba/lang/java/reef-runtime-mock/src/main/java/org/apache/reef/mock/driver/request/SendMessageDriverToTask.java
----------------------------------------------------------------------
diff --git 
a/lang/java/reef-runtime-mock/src/main/java/org/apache/reef/mock/driver/request/SendMessageDriverToTask.java
 
b/lang/java/reef-runtime-mock/src/main/java/org/apache/reef/mock/driver/request/SendMessageDriverToTask.java
new file mode 100644
index 0000000..bf6c3dd
--- /dev/null
+++ 
b/lang/java/reef-runtime-mock/src/main/java/org/apache/reef/mock/driver/request/SendMessageDriverToTask.java
@@ -0,0 +1,81 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.reef.mock.driver.request;
+
+import org.apache.reef.annotations.Unstable;
+import org.apache.reef.annotations.audience.Private;
+import org.apache.reef.driver.task.RunningTask;
+import org.apache.reef.mock.driver.ProcessRequest;
+
+/**
+ * send message from driver to task process request.
+ */
+@Unstable
+@Private
+public final class SendMessageDriverToTask implements
+    ProcessRequestInternal<Object, Object> {
+
+  private RunningTask task;
+
+  private final byte[] message;
+
+  public SendMessageDriverToTask(final RunningTask task, final byte[] message) 
{
+    this.task = task;
+    this.message = message;
+  }
+
+  @Override
+  public Type getType() {
+    return Type.SEND_MESSAGE_DRIVER_TO_TASK;
+  }
+
+  public RunningTask getTask() {
+    return task;
+  }
+
+  public byte[] getMessage() {
+    return message;
+  }
+
+  @Override
+  public Object getSuccessEvent() {
+    throw new UnsupportedOperationException();
+  }
+
+  @Override
+  public Object getFailureEvent() {
+    throw new UnsupportedOperationException();
+  }
+
+  @Override
+  public boolean doAutoComplete() {
+    return false;
+  }
+
+  @Override
+  public void setAutoComplete(final boolean value) {
+    throw new UnsupportedOperationException();
+  }
+
+  @Override
+  public ProcessRequest getCompletionProcessRequest() {
+    throw new UnsupportedOperationException();
+  }
+}

http://git-wip-us.apache.org/repos/asf/reef/blob/5ed56eba/lang/java/reef-runtime-mock/src/main/java/org/apache/reef/mock/driver/request/SuspendTask.java
----------------------------------------------------------------------
diff --git 
a/lang/java/reef-runtime-mock/src/main/java/org/apache/reef/mock/driver/request/SuspendTask.java
 
b/lang/java/reef-runtime-mock/src/main/java/org/apache/reef/mock/driver/request/SuspendTask.java
new file mode 100644
index 0000000..a4dbcee
--- /dev/null
+++ 
b/lang/java/reef-runtime-mock/src/main/java/org/apache/reef/mock/driver/request/SuspendTask.java
@@ -0,0 +1,90 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.reef.mock.driver.request;
+
+import org.apache.reef.annotations.Unstable;
+import org.apache.reef.annotations.audience.Private;
+import org.apache.reef.driver.task.FailedTask;
+import org.apache.reef.driver.task.SuspendedTask;
+import org.apache.reef.mock.driver.ProcessRequest;
+import org.apache.reef.mock.driver.runtime.MockRunningTask;
+import org.apache.reef.mock.driver.runtime.MockSuspendedTask;
+import org.apache.reef.util.Optional;
+
+/**
+ * suspend task process request.
+ */
+@Unstable
+@Private
+public final class SuspendTask implements 
ProcessRequestInternal<SuspendedTask, FailedTask> {
+
+  private final MockRunningTask task;
+
+  private final Optional<byte[]> message;
+
+  public SuspendTask(final MockRunningTask task, final Optional<byte[]> 
message) {
+    this.task = task;
+    this.message = message;
+  }
+
+  public MockRunningTask getTask() {
+    return task;
+  }
+
+  @Override
+  public Type getType() {
+    return Type.SUSPEND_TASK;
+  }
+
+  public Optional<byte[]> getMessage() {
+    return message;
+  }
+
+  @Override
+  public MockSuspendedTask getSuccessEvent() {
+    return new MockSuspendedTask(this.task);
+  }
+
+  @Override
+  public FailedTask getFailureEvent() {
+    return new FailedTask(
+        this.task.getId(),
+        "mock",
+        Optional.<String>empty(),
+        Optional.<Throwable>empty(),
+        Optional.<byte[]>empty(),
+        Optional.of(this.task.getActiveContext()));
+  }
+
+  @Override
+  public boolean doAutoComplete() {
+    return false;
+  }
+
+  @Override
+  public void setAutoComplete(final boolean value) {
+    throw new UnsupportedOperationException();
+  }
+
+  @Override
+  public ProcessRequest getCompletionProcessRequest() {
+    throw new UnsupportedOperationException();
+  }
+}

http://git-wip-us.apache.org/repos/asf/reef/blob/5ed56eba/lang/java/reef-runtime-mock/src/main/java/org/apache/reef/mock/driver/request/package-info.java
----------------------------------------------------------------------
diff --git 
a/lang/java/reef-runtime-mock/src/main/java/org/apache/reef/mock/driver/request/package-info.java
 
b/lang/java/reef-runtime-mock/src/main/java/org/apache/reef/mock/driver/request/package-info.java
new file mode 100644
index 0000000..22dbfbc
--- /dev/null
+++ 
b/lang/java/reef-runtime-mock/src/main/java/org/apache/reef/mock/driver/request/package-info.java
@@ -0,0 +1,23 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ *  regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ *  Unless required by applicable law or agreed to in writing,
+ *  software distributed under the License is distributed on an
+ *  "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ *  KIND, either express or implied.  See the License for the
+ *  specific language governing permissions and limitations
+ *  under the License.
+ *
+ */
+/**
+ * process request implementations.
+ */
+package org.apache.reef.mock.driver.request;

http://git-wip-us.apache.org/repos/asf/reef/blob/5ed56eba/lang/java/reef-runtime-mock/src/main/java/org/apache/reef/mock/driver/runtime/MockActiveContext.java
----------------------------------------------------------------------
diff --git 
a/lang/java/reef-runtime-mock/src/main/java/org/apache/reef/mock/driver/runtime/MockActiveContext.java
 
b/lang/java/reef-runtime-mock/src/main/java/org/apache/reef/mock/driver/runtime/MockActiveContext.java
new file mode 100644
index 0000000..292dd9d
--- /dev/null
+++ 
b/lang/java/reef-runtime-mock/src/main/java/org/apache/reef/mock/driver/runtime/MockActiveContext.java
@@ -0,0 +1,139 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.reef.mock.driver.runtime;
+
+import org.apache.reef.annotations.Unstable;
+import org.apache.reef.annotations.audience.Private;
+import org.apache.reef.driver.context.ActiveContext;
+import org.apache.reef.driver.evaluator.EvaluatorDescriptor;
+import org.apache.reef.driver.task.TaskConfigurationOptions;
+import org.apache.reef.evaluator.context.parameters.ContextIdentifier;
+import org.apache.reef.mock.driver.request.CloseContext;
+import org.apache.reef.mock.driver.request.CreateContext;
+import org.apache.reef.mock.driver.request.CreateTask;
+import org.apache.reef.mock.driver.request.SendMessageDriverToContext;
+import org.apache.reef.tang.Configuration;
+import org.apache.reef.util.Optional;
+
+/**
+ * mock active context.
+ */
+@Unstable
+@Private
+public final class MockActiveContext implements ActiveContext {
+
+  private final MockRuntimeDriver mockRuntimeDriver;
+
+  private final MockAllocatedEvaluator evaluator;
+
+  private final Optional<MockActiveContext> parentContext;
+
+  private final String contextID;
+
+  MockActiveContext(
+      final MockRuntimeDriver mockRuntimeDriver,
+      final MockAllocatedEvaluator evaluator,
+      final Optional<MockActiveContext> parentContext,
+      final String contextID) {
+    this.mockRuntimeDriver = mockRuntimeDriver;
+    this.evaluator = evaluator;
+    this.parentContext = parentContext;
+    this.contextID = contextID;
+  }
+
+  @Override
+  public int hashCode() {
+    final String id = this.getEvaluatorId() + ":" + contextID;
+    return id.hashCode();
+  }
+
+  public boolean equals(final Object that) {
+    if (that instanceof MockActiveContext) {
+      return 
this.getEvaluatorId().equals(((MockActiveContext)that).getEvaluatorId()) &&
+          this.contextID.equals(((MockActiveContext)that).contextID);
+    }
+    return false;
+  }
+
+  public MockAllocatedEvaluator getEvaluator() {
+    return this.evaluator;
+  }
+
+  public Optional<MockActiveContext> getParentContext() {
+    return this.parentContext;
+  }
+
+  @Override
+  public void close() {
+    this.mockRuntimeDriver.add(new CloseContext(this));
+  }
+
+  @Override
+  public void submitTask(final Configuration taskConf) {
+    final String taskID = MockUtils.getValue(taskConf, 
TaskConfigurationOptions.Identifier.class);
+    final MockRunningTask task = new MockRunningTask(this.mockRuntimeDriver, 
taskID, this);
+    this.mockRuntimeDriver.add(new CreateTask(task, 
this.mockRuntimeDriver.getTaskReturnValueProvider()));
+  }
+
+  @Override
+  public void submitContext(final Configuration contextConfiguration) {
+    final String childContextID = MockUtils.getValue(contextConfiguration, 
ContextIdentifier.class);
+    final MockActiveContext context = new MockActiveContext(
+        this.mockRuntimeDriver,
+        this.evaluator,
+        Optional.of(this),
+        childContextID);
+    this.mockRuntimeDriver.add(new CreateContext(context));
+  }
+
+  @Override
+  public void submitContextAndService(
+      final Configuration contextConfiguration,
+      final Configuration serviceConfiguration) {
+    submitContext(contextConfiguration);
+  }
+
+  @Override
+  public void sendMessage(final byte[] message) {
+    this.mockRuntimeDriver.add(new SendMessageDriverToContext(this, message));
+  }
+
+  @Override
+  public String getEvaluatorId() {
+    return this.evaluator.getId();
+  }
+
+  @Override
+  public Optional<String> getParentId() {
+    return this.parentContext.isPresent() ?
+        Optional.of(this.parentContext.get().getId()) :
+        Optional.<String>empty();
+  }
+
+  @Override
+  public EvaluatorDescriptor getEvaluatorDescriptor() {
+    return this.evaluator.getEvaluatorDescriptor();
+  }
+
+  @Override
+  public String getId() {
+    return this.contextID;
+  }
+}

http://git-wip-us.apache.org/repos/asf/reef/blob/5ed56eba/lang/java/reef-runtime-mock/src/main/java/org/apache/reef/mock/driver/runtime/MockAllocatedEvaluator.java
----------------------------------------------------------------------
diff --git 
a/lang/java/reef-runtime-mock/src/main/java/org/apache/reef/mock/driver/runtime/MockAllocatedEvaluator.java
 
b/lang/java/reef-runtime-mock/src/main/java/org/apache/reef/mock/driver/runtime/MockAllocatedEvaluator.java
new file mode 100644
index 0000000..d3c0581
--- /dev/null
+++ 
b/lang/java/reef-runtime-mock/src/main/java/org/apache/reef/mock/driver/runtime/MockAllocatedEvaluator.java
@@ -0,0 +1,172 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.reef.mock.driver.runtime;
+
+import org.apache.reef.annotations.Unstable;
+import org.apache.reef.annotations.audience.Private;
+import org.apache.reef.driver.evaluator.AllocatedEvaluator;
+import org.apache.reef.driver.evaluator.EvaluatorDescriptor;
+import org.apache.reef.driver.evaluator.EvaluatorProcess;
+import org.apache.reef.driver.task.TaskConfigurationOptions;
+import org.apache.reef.evaluator.context.parameters.ContextIdentifier;
+import org.apache.reef.mock.driver.request.CloseEvaluator;
+import org.apache.reef.mock.driver.request.CreateContext;
+import org.apache.reef.mock.driver.request.CreateContextAndTask;
+import org.apache.reef.tang.Configuration;
+import org.apache.reef.util.Optional;
+
+import java.io.File;
+
+/**
+ * mock allocated evaluator.
+ */
+@Unstable
+@Private
+public final class MockAllocatedEvaluator implements AllocatedEvaluator {
+  public static final String ROOT_CONTEXT_IDENTIFIER_PREFIX = "ROOT.CONTEXT.";
+
+  private final MockRuntimeDriver mockRuntimeDriver;
+
+  private final String identifier;
+
+  private final EvaluatorDescriptor evaluatorDescriptor;
+
+  private MockActiveContext rootContext = null;
+
+  private boolean closed = false;
+
+  MockAllocatedEvaluator(
+      final MockRuntimeDriver mockRuntimeDriver,
+      final String identifier,
+      final EvaluatorDescriptor evaluatorDescriptor) {
+    this.mockRuntimeDriver = mockRuntimeDriver;
+    this.identifier = identifier;
+    this.evaluatorDescriptor = evaluatorDescriptor;
+  }
+
+  public MockActiveContext getRootContext() {
+    return this.rootContext;
+  }
+
+  @Override
+  public void addFile(final File file) {
+    // ignore
+  }
+
+  @Override
+  public void addLibrary(final File file) {
+    // ignore
+  }
+
+  @Override
+  public EvaluatorDescriptor getEvaluatorDescriptor() {
+    return this.evaluatorDescriptor;
+  }
+
+  @Override
+  public void setProcess(final EvaluatorProcess process) {
+    throw new UnsupportedOperationException();
+  }
+
+  @Override
+  public void close() {
+    if (!this.closed) {
+      this.mockRuntimeDriver.add(new CloseEvaluator(this));
+    } else {
+      throw new IllegalStateException("evaluator already closed");
+    }
+  }
+
+  @Override
+  public void submitTask(final Configuration taskConfiguration) {
+    if (this.rootContext != null) {
+      throw new IllegalStateException("Root context already created");
+    }
+    this.rootContext = new MockActiveContext(
+        mockRuntimeDriver,
+        this,
+        Optional.<MockActiveContext>empty(),
+        ROOT_CONTEXT_IDENTIFIER_PREFIX + identifier);
+    final String taskID = MockUtils.getValue(taskConfiguration, 
TaskConfigurationOptions.Identifier.class);
+    final MockRunningTask mockTask = new 
MockRunningTask(this.mockRuntimeDriver, taskID, this.rootContext);
+    this.mockRuntimeDriver.add(
+        new CreateContextAndTask(
+            this.rootContext,
+            mockTask,
+            this.mockRuntimeDriver.getTaskReturnValueProvider()));
+  }
+
+  @Override
+  public void submitContext(final Configuration contextConfiguration) {
+    if (this.rootContext != null) {
+      throw new IllegalStateException("Root context already created");
+    }
+    final String rootContextID = MockUtils.getValue(contextConfiguration, 
ContextIdentifier.class);
+    this.rootContext = new MockActiveContext(
+        this.mockRuntimeDriver,
+        this,
+        Optional.<MockActiveContext>empty(),
+        rootContextID);
+    this.mockRuntimeDriver.add(new CreateContext(this.rootContext));
+  }
+
+  @Override
+  public void submitContextAndService(
+      final Configuration contextConfiguration,
+      final Configuration serviceConfiguration) {
+    submitContext(contextConfiguration);
+    // ignore services
+  }
+
+  @Override
+  public void submitContextAndTask(
+      final Configuration contextConfiguration,
+      final Configuration taskConfiguration) {
+    if (this.rootContext != null) {
+      throw new IllegalStateException("Root context already created");
+    }
+    final String contextID = MockUtils.getValue(contextConfiguration, 
ContextIdentifier.class);
+    final String taskID = MockUtils.getValue(taskConfiguration, 
TaskConfigurationOptions.Identifier.class);
+    this.rootContext = new MockActiveContext(
+        this.mockRuntimeDriver,
+        this,
+        Optional.<MockActiveContext>empty(),
+        contextID);
+    final MockRunningTask mockTask = new 
MockRunningTask(this.mockRuntimeDriver, taskID, this.rootContext);
+    this.mockRuntimeDriver.add(
+        new CreateContextAndTask(
+            this.rootContext,
+            mockTask,
+            this.mockRuntimeDriver.getTaskReturnValueProvider()));
+  }
+
+  @Override
+  public void submitContextAndServiceAndTask(
+      final Configuration contextConfiguration,
+      final Configuration serviceConfiguration,
+      final Configuration taskConfiguration) {
+    submitContextAndTask(contextConfiguration, taskConfiguration);
+  }
+
+  @Override
+  public String getId() {
+    return this.identifier;
+  }
+}

http://git-wip-us.apache.org/repos/asf/reef/blob/5ed56eba/lang/java/reef-runtime-mock/src/main/java/org/apache/reef/mock/driver/runtime/MockClock.java
----------------------------------------------------------------------
diff --git 
a/lang/java/reef-runtime-mock/src/main/java/org/apache/reef/mock/driver/runtime/MockClock.java
 
b/lang/java/reef-runtime-mock/src/main/java/org/apache/reef/mock/driver/runtime/MockClock.java
new file mode 100644
index 0000000..297647a
--- /dev/null
+++ 
b/lang/java/reef-runtime-mock/src/main/java/org/apache/reef/mock/driver/runtime/MockClock.java
@@ -0,0 +1,120 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ *  regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ *  Unless required by applicable law or agreed to in writing,
+ *  software distributed under the License is distributed on an
+ *  "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ *  KIND, either express or implied.  See the License for the
+ *  specific language governing permissions and limitations
+ *  under the License.
+ *
+ */
+
+package org.apache.reef.mock.driver.runtime;
+
+import org.apache.reef.annotations.Unstable;
+import org.apache.reef.annotations.audience.Private;
+import org.apache.reef.mock.driver.MockRuntime;
+import org.apache.reef.tang.InjectionFuture;
+import org.apache.reef.wake.EventHandler;
+import org.apache.reef.wake.time.Clock;
+import org.apache.reef.wake.time.Time;
+import org.apache.reef.wake.time.event.Alarm;
+import org.apache.reef.wake.time.runtime.event.ClientAlarm;
+
+import javax.inject.Inject;
+import java.util.ArrayList;
+import java.util.Iterator;
+import java.util.List;
+
+/**
+ * The MockClock can be used to drive alarms set by the client application.
+ */
+@Unstable
+@Private
+public final class MockClock implements Clock {
+
+  private final InjectionFuture<MockRuntime> runtime;
+
+  private final List<Alarm> alarmList = new ArrayList<>();
+
+  private long currentTime = 0;
+
+  private boolean closed = false;
+
+  @Inject
+  MockClock(final InjectionFuture<MockRuntime> runtime) {
+    this.runtime = runtime;
+  }
+
+  /**
+   * Advances the clock by the offset amount.
+   * @param offset amount to advance clock
+   */
+  public void advanceClock(final int offset) {
+    this.currentTime += offset;
+    final Iterator<Alarm> iter = this.alarmList.iterator();
+    while (iter.hasNext()) {
+      final Alarm alarm = iter.next();
+      if (alarm.getTimestamp() <= this.currentTime) {
+        alarm.run();
+        iter.remove();
+      }
+    }
+  }
+
+  /**
+   * @return the current mock clock time
+   */
+  public long getCurrentTime() {
+    return this.currentTime;
+  }
+
+  @Override
+  public Time scheduleAlarm(final int offset, final EventHandler<Alarm> 
handler) {
+    final Alarm alarm = new ClientAlarm(this.currentTime + offset, handler);
+    alarmList.add(alarm);
+    return alarm;
+  }
+
+  @Override
+  public void close() {
+    if (!closed) {
+      this.runtime.get().stop();
+      this.closed = true;
+    }
+  }
+
+  @Override
+  public void stop() {
+    close();
+  }
+
+  @Override
+  public void stop(final Throwable exception) {
+    close();
+  }
+
+  @Override
+  public boolean isIdle() {
+    return this.alarmList.size() == 0;
+  }
+
+  @Override
+  public boolean isClosed() {
+    return this.closed;
+  }
+
+  @Override
+  public void run() {
+    this.runtime.get().start();
+  }
+}

http://git-wip-us.apache.org/repos/asf/reef/blob/5ed56eba/lang/java/reef-runtime-mock/src/main/java/org/apache/reef/mock/driver/runtime/MockClosedContext.java
----------------------------------------------------------------------
diff --git 
a/lang/java/reef-runtime-mock/src/main/java/org/apache/reef/mock/driver/runtime/MockClosedContext.java
 
b/lang/java/reef-runtime-mock/src/main/java/org/apache/reef/mock/driver/runtime/MockClosedContext.java
new file mode 100644
index 0000000..d008373
--- /dev/null
+++ 
b/lang/java/reef-runtime-mock/src/main/java/org/apache/reef/mock/driver/runtime/MockClosedContext.java
@@ -0,0 +1,71 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.reef.mock.driver.runtime;
+
+import org.apache.reef.annotations.Unstable;
+import org.apache.reef.annotations.audience.Private;
+import org.apache.reef.driver.context.ActiveContext;
+import org.apache.reef.driver.context.ClosedContext;
+import org.apache.reef.driver.evaluator.EvaluatorDescriptor;
+import org.apache.reef.util.Optional;
+
+/**
+ * mock closed context.
+ */
+@Unstable
+@Private
+public final class MockClosedContext implements ClosedContext {
+
+  private final MockActiveContext mockActiveContext;
+
+  public MockClosedContext(final MockActiveContext activeContext) {
+    this.mockActiveContext = activeContext;
+  }
+
+  public MockActiveContext getMockActiveContext() {
+    return this.mockActiveContext;
+  }
+
+  @Override
+  public ActiveContext getParentContext() {
+    return this.mockActiveContext.getParentContext().isPresent() ?
+      this.mockActiveContext.getParentContext().get() : null;
+  }
+
+  @Override
+  public String getId() {
+    return this.mockActiveContext.getId();
+  }
+
+  @Override
+  public String getEvaluatorId() {
+    return this.mockActiveContext.getEvaluatorId();
+  }
+
+  @Override
+  public Optional<String> getParentId() {
+    return this.mockActiveContext.getParentId();
+  }
+
+  @Override
+  public EvaluatorDescriptor getEvaluatorDescriptor() {
+    return this.mockActiveContext.getEvaluatorDescriptor();
+  }
+}

http://git-wip-us.apache.org/repos/asf/reef/blob/5ed56eba/lang/java/reef-runtime-mock/src/main/java/org/apache/reef/mock/driver/runtime/MockCompletedTask.java
----------------------------------------------------------------------
diff --git 
a/lang/java/reef-runtime-mock/src/main/java/org/apache/reef/mock/driver/runtime/MockCompletedTask.java
 
b/lang/java/reef-runtime-mock/src/main/java/org/apache/reef/mock/driver/runtime/MockCompletedTask.java
new file mode 100644
index 0000000..e141787
--- /dev/null
+++ 
b/lang/java/reef-runtime-mock/src/main/java/org/apache/reef/mock/driver/runtime/MockCompletedTask.java
@@ -0,0 +1,57 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.reef.mock.driver.runtime;
+
+import org.apache.reef.annotations.Unstable;
+import org.apache.reef.annotations.audience.Private;
+import org.apache.reef.driver.context.ActiveContext;
+import org.apache.reef.driver.task.CompletedTask;
+
+/**
+ * mock completed task.
+ */
+@Unstable
+@Private
+public final class MockCompletedTask implements CompletedTask {
+
+  private final MockRunningTask task;
+
+  private final byte[] returnValue;
+
+  public MockCompletedTask(final MockRunningTask task, final byte[] 
returnValue) {
+    this.task = task;
+    this.returnValue = returnValue;
+  }
+
+  @Override
+  public ActiveContext getActiveContext() {
+    return this.task.getActiveContext();
+  }
+
+  @Override
+  public String getId() {
+    return this.task.getId();
+  }
+
+  @Override
+  public byte[] get() {
+    return this.returnValue;
+  }
+}

http://git-wip-us.apache.org/repos/asf/reef/blob/5ed56eba/lang/java/reef-runtime-mock/src/main/java/org/apache/reef/mock/driver/runtime/MockEvaluatorDescriptor.java
----------------------------------------------------------------------
diff --git 
a/lang/java/reef-runtime-mock/src/main/java/org/apache/reef/mock/driver/runtime/MockEvaluatorDescriptor.java
 
b/lang/java/reef-runtime-mock/src/main/java/org/apache/reef/mock/driver/runtime/MockEvaluatorDescriptor.java
new file mode 100644
index 0000000..4341766
--- /dev/null
+++ 
b/lang/java/reef-runtime-mock/src/main/java/org/apache/reef/mock/driver/runtime/MockEvaluatorDescriptor.java
@@ -0,0 +1,64 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.reef.mock.driver.runtime;
+
+import org.apache.reef.annotations.Unstable;
+import org.apache.reef.annotations.audience.Private;
+import org.apache.reef.driver.catalog.NodeDescriptor;
+import org.apache.reef.driver.evaluator.EvaluatorDescriptor;
+import org.apache.reef.driver.evaluator.EvaluatorProcess;
+
+/**
+ * mock evaluator descriptor.
+ */
+@Unstable
+@Private
+public final class MockEvaluatorDescriptor implements EvaluatorDescriptor {
+  private final NodeDescriptor nodeDescriptor;
+
+  MockEvaluatorDescriptor(final NodeDescriptor nodeDescriptor) {
+    this.nodeDescriptor = nodeDescriptor;
+  }
+
+  @Override
+  public NodeDescriptor getNodeDescriptor() {
+    return this.nodeDescriptor;
+  }
+
+  @Override
+  public EvaluatorProcess getProcess() {
+    throw new UnsupportedOperationException();
+  }
+
+  @Override
+  public int getMemory() {
+    return 0;
+  }
+
+  @Override
+  public int getNumberOfCores() {
+    return 1;
+  }
+
+  @Override
+  public String getRuntimeName() {
+    return "mock";
+  }
+}

http://git-wip-us.apache.org/repos/asf/reef/blob/5ed56eba/lang/java/reef-runtime-mock/src/main/java/org/apache/reef/mock/driver/runtime/MockEvaluatorRequestor.java
----------------------------------------------------------------------
diff --git 
a/lang/java/reef-runtime-mock/src/main/java/org/apache/reef/mock/driver/runtime/MockEvaluatorRequestor.java
 
b/lang/java/reef-runtime-mock/src/main/java/org/apache/reef/mock/driver/runtime/MockEvaluatorRequestor.java
new file mode 100644
index 0000000..ec80422
--- /dev/null
+++ 
b/lang/java/reef-runtime-mock/src/main/java/org/apache/reef/mock/driver/runtime/MockEvaluatorRequestor.java
@@ -0,0 +1,85 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.reef.mock.driver.runtime;
+
+import org.apache.reef.annotations.Unstable;
+import org.apache.reef.annotations.audience.Private;
+import org.apache.reef.driver.catalog.NodeDescriptor;
+import org.apache.reef.driver.evaluator.EvaluatorRequest;
+import org.apache.reef.driver.evaluator.EvaluatorRequestor;
+import org.apache.reef.mock.driver.request.AllocateEvaluator;
+import org.apache.reef.tang.InjectionFuture;
+
+import javax.inject.Inject;
+import java.util.UUID;
+
+/**
+ * mock evaluator requestor.
+ */
+@Unstable
+@Private
+public final class MockEvaluatorRequestor implements EvaluatorRequestor {
+
+  private final InjectionFuture<MockRuntimeDriver> mockRuntimeDriver;
+
+  private final InjectionFuture<MockClock> clock;
+
+  @Inject
+  MockEvaluatorRequestor(
+      final InjectionFuture<MockClock> clock,
+      final InjectionFuture<MockRuntimeDriver> mockRuntimeDriver) {
+    this.clock = clock;
+    this.mockRuntimeDriver = mockRuntimeDriver;
+  }
+
+  @Override
+  public void submit(final EvaluatorRequest req) {
+    if (this.clock.get().isClosed()) {
+      throw new IllegalStateException("clock closed");
+    }
+    final NodeDescriptor nodeDescriptor = new MockNodeDescriptor();
+    final MockEvaluatorDescriptor evaluatorDescriptor = new 
MockEvaluatorDescriptor(nodeDescriptor);
+    for (int i = 0; i < req.getNumber(); i++) {
+      final MockAllocatedEvaluator mockEvaluator = new MockAllocatedEvaluator(
+          this.mockRuntimeDriver.get(), UUID.randomUUID().toString(), 
evaluatorDescriptor);
+      this.mockRuntimeDriver.get().add(new AllocateEvaluator(mockEvaluator));
+    }
+  }
+
+  @Override
+  public Builder newRequest() {
+    if (this.clock.get().isClosed()) {
+      throw new IllegalStateException("clock closed");
+    }
+    return new Builder();
+  }
+
+
+  /**
+   * {@link EvaluatorRequest.Builder} extended with a new submit method.
+   * {@link EvaluatorRequest}s are built using this builder.
+   */
+  private final class Builder extends EvaluatorRequest.Builder<Builder> {
+    @Override
+    public void submit() {
+      MockEvaluatorRequestor.this.submit(this.build());
+    }
+  }
+}

http://git-wip-us.apache.org/repos/asf/reef/blob/5ed56eba/lang/java/reef-runtime-mock/src/main/java/org/apache/reef/mock/driver/runtime/MockFailedContext.java
----------------------------------------------------------------------
diff --git 
a/lang/java/reef-runtime-mock/src/main/java/org/apache/reef/mock/driver/runtime/MockFailedContext.java
 
b/lang/java/reef-runtime-mock/src/main/java/org/apache/reef/mock/driver/runtime/MockFailedContext.java
new file mode 100644
index 0000000..d57cc2c
--- /dev/null
+++ 
b/lang/java/reef-runtime-mock/src/main/java/org/apache/reef/mock/driver/runtime/MockFailedContext.java
@@ -0,0 +1,93 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.reef.mock.driver.runtime;
+
+import org.apache.reef.annotations.Unstable;
+import org.apache.reef.annotations.audience.Private;
+import org.apache.reef.driver.context.ActiveContext;
+import org.apache.reef.driver.context.FailedContext;
+import org.apache.reef.driver.evaluator.EvaluatorDescriptor;
+import org.apache.reef.util.Optional;
+
+/**
+ * mock failed context.
+ */
+@Unstable
+@Private
+public final class MockFailedContext implements FailedContext {
+
+  private final MockActiveContext context;
+
+  public MockFailedContext(final MockActiveContext context) {
+    this.context = context;
+  }
+
+  @Override
+  public Optional<ActiveContext> getParentContext() {
+    return this.context.getParentContext().isPresent() ?
+        Optional.of((ActiveContext)this.context.getParentContext().get()) :
+        Optional.<ActiveContext>empty();
+  }
+
+  @Override
+  public String getMessage() {
+    return "mock";
+  }
+
+  @Override
+  public Optional<String> getDescription() {
+    return Optional.empty();
+  }
+
+  @Override
+  public Optional<Throwable> getReason() {
+    return Optional.empty();
+  }
+
+  @Override
+  public Optional<byte[]> getData() {
+    return Optional.empty();
+  }
+
+  @Override
+  public Throwable asError() {
+    return new Exception("mock");
+  }
+
+  @Override
+  public String getEvaluatorId() {
+    return this.context.getEvaluatorId();
+  }
+
+  @Override
+  public Optional<String> getParentId() {
+    return this.context.getParentId();
+  }
+
+  @Override
+  public EvaluatorDescriptor getEvaluatorDescriptor() {
+    return this.context.getEvaluatorDescriptor();
+  }
+
+  @Override
+  public String getId() {
+    return this.context.getId();
+  }
+}

http://git-wip-us.apache.org/repos/asf/reef/blob/5ed56eba/lang/java/reef-runtime-mock/src/main/java/org/apache/reef/mock/driver/runtime/MockFailedEvaluator.java
----------------------------------------------------------------------
diff --git 
a/lang/java/reef-runtime-mock/src/main/java/org/apache/reef/mock/driver/runtime/MockFailedEvaluator.java
 
b/lang/java/reef-runtime-mock/src/main/java/org/apache/reef/mock/driver/runtime/MockFailedEvaluator.java
new file mode 100644
index 0000000..aed9c24
--- /dev/null
+++ 
b/lang/java/reef-runtime-mock/src/main/java/org/apache/reef/mock/driver/runtime/MockFailedEvaluator.java
@@ -0,0 +1,79 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.reef.mock.driver.runtime;
+
+import org.apache.reef.annotations.Unstable;
+import org.apache.reef.annotations.audience.Private;
+import org.apache.reef.driver.context.FailedContext;
+import org.apache.reef.driver.evaluator.FailedEvaluator;
+import org.apache.reef.driver.task.FailedTask;
+import org.apache.reef.exception.EvaluatorException;
+import org.apache.reef.util.Optional;
+
+import java.util.ArrayList;
+import java.util.List;
+
+/**
+ * mock failed evaluator.
+ */
+@Unstable
+@Private
+public final class MockFailedEvaluator implements FailedEvaluator {
+
+  private final String evaluatorID;
+
+  private final List<FailedContext> failedContextList;
+
+  private final Optional<FailedTask> failedTask;
+
+  public MockFailedEvaluator(
+      final String evaluatorID,
+      final List<FailedContext> failedContextList,
+      final Optional<FailedTask> failedTask) {
+    this.evaluatorID = evaluatorID;
+    this.failedContextList = failedContextList;
+    this.failedTask = failedTask;
+  }
+
+  public MockFailedEvaluator(final String evaluatorID) {
+    this.evaluatorID = evaluatorID;
+    this.failedContextList = new ArrayList<>();
+    this.failedTask = Optional.empty();
+  }
+
+  @Override
+  public EvaluatorException getEvaluatorException() {
+    return null;
+  }
+
+  @Override
+  public List<FailedContext> getFailedContextList() {
+    return this.failedContextList;
+  }
+
+  @Override
+  public Optional<FailedTask> getFailedTask() {
+    return this.failedTask;
+  }
+
+  @Override
+  public String getId() {
+    return this.evaluatorID;
+  }
+}

http://git-wip-us.apache.org/repos/asf/reef/blob/5ed56eba/lang/java/reef-runtime-mock/src/main/java/org/apache/reef/mock/driver/runtime/MockNodeDescriptor.java
----------------------------------------------------------------------
diff --git 
a/lang/java/reef-runtime-mock/src/main/java/org/apache/reef/mock/driver/runtime/MockNodeDescriptor.java
 
b/lang/java/reef-runtime-mock/src/main/java/org/apache/reef/mock/driver/runtime/MockNodeDescriptor.java
new file mode 100644
index 0000000..8278a29
--- /dev/null
+++ 
b/lang/java/reef-runtime-mock/src/main/java/org/apache/reef/mock/driver/runtime/MockNodeDescriptor.java
@@ -0,0 +1,68 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.reef.mock.driver.runtime;
+
+import org.apache.reef.annotations.Unstable;
+import org.apache.reef.annotations.audience.Private;
+import org.apache.reef.driver.catalog.NodeDescriptor;
+import org.apache.reef.driver.catalog.RackDescriptor;
+
+import java.net.InetSocketAddress;
+import java.util.ArrayList;
+import java.util.List;
+
+/**
+ * mock node descriptor.
+ */
+@Unstable
+@Private
+public final class MockNodeDescriptor implements NodeDescriptor {
+  @Override
+  public InetSocketAddress getInetSocketAddress() {
+    throw new UnsupportedOperationException();
+  }
+
+  @Override
+  public RackDescriptor getRackDescriptor() {
+    return new RackDescriptor() {
+      @Override
+      public List<NodeDescriptor> getNodes() {
+        final List<NodeDescriptor> nodes = new ArrayList<>();
+        nodes.add(MockNodeDescriptor.this);
+        return nodes;
+      }
+
+      @Override
+      public String getName() {
+        return "mock";
+      }
+    };
+  }
+
+  @Override
+  public String getName() {
+    return "mock";
+  }
+
+  @Override
+  public String getId() {
+    return "mock";
+  }
+}

http://git-wip-us.apache.org/repos/asf/reef/blob/5ed56eba/lang/java/reef-runtime-mock/src/main/java/org/apache/reef/mock/driver/runtime/MockRunningTask.java
----------------------------------------------------------------------
diff --git 
a/lang/java/reef-runtime-mock/src/main/java/org/apache/reef/mock/driver/runtime/MockRunningTask.java
 
b/lang/java/reef-runtime-mock/src/main/java/org/apache/reef/mock/driver/runtime/MockRunningTask.java
new file mode 100644
index 0000000..711635a
--- /dev/null
+++ 
b/lang/java/reef-runtime-mock/src/main/java/org/apache/reef/mock/driver/runtime/MockRunningTask.java
@@ -0,0 +1,97 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.reef.mock.driver.runtime;
+
+import org.apache.reef.annotations.Unstable;
+import org.apache.reef.annotations.audience.Private;
+import org.apache.reef.driver.context.ActiveContext;
+import org.apache.reef.driver.task.RunningTask;
+import org.apache.reef.mock.driver.request.CloseTask;
+import org.apache.reef.mock.driver.request.SendMessageDriverToTask;
+import org.apache.reef.mock.driver.request.SuspendTask;
+import org.apache.reef.runtime.common.driver.task.TaskRepresenter;
+import org.apache.reef.util.Optional;
+
+/**
+ * mock running task.
+ */
+@Unstable
+@Private
+public final class MockRunningTask implements RunningTask {
+
+  private final MockRuntimeDriver mockRuntimeDriver;
+
+  private final String taskID;
+
+  private final ActiveContext context;
+
+  MockRunningTask(
+      final MockRuntimeDriver mockRuntimeDriver,
+      final String taskID,
+      final ActiveContext context) {
+    this.mockRuntimeDriver = mockRuntimeDriver;
+    this.taskID = taskID;
+    this.context = context;
+  }
+
+  public String evaluatorID() {
+    return this.context.getEvaluatorId();
+  }
+
+  @Override
+  public ActiveContext getActiveContext() {
+    return this.context;
+  }
+
+  @Override
+  public void send(final byte[] message) {
+    this.mockRuntimeDriver.add(new SendMessageDriverToTask(this, message));
+  }
+
+  @Override
+  public void suspend(final byte[] message) {
+    this.mockRuntimeDriver.add(new SuspendTask(this, Optional.of(message)));
+  }
+
+  @Override
+  public void suspend() {
+    this.mockRuntimeDriver.add(new SuspendTask(this, 
Optional.<byte[]>empty()));
+  }
+
+  @Override
+  public void close(final byte[] message) {
+    this.mockRuntimeDriver.add(new CloseTask(this, 
this.mockRuntimeDriver.getTaskReturnValueProvider()));
+  }
+
+  @Override
+  public void close() {
+    this.mockRuntimeDriver.add(new CloseTask(this, 
this.mockRuntimeDriver.getTaskReturnValueProvider()));
+  }
+
+  @Override
+  public TaskRepresenter getTaskRepresenter() {
+    throw new UnsupportedOperationException();
+  }
+
+  @Override
+  public String getId() {
+    return this.taskID;
+  }
+}

http://git-wip-us.apache.org/repos/asf/reef/blob/5ed56eba/lang/java/reef-runtime-mock/src/main/java/org/apache/reef/mock/driver/runtime/MockRuntimeDriver.java
----------------------------------------------------------------------
diff --git 
a/lang/java/reef-runtime-mock/src/main/java/org/apache/reef/mock/driver/runtime/MockRuntimeDriver.java
 
b/lang/java/reef-runtime-mock/src/main/java/org/apache/reef/mock/driver/runtime/MockRuntimeDriver.java
new file mode 100644
index 0000000..42ea28d
--- /dev/null
+++ 
b/lang/java/reef-runtime-mock/src/main/java/org/apache/reef/mock/driver/runtime/MockRuntimeDriver.java
@@ -0,0 +1,522 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.reef.mock.driver.runtime;
+
+import org.apache.reef.annotations.Unstable;
+import org.apache.reef.annotations.audience.Private;
+import org.apache.reef.driver.context.ActiveContext;
+import org.apache.reef.driver.context.ContextMessage;
+import org.apache.reef.driver.context.FailedContext;
+import org.apache.reef.driver.evaluator.AllocatedEvaluator;
+import org.apache.reef.driver.evaluator.CompletedEvaluator;
+import org.apache.reef.driver.evaluator.FailedEvaluator;
+import org.apache.reef.driver.parameters.*;
+import org.apache.reef.driver.restart.DriverRestartCompleted;
+import org.apache.reef.driver.restart.DriverRestarted;
+import org.apache.reef.driver.task.*;
+import org.apache.reef.io.Tuple;
+import org.apache.reef.mock.driver.MockDriverRestartContext;
+import org.apache.reef.mock.driver.MockRuntime;
+import org.apache.reef.mock.driver.MockTaskReturnValueProvider;
+import org.apache.reef.mock.driver.ProcessRequest;
+import org.apache.reef.mock.driver.request.*;
+import org.apache.reef.tang.InjectionFuture;
+import org.apache.reef.tang.annotations.Parameter;
+import org.apache.reef.util.Optional;
+import org.apache.reef.wake.EventHandler;
+import org.apache.reef.wake.time.Clock;
+import org.apache.reef.wake.time.event.StartTime;
+import org.apache.reef.wake.time.event.StopTime;
+
+import javax.inject.Inject;
+import java.util.*;
+
+/**
+ * mock runtime driver.
+ */
+@Unstable
+@Private
+public final class MockRuntimeDriver implements MockRuntime {
+
+  private final InjectionFuture<MockClock> clock;
+
+  private final List<ProcessRequest> processRequestQueue = new ArrayList<>();
+
+  private final Set<EventHandler<StartTime>> driverStartHandlers;
+
+  private final Set<EventHandler<StopTime>> driverStopHandlers;
+
+  private final Set<EventHandler<AllocatedEvaluator>> 
allocatedEvaluatorHandlers;
+
+  private final Set<EventHandler<CompletedEvaluator>> 
completedEvaluatorHandlers;
+
+  private final Set<EventHandler<FailedEvaluator>> failedEvaluatorHandlers;
+
+  private final Set<EventHandler<TaskRunningHandlers>> taskRunningHandlers;
+
+  private final Set<EventHandler<FailedTask>> taskFailedHandlers;
+
+  private final Set<EventHandler<TaskMessage>> taskMessageHandlers;
+
+  private final Set<EventHandler<CompletedTask>> taskCompletedHandlers;
+
+  private final Set<EventHandler<SuspendedTask>> taskSuspendedHandlers;
+
+  private final Set<EventHandler<ActiveContext>> contextActiveHandlers;
+
+  private final Set<EventHandler<CloseContext>> contextClosedHandlers;
+
+  private final Set<EventHandler<ContextMessage>> contextMessageHandlers;
+
+  private final Set<EventHandler<FailedContext>> contextFailedHandlers;
+
+  private final Set<EventHandler<DriverRestarted>> driverRestartHandlers;
+
+  private final Set<EventHandler<RunningTask>> 
driverRestartRunningTaskHandlers;
+
+  private final Set<EventHandler<ActiveContext>> 
driverRestartActiveContextHandlers;
+
+  private final Set<EventHandler<DriverRestartCompleted>> 
driverRestartCompletedHandlers;
+
+  private final Set<EventHandler<FailedEvaluator>> 
driverRestartFailedEvaluatorHandlers;
+
+  private final Map<String, MockAllocatedEvaluator> allocatedEvaluatorMap = 
new HashMap<>();
+
+  private final Map<String, List<MockActiveContext>> allocatedContextsMap = 
new HashMap<>();
+
+  private final Map<String, MockRunningTask> runningTasks = new HashMap<>();
+
+  private final MockTaskReturnValueProvider taskReturnValueProvider;
+
+  @Inject
+  MockRuntimeDriver(
+      final InjectionFuture<MockClock> clock,
+      final MockTaskReturnValueProvider taskReturnValueProvider,
+      @Parameter(DriverStartHandler.class) final Set<EventHandler<StartTime>> 
driverStartHandlers,
+      @Parameter(Clock.StopHandler.class) final Set<EventHandler<StopTime>> 
driverStopHandlers,
+      @Parameter(EvaluatorAllocatedHandlers.class) final 
Set<EventHandler<AllocatedEvaluator>>
+          allocatedEvaluatorHandlers,
+      @Parameter(EvaluatorCompletedHandlers.class) final 
Set<EventHandler<CompletedEvaluator>>
+          completedEvaluatorHandlers,
+      @Parameter(EvaluatorFailedHandlers.class) final 
Set<EventHandler<FailedEvaluator>> failedEvaluatorHandlers,
+      @Parameter(TaskRunningHandlers.class) final 
Set<EventHandler<TaskRunningHandlers>> taskRunningHandlers,
+      @Parameter(TaskFailedHandlers.class) final Set<EventHandler<FailedTask>> 
taskFailedHandlers,
+      @Parameter(TaskMessageHandlers.class) final 
Set<EventHandler<TaskMessage>> taskMessageHandlers,
+      @Parameter(TaskCompletedHandlers.class) final 
Set<EventHandler<CompletedTask>> taskCompletedHandlers,
+      @Parameter(TaskSuspendedHandlers.class) final 
Set<EventHandler<SuspendedTask>> taskSuspendedHandlers,
+      @Parameter(ContextActiveHandlers.class) final 
Set<EventHandler<ActiveContext>> contextActiveHandlers,
+      @Parameter(ContextClosedHandlers.class) final 
Set<EventHandler<CloseContext>> contextClosedHandlers,
+      @Parameter(ContextMessageHandlers.class) final 
Set<EventHandler<ContextMessage>> contextMessageHandlers,
+      @Parameter(ContextFailedHandlers.class) final 
Set<EventHandler<FailedContext>> contextFailedHandlers,
+      @Parameter(DriverRestartHandler.class) final 
Set<EventHandler<DriverRestarted>>
+          driverRestartHandlers,
+      @Parameter(DriverRestartTaskRunningHandlers.class) final 
Set<EventHandler<RunningTask>>
+          driverRestartRunningTaskHandlers,
+      @Parameter(DriverRestartContextActiveHandlers.class) final 
Set<EventHandler<ActiveContext>>
+          driverRestartActiveContextHandlers,
+      @Parameter(DriverRestartCompletedHandlers.class) final 
Set<EventHandler<DriverRestartCompleted>>
+          driverRestartCompletedHandlers,
+      @Parameter(DriverRestartFailedEvaluatorHandlers.class) final 
Set<EventHandler<FailedEvaluator>>
+          driverRestartFailedEvaluatorHandlers){
+    this.clock = clock;
+    this.taskReturnValueProvider = taskReturnValueProvider;
+    this.driverStartHandlers = driverStartHandlers;
+    this.driverStopHandlers = driverStopHandlers;
+    this.allocatedEvaluatorHandlers = allocatedEvaluatorHandlers;
+    this.completedEvaluatorHandlers = completedEvaluatorHandlers;
+    this.failedEvaluatorHandlers = failedEvaluatorHandlers;
+    this.taskRunningHandlers = taskRunningHandlers;
+    this.taskFailedHandlers = taskFailedHandlers;
+    this.taskMessageHandlers = taskMessageHandlers;
+    this.taskCompletedHandlers = taskCompletedHandlers;
+    this.taskSuspendedHandlers = taskSuspendedHandlers;
+    this.contextActiveHandlers = contextActiveHandlers;
+    this.contextClosedHandlers = contextClosedHandlers;
+    this.contextMessageHandlers = contextMessageHandlers;
+    this.contextFailedHandlers = contextFailedHandlers;
+    this.driverRestartHandlers = driverRestartHandlers;
+    this.driverRestartRunningTaskHandlers = driverRestartRunningTaskHandlers;
+    this.driverRestartActiveContextHandlers = 
driverRestartActiveContextHandlers;
+    this.driverRestartCompletedHandlers = driverRestartCompletedHandlers;
+    this.driverRestartFailedEvaluatorHandlers = 
driverRestartFailedEvaluatorHandlers;
+  }
+
+  @Override
+  public Collection<AllocatedEvaluator> getCurrentAllocatedEvaluators() {
+    if (this.clock.get().isClosed()) {
+      throw new IllegalStateException("clock is closed");
+    }
+    return new 
ArrayList<AllocatedEvaluator>(this.allocatedEvaluatorMap.values());
+  }
+
+  @Override
+  public void fail(final AllocatedEvaluator evaluator) {
+    if (this.clock.get().isClosed()) {
+      throw new IllegalStateException("clock is closed");
+    }
+    if (this.allocatedEvaluatorMap.remove(evaluator.getId()) == null) {
+      throw new IllegalStateException("unknown evaluator " + evaluator);
+    }
+    FailedTask failedTask = null;
+    if (this.runningTasks.containsKey(evaluator.getId())) {
+      final RunningTask task = this.runningTasks.remove(evaluator.getId());
+      failedTask = new FailedTask(
+          task.getId(),
+          "mock",
+          Optional.<String>empty(),
+          Optional.<Throwable>empty(),
+          Optional.<byte[]>empty(),
+          Optional.<ActiveContext>of(task.getActiveContext()));
+    }
+    final List<FailedContext> failedContexts = new ArrayList<>();
+    for (final MockActiveContext context : 
this.allocatedContextsMap.get(evaluator.getId())) {
+      failedContexts.add(new MockFailedContext(context));
+    }
+    post(this.failedEvaluatorHandlers, new MockFailedEvaluator(
+        evaluator.getId(), failedContexts, Optional.ofNullable(failedTask)));
+  }
+
+  @Override
+  public Collection<ActiveContext> getCurrentActiveContexts() {
+    if (this.clock.get().isClosed()) {
+      throw new IllegalStateException("clock is closed");
+    }
+    final List<ActiveContext> currentActiveContexts = new ArrayList<>();
+    for (final List<MockActiveContext> contexts : 
this.allocatedContextsMap.values()) {
+      currentActiveContexts.addAll(contexts);
+    }
+    return currentActiveContexts;
+  }
+
+  @Override
+  public void fail(final ActiveContext context) {
+    if (this.clock.get().isClosed()) {
+      throw new IllegalStateException("clock is closed");
+    }
+
+    final MockAllocatedEvaluator evaluator = ((MockActiveContext) 
context).getEvaluator();
+
+    // Root context failure shuts evaluator down.
+    if (!((MockActiveContext) context).getParentContext().isPresent()) {
+      allocatedEvaluatorMap.remove(evaluator.getId());
+      post(this.completedEvaluatorHandlers, new CompletedEvaluator() {
+        @Override
+        public String getId() {
+          return evaluator.getId();
+        }
+      });
+    }
+
+    this.allocatedContextsMap.get(evaluator.getId()).remove(context);
+    post(this.contextFailedHandlers, new MockFailedContext((MockActiveContext) 
context));
+  }
+
+  @Override
+  public Collection<RunningTask> getCurrentRunningTasks() {
+    if (this.clock.get().isClosed()) {
+      throw new IllegalStateException("clock is closed");
+    }
+    return new ArrayList<RunningTask>(this.runningTasks.values());
+  }
+
+  @Override
+  public void fail(final RunningTask task) {
+    if (this.clock.get().isClosed()) {
+      throw new IllegalStateException("clock is closed");
+    }
+    final String evaluatorID = task.getActiveContext().getEvaluatorId();
+    if (this.runningTasks.containsKey(evaluatorID) &&
+        this.runningTasks.get(evaluatorID).equals(task)) {
+      this.runningTasks.remove(evaluatorID);
+      post(taskFailedHandlers, new FailedTask(
+          task.getId(),
+          "mock",
+          Optional.<String>empty(),
+          Optional.<Throwable>empty(),
+          Optional.<byte[]>empty(),
+          Optional.of(task.getActiveContext())));
+    } else {
+      throw new IllegalStateException("unknown running task " + task);
+    }
+  }
+
+  @Override
+  public void start() {
+    post(this.driverStartHandlers, new 
StartTime(this.clock.get().getCurrentTime()));
+  }
+
+  @Override
+  public void stop() {
+    post(this.driverStopHandlers, new 
StopTime(this.clock.get().getCurrentTime()));
+  }
+
+  @Override
+  public void restart(final MockDriverRestartContext restartContext, final 
boolean isTimeout, final long duration) {
+    post(this.driverRestartHandlers, restartContext.getDriverRestarted());
+    for (final RunningTask runningTask : restartContext.getRunningTasks()) {
+      post(this.driverRestartRunningTaskHandlers, runningTask);
+    }
+    for (final ActiveContext activeContext : 
restartContext.getIdleActiveContexts()) {
+      post(this.driverRestartActiveContextHandlers, activeContext);
+    }
+    post(this.driverRestartCompletedHandlers, 
restartContext.getDriverRestartCompleted(isTimeout, duration));
+    for (final FailedEvaluator failedEvaluator : 
restartContext.getFailedEvaluators()) {
+      post(this.driverRestartFailedEvaluatorHandlers, failedEvaluator);
+    }
+  }
+
+  @Override
+  public MockDriverRestartContext failDriver(final int attempt, final 
StartTime startTime) {
+    final List<MockActiveContext> activeContexts = new ArrayList<>();
+    for (final List<MockActiveContext> contexts : 
this.allocatedContextsMap.values()) {
+      if (contexts.size() > 0) {
+        activeContexts.add(contexts.get(contexts.size() - 1));
+      }
+    }
+    return new MockDriverRestartContext(
+        attempt,
+        startTime,
+        new ArrayList<>(this.allocatedEvaluatorMap.values()),
+        activeContexts,
+        new ArrayList<>(this.runningTasks.values()));
+  }
+
+  @Override
+  public boolean hasProcessRequest() {
+    return this.processRequestQueue.size() > 0;
+  }
+
+  @Override
+  public ProcessRequest getNextProcessRequest() {
+    if (this.processRequestQueue.size() > 0) {
+      return this.processRequestQueue.remove(0);
+    } else {
+      return null;
+    }
+  }
+
+  @Override
+  public void succeed(final ProcessRequest pr) {
+    if (this.clock.get().isClosed()) {
+      throw new IllegalStateException("clock is closed");
+    }
+    final ProcessRequestInternal request = (ProcessRequestInternal) pr;
+    switch (request.getType()) {
+    case ALLOCATE_EVALUATOR:
+      final MockAllocatedEvaluator allocatedEvaluator = 
((AllocateEvaluator)request).getSuccessEvent();
+      validateAndCreate(allocatedEvaluator);
+      post(this.allocatedEvaluatorHandlers, allocatedEvaluator);
+      break;
+    case CLOSE_EVALUATOR:
+      final CompletedEvaluator closedEvaluator = 
((CloseEvaluator)request).getSuccessEvent();
+      validateAndClose(closedEvaluator);
+      post(this.completedEvaluatorHandlers, closedEvaluator);
+      break;
+    case CREATE_CONTEXT:
+      final MockActiveContext createContext = ((CreateContext) 
request).getSuccessEvent();
+      validateAndCreate(createContext);
+      post(this.contextActiveHandlers, createContext);
+      break;
+    case CLOSE_CONTEXT:
+      final MockClosedContext closeContext = ((CloseContext) 
request).getSuccessEvent();
+      validateAndClose(closeContext);
+      post(this.contextClosedHandlers, closeContext);
+      break;
+    case CREATE_TASK:
+      final MockRunningTask createTask = 
((CreateTask)request).getSuccessEvent();
+      validateAndCreate(createTask);
+      post(this.taskRunningHandlers, request.getSuccessEvent());
+      break;
+    case SUSPEND_TASK:
+      final MockRunningTask suspendedTask = ((SuspendTask)request).getTask();
+      validateAndClose(suspendedTask);
+      post(this.taskSuspendedHandlers, request.getSuccessEvent());
+      break;
+    case CLOSE_TASK:
+    case COMPLETE_TASK:
+      final MockRunningTask completedTask = ((CompleteTask)request).getTask();
+      validateAndClose(completedTask);
+      post(this.taskCompletedHandlers, request.getSuccessEvent());
+      break;
+    case CREATE_CONTEXT_AND_TASK:
+      final CreateContextAndTask createContextTask = (CreateContextAndTask) 
request;
+      final Tuple<MockActiveContext, MockRunningTask> events = 
createContextTask.getSuccessEvent();
+      validateAndCreate(events.getKey());
+      post(this.contextActiveHandlers, events.getKey());
+      validateAndCreate(events.getValue());
+      post(this.taskRunningHandlers, events.getValue());
+      break;
+    case SEND_MESSAGE_DRIVER_TO_TASK:
+      // ignore
+      break;
+    case SEND_MESSAGE_DRIVER_TO_CONTEXT:
+      // ignore
+      break;
+    default:
+      throw new IllegalStateException("unknown type");
+    }
+
+    if (request.doAutoComplete()) {
+      add(request.getCompletionProcessRequest());
+    } else if (!this.clock.get().isClosed() && isIdle()) {
+      this.clock.get().close();
+    }
+  }
+
+  @Override
+  public void fail(final ProcessRequest pr) {
+    if (this.clock.get().isClosed()) {
+      throw new IllegalStateException("clock is closed");
+    }
+    final ProcessRequestInternal request = (ProcessRequestInternal) pr;
+    switch (request.getType()) {
+    case ALLOCATE_EVALUATOR:
+      post(this.failedEvaluatorHandlers, request.getFailureEvent());
+      break;
+    case CLOSE_EVALUATOR:
+      final CompletedEvaluator evaluator = 
((CloseEvaluator)request).getSuccessEvent();
+      validateAndClose(evaluator);
+      post(this.failedEvaluatorHandlers, request.getFailureEvent());
+      break;
+    case CREATE_CONTEXT:
+      post(this.contextFailedHandlers, request.getFailureEvent());
+      break;
+    case CLOSE_CONTEXT:
+      final MockClosedContext context = 
((CloseContext)request).getSuccessEvent();
+      validateAndClose(context);
+      if (context.getParentContext() == null) {
+        add(new CloseEvaluator(context.getMockActiveContext().getEvaluator()));
+      }
+      post(this.contextFailedHandlers, request.getFailureEvent());
+      break;
+    case CREATE_TASK:
+      post(this.taskFailedHandlers, request.getFailureEvent());
+      break;
+    case SUSPEND_TASK:
+      validateAndClose(((SuspendTask)request).getTask());
+      post(this.taskFailedHandlers, request.getFailureEvent());
+      break;
+    case CLOSE_TASK:
+    case COMPLETE_TASK:
+      validateAndClose(((CloseTask)request).getTask());
+      post(this.taskFailedHandlers, request.getFailureEvent());
+      break;
+    case CREATE_CONTEXT_AND_TASK:
+      final CreateContextAndTask createContextTask = (CreateContextAndTask) 
request;
+      final Tuple<MockFailedContext, FailedTask> events = 
createContextTask.getFailureEvent();
+      post(this.taskFailedHandlers, events.getValue());
+      post(this.contextFailedHandlers, events.getKey());
+      break;
+    case SEND_MESSAGE_DRIVER_TO_TASK:
+      // ignore
+      break;
+    case SEND_MESSAGE_DRIVER_TO_CONTEXT:
+      // ignore
+      break;
+    default:
+      throw new IllegalStateException("unknown type");
+    }
+
+    if (!this.clock.get().isClosed() && isIdle()) {
+      this.clock.get().close();
+    }
+  }
+
+  @Override
+  public void publish(final ContextMessage contextMessage) {
+    for (final EventHandler<ContextMessage> handler : 
this.contextMessageHandlers) {
+      handler.onNext(contextMessage);
+    }
+  }
+
+  MockTaskReturnValueProvider getTaskReturnValueProvider() {
+    return this.taskReturnValueProvider;
+  }
+  /**
+   * Used by mock REEF entities (e.g., AllocatedEvaluator, RunningTask) to 
inject
+   * process requests from initiated actions e.g., RunningTask.close().
+   * @param request to inject
+   */
+  void add(final ProcessRequest request) {
+    this.processRequestQueue.add(request);
+  }
+
+  private boolean isIdle() {
+    return this.clock.get().isIdle() &&
+        this.processRequestQueue.isEmpty() &&
+        this.allocatedEvaluatorMap.isEmpty();
+  }
+
+  private <T> void post(final Set<EventHandler<T>> handlers, final Object 
event) {
+    for (final EventHandler<T> handler : handlers) {
+      handler.onNext((T) event);
+    }
+  }
+
+  private void validateAndCreate(final MockActiveContext context) {
+    if (!this.allocatedEvaluatorMap.containsKey(context.getEvaluatorId())) {
+      throw new IllegalStateException("unknown evaluator id " + 
context.getEvaluatorId());
+    } else if 
(!this.allocatedContextsMap.containsKey(context.getEvaluatorId())) {
+      this.allocatedContextsMap.put(context.getEvaluatorId(), new 
ArrayList<MockActiveContext>());
+    }
+    this.allocatedContextsMap.get(context.getEvaluatorId()).add(context);
+  }
+
+  private void validateAndClose(final MockClosedContext context) {
+    if (!this.allocatedContextsMap.containsKey(context.getEvaluatorId())) {
+      throw new IllegalStateException("unknown evaluator id " + 
context.getEvaluatorId());
+    }
+    final List<MockActiveContext> contexts = 
this.allocatedContextsMap.get(context.getEvaluatorId());
+    if (!contexts.get(contexts.size() - 
1).equals(context.getMockActiveContext())) {
+      throw new IllegalStateException("closing context that is not on the top 
of the stack");
+    }
+    contexts.remove(context.getMockActiveContext());
+  }
+
+  private void validateAndCreate(final MockRunningTask task) {
+    if (this.runningTasks.containsKey(task.evaluatorID())) {
+      throw new IllegalStateException("task already running on evaluator " +
+          task.evaluatorID());
+    }
+    this.runningTasks.put(task.evaluatorID(), task);
+  }
+
+  private void validateAndClose(final MockRunningTask task) {
+    if 
(!this.runningTasks.containsKey(task.getActiveContext().getEvaluatorId())) {
+      throw new IllegalStateException("no task running on evaluator");
+    }
+    this.runningTasks.remove(task.getActiveContext().getEvaluatorId());
+  }
+
+  private void validateAndCreate(final MockAllocatedEvaluator evaluator) {
+    if (this.allocatedEvaluatorMap.containsKey(evaluator.getId())) {
+      throw new IllegalStateException("evaluator id " + evaluator.getId() + " 
already exists");
+    }
+    this.allocatedEvaluatorMap.put(evaluator.getId(), evaluator);
+    this.allocatedContextsMap.put(evaluator.getId(), new 
ArrayList<MockActiveContext>());
+  }
+
+  private void validateAndClose(final CompletedEvaluator evaluator) {
+    if (!this.allocatedEvaluatorMap.containsKey(evaluator.getId())) {
+      throw new IllegalStateException("unknown evaluator id " + 
evaluator.getId());
+    }
+    this.allocatedEvaluatorMap.remove(evaluator.getId());
+  }
+}

http://git-wip-us.apache.org/repos/asf/reef/blob/5ed56eba/lang/java/reef-runtime-mock/src/main/java/org/apache/reef/mock/driver/runtime/MockSuspendedTask.java
----------------------------------------------------------------------
diff --git 
a/lang/java/reef-runtime-mock/src/main/java/org/apache/reef/mock/driver/runtime/MockSuspendedTask.java
 
b/lang/java/reef-runtime-mock/src/main/java/org/apache/reef/mock/driver/runtime/MockSuspendedTask.java
new file mode 100644
index 0000000..9f25ae32
--- /dev/null
+++ 
b/lang/java/reef-runtime-mock/src/main/java/org/apache/reef/mock/driver/runtime/MockSuspendedTask.java
@@ -0,0 +1,54 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.reef.mock.driver.runtime;
+
+import org.apache.reef.annotations.Unstable;
+import org.apache.reef.annotations.audience.Private;
+import org.apache.reef.driver.context.ActiveContext;
+import org.apache.reef.driver.task.SuspendedTask;
+
+/**
+ * mock suspended task.
+ */
+@Unstable
+@Private
+public final class MockSuspendedTask implements SuspendedTask {
+
+  private final MockRunningTask task;
+
+  public MockSuspendedTask(final MockRunningTask task) {
+    this.task = task;
+  }
+
+  @Override
+  public ActiveContext getActiveContext() {
+    return this.task.getActiveContext();
+  }
+
+  @Override
+  public byte[] get() {
+    return new byte[0];
+  }
+
+  @Override
+  public String getId() {
+    return this.task.getId();
+  }
+}

http://git-wip-us.apache.org/repos/asf/reef/blob/5ed56eba/lang/java/reef-runtime-mock/src/main/java/org/apache/reef/mock/driver/runtime/MockUtils.java
----------------------------------------------------------------------
diff --git 
a/lang/java/reef-runtime-mock/src/main/java/org/apache/reef/mock/driver/runtime/MockUtils.java
 
b/lang/java/reef-runtime-mock/src/main/java/org/apache/reef/mock/driver/runtime/MockUtils.java
new file mode 100644
index 0000000..23d3b46
--- /dev/null
+++ 
b/lang/java/reef-runtime-mock/src/main/java/org/apache/reef/mock/driver/runtime/MockUtils.java
@@ -0,0 +1,48 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.reef.mock.driver.runtime;
+
+import org.apache.reef.annotations.Unstable;
+import org.apache.reef.annotations.audience.Private;
+import org.apache.reef.tang.Configuration;
+import org.apache.reef.tang.Injector;
+import org.apache.reef.tang.Tang;
+import org.apache.reef.tang.annotations.Name;
+import org.apache.reef.tang.exceptions.InjectionException;
+
+/**
+ * mock utilities.
+ */
+@Unstable
+@Private
+final class MockUtils {
+
+  private MockUtils() {
+  }
+
+  public static <U, T extends Name<U>> U getValue(final Configuration 
configuration, final Class<T> name) {
+    try {
+      final Injector injector = 
Tang.Factory.getTang().newInjector(configuration);
+      return injector.getNamedInstance(name);
+    } catch (InjectionException e) {
+      throw new IllegalStateException(e);
+    }
+  }
+}

http://git-wip-us.apache.org/repos/asf/reef/blob/5ed56eba/lang/java/reef-runtime-mock/src/main/java/org/apache/reef/mock/driver/runtime/package-info.java
----------------------------------------------------------------------
diff --git 
a/lang/java/reef-runtime-mock/src/main/java/org/apache/reef/mock/driver/runtime/package-info.java
 
b/lang/java/reef-runtime-mock/src/main/java/org/apache/reef/mock/driver/runtime/package-info.java
new file mode 100644
index 0000000..50e7eac
--- /dev/null
+++ 
b/lang/java/reef-runtime-mock/src/main/java/org/apache/reef/mock/driver/runtime/package-info.java
@@ -0,0 +1,23 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ *  regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ *  Unless required by applicable law or agreed to in writing,
+ *  software distributed under the License is distributed on an
+ *  "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ *  KIND, either express or implied.  See the License for the
+ *  specific language governing permissions and limitations
+ *  under the License.
+ *
+ */
+/**
+ * mock runtime implementation.
+ */
+package org.apache.reef.mock.driver.runtime;

http://git-wip-us.apache.org/repos/asf/reef/blob/5ed56eba/lang/java/reef-runtime-mock/src/main/java/org/apache/reef/mock/package-info.java
----------------------------------------------------------------------
diff --git 
a/lang/java/reef-runtime-mock/src/main/java/org/apache/reef/mock/package-info.java
 
b/lang/java/reef-runtime-mock/src/main/java/org/apache/reef/mock/package-info.java
deleted file mode 100644
index fdda864..0000000
--- 
a/lang/java/reef-runtime-mock/src/main/java/org/apache/reef/mock/package-info.java
+++ /dev/null
@@ -1,40 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- *  regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *      http://www.apache.org/licenses/LICENSE-2.0
- *
- *  Unless required by applicable law or agreed to in writing,
- *  software distributed under the License is distributed on an
- *  "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- *  KIND, either express or implied.  See the License for the
- *  specific language governing permissions and limitations
- *  under the License.
- *
- */
-/**
- * Mock runtime API.
- *
- * Mock runtime is meant to mimic the semantics of the REEF runtime and
- * allow:
- *  1. Applications to driver the forward progress of processing REEF events.
- *     See {@link org.apache.reef.mock.MockRuntime} API
- *  2. Control the advancement of the Clock and Alarm callbacks.
- *     See {@link org.apache.reef.mock.runtime.MockClock}
- *  3. Inject failures into the REEF applications.
- *     See {@link org.apache.reef.mock.MockFailure}
- *
- * Use {@link org.apache.reef.mock.MockConfiguration} to bind your REEF
- * driver application event handlers.
- *
- * Use {@link org.apache.reef.mock.MockRuntime#start()} to trigger the
- * driver start event and {@link org.apache.reef.mock.MockRuntime#stop()}}
- * or {@link org.apache.reef.mock.runtime.MockClock#close()} to trigger the 
driver
- * stop event.
- */
-package org.apache.reef.mock;

http://git-wip-us.apache.org/repos/asf/reef/blob/5ed56eba/lang/java/reef-runtime-mock/src/main/java/org/apache/reef/mock/request/AllocateEvaluator.java
----------------------------------------------------------------------
diff --git 
a/lang/java/reef-runtime-mock/src/main/java/org/apache/reef/mock/request/AllocateEvaluator.java
 
b/lang/java/reef-runtime-mock/src/main/java/org/apache/reef/mock/request/AllocateEvaluator.java
deleted file mode 100644
index 9d6e400..0000000
--- 
a/lang/java/reef-runtime-mock/src/main/java/org/apache/reef/mock/request/AllocateEvaluator.java
+++ /dev/null
@@ -1,72 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied.  See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-
-package org.apache.reef.mock.request;
-
-import org.apache.reef.annotations.Unstable;
-import org.apache.reef.annotations.audience.Private;
-import org.apache.reef.driver.evaluator.FailedEvaluator;
-import org.apache.reef.mock.ProcessRequest;
-import org.apache.reef.mock.runtime.MockAllocatedEvalautor;
-import org.apache.reef.mock.runtime.MockFailedEvaluator;
-
-/**
- * Allocate Evaluator process request.
- */
-@Unstable
-@Private
-public final class AllocateEvaluator implements
-    ProcessRequestInternal<MockAllocatedEvalautor, FailedEvaluator> {
-
-  private final MockAllocatedEvalautor evaluator;
-
-  public AllocateEvaluator(final MockAllocatedEvalautor evaluator) {
-    this.evaluator = evaluator;
-  }
-
-  @Override
-  public Type getType() {
-    return Type.ALLOCATE_EVALUATOR;
-  }
-
-  @Override
-  public MockAllocatedEvalautor getSuccessEvent() {
-    return this.evaluator;
-  }
-
-  @Override
-  public FailedEvaluator getFailureEvent() {
-    return new MockFailedEvaluator(evaluator.getId());
-  }
-
-  @Override
-  public boolean doAutoComplete() {
-    return false;
-  }
-
-  @Override
-  public void setAutoComplete(final boolean value) {
-    throw new UnsupportedOperationException();
-  }
-
-  @Override
-  public ProcessRequest getCompletionProcessRequest() {
-    throw new UnsupportedOperationException();
-  }
-}

http://git-wip-us.apache.org/repos/asf/reef/blob/5ed56eba/lang/java/reef-runtime-mock/src/main/java/org/apache/reef/mock/request/CloseContext.java
----------------------------------------------------------------------
diff --git 
a/lang/java/reef-runtime-mock/src/main/java/org/apache/reef/mock/request/CloseContext.java
 
b/lang/java/reef-runtime-mock/src/main/java/org/apache/reef/mock/request/CloseContext.java
deleted file mode 100644
index 00bdf3c..0000000
--- 
a/lang/java/reef-runtime-mock/src/main/java/org/apache/reef/mock/request/CloseContext.java
+++ /dev/null
@@ -1,76 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied.  See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-
-package org.apache.reef.mock.request;
-
-import org.apache.reef.annotations.Unstable;
-import org.apache.reef.annotations.audience.Private;
-import org.apache.reef.driver.context.ClosedContext;
-import org.apache.reef.driver.context.FailedContext;
-import org.apache.reef.mock.AutoCompletable;
-import org.apache.reef.mock.ProcessRequest;
-import org.apache.reef.mock.runtime.MockActiveContext;
-import org.apache.reef.mock.runtime.MockClosedContext;
-import org.apache.reef.mock.runtime.MockFailedContext;
-
-/**
- * close context process request.
- */
-@Unstable
-@Private
-public final class CloseContext implements
-    ProcessRequestInternal<ClosedContext, FailedContext>,
-    AutoCompletable {
-
-  private final MockActiveContext context;
-
-  public CloseContext(final MockActiveContext context) {
-    this.context = context;
-  }
-
-  @Override
-  public Type getType() {
-    return Type.CLOSE_CONTEXT;
-  }
-
-  @Override
-  public MockClosedContext getSuccessEvent() {
-    return new MockClosedContext(this.context);
-  }
-
-  @Override
-  public FailedContext getFailureEvent() {
-    return new MockFailedContext(this.context);
-  }
-
-  @Override
-  public boolean doAutoComplete() {
-    return !this.context.getParentContext().isPresent();
-  }
-
-  @Override
-  public void setAutoComplete(final boolean value) {
-    throw new UnsupportedOperationException();
-  }
-
-  @Override
-  public ProcessRequest getCompletionProcessRequest() {
-    return new CloseEvaluator(this.context.getEvaluator());
-  }
-}

http://git-wip-us.apache.org/repos/asf/reef/blob/5ed56eba/lang/java/reef-runtime-mock/src/main/java/org/apache/reef/mock/request/CloseEvaluator.java
----------------------------------------------------------------------
diff --git 
a/lang/java/reef-runtime-mock/src/main/java/org/apache/reef/mock/request/CloseEvaluator.java
 
b/lang/java/reef-runtime-mock/src/main/java/org/apache/reef/mock/request/CloseEvaluator.java
deleted file mode 100644
index 6ef2b9f..0000000
--- 
a/lang/java/reef-runtime-mock/src/main/java/org/apache/reef/mock/request/CloseEvaluator.java
+++ /dev/null
@@ -1,78 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied.  See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-
-package org.apache.reef.mock.request;
-
-import org.apache.reef.annotations.Unstable;
-import org.apache.reef.annotations.audience.Private;
-import org.apache.reef.driver.evaluator.CompletedEvaluator;
-import org.apache.reef.driver.evaluator.FailedEvaluator;
-import org.apache.reef.mock.ProcessRequest;
-import org.apache.reef.mock.runtime.MockAllocatedEvalautor;
-import org.apache.reef.mock.runtime.MockFailedEvaluator;
-
-/**
- * close evaluator request.
- */
-@Unstable
-@Private
-public final class CloseEvaluator implements 
ProcessRequestInternal<CompletedEvaluator, FailedEvaluator> {
-
-  private final MockAllocatedEvalautor evaluator;
-
-  public CloseEvaluator(final MockAllocatedEvalautor evaluator) {
-    this.evaluator = evaluator;
-  }
-
-  @Override
-  public Type getType() {
-    return Type.CLOSE_EVALUATOR;
-  }
-
-  @Override
-  public CompletedEvaluator getSuccessEvent() {
-    return new CompletedEvaluator() {
-      @Override
-      public String getId() {
-        return evaluator.getId();
-      }
-    };
-  }
-
-  @Override
-  public FailedEvaluator getFailureEvent() {
-    // TODO[initialize remaining failed contstructer fields]
-    return new MockFailedEvaluator(evaluator.getId());
-  }
-
-  @Override
-  public boolean doAutoComplete() {
-    return false;
-  }
-
-  @Override
-  public void setAutoComplete(final boolean value) {
-    throw new UnsupportedOperationException();
-  }
-
-  @Override
-  public ProcessRequest getCompletionProcessRequest() {
-    throw new UnsupportedOperationException();
-  }
-}

Reply via email to