http://git-wip-us.apache.org/repos/asf/reef/blob/5ed56eba/lang/java/reef-runtime-mock/src/main/java/org/apache/reef/mock/driver/request/SendMessageDriverToContext.java ---------------------------------------------------------------------- diff --git a/lang/java/reef-runtime-mock/src/main/java/org/apache/reef/mock/driver/request/SendMessageDriverToContext.java b/lang/java/reef-runtime-mock/src/main/java/org/apache/reef/mock/driver/request/SendMessageDriverToContext.java new file mode 100644 index 0000000..d90486a --- /dev/null +++ b/lang/java/reef-runtime-mock/src/main/java/org/apache/reef/mock/driver/request/SendMessageDriverToContext.java @@ -0,0 +1,81 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.reef.mock.driver.request; + +import org.apache.reef.annotations.Unstable; +import org.apache.reef.annotations.audience.Private; +import org.apache.reef.driver.context.ActiveContext; +import org.apache.reef.mock.driver.ProcessRequest; + +/** + * send message from driver to context process request. + */ +@Unstable +@Private +public final class SendMessageDriverToContext implements + ProcessRequestInternal<Object, Object> { + + private final ActiveContext context; + + private final byte[] message; + + public SendMessageDriverToContext(final ActiveContext context, final byte[] message) { + this.context = context; + this.message = message; + } + + @Override + public Type getType() { + return Type.SEND_MESSAGE_DRIVER_TO_CONTEXT; + } + + public ActiveContext getContext() { + return this.context; + } + + public byte[] getMessage() { + return this.message; + } + + @Override + public Object getSuccessEvent() { + throw new UnsupportedOperationException(); + } + + @Override + public Object getFailureEvent() { + throw new UnsupportedOperationException(); + } + + @Override + public boolean doAutoComplete() { + return false; + } + + @Override + public void setAutoComplete(final boolean value) { + throw new UnsupportedOperationException(); + } + + @Override + public ProcessRequest getCompletionProcessRequest() { + throw new UnsupportedOperationException(); + } +}
http://git-wip-us.apache.org/repos/asf/reef/blob/5ed56eba/lang/java/reef-runtime-mock/src/main/java/org/apache/reef/mock/driver/request/SendMessageDriverToTask.java ---------------------------------------------------------------------- diff --git a/lang/java/reef-runtime-mock/src/main/java/org/apache/reef/mock/driver/request/SendMessageDriverToTask.java b/lang/java/reef-runtime-mock/src/main/java/org/apache/reef/mock/driver/request/SendMessageDriverToTask.java new file mode 100644 index 0000000..bf6c3dd --- /dev/null +++ b/lang/java/reef-runtime-mock/src/main/java/org/apache/reef/mock/driver/request/SendMessageDriverToTask.java @@ -0,0 +1,81 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.reef.mock.driver.request; + +import org.apache.reef.annotations.Unstable; +import org.apache.reef.annotations.audience.Private; +import org.apache.reef.driver.task.RunningTask; +import org.apache.reef.mock.driver.ProcessRequest; + +/** + * send message from driver to task process request. + */ +@Unstable +@Private +public final class SendMessageDriverToTask implements + ProcessRequestInternal<Object, Object> { + + private RunningTask task; + + private final byte[] message; + + public SendMessageDriverToTask(final RunningTask task, final byte[] message) { + this.task = task; + this.message = message; + } + + @Override + public Type getType() { + return Type.SEND_MESSAGE_DRIVER_TO_TASK; + } + + public RunningTask getTask() { + return task; + } + + public byte[] getMessage() { + return message; + } + + @Override + public Object getSuccessEvent() { + throw new UnsupportedOperationException(); + } + + @Override + public Object getFailureEvent() { + throw new UnsupportedOperationException(); + } + + @Override + public boolean doAutoComplete() { + return false; + } + + @Override + public void setAutoComplete(final boolean value) { + throw new UnsupportedOperationException(); + } + + @Override + public ProcessRequest getCompletionProcessRequest() { + throw new UnsupportedOperationException(); + } +} http://git-wip-us.apache.org/repos/asf/reef/blob/5ed56eba/lang/java/reef-runtime-mock/src/main/java/org/apache/reef/mock/driver/request/SuspendTask.java ---------------------------------------------------------------------- diff --git a/lang/java/reef-runtime-mock/src/main/java/org/apache/reef/mock/driver/request/SuspendTask.java b/lang/java/reef-runtime-mock/src/main/java/org/apache/reef/mock/driver/request/SuspendTask.java new file mode 100644 index 0000000..a4dbcee --- /dev/null +++ b/lang/java/reef-runtime-mock/src/main/java/org/apache/reef/mock/driver/request/SuspendTask.java @@ -0,0 +1,90 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.reef.mock.driver.request; + +import org.apache.reef.annotations.Unstable; +import org.apache.reef.annotations.audience.Private; +import org.apache.reef.driver.task.FailedTask; +import org.apache.reef.driver.task.SuspendedTask; +import org.apache.reef.mock.driver.ProcessRequest; +import org.apache.reef.mock.driver.runtime.MockRunningTask; +import org.apache.reef.mock.driver.runtime.MockSuspendedTask; +import org.apache.reef.util.Optional; + +/** + * suspend task process request. + */ +@Unstable +@Private +public final class SuspendTask implements ProcessRequestInternal<SuspendedTask, FailedTask> { + + private final MockRunningTask task; + + private final Optional<byte[]> message; + + public SuspendTask(final MockRunningTask task, final Optional<byte[]> message) { + this.task = task; + this.message = message; + } + + public MockRunningTask getTask() { + return task; + } + + @Override + public Type getType() { + return Type.SUSPEND_TASK; + } + + public Optional<byte[]> getMessage() { + return message; + } + + @Override + public MockSuspendedTask getSuccessEvent() { + return new MockSuspendedTask(this.task); + } + + @Override + public FailedTask getFailureEvent() { + return new FailedTask( + this.task.getId(), + "mock", + Optional.<String>empty(), + Optional.<Throwable>empty(), + Optional.<byte[]>empty(), + Optional.of(this.task.getActiveContext())); + } + + @Override + public boolean doAutoComplete() { + return false; + } + + @Override + public void setAutoComplete(final boolean value) { + throw new UnsupportedOperationException(); + } + + @Override + public ProcessRequest getCompletionProcessRequest() { + throw new UnsupportedOperationException(); + } +} http://git-wip-us.apache.org/repos/asf/reef/blob/5ed56eba/lang/java/reef-runtime-mock/src/main/java/org/apache/reef/mock/driver/request/package-info.java ---------------------------------------------------------------------- diff --git a/lang/java/reef-runtime-mock/src/main/java/org/apache/reef/mock/driver/request/package-info.java b/lang/java/reef-runtime-mock/src/main/java/org/apache/reef/mock/driver/request/package-info.java new file mode 100644 index 0000000..22dbfbc --- /dev/null +++ b/lang/java/reef-runtime-mock/src/main/java/org/apache/reef/mock/driver/request/package-info.java @@ -0,0 +1,23 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + * + */ +/** + * process request implementations. + */ +package org.apache.reef.mock.driver.request; http://git-wip-us.apache.org/repos/asf/reef/blob/5ed56eba/lang/java/reef-runtime-mock/src/main/java/org/apache/reef/mock/driver/runtime/MockActiveContext.java ---------------------------------------------------------------------- diff --git a/lang/java/reef-runtime-mock/src/main/java/org/apache/reef/mock/driver/runtime/MockActiveContext.java b/lang/java/reef-runtime-mock/src/main/java/org/apache/reef/mock/driver/runtime/MockActiveContext.java new file mode 100644 index 0000000..292dd9d --- /dev/null +++ b/lang/java/reef-runtime-mock/src/main/java/org/apache/reef/mock/driver/runtime/MockActiveContext.java @@ -0,0 +1,139 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.reef.mock.driver.runtime; + +import org.apache.reef.annotations.Unstable; +import org.apache.reef.annotations.audience.Private; +import org.apache.reef.driver.context.ActiveContext; +import org.apache.reef.driver.evaluator.EvaluatorDescriptor; +import org.apache.reef.driver.task.TaskConfigurationOptions; +import org.apache.reef.evaluator.context.parameters.ContextIdentifier; +import org.apache.reef.mock.driver.request.CloseContext; +import org.apache.reef.mock.driver.request.CreateContext; +import org.apache.reef.mock.driver.request.CreateTask; +import org.apache.reef.mock.driver.request.SendMessageDriverToContext; +import org.apache.reef.tang.Configuration; +import org.apache.reef.util.Optional; + +/** + * mock active context. + */ +@Unstable +@Private +public final class MockActiveContext implements ActiveContext { + + private final MockRuntimeDriver mockRuntimeDriver; + + private final MockAllocatedEvaluator evaluator; + + private final Optional<MockActiveContext> parentContext; + + private final String contextID; + + MockActiveContext( + final MockRuntimeDriver mockRuntimeDriver, + final MockAllocatedEvaluator evaluator, + final Optional<MockActiveContext> parentContext, + final String contextID) { + this.mockRuntimeDriver = mockRuntimeDriver; + this.evaluator = evaluator; + this.parentContext = parentContext; + this.contextID = contextID; + } + + @Override + public int hashCode() { + final String id = this.getEvaluatorId() + ":" + contextID; + return id.hashCode(); + } + + public boolean equals(final Object that) { + if (that instanceof MockActiveContext) { + return this.getEvaluatorId().equals(((MockActiveContext)that).getEvaluatorId()) && + this.contextID.equals(((MockActiveContext)that).contextID); + } + return false; + } + + public MockAllocatedEvaluator getEvaluator() { + return this.evaluator; + } + + public Optional<MockActiveContext> getParentContext() { + return this.parentContext; + } + + @Override + public void close() { + this.mockRuntimeDriver.add(new CloseContext(this)); + } + + @Override + public void submitTask(final Configuration taskConf) { + final String taskID = MockUtils.getValue(taskConf, TaskConfigurationOptions.Identifier.class); + final MockRunningTask task = new MockRunningTask(this.mockRuntimeDriver, taskID, this); + this.mockRuntimeDriver.add(new CreateTask(task, this.mockRuntimeDriver.getTaskReturnValueProvider())); + } + + @Override + public void submitContext(final Configuration contextConfiguration) { + final String childContextID = MockUtils.getValue(contextConfiguration, ContextIdentifier.class); + final MockActiveContext context = new MockActiveContext( + this.mockRuntimeDriver, + this.evaluator, + Optional.of(this), + childContextID); + this.mockRuntimeDriver.add(new CreateContext(context)); + } + + @Override + public void submitContextAndService( + final Configuration contextConfiguration, + final Configuration serviceConfiguration) { + submitContext(contextConfiguration); + } + + @Override + public void sendMessage(final byte[] message) { + this.mockRuntimeDriver.add(new SendMessageDriverToContext(this, message)); + } + + @Override + public String getEvaluatorId() { + return this.evaluator.getId(); + } + + @Override + public Optional<String> getParentId() { + return this.parentContext.isPresent() ? + Optional.of(this.parentContext.get().getId()) : + Optional.<String>empty(); + } + + @Override + public EvaluatorDescriptor getEvaluatorDescriptor() { + return this.evaluator.getEvaluatorDescriptor(); + } + + @Override + public String getId() { + return this.contextID; + } +} http://git-wip-us.apache.org/repos/asf/reef/blob/5ed56eba/lang/java/reef-runtime-mock/src/main/java/org/apache/reef/mock/driver/runtime/MockAllocatedEvaluator.java ---------------------------------------------------------------------- diff --git a/lang/java/reef-runtime-mock/src/main/java/org/apache/reef/mock/driver/runtime/MockAllocatedEvaluator.java b/lang/java/reef-runtime-mock/src/main/java/org/apache/reef/mock/driver/runtime/MockAllocatedEvaluator.java new file mode 100644 index 0000000..d3c0581 --- /dev/null +++ b/lang/java/reef-runtime-mock/src/main/java/org/apache/reef/mock/driver/runtime/MockAllocatedEvaluator.java @@ -0,0 +1,172 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.reef.mock.driver.runtime; + +import org.apache.reef.annotations.Unstable; +import org.apache.reef.annotations.audience.Private; +import org.apache.reef.driver.evaluator.AllocatedEvaluator; +import org.apache.reef.driver.evaluator.EvaluatorDescriptor; +import org.apache.reef.driver.evaluator.EvaluatorProcess; +import org.apache.reef.driver.task.TaskConfigurationOptions; +import org.apache.reef.evaluator.context.parameters.ContextIdentifier; +import org.apache.reef.mock.driver.request.CloseEvaluator; +import org.apache.reef.mock.driver.request.CreateContext; +import org.apache.reef.mock.driver.request.CreateContextAndTask; +import org.apache.reef.tang.Configuration; +import org.apache.reef.util.Optional; + +import java.io.File; + +/** + * mock allocated evaluator. + */ +@Unstable +@Private +public final class MockAllocatedEvaluator implements AllocatedEvaluator { + public static final String ROOT_CONTEXT_IDENTIFIER_PREFIX = "ROOT.CONTEXT."; + + private final MockRuntimeDriver mockRuntimeDriver; + + private final String identifier; + + private final EvaluatorDescriptor evaluatorDescriptor; + + private MockActiveContext rootContext = null; + + private boolean closed = false; + + MockAllocatedEvaluator( + final MockRuntimeDriver mockRuntimeDriver, + final String identifier, + final EvaluatorDescriptor evaluatorDescriptor) { + this.mockRuntimeDriver = mockRuntimeDriver; + this.identifier = identifier; + this.evaluatorDescriptor = evaluatorDescriptor; + } + + public MockActiveContext getRootContext() { + return this.rootContext; + } + + @Override + public void addFile(final File file) { + // ignore + } + + @Override + public void addLibrary(final File file) { + // ignore + } + + @Override + public EvaluatorDescriptor getEvaluatorDescriptor() { + return this.evaluatorDescriptor; + } + + @Override + public void setProcess(final EvaluatorProcess process) { + throw new UnsupportedOperationException(); + } + + @Override + public void close() { + if (!this.closed) { + this.mockRuntimeDriver.add(new CloseEvaluator(this)); + } else { + throw new IllegalStateException("evaluator already closed"); + } + } + + @Override + public void submitTask(final Configuration taskConfiguration) { + if (this.rootContext != null) { + throw new IllegalStateException("Root context already created"); + } + this.rootContext = new MockActiveContext( + mockRuntimeDriver, + this, + Optional.<MockActiveContext>empty(), + ROOT_CONTEXT_IDENTIFIER_PREFIX + identifier); + final String taskID = MockUtils.getValue(taskConfiguration, TaskConfigurationOptions.Identifier.class); + final MockRunningTask mockTask = new MockRunningTask(this.mockRuntimeDriver, taskID, this.rootContext); + this.mockRuntimeDriver.add( + new CreateContextAndTask( + this.rootContext, + mockTask, + this.mockRuntimeDriver.getTaskReturnValueProvider())); + } + + @Override + public void submitContext(final Configuration contextConfiguration) { + if (this.rootContext != null) { + throw new IllegalStateException("Root context already created"); + } + final String rootContextID = MockUtils.getValue(contextConfiguration, ContextIdentifier.class); + this.rootContext = new MockActiveContext( + this.mockRuntimeDriver, + this, + Optional.<MockActiveContext>empty(), + rootContextID); + this.mockRuntimeDriver.add(new CreateContext(this.rootContext)); + } + + @Override + public void submitContextAndService( + final Configuration contextConfiguration, + final Configuration serviceConfiguration) { + submitContext(contextConfiguration); + // ignore services + } + + @Override + public void submitContextAndTask( + final Configuration contextConfiguration, + final Configuration taskConfiguration) { + if (this.rootContext != null) { + throw new IllegalStateException("Root context already created"); + } + final String contextID = MockUtils.getValue(contextConfiguration, ContextIdentifier.class); + final String taskID = MockUtils.getValue(taskConfiguration, TaskConfigurationOptions.Identifier.class); + this.rootContext = new MockActiveContext( + this.mockRuntimeDriver, + this, + Optional.<MockActiveContext>empty(), + contextID); + final MockRunningTask mockTask = new MockRunningTask(this.mockRuntimeDriver, taskID, this.rootContext); + this.mockRuntimeDriver.add( + new CreateContextAndTask( + this.rootContext, + mockTask, + this.mockRuntimeDriver.getTaskReturnValueProvider())); + } + + @Override + public void submitContextAndServiceAndTask( + final Configuration contextConfiguration, + final Configuration serviceConfiguration, + final Configuration taskConfiguration) { + submitContextAndTask(contextConfiguration, taskConfiguration); + } + + @Override + public String getId() { + return this.identifier; + } +} http://git-wip-us.apache.org/repos/asf/reef/blob/5ed56eba/lang/java/reef-runtime-mock/src/main/java/org/apache/reef/mock/driver/runtime/MockClock.java ---------------------------------------------------------------------- diff --git a/lang/java/reef-runtime-mock/src/main/java/org/apache/reef/mock/driver/runtime/MockClock.java b/lang/java/reef-runtime-mock/src/main/java/org/apache/reef/mock/driver/runtime/MockClock.java new file mode 100644 index 0000000..297647a --- /dev/null +++ b/lang/java/reef-runtime-mock/src/main/java/org/apache/reef/mock/driver/runtime/MockClock.java @@ -0,0 +1,120 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + * + */ + +package org.apache.reef.mock.driver.runtime; + +import org.apache.reef.annotations.Unstable; +import org.apache.reef.annotations.audience.Private; +import org.apache.reef.mock.driver.MockRuntime; +import org.apache.reef.tang.InjectionFuture; +import org.apache.reef.wake.EventHandler; +import org.apache.reef.wake.time.Clock; +import org.apache.reef.wake.time.Time; +import org.apache.reef.wake.time.event.Alarm; +import org.apache.reef.wake.time.runtime.event.ClientAlarm; + +import javax.inject.Inject; +import java.util.ArrayList; +import java.util.Iterator; +import java.util.List; + +/** + * The MockClock can be used to drive alarms set by the client application. + */ +@Unstable +@Private +public final class MockClock implements Clock { + + private final InjectionFuture<MockRuntime> runtime; + + private final List<Alarm> alarmList = new ArrayList<>(); + + private long currentTime = 0; + + private boolean closed = false; + + @Inject + MockClock(final InjectionFuture<MockRuntime> runtime) { + this.runtime = runtime; + } + + /** + * Advances the clock by the offset amount. + * @param offset amount to advance clock + */ + public void advanceClock(final int offset) { + this.currentTime += offset; + final Iterator<Alarm> iter = this.alarmList.iterator(); + while (iter.hasNext()) { + final Alarm alarm = iter.next(); + if (alarm.getTimestamp() <= this.currentTime) { + alarm.run(); + iter.remove(); + } + } + } + + /** + * @return the current mock clock time + */ + public long getCurrentTime() { + return this.currentTime; + } + + @Override + public Time scheduleAlarm(final int offset, final EventHandler<Alarm> handler) { + final Alarm alarm = new ClientAlarm(this.currentTime + offset, handler); + alarmList.add(alarm); + return alarm; + } + + @Override + public void close() { + if (!closed) { + this.runtime.get().stop(); + this.closed = true; + } + } + + @Override + public void stop() { + close(); + } + + @Override + public void stop(final Throwable exception) { + close(); + } + + @Override + public boolean isIdle() { + return this.alarmList.size() == 0; + } + + @Override + public boolean isClosed() { + return this.closed; + } + + @Override + public void run() { + this.runtime.get().start(); + } +} http://git-wip-us.apache.org/repos/asf/reef/blob/5ed56eba/lang/java/reef-runtime-mock/src/main/java/org/apache/reef/mock/driver/runtime/MockClosedContext.java ---------------------------------------------------------------------- diff --git a/lang/java/reef-runtime-mock/src/main/java/org/apache/reef/mock/driver/runtime/MockClosedContext.java b/lang/java/reef-runtime-mock/src/main/java/org/apache/reef/mock/driver/runtime/MockClosedContext.java new file mode 100644 index 0000000..d008373 --- /dev/null +++ b/lang/java/reef-runtime-mock/src/main/java/org/apache/reef/mock/driver/runtime/MockClosedContext.java @@ -0,0 +1,71 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.reef.mock.driver.runtime; + +import org.apache.reef.annotations.Unstable; +import org.apache.reef.annotations.audience.Private; +import org.apache.reef.driver.context.ActiveContext; +import org.apache.reef.driver.context.ClosedContext; +import org.apache.reef.driver.evaluator.EvaluatorDescriptor; +import org.apache.reef.util.Optional; + +/** + * mock closed context. + */ +@Unstable +@Private +public final class MockClosedContext implements ClosedContext { + + private final MockActiveContext mockActiveContext; + + public MockClosedContext(final MockActiveContext activeContext) { + this.mockActiveContext = activeContext; + } + + public MockActiveContext getMockActiveContext() { + return this.mockActiveContext; + } + + @Override + public ActiveContext getParentContext() { + return this.mockActiveContext.getParentContext().isPresent() ? + this.mockActiveContext.getParentContext().get() : null; + } + + @Override + public String getId() { + return this.mockActiveContext.getId(); + } + + @Override + public String getEvaluatorId() { + return this.mockActiveContext.getEvaluatorId(); + } + + @Override + public Optional<String> getParentId() { + return this.mockActiveContext.getParentId(); + } + + @Override + public EvaluatorDescriptor getEvaluatorDescriptor() { + return this.mockActiveContext.getEvaluatorDescriptor(); + } +} http://git-wip-us.apache.org/repos/asf/reef/blob/5ed56eba/lang/java/reef-runtime-mock/src/main/java/org/apache/reef/mock/driver/runtime/MockCompletedTask.java ---------------------------------------------------------------------- diff --git a/lang/java/reef-runtime-mock/src/main/java/org/apache/reef/mock/driver/runtime/MockCompletedTask.java b/lang/java/reef-runtime-mock/src/main/java/org/apache/reef/mock/driver/runtime/MockCompletedTask.java new file mode 100644 index 0000000..e141787 --- /dev/null +++ b/lang/java/reef-runtime-mock/src/main/java/org/apache/reef/mock/driver/runtime/MockCompletedTask.java @@ -0,0 +1,57 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.reef.mock.driver.runtime; + +import org.apache.reef.annotations.Unstable; +import org.apache.reef.annotations.audience.Private; +import org.apache.reef.driver.context.ActiveContext; +import org.apache.reef.driver.task.CompletedTask; + +/** + * mock completed task. + */ +@Unstable +@Private +public final class MockCompletedTask implements CompletedTask { + + private final MockRunningTask task; + + private final byte[] returnValue; + + public MockCompletedTask(final MockRunningTask task, final byte[] returnValue) { + this.task = task; + this.returnValue = returnValue; + } + + @Override + public ActiveContext getActiveContext() { + return this.task.getActiveContext(); + } + + @Override + public String getId() { + return this.task.getId(); + } + + @Override + public byte[] get() { + return this.returnValue; + } +} http://git-wip-us.apache.org/repos/asf/reef/blob/5ed56eba/lang/java/reef-runtime-mock/src/main/java/org/apache/reef/mock/driver/runtime/MockEvaluatorDescriptor.java ---------------------------------------------------------------------- diff --git a/lang/java/reef-runtime-mock/src/main/java/org/apache/reef/mock/driver/runtime/MockEvaluatorDescriptor.java b/lang/java/reef-runtime-mock/src/main/java/org/apache/reef/mock/driver/runtime/MockEvaluatorDescriptor.java new file mode 100644 index 0000000..4341766 --- /dev/null +++ b/lang/java/reef-runtime-mock/src/main/java/org/apache/reef/mock/driver/runtime/MockEvaluatorDescriptor.java @@ -0,0 +1,64 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.reef.mock.driver.runtime; + +import org.apache.reef.annotations.Unstable; +import org.apache.reef.annotations.audience.Private; +import org.apache.reef.driver.catalog.NodeDescriptor; +import org.apache.reef.driver.evaluator.EvaluatorDescriptor; +import org.apache.reef.driver.evaluator.EvaluatorProcess; + +/** + * mock evaluator descriptor. + */ +@Unstable +@Private +public final class MockEvaluatorDescriptor implements EvaluatorDescriptor { + private final NodeDescriptor nodeDescriptor; + + MockEvaluatorDescriptor(final NodeDescriptor nodeDescriptor) { + this.nodeDescriptor = nodeDescriptor; + } + + @Override + public NodeDescriptor getNodeDescriptor() { + return this.nodeDescriptor; + } + + @Override + public EvaluatorProcess getProcess() { + throw new UnsupportedOperationException(); + } + + @Override + public int getMemory() { + return 0; + } + + @Override + public int getNumberOfCores() { + return 1; + } + + @Override + public String getRuntimeName() { + return "mock"; + } +} http://git-wip-us.apache.org/repos/asf/reef/blob/5ed56eba/lang/java/reef-runtime-mock/src/main/java/org/apache/reef/mock/driver/runtime/MockEvaluatorRequestor.java ---------------------------------------------------------------------- diff --git a/lang/java/reef-runtime-mock/src/main/java/org/apache/reef/mock/driver/runtime/MockEvaluatorRequestor.java b/lang/java/reef-runtime-mock/src/main/java/org/apache/reef/mock/driver/runtime/MockEvaluatorRequestor.java new file mode 100644 index 0000000..ec80422 --- /dev/null +++ b/lang/java/reef-runtime-mock/src/main/java/org/apache/reef/mock/driver/runtime/MockEvaluatorRequestor.java @@ -0,0 +1,85 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.reef.mock.driver.runtime; + +import org.apache.reef.annotations.Unstable; +import org.apache.reef.annotations.audience.Private; +import org.apache.reef.driver.catalog.NodeDescriptor; +import org.apache.reef.driver.evaluator.EvaluatorRequest; +import org.apache.reef.driver.evaluator.EvaluatorRequestor; +import org.apache.reef.mock.driver.request.AllocateEvaluator; +import org.apache.reef.tang.InjectionFuture; + +import javax.inject.Inject; +import java.util.UUID; + +/** + * mock evaluator requestor. + */ +@Unstable +@Private +public final class MockEvaluatorRequestor implements EvaluatorRequestor { + + private final InjectionFuture<MockRuntimeDriver> mockRuntimeDriver; + + private final InjectionFuture<MockClock> clock; + + @Inject + MockEvaluatorRequestor( + final InjectionFuture<MockClock> clock, + final InjectionFuture<MockRuntimeDriver> mockRuntimeDriver) { + this.clock = clock; + this.mockRuntimeDriver = mockRuntimeDriver; + } + + @Override + public void submit(final EvaluatorRequest req) { + if (this.clock.get().isClosed()) { + throw new IllegalStateException("clock closed"); + } + final NodeDescriptor nodeDescriptor = new MockNodeDescriptor(); + final MockEvaluatorDescriptor evaluatorDescriptor = new MockEvaluatorDescriptor(nodeDescriptor); + for (int i = 0; i < req.getNumber(); i++) { + final MockAllocatedEvaluator mockEvaluator = new MockAllocatedEvaluator( + this.mockRuntimeDriver.get(), UUID.randomUUID().toString(), evaluatorDescriptor); + this.mockRuntimeDriver.get().add(new AllocateEvaluator(mockEvaluator)); + } + } + + @Override + public Builder newRequest() { + if (this.clock.get().isClosed()) { + throw new IllegalStateException("clock closed"); + } + return new Builder(); + } + + + /** + * {@link EvaluatorRequest.Builder} extended with a new submit method. + * {@link EvaluatorRequest}s are built using this builder. + */ + private final class Builder extends EvaluatorRequest.Builder<Builder> { + @Override + public void submit() { + MockEvaluatorRequestor.this.submit(this.build()); + } + } +} http://git-wip-us.apache.org/repos/asf/reef/blob/5ed56eba/lang/java/reef-runtime-mock/src/main/java/org/apache/reef/mock/driver/runtime/MockFailedContext.java ---------------------------------------------------------------------- diff --git a/lang/java/reef-runtime-mock/src/main/java/org/apache/reef/mock/driver/runtime/MockFailedContext.java b/lang/java/reef-runtime-mock/src/main/java/org/apache/reef/mock/driver/runtime/MockFailedContext.java new file mode 100644 index 0000000..d57cc2c --- /dev/null +++ b/lang/java/reef-runtime-mock/src/main/java/org/apache/reef/mock/driver/runtime/MockFailedContext.java @@ -0,0 +1,93 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.reef.mock.driver.runtime; + +import org.apache.reef.annotations.Unstable; +import org.apache.reef.annotations.audience.Private; +import org.apache.reef.driver.context.ActiveContext; +import org.apache.reef.driver.context.FailedContext; +import org.apache.reef.driver.evaluator.EvaluatorDescriptor; +import org.apache.reef.util.Optional; + +/** + * mock failed context. + */ +@Unstable +@Private +public final class MockFailedContext implements FailedContext { + + private final MockActiveContext context; + + public MockFailedContext(final MockActiveContext context) { + this.context = context; + } + + @Override + public Optional<ActiveContext> getParentContext() { + return this.context.getParentContext().isPresent() ? + Optional.of((ActiveContext)this.context.getParentContext().get()) : + Optional.<ActiveContext>empty(); + } + + @Override + public String getMessage() { + return "mock"; + } + + @Override + public Optional<String> getDescription() { + return Optional.empty(); + } + + @Override + public Optional<Throwable> getReason() { + return Optional.empty(); + } + + @Override + public Optional<byte[]> getData() { + return Optional.empty(); + } + + @Override + public Throwable asError() { + return new Exception("mock"); + } + + @Override + public String getEvaluatorId() { + return this.context.getEvaluatorId(); + } + + @Override + public Optional<String> getParentId() { + return this.context.getParentId(); + } + + @Override + public EvaluatorDescriptor getEvaluatorDescriptor() { + return this.context.getEvaluatorDescriptor(); + } + + @Override + public String getId() { + return this.context.getId(); + } +} http://git-wip-us.apache.org/repos/asf/reef/blob/5ed56eba/lang/java/reef-runtime-mock/src/main/java/org/apache/reef/mock/driver/runtime/MockFailedEvaluator.java ---------------------------------------------------------------------- diff --git a/lang/java/reef-runtime-mock/src/main/java/org/apache/reef/mock/driver/runtime/MockFailedEvaluator.java b/lang/java/reef-runtime-mock/src/main/java/org/apache/reef/mock/driver/runtime/MockFailedEvaluator.java new file mode 100644 index 0000000..aed9c24 --- /dev/null +++ b/lang/java/reef-runtime-mock/src/main/java/org/apache/reef/mock/driver/runtime/MockFailedEvaluator.java @@ -0,0 +1,79 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.reef.mock.driver.runtime; + +import org.apache.reef.annotations.Unstable; +import org.apache.reef.annotations.audience.Private; +import org.apache.reef.driver.context.FailedContext; +import org.apache.reef.driver.evaluator.FailedEvaluator; +import org.apache.reef.driver.task.FailedTask; +import org.apache.reef.exception.EvaluatorException; +import org.apache.reef.util.Optional; + +import java.util.ArrayList; +import java.util.List; + +/** + * mock failed evaluator. + */ +@Unstable +@Private +public final class MockFailedEvaluator implements FailedEvaluator { + + private final String evaluatorID; + + private final List<FailedContext> failedContextList; + + private final Optional<FailedTask> failedTask; + + public MockFailedEvaluator( + final String evaluatorID, + final List<FailedContext> failedContextList, + final Optional<FailedTask> failedTask) { + this.evaluatorID = evaluatorID; + this.failedContextList = failedContextList; + this.failedTask = failedTask; + } + + public MockFailedEvaluator(final String evaluatorID) { + this.evaluatorID = evaluatorID; + this.failedContextList = new ArrayList<>(); + this.failedTask = Optional.empty(); + } + + @Override + public EvaluatorException getEvaluatorException() { + return null; + } + + @Override + public List<FailedContext> getFailedContextList() { + return this.failedContextList; + } + + @Override + public Optional<FailedTask> getFailedTask() { + return this.failedTask; + } + + @Override + public String getId() { + return this.evaluatorID; + } +} http://git-wip-us.apache.org/repos/asf/reef/blob/5ed56eba/lang/java/reef-runtime-mock/src/main/java/org/apache/reef/mock/driver/runtime/MockNodeDescriptor.java ---------------------------------------------------------------------- diff --git a/lang/java/reef-runtime-mock/src/main/java/org/apache/reef/mock/driver/runtime/MockNodeDescriptor.java b/lang/java/reef-runtime-mock/src/main/java/org/apache/reef/mock/driver/runtime/MockNodeDescriptor.java new file mode 100644 index 0000000..8278a29 --- /dev/null +++ b/lang/java/reef-runtime-mock/src/main/java/org/apache/reef/mock/driver/runtime/MockNodeDescriptor.java @@ -0,0 +1,68 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.reef.mock.driver.runtime; + +import org.apache.reef.annotations.Unstable; +import org.apache.reef.annotations.audience.Private; +import org.apache.reef.driver.catalog.NodeDescriptor; +import org.apache.reef.driver.catalog.RackDescriptor; + +import java.net.InetSocketAddress; +import java.util.ArrayList; +import java.util.List; + +/** + * mock node descriptor. + */ +@Unstable +@Private +public final class MockNodeDescriptor implements NodeDescriptor { + @Override + public InetSocketAddress getInetSocketAddress() { + throw new UnsupportedOperationException(); + } + + @Override + public RackDescriptor getRackDescriptor() { + return new RackDescriptor() { + @Override + public List<NodeDescriptor> getNodes() { + final List<NodeDescriptor> nodes = new ArrayList<>(); + nodes.add(MockNodeDescriptor.this); + return nodes; + } + + @Override + public String getName() { + return "mock"; + } + }; + } + + @Override + public String getName() { + return "mock"; + } + + @Override + public String getId() { + return "mock"; + } +} http://git-wip-us.apache.org/repos/asf/reef/blob/5ed56eba/lang/java/reef-runtime-mock/src/main/java/org/apache/reef/mock/driver/runtime/MockRunningTask.java ---------------------------------------------------------------------- diff --git a/lang/java/reef-runtime-mock/src/main/java/org/apache/reef/mock/driver/runtime/MockRunningTask.java b/lang/java/reef-runtime-mock/src/main/java/org/apache/reef/mock/driver/runtime/MockRunningTask.java new file mode 100644 index 0000000..711635a --- /dev/null +++ b/lang/java/reef-runtime-mock/src/main/java/org/apache/reef/mock/driver/runtime/MockRunningTask.java @@ -0,0 +1,97 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.reef.mock.driver.runtime; + +import org.apache.reef.annotations.Unstable; +import org.apache.reef.annotations.audience.Private; +import org.apache.reef.driver.context.ActiveContext; +import org.apache.reef.driver.task.RunningTask; +import org.apache.reef.mock.driver.request.CloseTask; +import org.apache.reef.mock.driver.request.SendMessageDriverToTask; +import org.apache.reef.mock.driver.request.SuspendTask; +import org.apache.reef.runtime.common.driver.task.TaskRepresenter; +import org.apache.reef.util.Optional; + +/** + * mock running task. + */ +@Unstable +@Private +public final class MockRunningTask implements RunningTask { + + private final MockRuntimeDriver mockRuntimeDriver; + + private final String taskID; + + private final ActiveContext context; + + MockRunningTask( + final MockRuntimeDriver mockRuntimeDriver, + final String taskID, + final ActiveContext context) { + this.mockRuntimeDriver = mockRuntimeDriver; + this.taskID = taskID; + this.context = context; + } + + public String evaluatorID() { + return this.context.getEvaluatorId(); + } + + @Override + public ActiveContext getActiveContext() { + return this.context; + } + + @Override + public void send(final byte[] message) { + this.mockRuntimeDriver.add(new SendMessageDriverToTask(this, message)); + } + + @Override + public void suspend(final byte[] message) { + this.mockRuntimeDriver.add(new SuspendTask(this, Optional.of(message))); + } + + @Override + public void suspend() { + this.mockRuntimeDriver.add(new SuspendTask(this, Optional.<byte[]>empty())); + } + + @Override + public void close(final byte[] message) { + this.mockRuntimeDriver.add(new CloseTask(this, this.mockRuntimeDriver.getTaskReturnValueProvider())); + } + + @Override + public void close() { + this.mockRuntimeDriver.add(new CloseTask(this, this.mockRuntimeDriver.getTaskReturnValueProvider())); + } + + @Override + public TaskRepresenter getTaskRepresenter() { + throw new UnsupportedOperationException(); + } + + @Override + public String getId() { + return this.taskID; + } +} http://git-wip-us.apache.org/repos/asf/reef/blob/5ed56eba/lang/java/reef-runtime-mock/src/main/java/org/apache/reef/mock/driver/runtime/MockRuntimeDriver.java ---------------------------------------------------------------------- diff --git a/lang/java/reef-runtime-mock/src/main/java/org/apache/reef/mock/driver/runtime/MockRuntimeDriver.java b/lang/java/reef-runtime-mock/src/main/java/org/apache/reef/mock/driver/runtime/MockRuntimeDriver.java new file mode 100644 index 0000000..42ea28d --- /dev/null +++ b/lang/java/reef-runtime-mock/src/main/java/org/apache/reef/mock/driver/runtime/MockRuntimeDriver.java @@ -0,0 +1,522 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.reef.mock.driver.runtime; + +import org.apache.reef.annotations.Unstable; +import org.apache.reef.annotations.audience.Private; +import org.apache.reef.driver.context.ActiveContext; +import org.apache.reef.driver.context.ContextMessage; +import org.apache.reef.driver.context.FailedContext; +import org.apache.reef.driver.evaluator.AllocatedEvaluator; +import org.apache.reef.driver.evaluator.CompletedEvaluator; +import org.apache.reef.driver.evaluator.FailedEvaluator; +import org.apache.reef.driver.parameters.*; +import org.apache.reef.driver.restart.DriverRestartCompleted; +import org.apache.reef.driver.restart.DriverRestarted; +import org.apache.reef.driver.task.*; +import org.apache.reef.io.Tuple; +import org.apache.reef.mock.driver.MockDriverRestartContext; +import org.apache.reef.mock.driver.MockRuntime; +import org.apache.reef.mock.driver.MockTaskReturnValueProvider; +import org.apache.reef.mock.driver.ProcessRequest; +import org.apache.reef.mock.driver.request.*; +import org.apache.reef.tang.InjectionFuture; +import org.apache.reef.tang.annotations.Parameter; +import org.apache.reef.util.Optional; +import org.apache.reef.wake.EventHandler; +import org.apache.reef.wake.time.Clock; +import org.apache.reef.wake.time.event.StartTime; +import org.apache.reef.wake.time.event.StopTime; + +import javax.inject.Inject; +import java.util.*; + +/** + * mock runtime driver. + */ +@Unstable +@Private +public final class MockRuntimeDriver implements MockRuntime { + + private final InjectionFuture<MockClock> clock; + + private final List<ProcessRequest> processRequestQueue = new ArrayList<>(); + + private final Set<EventHandler<StartTime>> driverStartHandlers; + + private final Set<EventHandler<StopTime>> driverStopHandlers; + + private final Set<EventHandler<AllocatedEvaluator>> allocatedEvaluatorHandlers; + + private final Set<EventHandler<CompletedEvaluator>> completedEvaluatorHandlers; + + private final Set<EventHandler<FailedEvaluator>> failedEvaluatorHandlers; + + private final Set<EventHandler<TaskRunningHandlers>> taskRunningHandlers; + + private final Set<EventHandler<FailedTask>> taskFailedHandlers; + + private final Set<EventHandler<TaskMessage>> taskMessageHandlers; + + private final Set<EventHandler<CompletedTask>> taskCompletedHandlers; + + private final Set<EventHandler<SuspendedTask>> taskSuspendedHandlers; + + private final Set<EventHandler<ActiveContext>> contextActiveHandlers; + + private final Set<EventHandler<CloseContext>> contextClosedHandlers; + + private final Set<EventHandler<ContextMessage>> contextMessageHandlers; + + private final Set<EventHandler<FailedContext>> contextFailedHandlers; + + private final Set<EventHandler<DriverRestarted>> driverRestartHandlers; + + private final Set<EventHandler<RunningTask>> driverRestartRunningTaskHandlers; + + private final Set<EventHandler<ActiveContext>> driverRestartActiveContextHandlers; + + private final Set<EventHandler<DriverRestartCompleted>> driverRestartCompletedHandlers; + + private final Set<EventHandler<FailedEvaluator>> driverRestartFailedEvaluatorHandlers; + + private final Map<String, MockAllocatedEvaluator> allocatedEvaluatorMap = new HashMap<>(); + + private final Map<String, List<MockActiveContext>> allocatedContextsMap = new HashMap<>(); + + private final Map<String, MockRunningTask> runningTasks = new HashMap<>(); + + private final MockTaskReturnValueProvider taskReturnValueProvider; + + @Inject + MockRuntimeDriver( + final InjectionFuture<MockClock> clock, + final MockTaskReturnValueProvider taskReturnValueProvider, + @Parameter(DriverStartHandler.class) final Set<EventHandler<StartTime>> driverStartHandlers, + @Parameter(Clock.StopHandler.class) final Set<EventHandler<StopTime>> driverStopHandlers, + @Parameter(EvaluatorAllocatedHandlers.class) final Set<EventHandler<AllocatedEvaluator>> + allocatedEvaluatorHandlers, + @Parameter(EvaluatorCompletedHandlers.class) final Set<EventHandler<CompletedEvaluator>> + completedEvaluatorHandlers, + @Parameter(EvaluatorFailedHandlers.class) final Set<EventHandler<FailedEvaluator>> failedEvaluatorHandlers, + @Parameter(TaskRunningHandlers.class) final Set<EventHandler<TaskRunningHandlers>> taskRunningHandlers, + @Parameter(TaskFailedHandlers.class) final Set<EventHandler<FailedTask>> taskFailedHandlers, + @Parameter(TaskMessageHandlers.class) final Set<EventHandler<TaskMessage>> taskMessageHandlers, + @Parameter(TaskCompletedHandlers.class) final Set<EventHandler<CompletedTask>> taskCompletedHandlers, + @Parameter(TaskSuspendedHandlers.class) final Set<EventHandler<SuspendedTask>> taskSuspendedHandlers, + @Parameter(ContextActiveHandlers.class) final Set<EventHandler<ActiveContext>> contextActiveHandlers, + @Parameter(ContextClosedHandlers.class) final Set<EventHandler<CloseContext>> contextClosedHandlers, + @Parameter(ContextMessageHandlers.class) final Set<EventHandler<ContextMessage>> contextMessageHandlers, + @Parameter(ContextFailedHandlers.class) final Set<EventHandler<FailedContext>> contextFailedHandlers, + @Parameter(DriverRestartHandler.class) final Set<EventHandler<DriverRestarted>> + driverRestartHandlers, + @Parameter(DriverRestartTaskRunningHandlers.class) final Set<EventHandler<RunningTask>> + driverRestartRunningTaskHandlers, + @Parameter(DriverRestartContextActiveHandlers.class) final Set<EventHandler<ActiveContext>> + driverRestartActiveContextHandlers, + @Parameter(DriverRestartCompletedHandlers.class) final Set<EventHandler<DriverRestartCompleted>> + driverRestartCompletedHandlers, + @Parameter(DriverRestartFailedEvaluatorHandlers.class) final Set<EventHandler<FailedEvaluator>> + driverRestartFailedEvaluatorHandlers){ + this.clock = clock; + this.taskReturnValueProvider = taskReturnValueProvider; + this.driverStartHandlers = driverStartHandlers; + this.driverStopHandlers = driverStopHandlers; + this.allocatedEvaluatorHandlers = allocatedEvaluatorHandlers; + this.completedEvaluatorHandlers = completedEvaluatorHandlers; + this.failedEvaluatorHandlers = failedEvaluatorHandlers; + this.taskRunningHandlers = taskRunningHandlers; + this.taskFailedHandlers = taskFailedHandlers; + this.taskMessageHandlers = taskMessageHandlers; + this.taskCompletedHandlers = taskCompletedHandlers; + this.taskSuspendedHandlers = taskSuspendedHandlers; + this.contextActiveHandlers = contextActiveHandlers; + this.contextClosedHandlers = contextClosedHandlers; + this.contextMessageHandlers = contextMessageHandlers; + this.contextFailedHandlers = contextFailedHandlers; + this.driverRestartHandlers = driverRestartHandlers; + this.driverRestartRunningTaskHandlers = driverRestartRunningTaskHandlers; + this.driverRestartActiveContextHandlers = driverRestartActiveContextHandlers; + this.driverRestartCompletedHandlers = driverRestartCompletedHandlers; + this.driverRestartFailedEvaluatorHandlers = driverRestartFailedEvaluatorHandlers; + } + + @Override + public Collection<AllocatedEvaluator> getCurrentAllocatedEvaluators() { + if (this.clock.get().isClosed()) { + throw new IllegalStateException("clock is closed"); + } + return new ArrayList<AllocatedEvaluator>(this.allocatedEvaluatorMap.values()); + } + + @Override + public void fail(final AllocatedEvaluator evaluator) { + if (this.clock.get().isClosed()) { + throw new IllegalStateException("clock is closed"); + } + if (this.allocatedEvaluatorMap.remove(evaluator.getId()) == null) { + throw new IllegalStateException("unknown evaluator " + evaluator); + } + FailedTask failedTask = null; + if (this.runningTasks.containsKey(evaluator.getId())) { + final RunningTask task = this.runningTasks.remove(evaluator.getId()); + failedTask = new FailedTask( + task.getId(), + "mock", + Optional.<String>empty(), + Optional.<Throwable>empty(), + Optional.<byte[]>empty(), + Optional.<ActiveContext>of(task.getActiveContext())); + } + final List<FailedContext> failedContexts = new ArrayList<>(); + for (final MockActiveContext context : this.allocatedContextsMap.get(evaluator.getId())) { + failedContexts.add(new MockFailedContext(context)); + } + post(this.failedEvaluatorHandlers, new MockFailedEvaluator( + evaluator.getId(), failedContexts, Optional.ofNullable(failedTask))); + } + + @Override + public Collection<ActiveContext> getCurrentActiveContexts() { + if (this.clock.get().isClosed()) { + throw new IllegalStateException("clock is closed"); + } + final List<ActiveContext> currentActiveContexts = new ArrayList<>(); + for (final List<MockActiveContext> contexts : this.allocatedContextsMap.values()) { + currentActiveContexts.addAll(contexts); + } + return currentActiveContexts; + } + + @Override + public void fail(final ActiveContext context) { + if (this.clock.get().isClosed()) { + throw new IllegalStateException("clock is closed"); + } + + final MockAllocatedEvaluator evaluator = ((MockActiveContext) context).getEvaluator(); + + // Root context failure shuts evaluator down. + if (!((MockActiveContext) context).getParentContext().isPresent()) { + allocatedEvaluatorMap.remove(evaluator.getId()); + post(this.completedEvaluatorHandlers, new CompletedEvaluator() { + @Override + public String getId() { + return evaluator.getId(); + } + }); + } + + this.allocatedContextsMap.get(evaluator.getId()).remove(context); + post(this.contextFailedHandlers, new MockFailedContext((MockActiveContext) context)); + } + + @Override + public Collection<RunningTask> getCurrentRunningTasks() { + if (this.clock.get().isClosed()) { + throw new IllegalStateException("clock is closed"); + } + return new ArrayList<RunningTask>(this.runningTasks.values()); + } + + @Override + public void fail(final RunningTask task) { + if (this.clock.get().isClosed()) { + throw new IllegalStateException("clock is closed"); + } + final String evaluatorID = task.getActiveContext().getEvaluatorId(); + if (this.runningTasks.containsKey(evaluatorID) && + this.runningTasks.get(evaluatorID).equals(task)) { + this.runningTasks.remove(evaluatorID); + post(taskFailedHandlers, new FailedTask( + task.getId(), + "mock", + Optional.<String>empty(), + Optional.<Throwable>empty(), + Optional.<byte[]>empty(), + Optional.of(task.getActiveContext()))); + } else { + throw new IllegalStateException("unknown running task " + task); + } + } + + @Override + public void start() { + post(this.driverStartHandlers, new StartTime(this.clock.get().getCurrentTime())); + } + + @Override + public void stop() { + post(this.driverStopHandlers, new StopTime(this.clock.get().getCurrentTime())); + } + + @Override + public void restart(final MockDriverRestartContext restartContext, final boolean isTimeout, final long duration) { + post(this.driverRestartHandlers, restartContext.getDriverRestarted()); + for (final RunningTask runningTask : restartContext.getRunningTasks()) { + post(this.driverRestartRunningTaskHandlers, runningTask); + } + for (final ActiveContext activeContext : restartContext.getIdleActiveContexts()) { + post(this.driverRestartActiveContextHandlers, activeContext); + } + post(this.driverRestartCompletedHandlers, restartContext.getDriverRestartCompleted(isTimeout, duration)); + for (final FailedEvaluator failedEvaluator : restartContext.getFailedEvaluators()) { + post(this.driverRestartFailedEvaluatorHandlers, failedEvaluator); + } + } + + @Override + public MockDriverRestartContext failDriver(final int attempt, final StartTime startTime) { + final List<MockActiveContext> activeContexts = new ArrayList<>(); + for (final List<MockActiveContext> contexts : this.allocatedContextsMap.values()) { + if (contexts.size() > 0) { + activeContexts.add(contexts.get(contexts.size() - 1)); + } + } + return new MockDriverRestartContext( + attempt, + startTime, + new ArrayList<>(this.allocatedEvaluatorMap.values()), + activeContexts, + new ArrayList<>(this.runningTasks.values())); + } + + @Override + public boolean hasProcessRequest() { + return this.processRequestQueue.size() > 0; + } + + @Override + public ProcessRequest getNextProcessRequest() { + if (this.processRequestQueue.size() > 0) { + return this.processRequestQueue.remove(0); + } else { + return null; + } + } + + @Override + public void succeed(final ProcessRequest pr) { + if (this.clock.get().isClosed()) { + throw new IllegalStateException("clock is closed"); + } + final ProcessRequestInternal request = (ProcessRequestInternal) pr; + switch (request.getType()) { + case ALLOCATE_EVALUATOR: + final MockAllocatedEvaluator allocatedEvaluator = ((AllocateEvaluator)request).getSuccessEvent(); + validateAndCreate(allocatedEvaluator); + post(this.allocatedEvaluatorHandlers, allocatedEvaluator); + break; + case CLOSE_EVALUATOR: + final CompletedEvaluator closedEvaluator = ((CloseEvaluator)request).getSuccessEvent(); + validateAndClose(closedEvaluator); + post(this.completedEvaluatorHandlers, closedEvaluator); + break; + case CREATE_CONTEXT: + final MockActiveContext createContext = ((CreateContext) request).getSuccessEvent(); + validateAndCreate(createContext); + post(this.contextActiveHandlers, createContext); + break; + case CLOSE_CONTEXT: + final MockClosedContext closeContext = ((CloseContext) request).getSuccessEvent(); + validateAndClose(closeContext); + post(this.contextClosedHandlers, closeContext); + break; + case CREATE_TASK: + final MockRunningTask createTask = ((CreateTask)request).getSuccessEvent(); + validateAndCreate(createTask); + post(this.taskRunningHandlers, request.getSuccessEvent()); + break; + case SUSPEND_TASK: + final MockRunningTask suspendedTask = ((SuspendTask)request).getTask(); + validateAndClose(suspendedTask); + post(this.taskSuspendedHandlers, request.getSuccessEvent()); + break; + case CLOSE_TASK: + case COMPLETE_TASK: + final MockRunningTask completedTask = ((CompleteTask)request).getTask(); + validateAndClose(completedTask); + post(this.taskCompletedHandlers, request.getSuccessEvent()); + break; + case CREATE_CONTEXT_AND_TASK: + final CreateContextAndTask createContextTask = (CreateContextAndTask) request; + final Tuple<MockActiveContext, MockRunningTask> events = createContextTask.getSuccessEvent(); + validateAndCreate(events.getKey()); + post(this.contextActiveHandlers, events.getKey()); + validateAndCreate(events.getValue()); + post(this.taskRunningHandlers, events.getValue()); + break; + case SEND_MESSAGE_DRIVER_TO_TASK: + // ignore + break; + case SEND_MESSAGE_DRIVER_TO_CONTEXT: + // ignore + break; + default: + throw new IllegalStateException("unknown type"); + } + + if (request.doAutoComplete()) { + add(request.getCompletionProcessRequest()); + } else if (!this.clock.get().isClosed() && isIdle()) { + this.clock.get().close(); + } + } + + @Override + public void fail(final ProcessRequest pr) { + if (this.clock.get().isClosed()) { + throw new IllegalStateException("clock is closed"); + } + final ProcessRequestInternal request = (ProcessRequestInternal) pr; + switch (request.getType()) { + case ALLOCATE_EVALUATOR: + post(this.failedEvaluatorHandlers, request.getFailureEvent()); + break; + case CLOSE_EVALUATOR: + final CompletedEvaluator evaluator = ((CloseEvaluator)request).getSuccessEvent(); + validateAndClose(evaluator); + post(this.failedEvaluatorHandlers, request.getFailureEvent()); + break; + case CREATE_CONTEXT: + post(this.contextFailedHandlers, request.getFailureEvent()); + break; + case CLOSE_CONTEXT: + final MockClosedContext context = ((CloseContext)request).getSuccessEvent(); + validateAndClose(context); + if (context.getParentContext() == null) { + add(new CloseEvaluator(context.getMockActiveContext().getEvaluator())); + } + post(this.contextFailedHandlers, request.getFailureEvent()); + break; + case CREATE_TASK: + post(this.taskFailedHandlers, request.getFailureEvent()); + break; + case SUSPEND_TASK: + validateAndClose(((SuspendTask)request).getTask()); + post(this.taskFailedHandlers, request.getFailureEvent()); + break; + case CLOSE_TASK: + case COMPLETE_TASK: + validateAndClose(((CloseTask)request).getTask()); + post(this.taskFailedHandlers, request.getFailureEvent()); + break; + case CREATE_CONTEXT_AND_TASK: + final CreateContextAndTask createContextTask = (CreateContextAndTask) request; + final Tuple<MockFailedContext, FailedTask> events = createContextTask.getFailureEvent(); + post(this.taskFailedHandlers, events.getValue()); + post(this.contextFailedHandlers, events.getKey()); + break; + case SEND_MESSAGE_DRIVER_TO_TASK: + // ignore + break; + case SEND_MESSAGE_DRIVER_TO_CONTEXT: + // ignore + break; + default: + throw new IllegalStateException("unknown type"); + } + + if (!this.clock.get().isClosed() && isIdle()) { + this.clock.get().close(); + } + } + + @Override + public void publish(final ContextMessage contextMessage) { + for (final EventHandler<ContextMessage> handler : this.contextMessageHandlers) { + handler.onNext(contextMessage); + } + } + + MockTaskReturnValueProvider getTaskReturnValueProvider() { + return this.taskReturnValueProvider; + } + /** + * Used by mock REEF entities (e.g., AllocatedEvaluator, RunningTask) to inject + * process requests from initiated actions e.g., RunningTask.close(). + * @param request to inject + */ + void add(final ProcessRequest request) { + this.processRequestQueue.add(request); + } + + private boolean isIdle() { + return this.clock.get().isIdle() && + this.processRequestQueue.isEmpty() && + this.allocatedEvaluatorMap.isEmpty(); + } + + private <T> void post(final Set<EventHandler<T>> handlers, final Object event) { + for (final EventHandler<T> handler : handlers) { + handler.onNext((T) event); + } + } + + private void validateAndCreate(final MockActiveContext context) { + if (!this.allocatedEvaluatorMap.containsKey(context.getEvaluatorId())) { + throw new IllegalStateException("unknown evaluator id " + context.getEvaluatorId()); + } else if (!this.allocatedContextsMap.containsKey(context.getEvaluatorId())) { + this.allocatedContextsMap.put(context.getEvaluatorId(), new ArrayList<MockActiveContext>()); + } + this.allocatedContextsMap.get(context.getEvaluatorId()).add(context); + } + + private void validateAndClose(final MockClosedContext context) { + if (!this.allocatedContextsMap.containsKey(context.getEvaluatorId())) { + throw new IllegalStateException("unknown evaluator id " + context.getEvaluatorId()); + } + final List<MockActiveContext> contexts = this.allocatedContextsMap.get(context.getEvaluatorId()); + if (!contexts.get(contexts.size() - 1).equals(context.getMockActiveContext())) { + throw new IllegalStateException("closing context that is not on the top of the stack"); + } + contexts.remove(context.getMockActiveContext()); + } + + private void validateAndCreate(final MockRunningTask task) { + if (this.runningTasks.containsKey(task.evaluatorID())) { + throw new IllegalStateException("task already running on evaluator " + + task.evaluatorID()); + } + this.runningTasks.put(task.evaluatorID(), task); + } + + private void validateAndClose(final MockRunningTask task) { + if (!this.runningTasks.containsKey(task.getActiveContext().getEvaluatorId())) { + throw new IllegalStateException("no task running on evaluator"); + } + this.runningTasks.remove(task.getActiveContext().getEvaluatorId()); + } + + private void validateAndCreate(final MockAllocatedEvaluator evaluator) { + if (this.allocatedEvaluatorMap.containsKey(evaluator.getId())) { + throw new IllegalStateException("evaluator id " + evaluator.getId() + " already exists"); + } + this.allocatedEvaluatorMap.put(evaluator.getId(), evaluator); + this.allocatedContextsMap.put(evaluator.getId(), new ArrayList<MockActiveContext>()); + } + + private void validateAndClose(final CompletedEvaluator evaluator) { + if (!this.allocatedEvaluatorMap.containsKey(evaluator.getId())) { + throw new IllegalStateException("unknown evaluator id " + evaluator.getId()); + } + this.allocatedEvaluatorMap.remove(evaluator.getId()); + } +} http://git-wip-us.apache.org/repos/asf/reef/blob/5ed56eba/lang/java/reef-runtime-mock/src/main/java/org/apache/reef/mock/driver/runtime/MockSuspendedTask.java ---------------------------------------------------------------------- diff --git a/lang/java/reef-runtime-mock/src/main/java/org/apache/reef/mock/driver/runtime/MockSuspendedTask.java b/lang/java/reef-runtime-mock/src/main/java/org/apache/reef/mock/driver/runtime/MockSuspendedTask.java new file mode 100644 index 0000000..9f25ae32 --- /dev/null +++ b/lang/java/reef-runtime-mock/src/main/java/org/apache/reef/mock/driver/runtime/MockSuspendedTask.java @@ -0,0 +1,54 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.reef.mock.driver.runtime; + +import org.apache.reef.annotations.Unstable; +import org.apache.reef.annotations.audience.Private; +import org.apache.reef.driver.context.ActiveContext; +import org.apache.reef.driver.task.SuspendedTask; + +/** + * mock suspended task. + */ +@Unstable +@Private +public final class MockSuspendedTask implements SuspendedTask { + + private final MockRunningTask task; + + public MockSuspendedTask(final MockRunningTask task) { + this.task = task; + } + + @Override + public ActiveContext getActiveContext() { + return this.task.getActiveContext(); + } + + @Override + public byte[] get() { + return new byte[0]; + } + + @Override + public String getId() { + return this.task.getId(); + } +} http://git-wip-us.apache.org/repos/asf/reef/blob/5ed56eba/lang/java/reef-runtime-mock/src/main/java/org/apache/reef/mock/driver/runtime/MockUtils.java ---------------------------------------------------------------------- diff --git a/lang/java/reef-runtime-mock/src/main/java/org/apache/reef/mock/driver/runtime/MockUtils.java b/lang/java/reef-runtime-mock/src/main/java/org/apache/reef/mock/driver/runtime/MockUtils.java new file mode 100644 index 0000000..23d3b46 --- /dev/null +++ b/lang/java/reef-runtime-mock/src/main/java/org/apache/reef/mock/driver/runtime/MockUtils.java @@ -0,0 +1,48 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.reef.mock.driver.runtime; + +import org.apache.reef.annotations.Unstable; +import org.apache.reef.annotations.audience.Private; +import org.apache.reef.tang.Configuration; +import org.apache.reef.tang.Injector; +import org.apache.reef.tang.Tang; +import org.apache.reef.tang.annotations.Name; +import org.apache.reef.tang.exceptions.InjectionException; + +/** + * mock utilities. + */ +@Unstable +@Private +final class MockUtils { + + private MockUtils() { + } + + public static <U, T extends Name<U>> U getValue(final Configuration configuration, final Class<T> name) { + try { + final Injector injector = Tang.Factory.getTang().newInjector(configuration); + return injector.getNamedInstance(name); + } catch (InjectionException e) { + throw new IllegalStateException(e); + } + } +} http://git-wip-us.apache.org/repos/asf/reef/blob/5ed56eba/lang/java/reef-runtime-mock/src/main/java/org/apache/reef/mock/driver/runtime/package-info.java ---------------------------------------------------------------------- diff --git a/lang/java/reef-runtime-mock/src/main/java/org/apache/reef/mock/driver/runtime/package-info.java b/lang/java/reef-runtime-mock/src/main/java/org/apache/reef/mock/driver/runtime/package-info.java new file mode 100644 index 0000000..50e7eac --- /dev/null +++ b/lang/java/reef-runtime-mock/src/main/java/org/apache/reef/mock/driver/runtime/package-info.java @@ -0,0 +1,23 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + * + */ +/** + * mock runtime implementation. + */ +package org.apache.reef.mock.driver.runtime; http://git-wip-us.apache.org/repos/asf/reef/blob/5ed56eba/lang/java/reef-runtime-mock/src/main/java/org/apache/reef/mock/package-info.java ---------------------------------------------------------------------- diff --git a/lang/java/reef-runtime-mock/src/main/java/org/apache/reef/mock/package-info.java b/lang/java/reef-runtime-mock/src/main/java/org/apache/reef/mock/package-info.java deleted file mode 100644 index fdda864..0000000 --- a/lang/java/reef-runtime-mock/src/main/java/org/apache/reef/mock/package-info.java +++ /dev/null @@ -1,40 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, - * software distributed under the License is distributed on an - * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY - * KIND, either express or implied. See the License for the - * specific language governing permissions and limitations - * under the License. - * - */ -/** - * Mock runtime API. - * - * Mock runtime is meant to mimic the semantics of the REEF runtime and - * allow: - * 1. Applications to driver the forward progress of processing REEF events. - * See {@link org.apache.reef.mock.MockRuntime} API - * 2. Control the advancement of the Clock and Alarm callbacks. - * See {@link org.apache.reef.mock.runtime.MockClock} - * 3. Inject failures into the REEF applications. - * See {@link org.apache.reef.mock.MockFailure} - * - * Use {@link org.apache.reef.mock.MockConfiguration} to bind your REEF - * driver application event handlers. - * - * Use {@link org.apache.reef.mock.MockRuntime#start()} to trigger the - * driver start event and {@link org.apache.reef.mock.MockRuntime#stop()}} - * or {@link org.apache.reef.mock.runtime.MockClock#close()} to trigger the driver - * stop event. - */ -package org.apache.reef.mock; http://git-wip-us.apache.org/repos/asf/reef/blob/5ed56eba/lang/java/reef-runtime-mock/src/main/java/org/apache/reef/mock/request/AllocateEvaluator.java ---------------------------------------------------------------------- diff --git a/lang/java/reef-runtime-mock/src/main/java/org/apache/reef/mock/request/AllocateEvaluator.java b/lang/java/reef-runtime-mock/src/main/java/org/apache/reef/mock/request/AllocateEvaluator.java deleted file mode 100644 index 9d6e400..0000000 --- a/lang/java/reef-runtime-mock/src/main/java/org/apache/reef/mock/request/AllocateEvaluator.java +++ /dev/null @@ -1,72 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, - * software distributed under the License is distributed on an - * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY - * KIND, either express or implied. See the License for the - * specific language governing permissions and limitations - * under the License. - */ - -package org.apache.reef.mock.request; - -import org.apache.reef.annotations.Unstable; -import org.apache.reef.annotations.audience.Private; -import org.apache.reef.driver.evaluator.FailedEvaluator; -import org.apache.reef.mock.ProcessRequest; -import org.apache.reef.mock.runtime.MockAllocatedEvalautor; -import org.apache.reef.mock.runtime.MockFailedEvaluator; - -/** - * Allocate Evaluator process request. - */ -@Unstable -@Private -public final class AllocateEvaluator implements - ProcessRequestInternal<MockAllocatedEvalautor, FailedEvaluator> { - - private final MockAllocatedEvalautor evaluator; - - public AllocateEvaluator(final MockAllocatedEvalautor evaluator) { - this.evaluator = evaluator; - } - - @Override - public Type getType() { - return Type.ALLOCATE_EVALUATOR; - } - - @Override - public MockAllocatedEvalautor getSuccessEvent() { - return this.evaluator; - } - - @Override - public FailedEvaluator getFailureEvent() { - return new MockFailedEvaluator(evaluator.getId()); - } - - @Override - public boolean doAutoComplete() { - return false; - } - - @Override - public void setAutoComplete(final boolean value) { - throw new UnsupportedOperationException(); - } - - @Override - public ProcessRequest getCompletionProcessRequest() { - throw new UnsupportedOperationException(); - } -} http://git-wip-us.apache.org/repos/asf/reef/blob/5ed56eba/lang/java/reef-runtime-mock/src/main/java/org/apache/reef/mock/request/CloseContext.java ---------------------------------------------------------------------- diff --git a/lang/java/reef-runtime-mock/src/main/java/org/apache/reef/mock/request/CloseContext.java b/lang/java/reef-runtime-mock/src/main/java/org/apache/reef/mock/request/CloseContext.java deleted file mode 100644 index 00bdf3c..0000000 --- a/lang/java/reef-runtime-mock/src/main/java/org/apache/reef/mock/request/CloseContext.java +++ /dev/null @@ -1,76 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, - * software distributed under the License is distributed on an - * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY - * KIND, either express or implied. See the License for the - * specific language governing permissions and limitations - * under the License. - */ - -package org.apache.reef.mock.request; - -import org.apache.reef.annotations.Unstable; -import org.apache.reef.annotations.audience.Private; -import org.apache.reef.driver.context.ClosedContext; -import org.apache.reef.driver.context.FailedContext; -import org.apache.reef.mock.AutoCompletable; -import org.apache.reef.mock.ProcessRequest; -import org.apache.reef.mock.runtime.MockActiveContext; -import org.apache.reef.mock.runtime.MockClosedContext; -import org.apache.reef.mock.runtime.MockFailedContext; - -/** - * close context process request. - */ -@Unstable -@Private -public final class CloseContext implements - ProcessRequestInternal<ClosedContext, FailedContext>, - AutoCompletable { - - private final MockActiveContext context; - - public CloseContext(final MockActiveContext context) { - this.context = context; - } - - @Override - public Type getType() { - return Type.CLOSE_CONTEXT; - } - - @Override - public MockClosedContext getSuccessEvent() { - return new MockClosedContext(this.context); - } - - @Override - public FailedContext getFailureEvent() { - return new MockFailedContext(this.context); - } - - @Override - public boolean doAutoComplete() { - return !this.context.getParentContext().isPresent(); - } - - @Override - public void setAutoComplete(final boolean value) { - throw new UnsupportedOperationException(); - } - - @Override - public ProcessRequest getCompletionProcessRequest() { - return new CloseEvaluator(this.context.getEvaluator()); - } -} http://git-wip-us.apache.org/repos/asf/reef/blob/5ed56eba/lang/java/reef-runtime-mock/src/main/java/org/apache/reef/mock/request/CloseEvaluator.java ---------------------------------------------------------------------- diff --git a/lang/java/reef-runtime-mock/src/main/java/org/apache/reef/mock/request/CloseEvaluator.java b/lang/java/reef-runtime-mock/src/main/java/org/apache/reef/mock/request/CloseEvaluator.java deleted file mode 100644 index 6ef2b9f..0000000 --- a/lang/java/reef-runtime-mock/src/main/java/org/apache/reef/mock/request/CloseEvaluator.java +++ /dev/null @@ -1,78 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, - * software distributed under the License is distributed on an - * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY - * KIND, either express or implied. See the License for the - * specific language governing permissions and limitations - * under the License. - */ - -package org.apache.reef.mock.request; - -import org.apache.reef.annotations.Unstable; -import org.apache.reef.annotations.audience.Private; -import org.apache.reef.driver.evaluator.CompletedEvaluator; -import org.apache.reef.driver.evaluator.FailedEvaluator; -import org.apache.reef.mock.ProcessRequest; -import org.apache.reef.mock.runtime.MockAllocatedEvalautor; -import org.apache.reef.mock.runtime.MockFailedEvaluator; - -/** - * close evaluator request. - */ -@Unstable -@Private -public final class CloseEvaluator implements ProcessRequestInternal<CompletedEvaluator, FailedEvaluator> { - - private final MockAllocatedEvalautor evaluator; - - public CloseEvaluator(final MockAllocatedEvalautor evaluator) { - this.evaluator = evaluator; - } - - @Override - public Type getType() { - return Type.CLOSE_EVALUATOR; - } - - @Override - public CompletedEvaluator getSuccessEvent() { - return new CompletedEvaluator() { - @Override - public String getId() { - return evaluator.getId(); - } - }; - } - - @Override - public FailedEvaluator getFailureEvent() { - // TODO[initialize remaining failed contstructer fields] - return new MockFailedEvaluator(evaluator.getId()); - } - - @Override - public boolean doAutoComplete() { - return false; - } - - @Override - public void setAutoComplete(final boolean value) { - throw new UnsupportedOperationException(); - } - - @Override - public ProcessRequest getCompletionProcessRequest() { - throw new UnsupportedOperationException(); - } -}
