http://git-wip-us.apache.org/repos/asf/reef/blob/d243aa2a/lang/java/reef-bridge-proto-java/src/main/java/org/apache/reef/bridge/driver/client/DefaultDriverClientStopHandler.java ---------------------------------------------------------------------- diff --git a/lang/java/reef-bridge-proto-java/src/main/java/org/apache/reef/bridge/driver/client/DefaultDriverClientStopHandler.java b/lang/java/reef-bridge-proto-java/src/main/java/org/apache/reef/bridge/driver/client/DefaultDriverClientStopHandler.java new file mode 100644 index 0000000..0424e47 --- /dev/null +++ b/lang/java/reef-bridge-proto-java/src/main/java/org/apache/reef/bridge/driver/client/DefaultDriverClientStopHandler.java @@ -0,0 +1,43 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.reef.bridge.driver.client; + +import org.apache.reef.wake.EventHandler; +import org.apache.reef.wake.time.event.StopTime; + +import javax.inject.Inject; +import java.util.logging.Level; +import java.util.logging.Logger; + +/** + * Default java client driver stop handler. + */ +public final class DefaultDriverClientStopHandler implements EventHandler<StopTime> { + + private static final Logger LOG = Logger.getLogger(DefaultDriverClientStopHandler.class.getName()); + + @Inject + private DefaultDriverClientStopHandler() {} + + @Override + public void onNext(final StopTime value) { + LOG.log(Level.FINEST, "Stop time {0}", value); + } +}
http://git-wip-us.apache.org/repos/asf/reef/blob/d243aa2a/lang/java/reef-bridge-proto-java/src/main/java/org/apache/reef/bridge/driver/client/DriverClientClock.java ---------------------------------------------------------------------- diff --git a/lang/java/reef-bridge-proto-java/src/main/java/org/apache/reef/bridge/driver/client/DriverClientClock.java b/lang/java/reef-bridge-proto-java/src/main/java/org/apache/reef/bridge/driver/client/DriverClientClock.java new file mode 100644 index 0000000..ca3817b --- /dev/null +++ b/lang/java/reef-bridge-proto-java/src/main/java/org/apache/reef/bridge/driver/client/DriverClientClock.java @@ -0,0 +1,127 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.reef.bridge.driver.client; + +import org.apache.reef.wake.EventHandler; +import org.apache.reef.wake.time.Clock; +import org.apache.reef.wake.time.Time; +import org.apache.reef.wake.time.event.Alarm; +import org.apache.reef.wake.time.runtime.Timer; +import org.apache.reef.wake.time.runtime.event.ClientAlarm; + +import javax.inject.Inject; +import java.io.IOException; +import java.util.HashMap; +import java.util.Map; +import java.util.UUID; +import java.util.logging.Level; +import java.util.logging.Logger; + +/** + * The bridge driver client clock. + */ +public final class DriverClientClock implements Clock, IAlarmDispatchHandler { + + private static final Logger LOG = Logger.getLogger(DriverClientClock.class.getName()); + + private final IDriverClientService driverClientService; + + private final IDriverServiceClient driverServiceClient; + + private final Timer timer; + + private final Map<String, ClientAlarm> alarmMap = new HashMap<>(); + + private boolean closed = false; + + @Inject + private DriverClientClock( + final Timer timer, + final IDriverClientService driverClientService, + final IDriverServiceClient driverServiceClient) { + this.timer = timer; + this.driverClientService = driverClientService; + this.driverServiceClient = driverServiceClient; + } + + @Override + public Time scheduleAlarm(final int offset, final EventHandler<Alarm> handler) { + final ClientAlarm alarm = new ClientAlarm(this.timer.getCurrent() + offset, handler); + final String alarmId = UUID.randomUUID().toString(); + this.alarmMap.put(alarmId, alarm); + this.driverServiceClient.onSetAlarm(alarmId, offset); + return alarm; + } + + @Override + public void close() { + stop(); + } + + @Override + public void stop() { + if (!closed) { + this.closed = true; + this.driverServiceClient.onShutdown(); + } + } + + @Override + public void stop(final Throwable exception) { + if (!closed) { + this.closed = true; + this.driverServiceClient.onShutdown(exception); + } + } + + @Override + public boolean isIdle() { + return this.closed || this.alarmMap.isEmpty(); + } + + @Override + public boolean isClosed() { + return this.closed; + } + + @Override + public void run() { + try { + this.driverClientService.start(); + this.driverClientService.awaitTermination(); + } catch (IOException | InterruptedException e) { + throw new RuntimeException(e); + } + } + + /** + * Alarm clock event handler. + * @param alarmId alarm identifier + */ + @Override + public void onNext(final String alarmId) { + if (this.alarmMap.containsKey(alarmId)) { + final ClientAlarm clientAlarm = this.alarmMap.remove(alarmId); + clientAlarm.run(); + } else { + LOG.log(Level.SEVERE, "Unknown alarm id {0}", alarmId); + } + } +} http://git-wip-us.apache.org/repos/asf/reef/blob/d243aa2a/lang/java/reef-bridge-proto-java/src/main/java/org/apache/reef/bridge/driver/client/DriverClientConfiguration.java ---------------------------------------------------------------------- diff --git a/lang/java/reef-bridge-proto-java/src/main/java/org/apache/reef/bridge/driver/client/DriverClientConfiguration.java b/lang/java/reef-bridge-proto-java/src/main/java/org/apache/reef/bridge/driver/client/DriverClientConfiguration.java new file mode 100644 index 0000000..c11273e --- /dev/null +++ b/lang/java/reef-bridge-proto-java/src/main/java/org/apache/reef/bridge/driver/client/DriverClientConfiguration.java @@ -0,0 +1,202 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.reef.bridge.driver.client; + +import org.apache.reef.bridge.driver.client.parameters.DriverClientDispatchThreadCount; +import org.apache.reef.bridge.driver.client.parameters.ClientDriverStopHandler; +import org.apache.reef.driver.context.ActiveContext; +import org.apache.reef.driver.context.ClosedContext; +import org.apache.reef.driver.context.ContextMessage; +import org.apache.reef.driver.context.FailedContext; +import org.apache.reef.driver.evaluator.AllocatedEvaluator; +import org.apache.reef.driver.evaluator.CompletedEvaluator; +import org.apache.reef.driver.evaluator.EvaluatorRequestor; +import org.apache.reef.driver.evaluator.FailedEvaluator; +import org.apache.reef.driver.parameters.*; +import org.apache.reef.driver.task.*; +import org.apache.reef.tang.formats.ConfigurationModule; +import org.apache.reef.tang.formats.ConfigurationModuleBuilder; +import org.apache.reef.tang.formats.OptionalImpl; +import org.apache.reef.tang.formats.RequiredImpl; +import org.apache.reef.wake.EventHandler; +import org.apache.reef.wake.time.Clock; +import org.apache.reef.wake.time.event.StartTime; +import org.apache.reef.wake.time.event.StopTime; + +/** + * Driver client configuration. + */ +public final class DriverClientConfiguration extends ConfigurationModuleBuilder { + + /** + * The event handler invoked right after the driver boots up. + */ + public static final RequiredImpl<EventHandler<StartTime>> ON_DRIVER_STARTED = new RequiredImpl<>(); + + /** + * The event handler invoked right before the driver shuts down. Defaults to ignore. + */ + public static final OptionalImpl<EventHandler<StopTime>> ON_DRIVER_STOP = new OptionalImpl<>(); + + // ***** EVALUATOR HANDLER BINDINGS: + + /** + * Event handler for allocated evaluators. Defaults to returning the evaluator if not bound. + */ + public static final OptionalImpl<EventHandler<AllocatedEvaluator>> ON_EVALUATOR_ALLOCATED = new OptionalImpl<>(); + + /** + * Event handler for completed evaluators. Defaults to logging if not bound. + */ + public static final OptionalImpl<EventHandler<CompletedEvaluator>> ON_EVALUATOR_COMPLETED = new OptionalImpl<>(); + + /** + * Event handler for failed evaluators. Defaults to job failure if not bound. + */ + public static final OptionalImpl<EventHandler<FailedEvaluator>> ON_EVALUATOR_FAILED = new OptionalImpl<>(); + + // ***** TASK HANDLER BINDINGS: + + /** + * Event handler for task messages. Defaults to logging if not bound. + */ + public static final OptionalImpl<EventHandler<TaskMessage>> ON_TASK_MESSAGE = new OptionalImpl<>(); + + /** + * Event handler for completed tasks. Defaults to closing the context the task ran on if not bound. + */ + public static final OptionalImpl<EventHandler<CompletedTask>> ON_TASK_COMPLETED = new OptionalImpl<>(); + + /** + * Event handler for failed tasks. Defaults to job failure if not bound. + */ + public static final OptionalImpl<EventHandler<FailedTask>> ON_TASK_FAILED = new OptionalImpl<>(); + + /** + * Event handler for running tasks. Defaults to logging if not bound. + */ + public static final OptionalImpl<EventHandler<RunningTask>> ON_TASK_RUNNING = new OptionalImpl<>(); + + /** + * Event handler for suspended tasks. Defaults to job failure if not bound. Rationale: many jobs don't support + * task suspension. Hence, this parameter should be optional. The only sane default is to crash the job, then. + */ + public static final OptionalImpl<EventHandler<SuspendedTask>> ON_TASK_SUSPENDED = new OptionalImpl<>(); + + // ***** CLIENT HANDLER BINDINGS: + + /** + * Event handler for client messages. Defaults to logging if not bound. + */ + public static final OptionalImpl<EventHandler<byte[]>> ON_CLIENT_MESSAGE = new OptionalImpl<>(); + + /** + * Event handler for close messages sent by the client. Defaults to job failure if not bound. + */ + public static final OptionalImpl<EventHandler<Void>> ON_CLIENT_CLOSED = new OptionalImpl<>(); + + /** + * Event handler for close messages sent by the client. Defaults to job failure if not bound. + */ + public static final OptionalImpl<EventHandler<byte[]>> ON_CLIENT_CLOSED_MESSAGE = new OptionalImpl<>(); + + // ***** CONTEXT HANDLER BINDINGS: + + /** + * Event handler for active context. Defaults to closing the context if not bound. + */ + public static final OptionalImpl<EventHandler<ActiveContext>> ON_CONTEXT_ACTIVE = new OptionalImpl<>(); + + /** + * Event handler for closed context. Defaults to logging if not bound. + */ + public static final OptionalImpl<EventHandler<ClosedContext>> ON_CONTEXT_CLOSED = new OptionalImpl<>(); + + /** + * Event handler for closed context. Defaults to job failure if not bound. + */ + public static final OptionalImpl<EventHandler<FailedContext>> ON_CONTEXT_FAILED = new OptionalImpl<>(); + + /** + * Event handler for context messages. Defaults to logging if not bound. + */ + public static final OptionalImpl<EventHandler<ContextMessage>> ON_CONTEXT_MESSAGE = new OptionalImpl<>(); + + /** + * Number of dispatch threads to use. + */ + public static final OptionalImpl<Integer> CLIENT_DRIVER_DISPATCH_THREAD_COUNT = new OptionalImpl<>(); + + /** + * Alarm dispatch handler. + */ + public static final OptionalImpl<IAlarmDispatchHandler> ALARM_DISPATCH_HANDLER = new OptionalImpl<>(); + + /** + * Default to gRPC Driver Client Service. + */ + public static final OptionalImpl<IDriverClientService> DRIVER_CLIENT_SERVICE = new OptionalImpl<>(); + + /** + * Default to gRPC Driver Service Client. + */ + public static final OptionalImpl<IDriverServiceClient> DRIVER_SERVICE_CLIENT = new OptionalImpl<>(); + + /** + * ConfigurationModule to fill out to get a legal Driver Configuration. + */ + public static final ConfigurationModule CONF = new DriverClientConfiguration() + .bindImplementation(Clock.class, DriverClientClock.class) + .bindImplementation(EvaluatorRequestor.class, DriverClientEvaluatorRequestor.class) + .bindImplementation(IAlarmDispatchHandler.class, ALARM_DISPATCH_HANDLER) + .bindImplementation(IDriverClientService.class, DRIVER_CLIENT_SERVICE) + .bindImplementation(IDriverServiceClient.class, DRIVER_SERVICE_CLIENT) + + .bindNamedParameter(DriverClientDispatchThreadCount.class, CLIENT_DRIVER_DISPATCH_THREAD_COUNT) + + // Driver start/stop handlers + .bindSetEntry(DriverStartHandler.class, ON_DRIVER_STARTED) + .bindSetEntry(ClientDriverStopHandler.class, ON_DRIVER_STOP) + + // Evaluator handlers + .bindSetEntry(EvaluatorAllocatedHandlers.class, ON_EVALUATOR_ALLOCATED) + .bindSetEntry(EvaluatorCompletedHandlers.class, ON_EVALUATOR_COMPLETED) + .bindSetEntry(EvaluatorFailedHandlers.class, ON_EVALUATOR_FAILED) + + // Task handlers + .bindSetEntry(TaskRunningHandlers.class, ON_TASK_RUNNING) + .bindSetEntry(TaskFailedHandlers.class, ON_TASK_FAILED) + .bindSetEntry(TaskMessageHandlers.class, ON_TASK_MESSAGE) + .bindSetEntry(TaskCompletedHandlers.class, ON_TASK_COMPLETED) + .bindSetEntry(TaskSuspendedHandlers.class, ON_TASK_SUSPENDED) + + // Context handlers + .bindSetEntry(ContextActiveHandlers.class, ON_CONTEXT_ACTIVE) + .bindSetEntry(ContextClosedHandlers.class, ON_CONTEXT_CLOSED) + .bindSetEntry(ContextMessageHandlers.class, ON_CONTEXT_MESSAGE) + .bindSetEntry(ContextFailedHandlers.class, ON_CONTEXT_FAILED) + + // Client handlers + .bindSetEntry(ClientMessageHandlers.class, ON_CLIENT_MESSAGE) + .bindSetEntry(ClientCloseHandlers.class, ON_CLIENT_CLOSED) + .bindSetEntry(ClientCloseWithMessageHandlers.class, ON_CLIENT_CLOSED_MESSAGE) + + .build(); +} http://git-wip-us.apache.org/repos/asf/reef/blob/d243aa2a/lang/java/reef-bridge-proto-java/src/main/java/org/apache/reef/bridge/driver/client/DriverClientDispatcher.java ---------------------------------------------------------------------- diff --git a/lang/java/reef-bridge-proto-java/src/main/java/org/apache/reef/bridge/driver/client/DriverClientDispatcher.java b/lang/java/reef-bridge-proto-java/src/main/java/org/apache/reef/bridge/driver/client/DriverClientDispatcher.java new file mode 100644 index 0000000..8c4eb28 --- /dev/null +++ b/lang/java/reef-bridge-proto-java/src/main/java/org/apache/reef/bridge/driver/client/DriverClientDispatcher.java @@ -0,0 +1,346 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.reef.bridge.driver.client; + +import com.google.common.collect.Sets; +import org.apache.reef.annotations.audience.Private; +import org.apache.reef.bridge.driver.client.parameters.DriverClientDispatchThreadCount; +import org.apache.reef.bridge.driver.client.parameters.ClientDriverStopHandler; +import org.apache.reef.driver.context.ActiveContext; +import org.apache.reef.driver.context.ClosedContext; +import org.apache.reef.driver.context.ContextMessage; +import org.apache.reef.driver.context.FailedContext; +import org.apache.reef.driver.evaluator.AllocatedEvaluator; +import org.apache.reef.driver.evaluator.CompletedEvaluator; +import org.apache.reef.driver.evaluator.FailedEvaluator; +import org.apache.reef.driver.parameters.*; +import org.apache.reef.driver.restart.DriverRestartCompleted; +import org.apache.reef.driver.restart.DriverRestarted; +import org.apache.reef.driver.task.*; +import org.apache.reef.runtime.common.utils.DispatchingEStage; +import org.apache.reef.tang.annotations.Parameter; +import org.apache.reef.wake.EventHandler; +import org.apache.reef.wake.time.event.StartTime; +import org.apache.reef.wake.time.event.StopTime; + +import javax.inject.Inject; +import java.util.Set; + +/** + * Async dispatch of client driver events. + */ +@Private +public final class DriverClientDispatcher { + + /** + * Dispatcher used for application provided event handlers. + */ + private final DispatchingEStage applicationDispatcher; + + /** + * Dispatcher for client close events. + */ + private final DispatchingEStage clientCloseDispatcher; + + /** + * Dispatcher for client close with message events. + */ + private final DispatchingEStage clientCloseWithMessageDispatcher; + + /** + * Dispatcher for client messages. + */ + private final DispatchingEStage clientMessageDispatcher; + + /** + * The alarm dispatcher. + */ + private final DispatchingEStage alarmDispatcher; + + /** + * Driver restart dispatcher. + */ + private final DispatchingEStage driverRestartDispatcher; + + @Inject + private DriverClientDispatcher( + final DriverClientExceptionHandler driverExceptionHandler, + final IAlarmDispatchHandler alarmDispatchHandler, + @Parameter(DriverClientDispatchThreadCount.class) + final Integer numberOfThreads, + // Application-provided start and stop handlers + @Parameter(DriverStartHandler.class) + final Set<EventHandler<StartTime>> startHandlers, + @Parameter(ClientDriverStopHandler.class) + final Set<EventHandler<StopTime>> stopHandlers, + // Application-provided Context event handlers + @Parameter(ContextActiveHandlers.class) + final Set<EventHandler<ActiveContext>> contextActiveHandlers, + @Parameter(ContextClosedHandlers.class) + final Set<EventHandler<ClosedContext>> contextClosedHandlers, + @Parameter(ContextFailedHandlers.class) + final Set<EventHandler<FailedContext>> contextFailedHandlers, + @Parameter(ContextMessageHandlers.class) + final Set<EventHandler<ContextMessage>> contextMessageHandlers, + // Application-provided Task event handlers + @Parameter(TaskRunningHandlers.class) + final Set<EventHandler<RunningTask>> taskRunningHandlers, + @Parameter(TaskCompletedHandlers.class) + final Set<EventHandler<CompletedTask>> taskCompletedHandlers, + @Parameter(TaskSuspendedHandlers.class) + final Set<EventHandler<SuspendedTask>> taskSuspendedHandlers, + @Parameter(TaskMessageHandlers.class) + final Set<EventHandler<TaskMessage>> taskMessageEventHandlers, + @Parameter(TaskFailedHandlers.class) + final Set<EventHandler<FailedTask>> taskExceptionEventHandlers, + // Application-provided Evaluator event handlers + @Parameter(EvaluatorAllocatedHandlers.class) + final Set<EventHandler<AllocatedEvaluator>> evaluatorAllocatedHandlers, + @Parameter(EvaluatorFailedHandlers.class) + final Set<EventHandler<FailedEvaluator>> evaluatorFailedHandlers, + @Parameter(EvaluatorCompletedHandlers.class) + final Set<EventHandler<CompletedEvaluator>> evaluatorCompletedHandlers, + // Client handlers + @Parameter(ClientCloseHandlers.class) + final Set<EventHandler<Void>> clientCloseHandlers, + @Parameter(ClientCloseWithMessageHandlers.class) + final Set<EventHandler<byte[]>> clientCloseWithMessageHandlers, + @Parameter(ClientMessageHandlers.class) + final Set<EventHandler<byte[]>> clientMessageHandlers) { + + this.applicationDispatcher = new DispatchingEStage( + driverExceptionHandler, numberOfThreads, "ClientDriverDispatcher"); + // Application start and stop handlers + this.applicationDispatcher.register(StartTime.class, startHandlers); + this.applicationDispatcher.register(StopTime.class, stopHandlers); + // Application Context event handlers + this.applicationDispatcher.register(ActiveContext.class, contextActiveHandlers); + this.applicationDispatcher.register(ClosedContext.class, contextClosedHandlers); + this.applicationDispatcher.register(FailedContext.class, contextFailedHandlers); + this.applicationDispatcher.register(ContextMessage.class, contextMessageHandlers); + + // Application Task event handlers. + this.applicationDispatcher.register(RunningTask.class, taskRunningHandlers); + this.applicationDispatcher.register(CompletedTask.class, taskCompletedHandlers); + this.applicationDispatcher.register(SuspendedTask.class, taskSuspendedHandlers); + this.applicationDispatcher.register(TaskMessage.class, taskMessageEventHandlers); + this.applicationDispatcher.register(FailedTask.class, taskExceptionEventHandlers); + + // Application Evaluator event handlers + this.applicationDispatcher.register(AllocatedEvaluator.class, evaluatorAllocatedHandlers); + this.applicationDispatcher.register(CompletedEvaluator.class, evaluatorCompletedHandlers); + this.applicationDispatcher.register(FailedEvaluator.class, evaluatorFailedHandlers); + + // Client event handlers; + this.clientCloseDispatcher = new DispatchingEStage(this.applicationDispatcher); + this.clientCloseDispatcher.register(Void.class, clientCloseHandlers); + + this.clientCloseWithMessageDispatcher = new DispatchingEStage(this.applicationDispatcher); + this.clientCloseWithMessageDispatcher.register(byte[].class, clientCloseWithMessageHandlers); + + this.clientMessageDispatcher = new DispatchingEStage(this.applicationDispatcher); + this.clientMessageDispatcher.register(byte[].class, clientMessageHandlers); + + // Alarm event handlers + this.alarmDispatcher = new DispatchingEStage(this.applicationDispatcher); + this.alarmDispatcher.register(String.class, + Sets.newHashSet((EventHandler<String>)alarmDispatchHandler)); + + // Driver restart dispatcher + this.driverRestartDispatcher = new DispatchingEStage(this.applicationDispatcher); + } + + @Inject + private DriverClientDispatcher( + final DriverClientExceptionHandler driverExceptionHandler, + final IAlarmDispatchHandler alarmDispatchHandler, + @Parameter(DriverClientDispatchThreadCount.class) + final Integer numberOfThreads, + // Application-provided start and stop handlers + @Parameter(DriverStartHandler.class) + final Set<EventHandler<StartTime>> startHandlers, + @Parameter(ClientDriverStopHandler.class) + final Set<EventHandler<StopTime>> stopHandlers, + // Application-provided Context event handlers + @Parameter(ContextActiveHandlers.class) + final Set<EventHandler<ActiveContext>> contextActiveHandlers, + @Parameter(ContextClosedHandlers.class) + final Set<EventHandler<ClosedContext>> contextClosedHandlers, + @Parameter(ContextFailedHandlers.class) + final Set<EventHandler<FailedContext>> contextFailedHandlers, + @Parameter(ContextMessageHandlers.class) + final Set<EventHandler<ContextMessage>> contextMessageHandlers, + // Application-provided Task event handlers + @Parameter(TaskRunningHandlers.class) + final Set<EventHandler<RunningTask>> taskRunningHandlers, + @Parameter(TaskCompletedHandlers.class) + final Set<EventHandler<CompletedTask>> taskCompletedHandlers, + @Parameter(TaskSuspendedHandlers.class) + final Set<EventHandler<SuspendedTask>> taskSuspendedHandlers, + @Parameter(TaskMessageHandlers.class) + final Set<EventHandler<TaskMessage>> taskMessageEventHandlers, + @Parameter(TaskFailedHandlers.class) + final Set<EventHandler<FailedTask>> taskExceptionEventHandlers, + // Application-provided Evaluator event handlers + @Parameter(EvaluatorAllocatedHandlers.class) + final Set<EventHandler<AllocatedEvaluator>> evaluatorAllocatedHandlers, + @Parameter(EvaluatorFailedHandlers.class) + final Set<EventHandler<FailedEvaluator>> evaluatorFailedHandlers, + @Parameter(EvaluatorCompletedHandlers.class) + final Set<EventHandler<CompletedEvaluator>> evaluatorCompletedHandlers, + // Client handlers + @Parameter(ClientCloseHandlers.class) + final Set<EventHandler<Void>> clientCloseHandlers, + @Parameter(ClientCloseWithMessageHandlers.class) + final Set<EventHandler<byte[]>> clientCloseWithMessageHandlers, + @Parameter(ClientMessageHandlers.class) + final Set<EventHandler<byte[]>> clientMessageHandlers, + // Driver restart handlers + @Parameter(DriverRestartHandler.class) + final Set<EventHandler<DriverRestarted>> driverRestartHandlers, + @Parameter(DriverRestartTaskRunningHandlers.class) + final Set<EventHandler<RunningTask>> driverRestartTaskRunningHandlers, + @Parameter(DriverRestartContextActiveHandlers.class) + final Set<EventHandler<ActiveContext>> driverRestartActiveContextHandlers, + @Parameter(DriverRestartCompletedHandlers.class) + final Set<EventHandler<DriverRestartCompleted>> driverRestartCompletedHandlers, + @Parameter(DriverRestartFailedEvaluatorHandlers.class) + final Set<EventHandler<FailedEvaluator>> driverRestartFailedEvaluatorHandlers) { + this( + driverExceptionHandler, + alarmDispatchHandler, + numberOfThreads, + startHandlers, + stopHandlers, + contextActiveHandlers, + contextClosedHandlers, + contextFailedHandlers, + contextMessageHandlers, + taskRunningHandlers, + taskCompletedHandlers, + taskSuspendedHandlers, + taskMessageEventHandlers, + taskExceptionEventHandlers, + evaluatorAllocatedHandlers, + evaluatorFailedHandlers, + evaluatorCompletedHandlers, + clientCloseHandlers, + clientCloseWithMessageHandlers, + clientMessageHandlers); + // Register driver restart handlers. + this.driverRestartDispatcher.register(DriverRestarted.class, driverRestartHandlers); + this.driverRestartDispatcher.register(RunningTask.class, driverRestartTaskRunningHandlers); + this.driverRestartDispatcher.register(ActiveContext.class, driverRestartActiveContextHandlers); + this.driverRestartDispatcher.register(DriverRestartCompleted.class, driverRestartCompletedHandlers); + this.driverRestartDispatcher.register(FailedEvaluator.class, driverRestartFailedEvaluatorHandlers); + } + + public void dispatchRestart(final DriverRestarted driverRestarted) { + this.driverRestartDispatcher.onNext(DriverRestarted.class, driverRestarted); + } + + public void dispatchRestart(final RunningTask task) { + this.driverRestartDispatcher.onNext(RunningTask.class, task); + } + + public void dispatchRestart(final ActiveContext context) { + this.driverRestartDispatcher.onNext(ActiveContext.class, context); + } + + public void dispatchRestart(final DriverRestartCompleted completed) { + this.driverRestartDispatcher.onNext(DriverRestartCompleted.class, completed); + } + + public void dispatchRestart(final FailedEvaluator evaluator) { + this.driverRestartDispatcher.onNext(FailedEvaluator.class, evaluator); + } + + public void dispatch(final StartTime startTime) { + this.applicationDispatcher.onNext(StartTime.class, startTime); + } + + public void dispatch(final StopTime stopTime) { + this.applicationDispatcher.onNext(StopTime.class, stopTime); + } + + public void dispatch(final ActiveContext context) { + this.applicationDispatcher.onNext(ActiveContext.class, context); + } + + public void dispatch(final ClosedContext context) { + this.applicationDispatcher.onNext(ClosedContext.class, context); + } + + public void dispatch(final FailedContext context) { + this.applicationDispatcher.onNext(FailedContext.class, context); + } + + public void dispatch(final ContextMessage message) { + this.applicationDispatcher.onNext(ContextMessage.class, message); + } + + public void dispatch(final AllocatedEvaluator evaluator) { + this.applicationDispatcher.onNext(AllocatedEvaluator.class, evaluator); + } + + public void dispatch(final FailedEvaluator evaluator) { + this.applicationDispatcher.onNext(FailedEvaluator.class, evaluator); + } + + public void dispatch(final CompletedEvaluator evaluator) { + this.applicationDispatcher.onNext(CompletedEvaluator.class, evaluator); + } + + public void dispatch(final RunningTask task) { + this.applicationDispatcher.onNext(RunningTask.class, task); + } + + public void dispatch(final CompletedTask task) { + this.applicationDispatcher.onNext(CompletedTask.class, task); + } + + public void dispatch(final FailedTask task) { + this.applicationDispatcher.onNext(FailedTask.class, task); + } + + public void dispatch(final SuspendedTask task) { + this.applicationDispatcher.onNext(SuspendedTask.class, task); + } + + public void dispatch(final TaskMessage message) { + this.applicationDispatcher.onNext(TaskMessage.class, message); + } + + public void clientCloseDispatch() { + this.clientCloseDispatcher.onNext(Void.class, null); + } + + public void clientCloseWithMessageDispatch(final byte[] message) { + this.clientCloseWithMessageDispatcher.onNext(byte[].class, message); + } + + public void clientMessageDispatch(final byte[] message) { + this.clientMessageDispatcher.onNext(byte[].class, message); + } + + public void dispatchAlarm(final String alarmId) { + this.alarmDispatcher.onNext(String.class, alarmId); + } +} http://git-wip-us.apache.org/repos/asf/reef/blob/d243aa2a/lang/java/reef-bridge-proto-java/src/main/java/org/apache/reef/bridge/driver/client/DriverClientEvaluatorRequestor.java ---------------------------------------------------------------------- diff --git a/lang/java/reef-bridge-proto-java/src/main/java/org/apache/reef/bridge/driver/client/DriverClientEvaluatorRequestor.java b/lang/java/reef-bridge-proto-java/src/main/java/org/apache/reef/bridge/driver/client/DriverClientEvaluatorRequestor.java new file mode 100644 index 0000000..54692ec --- /dev/null +++ b/lang/java/reef-bridge-proto-java/src/main/java/org/apache/reef/bridge/driver/client/DriverClientEvaluatorRequestor.java @@ -0,0 +1,59 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.reef.bridge.driver.client; + +import org.apache.reef.driver.evaluator.EvaluatorRequest; +import org.apache.reef.driver.evaluator.EvaluatorRequestor; + +import javax.inject.Inject; + +/** + * Driver Client evaluator requestor. + */ +public final class DriverClientEvaluatorRequestor implements EvaluatorRequestor { + + private final IDriverServiceClient driverServiceClient; + + @Inject + private DriverClientEvaluatorRequestor(final IDriverServiceClient driverServiceClient) { + this.driverServiceClient = driverServiceClient; + } + + @Override + public void submit(final EvaluatorRequest req) { + this.driverServiceClient.onEvaluatorRequest(req); + } + + @Override + public EvaluatorRequest.Builder newRequest() { + return new DriverClientEvaluatorRequestor.Builder(); + } + + /** + * {@link DriverClientEvaluatorRequestor.Builder} extended with a new submit method. + * {@link EvaluatorRequest}s are built using this builder. + */ + public final class Builder extends EvaluatorRequest.Builder<DriverClientEvaluatorRequestor.Builder> { + @Override + public synchronized void submit() { + DriverClientEvaluatorRequestor.this.submit(this.build()); + } + } +} http://git-wip-us.apache.org/repos/asf/reef/blob/d243aa2a/lang/java/reef-bridge-proto-java/src/main/java/org/apache/reef/bridge/driver/client/DriverClientExceptionHandler.java ---------------------------------------------------------------------- diff --git a/lang/java/reef-bridge-proto-java/src/main/java/org/apache/reef/bridge/driver/client/DriverClientExceptionHandler.java b/lang/java/reef-bridge-proto-java/src/main/java/org/apache/reef/bridge/driver/client/DriverClientExceptionHandler.java new file mode 100644 index 0000000..9bd99d6 --- /dev/null +++ b/lang/java/reef-bridge-proto-java/src/main/java/org/apache/reef/bridge/driver/client/DriverClientExceptionHandler.java @@ -0,0 +1,43 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.reef.bridge.driver.client; + +import org.apache.reef.wake.EventHandler; + +import javax.inject.Inject; +import java.util.logging.Level; +import java.util.logging.Logger; + +/** + * Driver client exception handler. + */ +public final class DriverClientExceptionHandler implements EventHandler<Throwable> { + private static final Logger LOG = Logger.getLogger(DriverClientExceptionHandler.class.getName()); + + @Inject + private DriverClientExceptionHandler() { + LOG.log(Level.FINE, "Instantiated 'DriverExceptionHandler'"); + } + + + @Override + public void onNext(final Throwable throwable) { + } +} http://git-wip-us.apache.org/repos/asf/reef/blob/d243aa2a/lang/java/reef-bridge-proto-java/src/main/java/org/apache/reef/bridge/driver/client/IAlarmDispatchHandler.java ---------------------------------------------------------------------- diff --git a/lang/java/reef-bridge-proto-java/src/main/java/org/apache/reef/bridge/driver/client/IAlarmDispatchHandler.java b/lang/java/reef-bridge-proto-java/src/main/java/org/apache/reef/bridge/driver/client/IAlarmDispatchHandler.java new file mode 100644 index 0000000..e8e8fe5 --- /dev/null +++ b/lang/java/reef-bridge-proto-java/src/main/java/org/apache/reef/bridge/driver/client/IAlarmDispatchHandler.java @@ -0,0 +1,30 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.reef.bridge.driver.client; + +import org.apache.reef.tang.annotations.DefaultImplementation; +import org.apache.reef.wake.EventHandler; + +/** + * Alarm dispatch handler. + */ +@DefaultImplementation(DriverClientClock.class) +public interface IAlarmDispatchHandler extends EventHandler<String> { +} http://git-wip-us.apache.org/repos/asf/reef/blob/d243aa2a/lang/java/reef-bridge-proto-java/src/main/java/org/apache/reef/bridge/driver/client/IDriverClientService.java ---------------------------------------------------------------------- diff --git a/lang/java/reef-bridge-proto-java/src/main/java/org/apache/reef/bridge/driver/client/IDriverClientService.java b/lang/java/reef-bridge-proto-java/src/main/java/org/apache/reef/bridge/driver/client/IDriverClientService.java new file mode 100644 index 0000000..38758bd --- /dev/null +++ b/lang/java/reef-bridge-proto-java/src/main/java/org/apache/reef/bridge/driver/client/IDriverClientService.java @@ -0,0 +1,45 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.reef.bridge.driver.client; + +import org.apache.reef.bridge.driver.client.grpc.DriverClientService; +import org.apache.reef.tang.annotations.DefaultImplementation; + +import java.io.IOException; + +/** + * Interface that driver client services implement. + */ +@DefaultImplementation(DriverClientService.class) +public interface IDriverClientService { + + /** + * Start the DriverClient service. + * @throws IOException when unable to start service + */ + void start() throws IOException; + + + /** + * Wait for termination of driver client service. + */ + void awaitTermination() throws InterruptedException; + +} http://git-wip-us.apache.org/repos/asf/reef/blob/d243aa2a/lang/java/reef-bridge-proto-java/src/main/java/org/apache/reef/bridge/driver/client/IDriverServiceClient.java ---------------------------------------------------------------------- diff --git a/lang/java/reef-bridge-proto-java/src/main/java/org/apache/reef/bridge/driver/client/IDriverServiceClient.java b/lang/java/reef-bridge-proto-java/src/main/java/org/apache/reef/bridge/driver/client/IDriverServiceClient.java new file mode 100644 index 0000000..7cc1346 --- /dev/null +++ b/lang/java/reef-bridge-proto-java/src/main/java/org/apache/reef/bridge/driver/client/IDriverServiceClient.java @@ -0,0 +1,132 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.reef.bridge.driver.client; + +import org.apache.reef.bridge.driver.client.grpc.DriverServiceClient; +import org.apache.reef.driver.evaluator.EvaluatorRequest; +import org.apache.reef.tang.Configuration; +import org.apache.reef.tang.annotations.DefaultImplementation; +import org.apache.reef.util.Optional; + +import java.io.File; +import java.util.List; + +/** + * Forwards application requests to driver server. + */ +@DefaultImplementation(DriverServiceClient.class) +public interface IDriverServiceClient { + + /** + * Initiate shutdown. + */ + void onShutdown(); + + /** + * Initiate shutdown with error. + * @param ex exception error + */ + void onShutdown(final Throwable ex); + + /** + * Set alarm. + * @param alarmId alarm identifier + * @param timeoutMS timeout in milliseconds + */ + void onSetAlarm(final String alarmId, final int timeoutMS); + + /** + * Request evaluators. + * @param evaluatorRequest event + */ + void onEvaluatorRequest(final EvaluatorRequest evaluatorRequest); + + /** + * Close evaluator. + * @param evalautorId to close + */ + void onEvaluatorClose(final String evalautorId); + + /** + * Submit context and/or task. + * @param evaluatorId to submit against + * @param contextConfiguration context configuration + * @param taskConfiguration task configuration + * @param evaluatorProcess evaluator process + * @param addFileList to include + * @param addLibraryList to include + */ + void onEvaluatorSubmit( + final String evaluatorId, + final Optional<Configuration> contextConfiguration, + final Optional<Configuration> taskConfiguration, + final Optional<JVMClientProcess> evaluatorProcess, + final Optional<List<File>> addFileList, + final Optional<List<File>> addLibraryList); + + // Context Operations + + /** + * Close context. + * @param contextId to close + */ + void onContextClose(final String contextId); + + /** + * Submit child context. + * @param contextId to submit against + * @param contextConfiguration for child context + */ + void onContextSubmitContext( + final String contextId, + final Configuration contextConfiguration); + + /** + * Submit task. + * @param contextId to submit against + * @param taskConfiguration for task + */ + void onContextSubmitTask( + final String contextId, + final Configuration taskConfiguration); + + /** + * Send message to context. + * @param contextId to destination context + * @param message to send + */ + void onContextMessage(final String contextId, final byte[] message); + + // Task operations + + /** + * Close the task. + * @param taskId to close + * @param message optional message to include + */ + void onTaskClose(final String taskId, final Optional<byte[]> message); + + /** + * Send task a message. + * @param taskId of destination task + * @param message to send + */ + void onTaskMessage(final String taskId, final byte[] message); +} http://git-wip-us.apache.org/repos/asf/reef/blob/d243aa2a/lang/java/reef-bridge-proto-java/src/main/java/org/apache/reef/bridge/driver/client/JVMClientProcess.java ---------------------------------------------------------------------- diff --git a/lang/java/reef-bridge-proto-java/src/main/java/org/apache/reef/bridge/driver/client/JVMClientProcess.java b/lang/java/reef-bridge-proto-java/src/main/java/org/apache/reef/bridge/driver/client/JVMClientProcess.java new file mode 100644 index 0000000..124be21 --- /dev/null +++ b/lang/java/reef-bridge-proto-java/src/main/java/org/apache/reef/bridge/driver/client/JVMClientProcess.java @@ -0,0 +1,121 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.reef.bridge.driver.client; + +import org.apache.reef.annotations.audience.Private; +import org.apache.reef.driver.evaluator.EvaluatorProcess; +import org.apache.reef.driver.evaluator.EvaluatorType; + +import java.util.ArrayList; +import java.util.List; + +/** + * Stub class for Evaluator Process on driver client. + */ +@Private +public final class JVMClientProcess implements EvaluatorProcess { + + private boolean optionSet = false; + + private int megaBytes = 0; + + private String configurationFileName = null; + + private String standardOut = null; + + private String standardErr = null; + + private final List<String> optionList = new ArrayList<>(); + + public JVMClientProcess() { + } + + @Override + public List<String> getCommandLine() { + throw new UnsupportedOperationException(); + } + + @Override + public EvaluatorType getType() { + return EvaluatorType.JVM; + } + + @Override + public JVMClientProcess setMemory(final int mb) { + this.megaBytes = mb; + this.optionSet = true; + return this; + } + + public int getMemory() { + return this.megaBytes; + } + + @Override + public boolean isOptionSet() { + return optionSet; + } + + @Override + public JVMClientProcess setConfigurationFileName(final String configurationFileName) { + this.configurationFileName = configurationFileName; + return this; + } + + public String getConfigurationFileName() { + return this.configurationFileName; + } + + @Override + public JVMClientProcess setStandardOut(final String standardOut) { + this.standardOut = standardOut; + return this; + } + + public String getStandardOut() { + return this.standardOut; + } + + @Override + public JVMClientProcess setStandardErr(final String standardErr) { + this.standardErr = standardErr; + return this; + } + + public String getStandardErr() { + return this.standardErr; + } + + /** + * Add a JVM option. + * @param option The full option, e.g. "-XX:+PrintGCDetails", "-Xms500m" + * @return this + */ + public JVMClientProcess addOption(final String option) { + this.optionList.add(option); + optionSet = true; + return this; + } + + public List<String> getOptions() { + return this.optionList; + } + +} http://git-wip-us.apache.org/repos/asf/reef/blob/d243aa2a/lang/java/reef-bridge-proto-java/src/main/java/org/apache/reef/bridge/driver/client/JavaDriverClientLauncher.java ---------------------------------------------------------------------- diff --git a/lang/java/reef-bridge-proto-java/src/main/java/org/apache/reef/bridge/driver/client/JavaDriverClientLauncher.java b/lang/java/reef-bridge-proto-java/src/main/java/org/apache/reef/bridge/driver/client/JavaDriverClientLauncher.java new file mode 100644 index 0000000..3b675ea --- /dev/null +++ b/lang/java/reef-bridge-proto-java/src/main/java/org/apache/reef/bridge/driver/client/JavaDriverClientLauncher.java @@ -0,0 +1,217 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.reef.bridge.driver.client; + +import org.apache.reef.bridge.driver.client.grpc.DriverClientGrpcConfiguration; +import org.apache.reef.bridge.driver.client.grpc.parameters.DriverServicePort; +import org.apache.reef.runtime.common.REEFLauncher; +import org.apache.reef.runtime.common.evaluator.PIDStoreStartHandler; +import org.apache.reef.runtime.common.launch.REEFErrorHandler; +import org.apache.reef.runtime.common.launch.REEFMessageCodec; +import org.apache.reef.runtime.common.launch.REEFUncaughtExceptionHandler; +import org.apache.reef.runtime.common.launch.parameters.ClockConfigurationPath; +import org.apache.reef.tang.Configuration; +import org.apache.reef.tang.Configurations; +import org.apache.reef.tang.Injector; +import org.apache.reef.tang.Tang; +import org.apache.reef.tang.annotations.Parameter; +import org.apache.reef.tang.exceptions.BindException; +import org.apache.reef.tang.exceptions.InjectionException; +import org.apache.reef.tang.formats.ConfigurationSerializer; +import org.apache.reef.util.EnvironmentUtils; +import org.apache.reef.util.ThreadLogger; +import org.apache.reef.util.logging.LoggingSetup; +import org.apache.reef.wake.remote.RemoteConfiguration; +import org.apache.reef.wake.time.Clock; + +import javax.inject.Inject; +import java.io.File; +import java.io.FileNotFoundException; +import java.io.IOException; +import java.util.logging.Level; +import java.util.logging.Logger; + +/** + * Driver client launcher. + */ +public final class JavaDriverClientLauncher { + + private static final Logger LOG = Logger.getLogger(REEFLauncher.class.getName()); + + private static final Tang TANG = Tang.Factory.getTang(); + + private static final Configuration LAUNCHER_STATIC_CONFIG = + TANG.newConfigurationBuilder() + .bindNamedParameter(RemoteConfiguration.ManagerName.class, "DRIVER_CLIENT_LAUNCHER") + .bindNamedParameter(RemoteConfiguration.ErrorHandler.class, REEFErrorHandler.class) + .bindNamedParameter(RemoteConfiguration.MessageCodec.class, REEFMessageCodec.class) + .bindSetEntry(Clock.RuntimeStartHandler.class, PIDStoreStartHandler.class) + .build(); + + static { + LoggingSetup.setupCommonsLogging(); + } + + /** + * Main configuration object of the REEF component we are launching here. + */ + private final Configuration envConfig; + + /** + * REEFLauncher is instantiated in the main() method below using + * Tang configuration file provided as a command line argument. + * @param configurationPath Path to the serialized Tang configuration file. + * (The file must be in the local file system). + * @param configurationSerializer Serializer used to read the configuration file. + * We currently use Avro to serialize Tang configs. + */ + @Inject + private JavaDriverClientLauncher( + @Parameter(DriverServicePort.class) final Integer driverServicePort, + @Parameter(ClockConfigurationPath.class) final String configurationPath, + final ConfigurationSerializer configurationSerializer) { + + this.envConfig = Configurations.merge( + LAUNCHER_STATIC_CONFIG, + DriverClientGrpcConfiguration.CONF + .set(DriverClientGrpcConfiguration.DRIVER_SERVICE_PORT, driverServicePort) + .build(), + readConfigurationFromDisk(configurationPath, configurationSerializer)); + } + + /** + * Instantiate REEF DriverServiceLauncher. This method is called from REEFLauncher.main(). + * @param clockConfigPath Path to the local file that contains serialized configuration + * for the driver client. + * @return An instance of the configured REEFLauncher object. + */ + private static JavaDriverClientLauncher getLauncher(final String clockConfigPath, final int driverServicePort) { + + try { + + final Configuration clockArgConfig = Configurations.merge( + LAUNCHER_STATIC_CONFIG, + DriverClientGrpcConfiguration.CONF + .set(DriverClientGrpcConfiguration.DRIVER_SERVICE_PORT, driverServicePort) + .build(), + TANG.newConfigurationBuilder() + .bindNamedParameter(ClockConfigurationPath.class, clockConfigPath) + .build()); + + return TANG.newInjector(clockArgConfig).getInstance(JavaDriverClientLauncher.class); + + } catch (final BindException ex) { + throw fatal("Error in parsing the command line", ex); + } catch (final InjectionException ex) { + throw fatal("Unable to instantiate REEFLauncher.", ex); + } + } + + /** + * Read configuration from a given file and deserialize it + * into Tang configuration object that can be used for injection. + * Configuration is currently serialized using Avro. + * This method also prints full deserialized configuration into log. + * @param configPath Path to the local file that contains serialized configuration + * of a REEF component to launch (can be either Driver or Evaluator). + * @param serializer An object to deserialize the configuration file. + * @return Tang configuration read and deserialized from a given file. + */ + private static Configuration readConfigurationFromDisk( + final String configPath, final ConfigurationSerializer serializer) { + + LOG.log(Level.FINER, "Loading configuration file: {0}", configPath); + + final File evaluatorConfigFile = new File(configPath); + + if (!evaluatorConfigFile.exists()) { + throw fatal( + "Configuration file " + configPath + " does not exist. Can be an issue in job submission.", + new FileNotFoundException(configPath)); + } + + if (!evaluatorConfigFile.canRead()) { + throw fatal( + "Configuration file " + configPath + " exists, but can't be read.", + new IOException(configPath)); + } + + try { + + final Configuration config = serializer.fromFile(evaluatorConfigFile); + LOG.log(Level.FINEST, "The configuration file loaded: {0}", configPath); + + return config; + + } catch (final IOException e) { + throw fatal("Unable to parse the configuration file: " + configPath, e); + } + } + + /** + * Launches a REEF client process (Driver or Evaluator). + * @param args Command-line arguments. + * Must be a single element containing local path to the configuration file. + */ + @SuppressWarnings("checkstyle:illegalcatch") + public static void main(final String[] args) { + + LOG.log(Level.INFO, "Entering JavaDriverClientLauncher.main()."); + + LOG.log(Level.FINE, "JavaDriverClientLauncher started with user name [{0}]", System.getProperty("user.name")); + LOG.log(Level.FINE, "JavaDriverClientLauncher started. Assertions are {0} in this process.", + EnvironmentUtils.areAssertionsEnabled() ? "ENABLED" : "DISABLED"); + + if (args.length != 2) { + final String message = "JavaDriverClientLauncher have two and only two arguments to specify the runtime clock " + + "configuration path and driver service port"; + + throw fatal(message, new IllegalArgumentException(message)); + } + + final JavaDriverClientLauncher launcher = getLauncher(args[0], Integer.parseInt(args[1])); + + Thread.setDefaultUncaughtExceptionHandler(new REEFUncaughtExceptionHandler(launcher.envConfig)); + final Injector injector = Tang.Factory.getTang().newInjector(launcher.envConfig); + try (final Clock reef = injector.getInstance(Clock.class)) { + reef.run(); + } catch (final Throwable ex) { + throw fatal("Unable to configure and start Clock.", ex); + } + + ThreadLogger.logThreads(LOG, Level.FINEST, "Threads running after Clock.close():"); + + LOG.log(Level.INFO, "Exiting REEFLauncher.main()"); + + System.exit(0); // TODO[REEF-1715]: Should be able to exit cleanly at the end of main() + } + + /** + * Wrap an exception into RuntimeException with a given message, + * and write the same message and exception to the log. + * @param msg an error message to log and pass into the RuntimeException. + * @param t A Throwable exception to log and wrap. + * @return a new Runtime exception wrapping a Throwable. + */ + private static RuntimeException fatal(final String msg, final Throwable t) { + LOG.log(Level.SEVERE, msg, t); + return new RuntimeException(msg, t); + } +} http://git-wip-us.apache.org/repos/asf/reef/blob/d243aa2a/lang/java/reef-bridge-proto-java/src/main/java/org/apache/reef/bridge/driver/client/events/ActiveContextBridge.java ---------------------------------------------------------------------- diff --git a/lang/java/reef-bridge-proto-java/src/main/java/org/apache/reef/bridge/driver/client/events/ActiveContextBridge.java b/lang/java/reef-bridge-proto-java/src/main/java/org/apache/reef/bridge/driver/client/events/ActiveContextBridge.java new file mode 100644 index 0000000..4c97697 --- /dev/null +++ b/lang/java/reef-bridge-proto-java/src/main/java/org/apache/reef/bridge/driver/client/events/ActiveContextBridge.java @@ -0,0 +1,111 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.reef.bridge.driver.client.events; + +import org.apache.reef.bridge.driver.client.IDriverServiceClient; +import org.apache.reef.driver.context.ActiveContext; +import org.apache.reef.driver.evaluator.EvaluatorDescriptor; +import org.apache.reef.tang.Configuration; +import org.apache.reef.util.Optional; + +import java.util.logging.Level; +import java.util.logging.Logger; + +/** + * Active context bridge. + */ +public final class ActiveContextBridge implements ActiveContext { + + private static final Logger LOG = Logger.getLogger(ActiveContextBridge.class.getName()); + + private final IDriverServiceClient driverServiceClient; + + private final String contextId; + + private final Optional<String> parentId; + + private final String evaluatorId; + + private final EvaluatorDescriptor evaluatorDescriptor; + + public ActiveContextBridge( + final IDriverServiceClient driverServiceClient, + final String contextId, + final Optional<String> parentId, + final String evaluatorId, + final EvaluatorDescriptor evaluatorDescriptor) { + this.driverServiceClient = driverServiceClient; + this.contextId = contextId; + this.parentId = parentId; + this.evaluatorId = evaluatorId; + this.evaluatorDescriptor = evaluatorDescriptor; + } + + @Override + public void close() { + LOG.log(Level.INFO, "closing context " + this.contextId); + this.driverServiceClient.onContextClose(this.contextId); + } + + @Override + public void submitTask(final Configuration taskConf) { + LOG.log(Level.INFO, "submitting task via context " + this.contextId); + this.driverServiceClient.onContextSubmitTask(this.contextId, taskConf); + } + + @Override + public void submitContext(final Configuration contextConfiguration) { + LOG.log(Level.INFO, "submitting child context via context " + this.contextId); + this.driverServiceClient.onContextSubmitContext(this.contextId, contextConfiguration); + } + + @Override + public void submitContextAndService( + final Configuration contextConfiguration, + final Configuration serviceConfiguration) { + throw new UnsupportedOperationException("Service not supported"); + } + + @Override + public void sendMessage(final byte[] message) { + LOG.log(Level.INFO, "sending message to context " + this.contextId); + this.driverServiceClient.onContextMessage(this.contextId, message); + } + + @Override + public String getEvaluatorId() { + return this.evaluatorId; + } + + @Override + public Optional<String> getParentId() { + return this.parentId; + } + + @Override + public EvaluatorDescriptor getEvaluatorDescriptor() { + return this.evaluatorDescriptor; + } + + @Override + public String getId() { + return this.contextId; + } +} http://git-wip-us.apache.org/repos/asf/reef/blob/d243aa2a/lang/java/reef-bridge-proto-java/src/main/java/org/apache/reef/bridge/driver/client/events/AllocatedEvaluatorBridge.java ---------------------------------------------------------------------- diff --git a/lang/java/reef-bridge-proto-java/src/main/java/org/apache/reef/bridge/driver/client/events/AllocatedEvaluatorBridge.java b/lang/java/reef-bridge-proto-java/src/main/java/org/apache/reef/bridge/driver/client/events/AllocatedEvaluatorBridge.java new file mode 100644 index 0000000..2bc7599 --- /dev/null +++ b/lang/java/reef-bridge-proto-java/src/main/java/org/apache/reef/bridge/driver/client/events/AllocatedEvaluatorBridge.java @@ -0,0 +1,166 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.reef.bridge.driver.client.events; + +import org.apache.reef.annotations.audience.Private; +import org.apache.reef.bridge.driver.client.IDriverServiceClient; +import org.apache.reef.bridge.driver.client.JVMClientProcess; +import org.apache.reef.driver.evaluator.AllocatedEvaluator; +import org.apache.reef.driver.evaluator.EvaluatorDescriptor; +import org.apache.reef.driver.evaluator.EvaluatorProcess; +import org.apache.reef.tang.Configuration; +import org.apache.reef.util.Optional; + +import java.io.File; +import java.util.ArrayList; +import java.util.List; + +/** + * Allocated Evaluator Stub. + */ +@Private +public final class AllocatedEvaluatorBridge implements AllocatedEvaluator { + + private final String evaluatorId; + + private final EvaluatorDescriptor evaluatorDescriptor; + + private final IDriverServiceClient driverServiceClient; + + private final List<File> addFileList = new ArrayList<>(); + + private final List<File> addLibraryList = new ArrayList<>(); + + private JVMClientProcess evaluatorProcess = null; + + public AllocatedEvaluatorBridge( + final String evaluatorId, + final EvaluatorDescriptor evaluatorDescriptor, + final IDriverServiceClient driverServiceClient) { + this.evaluatorId = evaluatorId; + this.evaluatorDescriptor = evaluatorDescriptor; + this.driverServiceClient = driverServiceClient; + } + + @Override + public String getId() { + return this.evaluatorId; + } + + @Override + public void addFile(final File file) { + this.addFileList.add(file); + } + + @Override + public void addLibrary(final File file) { + this.addLibraryList.add(file); + } + + @Override + public EvaluatorDescriptor getEvaluatorDescriptor() { + return this.evaluatorDescriptor; + } + + @Override + public void setProcess(final EvaluatorProcess process) { + if (process instanceof JVMClientProcess) { + this.evaluatorProcess = (JVMClientProcess) process; + } else { + throw new IllegalArgumentException(JVMClientProcess.class.getCanonicalName() + " required."); + } + } + + @Override + public void close() { + this.driverServiceClient.onEvaluatorClose(getId()); + } + + @Override + public void submitTask(final Configuration taskConfiguration) { + this.driverServiceClient.onEvaluatorSubmit( + getId(), + Optional.<Configuration>empty(), + Optional.of(taskConfiguration), + this.evaluatorProcess== null ? + Optional.<JVMClientProcess>empty() : + Optional.of(this.evaluatorProcess), + this.addFileList.size() == 0 ? + Optional.<List<File>>empty() : + Optional.of(this.addFileList), + this.addLibraryList.size() == 0 ? + Optional.<List<File>>empty() : + Optional.of(this.addLibraryList)); + } + + @Override + public void submitContext(final Configuration contextConfiguration) { + + this.driverServiceClient.onEvaluatorSubmit( + getId(), + Optional.of(contextConfiguration), + Optional.<Configuration>empty(), + this.evaluatorProcess== null ? + Optional.<JVMClientProcess>empty() : + Optional.of(this.evaluatorProcess), + this.addFileList.size() == 0 ? + Optional.<List<File>>empty() : + Optional.of(this.addFileList), + this.addLibraryList.size() == 0 ? + Optional.<List<File>>empty() : + Optional.of(this.addLibraryList)); + } + + @Override + public void submitContextAndService( + final Configuration contextConfiguration, + final Configuration serviceConfiguration) { + throw new UnsupportedOperationException(); + } + + @Override + public void submitContextAndTask( + final Configuration contextConfiguration, + final Configuration taskConfiguration) { + + this.driverServiceClient.onEvaluatorSubmit( + getId(), + Optional.of(contextConfiguration), + Optional.of(taskConfiguration), + this.evaluatorProcess== null ? + Optional.<JVMClientProcess>empty() : + Optional.of(this.evaluatorProcess), + this.addFileList.size() == 0 ? + Optional.<List<File>>empty() : + Optional.of(this.addFileList), + this.addLibraryList.size() == 0 ? + Optional.<List<File>>empty() : + Optional.of(this.addLibraryList)); + } + + @Override + public void submitContextAndServiceAndTask( + final Configuration contextConfiguration, + final Configuration serviceConfiguration, + final Configuration taskConfiguration) { + throw new UnsupportedOperationException(); + } + + +} http://git-wip-us.apache.org/repos/asf/reef/blob/d243aa2a/lang/java/reef-bridge-proto-java/src/main/java/org/apache/reef/bridge/driver/client/events/ClosedContextBridge.java ---------------------------------------------------------------------- diff --git a/lang/java/reef-bridge-proto-java/src/main/java/org/apache/reef/bridge/driver/client/events/ClosedContextBridge.java b/lang/java/reef-bridge-proto-java/src/main/java/org/apache/reef/bridge/driver/client/events/ClosedContextBridge.java new file mode 100644 index 0000000..4527586 --- /dev/null +++ b/lang/java/reef-bridge-proto-java/src/main/java/org/apache/reef/bridge/driver/client/events/ClosedContextBridge.java @@ -0,0 +1,77 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.reef.bridge.driver.client.events; + +import org.apache.reef.annotations.audience.Private; +import org.apache.reef.driver.context.ActiveContext; +import org.apache.reef.driver.context.ClosedContext; +import org.apache.reef.driver.evaluator.EvaluatorDescriptor; +import org.apache.reef.util.Optional; + +/** + * Closed context bridge. + */ +@Private +public final class ClosedContextBridge implements ClosedContext { + + private final String contextId; + + private final String evaluatorId; + + private final ActiveContext parentContext; + + private final EvaluatorDescriptor evaluatorDescriptor; + + public ClosedContextBridge( + final String contextId, + final String evaluatorId, + final ActiveContext parentContext, + final EvaluatorDescriptor evaluatorDescriptor) { + this.contextId = contextId; + this.evaluatorId = evaluatorId; + this.parentContext = parentContext; + this.evaluatorDescriptor = evaluatorDescriptor; + } + + @Override + public ActiveContext getParentContext() { + return this.parentContext; + } + + @Override + public String getId() { + return this.contextId; + } + + @Override + public String getEvaluatorId() { + return this.evaluatorId; + } + + @Override + public Optional<String> getParentId() { + return Optional.of(this.parentContext.getId()); + } + + @Override + public EvaluatorDescriptor getEvaluatorDescriptor() { + return this.evaluatorDescriptor; + } +} http://git-wip-us.apache.org/repos/asf/reef/blob/d243aa2a/lang/java/reef-bridge-proto-java/src/main/java/org/apache/reef/bridge/driver/client/events/CompletedEvaluatorBridge.java ---------------------------------------------------------------------- diff --git a/lang/java/reef-bridge-proto-java/src/main/java/org/apache/reef/bridge/driver/client/events/CompletedEvaluatorBridge.java b/lang/java/reef-bridge-proto-java/src/main/java/org/apache/reef/bridge/driver/client/events/CompletedEvaluatorBridge.java new file mode 100644 index 0000000..4718fc2 --- /dev/null +++ b/lang/java/reef-bridge-proto-java/src/main/java/org/apache/reef/bridge/driver/client/events/CompletedEvaluatorBridge.java @@ -0,0 +1,39 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.reef.bridge.driver.client.events; + +import org.apache.reef.driver.evaluator.CompletedEvaluator; + +/** + * Completed Evaluator bridge. + */ +public final class CompletedEvaluatorBridge implements CompletedEvaluator { + + private final String id; + + public CompletedEvaluatorBridge(final String id) { + this.id = id; + } + + @Override + public String getId() { + return this.id; + } +} http://git-wip-us.apache.org/repos/asf/reef/blob/d243aa2a/lang/java/reef-bridge-proto-java/src/main/java/org/apache/reef/bridge/driver/client/events/CompletedTaskBridge.java ---------------------------------------------------------------------- diff --git a/lang/java/reef-bridge-proto-java/src/main/java/org/apache/reef/bridge/driver/client/events/CompletedTaskBridge.java b/lang/java/reef-bridge-proto-java/src/main/java/org/apache/reef/bridge/driver/client/events/CompletedTaskBridge.java new file mode 100644 index 0000000..77d2379 --- /dev/null +++ b/lang/java/reef-bridge-proto-java/src/main/java/org/apache/reef/bridge/driver/client/events/CompletedTaskBridge.java @@ -0,0 +1,61 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.reef.bridge.driver.client.events; + +import org.apache.reef.annotations.audience.Private; +import org.apache.reef.driver.context.ActiveContext; +import org.apache.reef.driver.task.CompletedTask; + +/** + * Completed task bridge. + */ +@Private +public final class CompletedTaskBridge implements CompletedTask { + + private final String taskId; + + private final ActiveContext context; + + private final byte[] result; + + public CompletedTaskBridge( + final String taskId, + final ActiveContext context, + final byte[] result) { + this.taskId = taskId; + this.context = context; + this.result = result; + } + + @Override + public ActiveContext getActiveContext() { + return this.context; + } + + @Override + public String getId() { + return this.taskId; + } + + @Override + public byte[] get() { + return this.result; + } +} http://git-wip-us.apache.org/repos/asf/reef/blob/d243aa2a/lang/java/reef-bridge-proto-java/src/main/java/org/apache/reef/bridge/driver/client/events/ContextMessageBridge.java ---------------------------------------------------------------------- diff --git a/lang/java/reef-bridge-proto-java/src/main/java/org/apache/reef/bridge/driver/client/events/ContextMessageBridge.java b/lang/java/reef-bridge-proto-java/src/main/java/org/apache/reef/bridge/driver/client/events/ContextMessageBridge.java new file mode 100644 index 0000000..f208735 --- /dev/null +++ b/lang/java/reef-bridge-proto-java/src/main/java/org/apache/reef/bridge/driver/client/events/ContextMessageBridge.java @@ -0,0 +1,69 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.reef.bridge.driver.client.events; + +import org.apache.reef.annotations.audience.Private; +import org.apache.reef.driver.context.ContextMessage; + +/** + * Context message bridge. + */ +@Private +public final class ContextMessageBridge implements ContextMessage { + + private final String contextId; + + private final String messageSourceId; + + private final long sequenceNumber; + + private final byte[] message; + + public ContextMessageBridge( + final String contextId, + final String messageSourceId, + final long sequenceNumber, + final byte[] message) { + this.contextId = contextId; + this.messageSourceId = messageSourceId; + this.sequenceNumber = sequenceNumber; + this.message = message; + } + + @Override + public byte[] get() { + return this.message; + } + + @Override + public String getId() { + return this.contextId; + } + + @Override + public String getMessageSourceID() { + return this.messageSourceId; + } + + @Override + public long getSequenceNumber() { + return this.sequenceNumber; + } +} http://git-wip-us.apache.org/repos/asf/reef/blob/d243aa2a/lang/java/reef-bridge-proto-java/src/main/java/org/apache/reef/bridge/driver/client/events/FailedContextBridge.java ---------------------------------------------------------------------- diff --git a/lang/java/reef-bridge-proto-java/src/main/java/org/apache/reef/bridge/driver/client/events/FailedContextBridge.java b/lang/java/reef-bridge-proto-java/src/main/java/org/apache/reef/bridge/driver/client/events/FailedContextBridge.java new file mode 100644 index 0000000..1c315bb --- /dev/null +++ b/lang/java/reef-bridge-proto-java/src/main/java/org/apache/reef/bridge/driver/client/events/FailedContextBridge.java @@ -0,0 +1,110 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.reef.bridge.driver.client.events; + +import org.apache.reef.driver.context.ActiveContext; +import org.apache.reef.driver.context.FailedContext; +import org.apache.reef.driver.evaluator.EvaluatorDescriptor; +import org.apache.reef.exception.EvaluatorException; +import org.apache.reef.util.Optional; + +/** + * Failed context bridge. + */ +public final class FailedContextBridge implements FailedContext { + + private final String contextId; + + private final String evaluatorId; + + private final String message; + + private final EvaluatorDescriptor evaluatorDescriptor; + + private final Optional<ActiveContext> parentContext; + + private final Optional<byte[]> data; + + public FailedContextBridge( + final String contextId, + final String evaluatorId, + final String message, + final EvaluatorDescriptor evaluatorDescriptor, + final Optional<ActiveContext> parentContext, + final Optional<byte[]> data) { + this.contextId = contextId; + this.evaluatorId = evaluatorId; + this.message = message; + this.evaluatorDescriptor = evaluatorDescriptor; + this.parentContext = parentContext; + this.data = data; + } + + @Override + public Optional<ActiveContext> getParentContext() { + return this.parentContext; + } + + @Override + public String getMessage() { + return this.message; + } + + @Override + public Optional<String> getDescription() { + return Optional.of(message); + } + + @Override + public Optional<Throwable> getReason() { + return Optional.<Throwable>of(new EvaluatorException(this.evaluatorId, this.message)); + } + + @Override + public Optional<byte[]> getData() { + return this.data; + } + + @Override + public Throwable asError() { + return new EvaluatorException(this.evaluatorId, this.message); + } + + @Override + public String getEvaluatorId() { + return this.evaluatorId; + } + + @Override + public Optional<String> getParentId() { + return this.parentContext.isPresent() ? + Optional.of(this.parentContext.get().getId()) : Optional.<String>empty(); + } + + @Override + public EvaluatorDescriptor getEvaluatorDescriptor() { + return this.evaluatorDescriptor; + } + + @Override + public String getId() { + return this.contextId; + } +} http://git-wip-us.apache.org/repos/asf/reef/blob/d243aa2a/lang/java/reef-bridge-proto-java/src/main/java/org/apache/reef/bridge/driver/client/events/FailedEvaluatorBridge.java ---------------------------------------------------------------------- diff --git a/lang/java/reef-bridge-proto-java/src/main/java/org/apache/reef/bridge/driver/client/events/FailedEvaluatorBridge.java b/lang/java/reef-bridge-proto-java/src/main/java/org/apache/reef/bridge/driver/client/events/FailedEvaluatorBridge.java new file mode 100644 index 0000000..64b268e --- /dev/null +++ b/lang/java/reef-bridge-proto-java/src/main/java/org/apache/reef/bridge/driver/client/events/FailedEvaluatorBridge.java @@ -0,0 +1,75 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.reef.bridge.driver.client.events; + +import org.apache.reef.annotations.audience.Private; +import org.apache.reef.driver.context.FailedContext; +import org.apache.reef.driver.evaluator.FailedEvaluator; +import org.apache.reef.driver.task.FailedTask; +import org.apache.reef.exception.EvaluatorException; +import org.apache.reef.util.Optional; + +import java.util.List; + +/** + * Failed Evaluator bridge. + */ +@Private +public final class FailedEvaluatorBridge implements FailedEvaluator { + + private final String id; + + private final EvaluatorException evaluatorException; + + private final List<FailedContext> failedContextList; + + private Optional<FailedTask> failedTask; + + public FailedEvaluatorBridge( + final String id, + final EvaluatorException evaluatorException, + final List<FailedContext> failedContextList, + final Optional<FailedTask> failedTask) { + this.id = id; + this.evaluatorException = evaluatorException; + this.failedContextList = failedContextList; + this.failedTask = failedTask; + } + + @Override + public EvaluatorException getEvaluatorException() { + return this.evaluatorException; + } + + @Override + public List<FailedContext> getFailedContextList() { + return this.failedContextList; + } + + @Override + public Optional<FailedTask> getFailedTask() { + return this.failedTask; + } + + @Override + public String getId() { + return this.id; + } +} http://git-wip-us.apache.org/repos/asf/reef/blob/d243aa2a/lang/java/reef-bridge-proto-java/src/main/java/org/apache/reef/bridge/driver/client/events/RunningTaskBridge.java ---------------------------------------------------------------------- diff --git a/lang/java/reef-bridge-proto-java/src/main/java/org/apache/reef/bridge/driver/client/events/RunningTaskBridge.java b/lang/java/reef-bridge-proto-java/src/main/java/org/apache/reef/bridge/driver/client/events/RunningTaskBridge.java new file mode 100644 index 0000000..d6d3f5e --- /dev/null +++ b/lang/java/reef-bridge-proto-java/src/main/java/org/apache/reef/bridge/driver/client/events/RunningTaskBridge.java @@ -0,0 +1,90 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.reef.bridge.driver.client.events; + +import org.apache.reef.annotations.audience.Private; +import org.apache.reef.bridge.driver.client.IDriverServiceClient; +import org.apache.reef.driver.context.ActiveContext; +import org.apache.reef.driver.task.RunningTask; +import org.apache.reef.runtime.common.driver.task.TaskRepresenter; +import org.apache.reef.util.Optional; + +/** + * Running task bridge. + */ +@Private +public final class RunningTaskBridge implements RunningTask { + + private final IDriverServiceClient driverServiceClient; + + private final String taskId; + + private final ActiveContext context; + + + public RunningTaskBridge( + final IDriverServiceClient driverServiceClient, + final String taskId, + final ActiveContext context) { + this.driverServiceClient = driverServiceClient; + this.taskId = taskId; + this.context = context; + } + + @Override + public ActiveContext getActiveContext() { + return this.context; + } + + @Override + public void send(final byte[] message) { + this.driverServiceClient.onTaskMessage(this.taskId, message); + } + + @Override + public void suspend(final byte[] message) { + throw new UnsupportedOperationException("Suspend task not supported"); + } + + @Override + public void suspend() { + throw new UnsupportedOperationException("Suspend task not supported"); + } + + @Override + public void close(final byte[] message) { + this.driverServiceClient.onTaskClose(this.taskId, Optional.of(message)); + } + + @Override + public void close() { + this.driverServiceClient.onTaskClose(this.taskId, Optional.<byte[]>empty()); + } + + @Override + public TaskRepresenter getTaskRepresenter() { + throw new UnsupportedOperationException("Not a public API"); + } + + @Override + public String getId() { + return this.taskId; + } +}
