http://git-wip-us.apache.org/repos/asf/reef/blob/d243aa2a/lang/java/reef-bridge-proto-java/src/main/java/org/apache/reef/bridge/driver/client/events/TaskMessageBridge.java ---------------------------------------------------------------------- diff --git a/lang/java/reef-bridge-proto-java/src/main/java/org/apache/reef/bridge/driver/client/events/TaskMessageBridge.java b/lang/java/reef-bridge-proto-java/src/main/java/org/apache/reef/bridge/driver/client/events/TaskMessageBridge.java new file mode 100644 index 0000000..41132a3 --- /dev/null +++ b/lang/java/reef-bridge-proto-java/src/main/java/org/apache/reef/bridge/driver/client/events/TaskMessageBridge.java @@ -0,0 +1,78 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.reef.bridge.driver.client.events; + +import org.apache.reef.annotations.audience.Private; +import org.apache.reef.driver.task.TaskMessage; + +/** + * Task message bridge. + */ +@Private +public final class TaskMessageBridge implements TaskMessage { + + private final String taskId; + + private final String contextId; + + private final String messageSourceId; + + private final long sequenceNumber; + + private final byte[] message; + + public TaskMessageBridge( + final String taskId, + final String contextId, + final String messageSourceId, + final long sequenceNumber, + final byte[] message) { + this.taskId = taskId; + this.contextId = contextId; + this.messageSourceId = messageSourceId; + this.sequenceNumber = sequenceNumber; + this.message = message; + } + + @Override + public byte[] get() { + return this.message; + } + + @Override + public String getId() { + return this.taskId; + } + + @Override + public long getSequenceNumber() { + return this.sequenceNumber; + } + + @Override + public String getContextId() { + return this.contextId; + } + + @Override + public String getMessageSourceID() { + return this.messageSourceId; + } +}
http://git-wip-us.apache.org/repos/asf/reef/blob/d243aa2a/lang/java/reef-bridge-proto-java/src/main/java/org/apache/reef/bridge/driver/client/events/package-info.java ---------------------------------------------------------------------- diff --git a/lang/java/reef-bridge-proto-java/src/main/java/org/apache/reef/bridge/driver/client/events/package-info.java b/lang/java/reef-bridge-proto-java/src/main/java/org/apache/reef/bridge/driver/client/events/package-info.java new file mode 100644 index 0000000..0ba5ba4 --- /dev/null +++ b/lang/java/reef-bridge-proto-java/src/main/java/org/apache/reef/bridge/driver/client/events/package-info.java @@ -0,0 +1,22 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +/** + * REEF event stubs. + */ +package org.apache.reef.bridge.driver.client.events; http://git-wip-us.apache.org/repos/asf/reef/blob/d243aa2a/lang/java/reef-bridge-proto-java/src/main/java/org/apache/reef/bridge/driver/client/grpc/DriverClientGrpcConfiguration.java ---------------------------------------------------------------------- diff --git a/lang/java/reef-bridge-proto-java/src/main/java/org/apache/reef/bridge/driver/client/grpc/DriverClientGrpcConfiguration.java b/lang/java/reef-bridge-proto-java/src/main/java/org/apache/reef/bridge/driver/client/grpc/DriverClientGrpcConfiguration.java new file mode 100644 index 0000000..496b3fc --- /dev/null +++ b/lang/java/reef-bridge-proto-java/src/main/java/org/apache/reef/bridge/driver/client/grpc/DriverClientGrpcConfiguration.java @@ -0,0 +1,42 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.reef.bridge.driver.client.grpc; + +import org.apache.reef.bridge.driver.client.IDriverClientService; +import org.apache.reef.bridge.driver.client.IDriverServiceClient; +import org.apache.reef.bridge.driver.client.grpc.parameters.DriverServicePort; +import org.apache.reef.tang.formats.ConfigurationModule; +import org.apache.reef.tang.formats.ConfigurationModuleBuilder; +import org.apache.reef.tang.formats.RequiredParameter; + +/** + * Configuration module for Grpc runtime. + */ +public final class DriverClientGrpcConfiguration extends ConfigurationModuleBuilder { + + public static final RequiredParameter<Integer> DRIVER_SERVICE_PORT = new RequiredParameter<>(); + + public static final ConfigurationModule CONF = new DriverClientGrpcConfiguration() + .bindImplementation(IDriverClientService.class, DriverClientService.class) + .bindImplementation(IDriverServiceClient.class, DriverServiceClient.class) + .bindNamedParameter(DriverServicePort.class, DRIVER_SERVICE_PORT) + .build(); + +} http://git-wip-us.apache.org/repos/asf/reef/blob/d243aa2a/lang/java/reef-bridge-proto-java/src/main/java/org/apache/reef/bridge/driver/client/grpc/DriverClientService.java ---------------------------------------------------------------------- diff --git a/lang/java/reef-bridge-proto-java/src/main/java/org/apache/reef/bridge/driver/client/grpc/DriverClientService.java b/lang/java/reef-bridge-proto-java/src/main/java/org/apache/reef/bridge/driver/client/grpc/DriverClientService.java new file mode 100644 index 0000000..12fd334 --- /dev/null +++ b/lang/java/reef-bridge-proto-java/src/main/java/org/apache/reef/bridge/driver/client/grpc/DriverClientService.java @@ -0,0 +1,611 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.reef.bridge.driver.client.grpc; + +import com.google.common.collect.Lists; +import io.grpc.Server; +import io.grpc.ServerBuilder; +import io.grpc.Status; +import io.grpc.stub.StreamObserver; +import org.apache.commons.lang.StringUtils; +import org.apache.reef.bridge.driver.client.DriverClientDispatcher; +import org.apache.reef.bridge.driver.client.IDriverClientService; +import org.apache.reef.bridge.driver.client.JVMClientProcess; +import org.apache.reef.bridge.driver.client.events.*; +import org.apache.reef.bridge.proto.*; +import org.apache.reef.bridge.proto.Void; +import org.apache.reef.driver.context.ActiveContext; +import org.apache.reef.driver.context.FailedContext; +import org.apache.reef.driver.evaluator.EvaluatorDescriptor; +import org.apache.reef.driver.restart.DriverRestartCompleted; +import org.apache.reef.driver.restart.DriverRestarted; +import org.apache.reef.driver.task.FailedTask; +import org.apache.reef.exception.EvaluatorException; +import org.apache.reef.runtime.common.driver.evaluator.EvaluatorDescriptorImpl; +import org.apache.reef.tang.InjectionFuture; +import org.apache.reef.util.Optional; +import org.apache.reef.wake.remote.ports.TcpPortProvider; +import org.apache.reef.wake.time.Clock; +import org.apache.reef.wake.time.Time; +import org.apache.reef.wake.time.event.StartTime; +import org.apache.reef.wake.time.event.StopTime; + +import javax.inject.Inject; +import java.io.IOException; +import java.util.*; +import java.util.logging.Level; +import java.util.logging.Logger; + +/** + * The driver client service that accepts incoming messages driver service and + * dispatches appropriate objects to the application. + */ +public final class DriverClientService extends DriverClientGrpc.DriverClientImplBase + implements IDriverClientService { + + private static final Logger LOG = Logger.getLogger(DriverClientService.class.getName()); + + private Server server; + + private final InjectionFuture<Clock> clock; + + private final DriverServiceClient driverServiceClient; + + private final TcpPortProvider tcpPortProvider; + + private final InjectionFuture<DriverClientDispatcher> clientDriverDispatcher; + + private final Map<String, AllocatedEvaluatorBridge> evaluatorBridgeMap = new HashMap<>(); + + private final Map<String, ActiveContextBridge> activeContextBridgeMap = new HashMap<>(); + + private boolean isIdle = false; + + @Inject + private DriverClientService( + final DriverServiceClient driverServiceClient, + final TcpPortProvider tcpPortProvider, + final InjectionFuture<Clock> clock, + final InjectionFuture<DriverClientDispatcher> clientDriverDispatcher) { + this.driverServiceClient = driverServiceClient; + this.tcpPortProvider = tcpPortProvider; + this.clock = clock; + this.clientDriverDispatcher = clientDriverDispatcher; + } + + void setNotIdle() { + this.isIdle = false; + } + + @Override + public void start() throws IOException { + for (final Integer port : this.tcpPortProvider) { + try { + this.server = ServerBuilder.forPort(port) + .addService(this) + .build() + .start(); + LOG.info("Driver Client Server started, listening on " + port); + break; + } catch (IOException e) { + LOG.log(Level.WARNING, "Unable to bind to port [{0}]", port); + } + } + if (this.server == null || this.server.isTerminated()) { + throw new IOException("Unable to start gRPC server"); + } + this.driverServiceClient.registerDriverClientService("localhost", this.server.getPort()); + } + + @Override + public void awaitTermination() throws InterruptedException { + if (this.server != null) { + this.server.awaitTermination(); + } + } + + @Override + public void idlenessCheckHandler(final Void request, final StreamObserver<IdleStatus> responseObserver) { + if (clock.get().isIdle() && this.evaluatorBridgeMap.isEmpty()) { + LOG.log(Level.INFO, "possibly idle. waiting for some action."); + this.isIdle = true; + try { + Thread.sleep(5000); + } catch (InterruptedException e) { + LOG.log(Level.WARNING, e.getMessage()); + } + } else { + LOG.log(Level.INFO, "not idle"); + this.isIdle = false; + } + responseObserver.onNext(IdleStatus.newBuilder() + .setReason("DriverClient checking idleness") + .setIsIdle(this.isIdle) + .build()); + responseObserver.onCompleted(); + } + + @Override + public void startHandler(final StartTimeInfo request, final StreamObserver<Void> responseObserver) { + try { + LOG.log(Level.INFO, "StartHandler at time {0}", request.getStartTime()); + final StartTime startTime = new StartTime(request.getStartTime()); + this.clientDriverDispatcher.get().dispatch(startTime); + } finally { + responseObserver.onNext(null); + responseObserver.onCompleted(); + } + } + + @Override + public void stopHandler(final StopTimeInfo request, final StreamObserver<Void> responseObserver) { + try { + LOG.log(Level.INFO, "StopHandler at time {0}", request.getStopTime()); + final StopTime stopTime = new StopTime(request.getStopTime()); + this.clientDriverDispatcher.get().dispatch(stopTime); + } finally { + responseObserver.onNext(null); + responseObserver.onCompleted(); + } + } + + @Override + public void alarmTrigger(final AlarmTriggerInfo request, final StreamObserver<Void> responseObserver) { + try { + LOG.log(Level.INFO, "Alarm Trigger id {0}", request.getAlarmId()); + this.clientDriverDispatcher.get().dispatchAlarm(request.getAlarmId()); + } finally { + responseObserver.onNext(null); + responseObserver.onCompleted(); + } + } + + @Override + public void allocatedEvaluatorHandler(final EvaluatorInfo request, final StreamObserver<Void> responseObserver) { + try { + this.isIdle = false; + LOG.log(Level.INFO, "Allocated evaluator id {0}", request.getEvaluatorId()); + final AllocatedEvaluatorBridge eval = new AllocatedEvaluatorBridge( + request.getEvaluatorId(), + toEvaluatorDescriptor(request.getDescriptorInfo()), + this.driverServiceClient); + this.evaluatorBridgeMap.put(eval.getId(), eval); + this.clientDriverDispatcher.get().dispatch(eval); + } finally { + responseObserver.onNext(null); + responseObserver.onCompleted(); + } + } + + @Override + public void completedEvaluatorHandler(final EvaluatorInfo request, final StreamObserver<Void> responseObserver) { + try { + LOG.log(Level.INFO, "Completed Evaluator id {0}", request.getEvaluatorId()); + this.evaluatorBridgeMap.remove(request.getEvaluatorId()); + this.clientDriverDispatcher.get().dispatch(new CompletedEvaluatorBridge(request.getEvaluatorId())); + } finally { + responseObserver.onNext(null); + responseObserver.onCompleted(); + } + } + + @Override + public void failedEvaluatorHandler(final EvaluatorInfo request, final StreamObserver<Void> responseObserver) { + try { + LOG.log(Level.INFO, "Failed Evaluator id {0}", request.getEvaluatorId()); + final AllocatedEvaluatorBridge eval = this.evaluatorBridgeMap.remove(request.getEvaluatorId()); + List<FailedContext> failedContextList = new ArrayList<>(); + if (request.getFailure().getFailedContextsList() != null) { + for (final String failedContextId : request.getFailure().getFailedContextsList()) { + final ActiveContextBridge context = this.activeContextBridgeMap.get(failedContextId); + failedContextList.add(new FailedContextBridge( + context.getId(), + eval.getId(), + request.getFailure().getMessage(), + eval.getEvaluatorDescriptor(), + context.getParentId().isPresent() ? + Optional.<ActiveContext>of(this.activeContextBridgeMap.get(context.getParentId().get())) : + Optional.<ActiveContext>empty(), + Optional.<byte[]>empty())); + } + for (final String failedContextId : request.getFailure().getFailedContextsList()) { + this.activeContextBridgeMap.remove(failedContextId); + } + } + this.clientDriverDispatcher.get().dispatch( + new FailedEvaluatorBridge( + eval.getId(), + new EvaluatorException(request.getEvaluatorId(), request.getFailure().getMessage()), + failedContextList, + request.getFailure().getFailedTaskId() != null ? + Optional.of(new FailedTask( + request.getFailure().getFailedTaskId(), + request.getFailure().getMessage(), + Optional.<String>empty(), + Optional.<Throwable>empty(), + Optional.<byte[]>empty(), + Optional.<ActiveContext>empty())) : + Optional.<FailedTask>empty())); + } finally { + responseObserver.onNext(null); + responseObserver.onCompleted(); + } + } + + @Override + public void activeContextHandler(final ContextInfo request, final StreamObserver<Void> responseObserver) { + try { + this.isIdle = false; + LOG.log(Level.INFO, "Active context id {0}", request.getContextId()); + final AllocatedEvaluatorBridge eval = this.evaluatorBridgeMap.get(request.getEvaluatorId()); + final ActiveContextBridge context = new ActiveContextBridge( + this.driverServiceClient, + request.getContextId(), + request.getParentId() != null ? Optional.of(request.getParentId()) : Optional.<String>empty(), + eval.getId(), + eval.getEvaluatorDescriptor()); + this.activeContextBridgeMap.put(context.getId(), context); + this.clientDriverDispatcher.get().dispatch(context); + } finally { + responseObserver.onNext(null); + responseObserver.onCompleted(); + } + } + + @Override + public void closedContextHandler(final ContextInfo request, final StreamObserver<Void> responseObserver) { + if (this.activeContextBridgeMap.containsKey(request.getContextId())) { + LOG.log(Level.INFO, "Closed context id {0}", request.getContextId()); + try { + final ActiveContextBridge context = this.activeContextBridgeMap.remove(request.getContextId()); + this.clientDriverDispatcher.get().dispatch( + new ClosedContextBridge( + context.getId(), + context.getEvaluatorId(), + this.activeContextBridgeMap.get(request.getParentId()), + context.getEvaluatorDescriptor())); + } finally { + responseObserver.onNext(null); + responseObserver.onCompleted(); + } + } else { + responseObserver.onError(Status.INTERNAL + .withDescription("Unknown context id " + request.getContextId() + " in close") + .asRuntimeException()); + } + } + + @Override + public void failedContextHandler(final ContextInfo request, final StreamObserver<Void> responseObserver) { + if (this.activeContextBridgeMap.containsKey(request.getContextId())) { + LOG.log(Level.INFO, "Failed context id {0}", request.getContextId()); + try { + final ActiveContextBridge context = this.activeContextBridgeMap.remove(request.getContextId()); + final Optional<ActiveContext> parent = context.getParentId().isPresent() ? + Optional.<ActiveContext>of(this.activeContextBridgeMap.get(context.getParentId().get())) : + Optional.<ActiveContext>empty(); + final Optional<byte[]> data = request.getException().getData() != null ? + Optional.of(request.getException().getData().toByteArray()) : Optional.<byte[]>empty(); + this.clientDriverDispatcher.get().dispatch( + new FailedContextBridge( + context.getId(), + context.getEvaluatorId(), + request.getException().getMessage(), + context.getEvaluatorDescriptor(), + parent, + data)); + } finally { + responseObserver.onNext(null); + responseObserver.onCompleted(); + } + } else { + responseObserver.onError(Status.INTERNAL + .withDescription("Unknown context id " + request.getContextId() + " in close") + .asRuntimeException()); + } + } + + @Override + public void contextMessageHandler(final ContextMessageInfo request, final StreamObserver<Void> responseObserver) { + if (this.activeContextBridgeMap.containsKey(request.getContextId())) { + LOG.log(Level.INFO, "Message context id {0}", request.getContextId()); + try { + this.clientDriverDispatcher.get().dispatch( + new ContextMessageBridge( + request.getContextId(), + request.getMessageSourceId(), + request.getSequenceNumber(), + request.getPayload().toByteArray())); + } finally { + responseObserver.onNext(null); + responseObserver.onCompleted(); + } + } else { + responseObserver.onError(Status.INTERNAL + .withDescription("Unknown context id " + request.getContextId() + " in close") + .asRuntimeException()); + } + } + + @Override + public void runningTaskHandler(final TaskInfo request, final StreamObserver<Void> responseObserver) { + final ContextInfo contextInfo = request.getContext(); + if (!this.activeContextBridgeMap.containsKey(contextInfo.getContextId())) { + this.activeContextBridgeMap.put(contextInfo.getContextId(), toActiveContext(contextInfo)); + } + + LOG.log(Level.INFO, "Running task id {0}", request.getTaskId()); + try { + final ActiveContextBridge context = this.activeContextBridgeMap.get(contextInfo.getContextId()); + this.clientDriverDispatcher.get().dispatch( + new RunningTaskBridge(this.driverServiceClient, request.getTaskId(), context)); + } finally { + responseObserver.onNext(null); + responseObserver.onCompleted(); + } + } + + @Override + public void failedTaskHandler(final TaskInfo request, final StreamObserver<Void> responseObserver) { + if (request.hasContext() && !this.activeContextBridgeMap.containsKey(request.getContext().getContextId())) { + this.activeContextBridgeMap.put(request.getContext().getContextId(), toActiveContext(request.getContext())); + } + try { + LOG.log(Level.INFO, "Failed task id {0}", request.getTaskId()); + final Optional<ActiveContext> context = + this.activeContextBridgeMap.containsKey(request.getContext().getContextId()) ? + Optional.<ActiveContext>of(this.activeContextBridgeMap.get(request.getContext().getContextId())) : + Optional.<ActiveContext>empty(); + final Optional<byte[]> data = request.getException().getData() != null ? + Optional.of(request.getException().getData().toByteArray()) : Optional.<byte[]>empty(); + this.clientDriverDispatcher.get().dispatch( + new FailedTask( + request.getTaskId(), + request.getException().getMessage(), + Optional.of(request.getException().getName()), + Optional.<Throwable>of(new EvaluatorException(request.getException().getMessage())), + data, + context)); + } finally { + responseObserver.onNext(null); + responseObserver.onCompleted(); + } + } + + @Override + public void completedTaskHandler(final TaskInfo request, final StreamObserver<Void> responseObserver) { + final ContextInfo contextInfo = request.getContext(); + if (!this.activeContextBridgeMap.containsKey(contextInfo.getContextId())) { + this.activeContextBridgeMap.put(contextInfo.getContextId(), toActiveContext(contextInfo)); + } + LOG.log(Level.INFO, "Completed task id {0}", request.getTaskId()); + try { + final ActiveContextBridge context = this.activeContextBridgeMap.get(request.getContext().getContextId()); + this.clientDriverDispatcher.get().dispatch( + new CompletedTaskBridge( + request.getTaskId(), + context, + request.getResult() != null ? request.getResult().toByteArray() : null)); + } finally { + responseObserver.onNext(null); + responseObserver.onCompleted(); + } + } + + @Override + public void suspendedTaskHandler(final TaskInfo request, final StreamObserver<Void> responseObserver) { + responseObserver.onError(Status.INTERNAL.withDescription("Not supported").asRuntimeException()); + } + + @Override + public void taskMessageHandler(final TaskMessageInfo request, final StreamObserver<Void> responseObserver) { + if (this.activeContextBridgeMap.containsKey(request.getContextId())) { + LOG.log(Level.INFO, "Message task id {0}", request.getTaskId()); + try { + this.clientDriverDispatcher.get().dispatch( + new TaskMessageBridge( + request.getTaskId(), + request.getContextId(), + request.getMessageSourceId(), + request.getSequenceNumber(), + request.getPayload().toByteArray())); + } finally { + responseObserver.onNext(null); + responseObserver.onCompleted(); + } + } else { + responseObserver.onError(Status.INTERNAL + .withDescription("Unknown context id: " + request.getContextId()) + .asRuntimeException()); + } + } + + @Override + public void clientMessageHandler(final ClientMessageInfo request, final StreamObserver<Void> responseObserver) { + LOG.log(Level.INFO, "Client message"); + try { + this.clientDriverDispatcher.get().clientMessageDispatch(request.getPayload().toByteArray()); + } finally { + responseObserver.onNext(null); + responseObserver.onCompleted(); + } + } + + @Override + public void clientCloseHandler(final Void request, final StreamObserver<Void> responseObserver) { + LOG.log(Level.INFO, "Client close"); + try { + this.clientDriverDispatcher.get().clientCloseDispatch(); + } finally { + responseObserver.onNext(null); + responseObserver.onCompleted(); + } + } + + @Override + public void clientCloseWithMessageHandler( + final ClientMessageInfo request, + final StreamObserver<Void> responseObserver) { + LOG.log(Level.INFO, "Client close with message"); + try { + this.clientDriverDispatcher.get().clientCloseWithMessageDispatch(request.getPayload().toByteArray()); + } finally { + responseObserver.onNext(null); + responseObserver.onCompleted(); + } + } + + @Override + public void driverRestartHandler(final DriverRestartInfo request, final StreamObserver<Void> responseObserver) { + LOG.log(Level.INFO, "Driver restarted"); + try { + final DriverRestarted driverRestarted = new DriverRestarted() { + @Override + public int getResubmissionAttempts() { + return request.getResubmissionAttempts(); + } + + @Override + public StartTime getStartTime() { + return new StartTime(request.getStartTime().getStartTime()); + } + + @Override + public Set<String> getExpectedEvaluatorIds() { + return new HashSet<>(request.getExpectedEvaluatorIdsList()); + } + }; + this.clientDriverDispatcher.get().dispatchRestart(driverRestarted); + } finally { + responseObserver.onNext(null); + responseObserver.onCompleted(); + } + } + + @Override + public void driverRestartActiveContextHandler( + final ContextInfo request, + final StreamObserver<Void> responseObserver) { + try { + LOG.log(Level.INFO, "Driver restarted active context " + request.getContextId()); + if (!this.evaluatorBridgeMap.containsKey(request.getEvaluatorId())) { + final AllocatedEvaluatorBridge eval = new AllocatedEvaluatorBridge( + request.getEvaluatorId(), + toEvaluatorDescriptor(request.getEvaluatorDescriptorInfo()), + this.driverServiceClient); + this.evaluatorBridgeMap.put(eval.getId(), eval); + } + final ActiveContextBridge context = toActiveContext(request); + this.activeContextBridgeMap.put(context.getId(), context); + this.clientDriverDispatcher.get().dispatchRestart(context); + } finally { + responseObserver.onNext(null); + responseObserver.onCompleted(); + } + } + + @Override + public void driverRestartRunningTaskHandler( + final TaskInfo request, + final StreamObserver<Void> responseObserver) { + try { + LOG.log(Level.INFO, "Driver restarted running task " + request.getTaskId()); + if (!this.evaluatorBridgeMap.containsKey(request.getContext().getEvaluatorId())) { + final AllocatedEvaluatorBridge eval = new AllocatedEvaluatorBridge( + request.getContext().getEvaluatorId(), + toEvaluatorDescriptor(request.getContext().getEvaluatorDescriptorInfo()), + this.driverServiceClient); + this.evaluatorBridgeMap.put(eval.getId(), eval); + } + if (!this.activeContextBridgeMap.containsKey(request.getContext().getContextId())) { + final ActiveContextBridge context = toActiveContext(request.getContext()); + this.activeContextBridgeMap.put(context.getId(), context); + } + final ActiveContextBridge context = this.activeContextBridgeMap.get(request.getContext().getContextId()); + this.clientDriverDispatcher.get().dispatchRestart( + new RunningTaskBridge(this.driverServiceClient, request.getTaskId(), context)); + } finally { + responseObserver.onNext(null); + responseObserver.onCompleted(); + } + } + + @Override + public void driverRestartCompletedHandler( + final DriverRestartCompletedInfo request, + final StreamObserver<Void> responseObserver) { + try { + this.clientDriverDispatcher.get().dispatchRestart(new DriverRestartCompleted() { + @Override + public Time getCompletedTime() { + return new StopTime(request.getCompletionTime().getStopTime()); + } + + @Override + public boolean isTimedOut() { + return request.getIsTimedOut(); + } + }); + } finally { + responseObserver.onNext(null); + responseObserver.onCompleted(); + } + } + + @Override + public void driverRestartFailedEvaluatorHandler( + final EvaluatorInfo request, + final StreamObserver<Void> responseObserver) { + try { + this.clientDriverDispatcher.get().dispatchRestart(new FailedEvaluatorBridge( + request.getEvaluatorId(), + request.getFailure() != null ? + new EvaluatorException(request.getFailure().getMessage()) : + new EvaluatorException("restart failed"), + Lists.<FailedContext>newArrayList(), + Optional.<FailedTask>empty())); + } finally { + responseObserver.onNext(null); + responseObserver.onCompleted(); + } + } + + // Helper methods + + private EvaluatorDescriptor toEvaluatorDescriptor(final EvaluatorDescriptorInfo info) { + return new EvaluatorDescriptorImpl( + null, + info.getMemory(), + info.getCores(), + new JVMClientProcess(), + info.getRuntimeName()); + } + + private ActiveContextBridge toActiveContext(final ContextInfo contextInfo) { + final AllocatedEvaluatorBridge eval = this.evaluatorBridgeMap.get(contextInfo.getEvaluatorId()); + return new ActiveContextBridge( + this.driverServiceClient, + contextInfo.getContextId(), + StringUtils.isNotEmpty(contextInfo.getParentId()) ? + Optional.of(contextInfo.getParentId()) : Optional.<String>empty(), + eval.getId(), + eval.getEvaluatorDescriptor()); + } +} http://git-wip-us.apache.org/repos/asf/reef/blob/d243aa2a/lang/java/reef-bridge-proto-java/src/main/java/org/apache/reef/bridge/driver/client/grpc/DriverServiceClient.java ---------------------------------------------------------------------- diff --git a/lang/java/reef-bridge-proto-java/src/main/java/org/apache/reef/bridge/driver/client/grpc/DriverServiceClient.java b/lang/java/reef-bridge-proto-java/src/main/java/org/apache/reef/bridge/driver/client/grpc/DriverServiceClient.java new file mode 100644 index 0000000..81fb290 --- /dev/null +++ b/lang/java/reef-bridge-proto-java/src/main/java/org/apache/reef/bridge/driver/client/grpc/DriverServiceClient.java @@ -0,0 +1,232 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.reef.bridge.driver.client.grpc; + +import com.google.protobuf.ByteString; +import io.grpc.ManagedChannel; +import io.grpc.ManagedChannelBuilder; +import org.apache.reef.annotations.audience.Private; +import org.apache.reef.bridge.driver.client.IDriverServiceClient; +import org.apache.reef.bridge.driver.client.JVMClientProcess; +import org.apache.reef.bridge.driver.client.grpc.parameters.DriverServicePort; +import org.apache.reef.bridge.proto.*; +import org.apache.reef.driver.context.ContextConfiguration; +import org.apache.reef.driver.evaluator.EvaluatorRequest; +import org.apache.reef.tang.Configuration; +import org.apache.reef.tang.InjectionFuture; +import org.apache.reef.tang.annotations.Parameter; +import org.apache.reef.tang.formats.ConfigurationSerializer; +import org.apache.reef.util.Optional; + +import javax.inject.Inject; +import java.io.File; +import java.util.List; + +/** + * The client that exposes methods for communicating back to the + * driver service. + */ +@Private +public final class DriverServiceClient implements IDriverServiceClient { + + private final InjectionFuture<DriverClientService> driverClientService; + + private final ConfigurationSerializer configurationSerializer; + + private final DriverServiceGrpc.DriverServiceFutureStub serviceStub; + + @Inject + private DriverServiceClient( + final InjectionFuture<DriverClientService> driverClientService, + final ConfigurationSerializer configurationSerializer, + @Parameter(DriverServicePort.class) final Integer driverServicePort) { + this.driverClientService = driverClientService; + this.configurationSerializer = configurationSerializer; + final ManagedChannel channel = ManagedChannelBuilder + .forAddress("localhost", driverServicePort) + .usePlaintext(true) + .build(); + this.serviceStub = DriverServiceGrpc.newFutureStub(channel); + } + + public void registerDriverClientService(final String host, final int port) { + this.serviceStub.registerDriverClient( + DriverClientRegistration.newBuilder() + .setHost(host) + .setPort(port) + .build()); + } + + @Override + public void onShutdown() { + this.serviceStub.shutdown(ShutdownRequest.newBuilder().build()); + } + + @Override + public void onShutdown(final Throwable ex) { + this.serviceStub.shutdown(ShutdownRequest.newBuilder() + .setException(ExceptionInfo.newBuilder() + .setName(ex.getCause() != null ? ex.getCause().toString() : ex.toString()) + .setMessage(ex.getMessage()) + .build()) + .build()); + } + + @Override + public void onSetAlarm(final String alarmId, final int timeoutMS) { + this.driverClientService.get().setNotIdle(); + this.serviceStub.setAlarm( + AlarmRequest.newBuilder() + .setAlarmId(alarmId) + .setTimeoutMs(timeoutMS) + .build()); + } + + @Override + public void onEvaluatorRequest(final EvaluatorRequest evaluatorRequest) { + this.driverClientService.get().setNotIdle(); + this.serviceStub.requestResources( + ResourceRequest.newBuilder() + .setCores(evaluatorRequest.getNumberOfCores()) + .setMemorySize(evaluatorRequest.getMegaBytes()) + .setRelaxLocality(evaluatorRequest.getRelaxLocality()) + .setResourceCount(evaluatorRequest.getNumber()) + .setRuntimeName(evaluatorRequest.getRuntimeName()) + .addAllRackNameList(evaluatorRequest.getRackNames()) + .addAllNodeNameList(evaluatorRequest.getNodeNames()) + .build()); + } + + @Override + public void onEvaluatorClose(final String evalautorId) { + this.serviceStub.allocatedEvaluatorOp( + AllocatedEvaluatorRequest.newBuilder() + .setEvaluatorId(evalautorId) + .setCloseEvaluator(true) + .build()); + } + + @Override + public void onEvaluatorSubmit( + final String evaluatorId, + final Optional<Configuration> contextConfiguration, + final Optional<Configuration> taskConfiguration, + final Optional<JVMClientProcess> evaluatorProcess, + final Optional<List<File>> addFileList, + final Optional<List<File>> addLibraryList) { + final AllocatedEvaluatorRequest.Builder builder = + AllocatedEvaluatorRequest.newBuilder().setEvaluatorId(evaluatorId); + if (addFileList.isPresent()) { + for (final File file : addFileList.get()) { + builder.addAddFiles(file.getAbsolutePath()); + } + } + if (addLibraryList.isPresent()) { + for (final File file : addLibraryList.get()) { + builder.addAddLibraries(file.getAbsolutePath()); + } + } + if (evaluatorProcess.isPresent()) { + final JVMClientProcess rawEP = evaluatorProcess.get(); + builder.setSetProcess( + AllocatedEvaluatorRequest.EvaluatorProcessRequest.newBuilder() + .setConfigurationFileName(rawEP.getConfigurationFileName()) + .setMemoryMb(rawEP.getMemory()) + .setStandardOut(rawEP.getStandardOut()) + .setStandardErr(rawEP.getStandardErr()) + .addAllOptions(rawEP.getOptions()) + .build()); + } + if (contextConfiguration.isPresent()) { + builder.setContextConfiguration( + this.configurationSerializer.toString(contextConfiguration.get())); + } else { + builder.setContextConfiguration(this.configurationSerializer.toString(ContextConfiguration.CONF + .set(ContextConfiguration.IDENTIFIER, "context-" + evaluatorId) + .build())); + } + if (taskConfiguration.isPresent()) { + builder.setTaskConfiguration( + this.configurationSerializer.toString(taskConfiguration.get())); + } + this.serviceStub.allocatedEvaluatorOp(builder.build()); + } + + // Context Operations + + @Override + public void onContextClose(final String contextId) { + this.serviceStub.activeContextOp( + ActiveContextRequest.newBuilder() + .setContextId(contextId) + .setCloseContext(true) + .build()); + } + + @Override + public void onContextSubmitContext( + final String contextId, + final Configuration contextConfiguration) { + this.serviceStub.activeContextOp( + ActiveContextRequest.newBuilder() + .setContextId(contextId) + .setNewContextRequest(this.configurationSerializer.toString(contextConfiguration)) + .build()); + } + + @Override + public void onContextSubmitTask( + final String contextId, + final Configuration taskConfiguration) { + this.serviceStub.activeContextOp( + ActiveContextRequest.newBuilder() + .setContextId(contextId) + .setNewTaskRequest(this.configurationSerializer.toString(taskConfiguration)) + .build()); + } + + @Override + public void onContextMessage(final String contextId, final byte[] message) { + this.serviceStub.activeContextOp( + ActiveContextRequest.newBuilder() + .setContextId(contextId) + .setMessage(ByteString.copyFrom(message)) + .build()); + } + + // Task operations + + @Override + public void onTaskClose(final String taskId, final Optional<byte[]> message) { + this.serviceStub.runningTaskOp(RunningTaskRequest.newBuilder() + .setTaskId(taskId) + .setCloseTask(true) + .setMessage(message.isPresent() ? ByteString.copyFrom(message.get()) : null) + .build()); + } + + @Override + public void onTaskMessage(final String taskId, final byte[] message) { + this.serviceStub.runningTaskOp(RunningTaskRequest.newBuilder() + .setTaskId(taskId) + .setMessage(ByteString.copyFrom(message)) + .build()); + } +} http://git-wip-us.apache.org/repos/asf/reef/blob/d243aa2a/lang/java/reef-bridge-proto-java/src/main/java/org/apache/reef/bridge/driver/client/grpc/package-info.java ---------------------------------------------------------------------- diff --git a/lang/java/reef-bridge-proto-java/src/main/java/org/apache/reef/bridge/driver/client/grpc/package-info.java b/lang/java/reef-bridge-proto-java/src/main/java/org/apache/reef/bridge/driver/client/grpc/package-info.java new file mode 100644 index 0000000..4e74386 --- /dev/null +++ b/lang/java/reef-bridge-proto-java/src/main/java/org/apache/reef/bridge/driver/client/grpc/package-info.java @@ -0,0 +1,22 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +/** + * gRPC specific implementations of the driver client bridge. + */ +package org.apache.reef.bridge.driver.client.grpc; http://git-wip-us.apache.org/repos/asf/reef/blob/d243aa2a/lang/java/reef-bridge-proto-java/src/main/java/org/apache/reef/bridge/driver/client/grpc/parameters/DriverServicePort.java ---------------------------------------------------------------------- diff --git a/lang/java/reef-bridge-proto-java/src/main/java/org/apache/reef/bridge/driver/client/grpc/parameters/DriverServicePort.java b/lang/java/reef-bridge-proto-java/src/main/java/org/apache/reef/bridge/driver/client/grpc/parameters/DriverServicePort.java new file mode 100644 index 0000000..857d94c --- /dev/null +++ b/lang/java/reef-bridge-proto-java/src/main/java/org/apache/reef/bridge/driver/client/grpc/parameters/DriverServicePort.java @@ -0,0 +1,29 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.reef.bridge.driver.client.grpc.parameters; + +import org.apache.reef.tang.annotations.Name; +import org.apache.reef.tang.annotations.NamedParameter; + +/** + * gRPC driver service port. + */ +@NamedParameter(doc = "Driver Service Grpc port", short_name = "driver-service-port") +public final class DriverServicePort implements Name<Integer> { +} http://git-wip-us.apache.org/repos/asf/reef/blob/d243aa2a/lang/java/reef-bridge-proto-java/src/main/java/org/apache/reef/bridge/driver/client/grpc/parameters/package-info.java ---------------------------------------------------------------------- diff --git a/lang/java/reef-bridge-proto-java/src/main/java/org/apache/reef/bridge/driver/client/grpc/parameters/package-info.java b/lang/java/reef-bridge-proto-java/src/main/java/org/apache/reef/bridge/driver/client/grpc/parameters/package-info.java new file mode 100644 index 0000000..d3917ea --- /dev/null +++ b/lang/java/reef-bridge-proto-java/src/main/java/org/apache/reef/bridge/driver/client/grpc/parameters/package-info.java @@ -0,0 +1,22 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +/** + * gRPC specific parameters. + */ +package org.apache.reef.bridge.driver.client.grpc.parameters; http://git-wip-us.apache.org/repos/asf/reef/blob/d243aa2a/lang/java/reef-bridge-proto-java/src/main/java/org/apache/reef/bridge/driver/client/package-info.java ---------------------------------------------------------------------- diff --git a/lang/java/reef-bridge-proto-java/src/main/java/org/apache/reef/bridge/driver/client/package-info.java b/lang/java/reef-bridge-proto-java/src/main/java/org/apache/reef/bridge/driver/client/package-info.java new file mode 100644 index 0000000..bba19b2 --- /dev/null +++ b/lang/java/reef-bridge-proto-java/src/main/java/org/apache/reef/bridge/driver/client/package-info.java @@ -0,0 +1,22 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +/** + * Java bridge client driver. + */ +package org.apache.reef.bridge.driver.client; http://git-wip-us.apache.org/repos/asf/reef/blob/d243aa2a/lang/java/reef-bridge-proto-java/src/main/java/org/apache/reef/bridge/driver/client/parameters/ClientDriverStopHandler.java ---------------------------------------------------------------------- diff --git a/lang/java/reef-bridge-proto-java/src/main/java/org/apache/reef/bridge/driver/client/parameters/ClientDriverStopHandler.java b/lang/java/reef-bridge-proto-java/src/main/java/org/apache/reef/bridge/driver/client/parameters/ClientDriverStopHandler.java new file mode 100644 index 0000000..b19602a --- /dev/null +++ b/lang/java/reef-bridge-proto-java/src/main/java/org/apache/reef/bridge/driver/client/parameters/ClientDriverStopHandler.java @@ -0,0 +1,36 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.reef.bridge.driver.client.parameters; + +import org.apache.reef.bridge.driver.client.DefaultDriverClientStopHandler; +import org.apache.reef.tang.annotations.Name; +import org.apache.reef.tang.annotations.NamedParameter; +import org.apache.reef.wake.EventHandler; +import org.apache.reef.wake.time.event.StopTime; + +import java.util.Set; + +/** + * Client driver stop handler. + */ +@NamedParameter(doc ="Java driver client stop handler", + default_class = DefaultDriverClientStopHandler.class) +public final class ClientDriverStopHandler implements Name<Set<EventHandler<StopTime>>> { +} http://git-wip-us.apache.org/repos/asf/reef/blob/d243aa2a/lang/java/reef-bridge-proto-java/src/main/java/org/apache/reef/bridge/driver/client/parameters/DriverClientDispatchThreadCount.java ---------------------------------------------------------------------- diff --git a/lang/java/reef-bridge-proto-java/src/main/java/org/apache/reef/bridge/driver/client/parameters/DriverClientDispatchThreadCount.java b/lang/java/reef-bridge-proto-java/src/main/java/org/apache/reef/bridge/driver/client/parameters/DriverClientDispatchThreadCount.java new file mode 100644 index 0000000..3a9bf41 --- /dev/null +++ b/lang/java/reef-bridge-proto-java/src/main/java/org/apache/reef/bridge/driver/client/parameters/DriverClientDispatchThreadCount.java @@ -0,0 +1,30 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.reef.bridge.driver.client.parameters; + +import org.apache.reef.tang.annotations.Name; +import org.apache.reef.tang.annotations.NamedParameter; + +/** + * Driver client dispatcher thread count. + */ +@NamedParameter(doc = "Number of dispatch threads", default_value = "1") +public class DriverClientDispatchThreadCount implements Name<Integer> { +} http://git-wip-us.apache.org/repos/asf/reef/blob/d243aa2a/lang/java/reef-bridge-proto-java/src/main/java/org/apache/reef/bridge/driver/client/parameters/package-info.java ---------------------------------------------------------------------- diff --git a/lang/java/reef-bridge-proto-java/src/main/java/org/apache/reef/bridge/driver/client/parameters/package-info.java b/lang/java/reef-bridge-proto-java/src/main/java/org/apache/reef/bridge/driver/client/parameters/package-info.java new file mode 100644 index 0000000..4771cf5 --- /dev/null +++ b/lang/java/reef-bridge-proto-java/src/main/java/org/apache/reef/bridge/driver/client/parameters/package-info.java @@ -0,0 +1,22 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +/** + * Java bridge driver client specific parameters. + */ +package org.apache.reef.bridge.driver.client.parameters; http://git-wip-us.apache.org/repos/asf/reef/blob/d243aa2a/lang/java/reef-bridge-proto-java/src/main/java/org/apache/reef/bridge/driver/service/DriverClientException.java ---------------------------------------------------------------------- diff --git a/lang/java/reef-bridge-proto-java/src/main/java/org/apache/reef/bridge/driver/service/DriverClientException.java b/lang/java/reef-bridge-proto-java/src/main/java/org/apache/reef/bridge/driver/service/DriverClientException.java new file mode 100644 index 0000000..a7ff6c1 --- /dev/null +++ b/lang/java/reef-bridge-proto-java/src/main/java/org/apache/reef/bridge/driver/service/DriverClientException.java @@ -0,0 +1,30 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.reef.bridge.driver.service; + +/** + * An exception thrown by the driver client. + */ +public final class DriverClientException extends Exception { + + public DriverClientException(final String message) { + super(message); + } +} http://git-wip-us.apache.org/repos/asf/reef/blob/d243aa2a/lang/java/reef-bridge-proto-java/src/main/java/org/apache/reef/bridge/driver/service/DriverServiceConfiguration.java ---------------------------------------------------------------------- diff --git a/lang/java/reef-bridge-proto-java/src/main/java/org/apache/reef/bridge/driver/service/DriverServiceConfiguration.java b/lang/java/reef-bridge-proto-java/src/main/java/org/apache/reef/bridge/driver/service/DriverServiceConfiguration.java new file mode 100644 index 0000000..e3656da --- /dev/null +++ b/lang/java/reef-bridge-proto-java/src/main/java/org/apache/reef/bridge/driver/service/DriverServiceConfiguration.java @@ -0,0 +1,45 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.reef.bridge.driver.service; + +import org.apache.reef.annotations.audience.Private; +import org.apache.reef.bridge.service.parameters.DriverClientCommand; +import org.apache.reef.driver.parameters.DriverIdleSources; +import org.apache.reef.tang.formats.ConfigurationModule; +import org.apache.reef.tang.formats.ConfigurationModuleBuilder; +import org.apache.reef.tang.formats.RequiredImpl; +import org.apache.reef.tang.formats.RequiredParameter; + +/** + * Binds all driver bridge service handlers to the driver. + */ +@Private +public final class DriverServiceConfiguration extends ConfigurationModuleBuilder { + + public static final RequiredImpl<IDriverService> DRIVER_SERVICE_IMPL = new RequiredImpl<>(); + + public static final RequiredParameter<String> DRIVER_CLIENT_COMMAND = new RequiredParameter<>(); + + /** Configuration module that binds all driver handlers. */ + public static final ConfigurationModule CONF = new DriverServiceConfiguration() + .bindImplementation(IDriverService.class, DRIVER_SERVICE_IMPL) + .bindNamedParameter(DriverClientCommand.class, DRIVER_CLIENT_COMMAND) + .bindSetEntry(DriverIdleSources.class, IDriverService.class) + .build(); +} http://git-wip-us.apache.org/repos/asf/reef/blob/d243aa2a/lang/java/reef-bridge-proto-java/src/main/java/org/apache/reef/bridge/driver/service/DriverServiceConfigurationProviderBase.java ---------------------------------------------------------------------- diff --git a/lang/java/reef-bridge-proto-java/src/main/java/org/apache/reef/bridge/driver/service/DriverServiceConfigurationProviderBase.java b/lang/java/reef-bridge-proto-java/src/main/java/org/apache/reef/bridge/driver/service/DriverServiceConfigurationProviderBase.java new file mode 100644 index 0000000..466c05f --- /dev/null +++ b/lang/java/reef-bridge-proto-java/src/main/java/org/apache/reef/bridge/driver/service/DriverServiceConfigurationProviderBase.java @@ -0,0 +1,148 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.reef.bridge.driver.service; + +import org.apache.reef.bridge.driver.service.grpc.GRPCDriverService; +import org.apache.reef.bridge.proto.ClientProtocol; +import org.apache.reef.client.DriverConfiguration; +import org.apache.reef.client.DriverRestartConfiguration; +import org.apache.reef.client.parameters.DriverConfigurationProviders; +import org.apache.reef.io.TcpPortConfigurationProvider; +import org.apache.reef.tang.Configuration; +import org.apache.reef.tang.JavaConfigurationBuilder; +import org.apache.reef.tang.Tang; +import org.apache.reef.tang.formats.ConfigurationModule; +import org.apache.reef.util.EnvironmentUtils; +import org.apache.reef.wake.remote.ports.parameters.TcpPortRangeBegin; +import org.apache.reef.wake.remote.ports.parameters.TcpPortRangeCount; + +import java.util.ArrayList; +import java.util.List; + +/** + * Base class for all driver service configuration provider implementations. + */ +public abstract class DriverServiceConfigurationProviderBase implements IDriverServiceConfigurationProvider { + + private static final Tang TANG = Tang.Factory.getTang(); + + + protected Configuration getTcpPortRangeConfiguration( + final ClientProtocol.DriverClientConfiguration driverClientConfigurationProto) { + JavaConfigurationBuilder configurationModuleBuilder = TANG.newConfigurationBuilder() + .bindSetEntry(DriverConfigurationProviders.class, TcpPortConfigurationProvider.class); + // Setup TCP constraints + if (driverClientConfigurationProto.getTcpPortRangeBegin() > 0) { + configurationModuleBuilder = configurationModuleBuilder + .bindNamedParameter(TcpPortRangeBegin.class, + Integer.toString(driverClientConfigurationProto.getTcpPortRangeBegin())); + } + if (driverClientConfigurationProto.getTcpPortRangeCount() > 0) { + configurationModuleBuilder = configurationModuleBuilder + .bindNamedParameter(TcpPortRangeCount.class, + Integer.toString(driverClientConfigurationProto.getTcpPortRangeCount())); + } + if (driverClientConfigurationProto.getTcpPortRangeTryCount() > 0) { + configurationModuleBuilder = configurationModuleBuilder + .bindNamedParameter(TcpPortRangeCount.class, + Integer.toString(driverClientConfigurationProto.getTcpPortRangeTryCount())); + } + return configurationModuleBuilder.build(); + } + + protected Configuration getDriverConfiguration( + final ClientProtocol.DriverClientConfiguration driverConfiguration) { + ConfigurationModule driverServiceConfigurationModule = DriverConfiguration.CONF + .set(DriverConfiguration.DRIVER_IDENTIFIER, driverConfiguration.getJobid()); + + // Set file dependencies + final List<String> localLibraries = new ArrayList<>(); + localLibraries.add(EnvironmentUtils.getClassLocation(GRPCDriverService.class)); + if (driverConfiguration.getLocalLibrariesCount() > 0) { + localLibraries.addAll(driverConfiguration.getLocalLibrariesList()); + } + driverServiceConfigurationModule = driverServiceConfigurationModule + .setMultiple(DriverConfiguration.LOCAL_LIBRARIES, localLibraries); + if (driverConfiguration.getGlobalLibrariesCount() > 0) { + driverServiceConfigurationModule = driverServiceConfigurationModule + .setMultiple(DriverConfiguration.GLOBAL_LIBRARIES, + driverConfiguration.getGlobalLibrariesList()); + } + if (driverConfiguration.getLocalFilesCount() > 0) { + driverServiceConfigurationModule = driverServiceConfigurationModule + .setMultiple(DriverConfiguration.LOCAL_FILES, + driverConfiguration.getLocalFilesList()); + } + if (driverConfiguration.getGlobalFilesCount() > 0) { + driverServiceConfigurationModule = driverServiceConfigurationModule + .setMultiple(DriverConfiguration.GLOBAL_FILES, + driverConfiguration.getGlobalFilesList()); + } + // Setup driver resources + if (driverConfiguration.getCpuCores() > 0) { + driverServiceConfigurationModule = driverServiceConfigurationModule + .set(DriverConfiguration.DRIVER_CPU_CORES, driverConfiguration.getCpuCores()); + } + if (driverConfiguration.getMemoryMb() > 0) { + driverServiceConfigurationModule = driverServiceConfigurationModule + .set(DriverConfiguration.DRIVER_MEMORY, driverConfiguration.getMemoryMb()); + } + // Setup handlers + driverServiceConfigurationModule = driverServiceConfigurationModule + .set(DriverConfiguration.ON_DRIVER_STARTED, DriverServiceHandlers.StartHandler.class) + .set(DriverConfiguration.ON_DRIVER_STOP, DriverServiceHandlers.StopHandler.class) + .set(DriverConfiguration.ON_EVALUATOR_ALLOCATED, DriverServiceHandlers.AllocatedEvaluatorHandler.class) + .set(DriverConfiguration.ON_EVALUATOR_COMPLETED, DriverServiceHandlers.CompletedEvaluatorHandler.class) + .set(DriverConfiguration.ON_EVALUATOR_FAILED, DriverServiceHandlers.FailedEvaluatorHandler.class) + .set(DriverConfiguration.ON_CONTEXT_ACTIVE, DriverServiceHandlers.ActiveContextHandler.class) + .set(DriverConfiguration.ON_CONTEXT_CLOSED, DriverServiceHandlers.ClosedContextHandler.class) + .set(DriverConfiguration.ON_CONTEXT_FAILED, DriverServiceHandlers.ContextFailedHandler.class) + .set(DriverConfiguration.ON_CONTEXT_MESSAGE, DriverServiceHandlers.ContextMessageHandler.class) + .set(DriverConfiguration.ON_TASK_RUNNING, DriverServiceHandlers.RunningTaskHandler.class) + .set(DriverConfiguration.ON_TASK_COMPLETED, DriverServiceHandlers.CompletedTaskHandler.class) + .set(DriverConfiguration.ON_TASK_FAILED, DriverServiceHandlers.FailedTaskHandler.class) + .set(DriverConfiguration.ON_TASK_MESSAGE, DriverServiceHandlers.TaskMessageHandler.class) + .set(DriverConfiguration.ON_CLIENT_MESSAGE, DriverServiceHandlers.ClientMessageHandler.class) + .set(DriverConfiguration.ON_CLIENT_CLOSED, DriverServiceHandlers.ClientCloseHandler.class) + .set(DriverConfiguration.ON_CLIENT_CLOSED_MESSAGE, DriverServiceHandlers.ClientCloseWithMessageHandler.class); + return driverServiceConfigurationModule.build(); + } + + protected Configuration getDriverRestartConfiguration( + final ClientProtocol.DriverClientConfiguration driverConfiguration) { + ConfigurationModule restartConfModule = DriverRestartConfiguration.CONF + .set(DriverRestartConfiguration.ON_DRIVER_RESTARTED, + DriverServiceHandlers.DriverRestartHandler.class) + .set(DriverRestartConfiguration.ON_DRIVER_RESTART_CONTEXT_ACTIVE, + DriverServiceHandlers.DriverRestartActiveContextHandler.class) + .set(DriverRestartConfiguration.ON_DRIVER_RESTART_TASK_RUNNING, + DriverServiceHandlers.DriverRestartRunningTaskHandler.class) + .set(DriverRestartConfiguration.ON_DRIVER_RESTART_COMPLETED, + DriverServiceHandlers.DriverRestartCompletedHandler.class) + .set(DriverRestartConfiguration.ON_DRIVER_RESTART_EVALUATOR_FAILED, + DriverServiceHandlers.DriverRestartFailedEvaluatorHandler.class); + return driverConfiguration.getDriverRestartEvaluatorRecoverySeconds() > 0 ? + restartConfModule + .set(DriverRestartConfiguration.DRIVER_RESTART_EVALUATOR_RECOVERY_SECONDS, + driverConfiguration.getDriverRestartEvaluatorRecoverySeconds()) + .build() : + restartConfModule.build(); + } +} http://git-wip-us.apache.org/repos/asf/reef/blob/d243aa2a/lang/java/reef-bridge-proto-java/src/main/java/org/apache/reef/bridge/driver/service/DriverServiceHandlers.java ---------------------------------------------------------------------- diff --git a/lang/java/reef-bridge-proto-java/src/main/java/org/apache/reef/bridge/driver/service/DriverServiceHandlers.java b/lang/java/reef-bridge-proto-java/src/main/java/org/apache/reef/bridge/driver/service/DriverServiceHandlers.java new file mode 100644 index 0000000..6dd6e2c --- /dev/null +++ b/lang/java/reef-bridge-proto-java/src/main/java/org/apache/reef/bridge/driver/service/DriverServiceHandlers.java @@ -0,0 +1,298 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.reef.bridge.driver.service; + +import org.apache.reef.annotations.audience.DriverSide; +import org.apache.reef.annotations.audience.Private; +import org.apache.reef.driver.context.ActiveContext; +import org.apache.reef.driver.context.ClosedContext; +import org.apache.reef.driver.context.ContextMessage; +import org.apache.reef.driver.context.FailedContext; +import org.apache.reef.driver.evaluator.AllocatedEvaluator; +import org.apache.reef.driver.evaluator.CompletedEvaluator; +import org.apache.reef.driver.evaluator.FailedEvaluator; +import org.apache.reef.driver.restart.DriverRestartCompleted; +import org.apache.reef.driver.restart.DriverRestarted; +import org.apache.reef.driver.task.*; +import org.apache.reef.tang.annotations.Unit; +import org.apache.reef.wake.EventHandler; +import org.apache.reef.wake.time.event.StartTime; +import org.apache.reef.wake.time.event.StopTime; + +import javax.inject.Inject; +import java.util.logging.Level; +import java.util.logging.Logger; + +/** + * Contains Java side event handlers that perform + * hand-off with the driver client side. + */ +@Unit +@Private +@DriverSide +public final class DriverServiceHandlers { + + private static final Logger LOG = Logger.getLogger(DriverServiceHandlers.class.getName()); + + private final IDriverService driverBridgeService; + + @Inject + private DriverServiceHandlers( + final IDriverService driverBridgeService) { + this.driverBridgeService = driverBridgeService; + } + + /** + * Job Driver is ready and the clock is set up: request the evaluators. + */ + public final class StartHandler implements EventHandler<StartTime> { + @Override + public void onNext(final StartTime startTime) { + LOG.log(Level.INFO, "JavaBridge: Start Driver"); + DriverServiceHandlers.this.driverBridgeService.startHandler(startTime); + } + } + + /** + * Job Driver is is shutting down: write to the log. + */ + public final class StopHandler implements EventHandler<StopTime> { + @Override + public void onNext(final StopTime stopTime) { + LOG.log(Level.INFO, "JavaBridge: Stop Driver"); + DriverServiceHandlers.this.driverBridgeService.stopHandler(stopTime); + } + } + + /** + * Receive notification that an Evaluator had been allocated, + * and submitTask a new Task in that Evaluator. + */ + public final class AllocatedEvaluatorHandler implements EventHandler<AllocatedEvaluator> { + @Override + public void onNext(final AllocatedEvaluator eval) { + LOG.log(Level.INFO, "JavaBridge: Allocated Evaluator {0}", eval.getId()); + DriverServiceHandlers.this.driverBridgeService.allocatedEvaluatorHandler(eval); + } + } + + /** + * Completed evaluator handler. + */ + public final class CompletedEvaluatorHandler implements EventHandler<CompletedEvaluator> { + @Override + public void onNext(final CompletedEvaluator eval) { + LOG.log(Level.INFO, "JavaBridge: Completed Evaluator {0}", eval.getId()); + DriverServiceHandlers.this.driverBridgeService.completedEvaluatorHandler(eval); + } + } + + /** + * Failed evaluator handler. + */ + public final class FailedEvaluatorHandler implements EventHandler<FailedEvaluator> { + @Override + public void onNext(final FailedEvaluator eval) { + LOG.log(Level.INFO, "JavaBridge: Failed Evaluator {0}", eval.getId()); + DriverServiceHandlers.this.driverBridgeService.failedEvaluatorHandler(eval); + } + } + + /** + * Receive notification that the Context is active. + */ + public final class ActiveContextHandler implements EventHandler<ActiveContext> { + @Override + public void onNext(final ActiveContext context) { + LOG.log(Level.INFO, "JavaBridge: Active Context {0}", context.getId()); + DriverServiceHandlers.this.driverBridgeService.activeContextHandler(context); + } + } + + /** + * Received notification that the Context is closed. + */ + public final class ClosedContextHandler implements EventHandler<ClosedContext> { + @Override + public void onNext(final ClosedContext context) { + LOG.log(Level.INFO, "JavaBridge: Closed Context {0}", context.getId()); + DriverServiceHandlers.this.driverBridgeService.closedContextHandler(context); + } + } + + /** + * Received a message from the context. + */ + public final class ContextMessageHandler implements EventHandler<ContextMessage> { + @Override + public void onNext(final ContextMessage message) { + LOG.log(Level.INFO, "JavaBridge: Context Message id {0}", message.getId()); + DriverServiceHandlers.this.driverBridgeService.contextMessageHandler(message); + } + } + + /** + * Received notification that the Context failed. + */ + public final class ContextFailedHandler implements EventHandler<FailedContext> { + @Override + public void onNext(final FailedContext context) { + LOG.log(Level.INFO, "JavaBridge: Context Failed {0}", context.getId()); + DriverServiceHandlers.this.driverBridgeService.failedContextHandler(context); + } + } + + /** + * Receive notification that the Task is running. + */ + public final class RunningTaskHandler implements EventHandler<RunningTask> { + @Override + public void onNext(final RunningTask task) { + LOG.log(Level.INFO, "JavaBridge: Running Task {0}", task.getId()); + DriverServiceHandlers.this.driverBridgeService.runningTaskHandler(task); + } + } + + /** + * Received notification that the Task failed. + */ + public final class FailedTaskHandler implements EventHandler<FailedTask> { + @Override + public void onNext(final FailedTask task) { + LOG.log(Level.INFO, "JavaBridge: Failed Task {0}", task.getId()); + DriverServiceHandlers.this.driverBridgeService.failedTaskHandler(task); + } + } + + /** + * Receive notification that the Task has completed successfully. + */ + public final class CompletedTaskHandler implements EventHandler<CompletedTask> { + @Override + public void onNext(final CompletedTask task) { + LOG.log(Level.INFO, "JavaBridge: Completed Task {0}", task.getId()); + DriverServiceHandlers.this.driverBridgeService.completedTaskHandler(task); + } + } + + /** + * Received notification that the Task was suspended. + */ + public final class SuspendedTaskHandler implements EventHandler<SuspendedTask> { + @Override + public void onNext(final SuspendedTask task) { + LOG.log(Level.INFO, "JavaBridge: Suspended Task {0}", task.getId()); + DriverServiceHandlers.this.driverBridgeService.suspendedTaskHandler(task); + } + } + + /** + * Received a message from the task. + */ + public final class TaskMessageHandler implements EventHandler<TaskMessage> { + @Override + public void onNext(final TaskMessage message) { + LOG.log(Level.INFO, "JavaBridge: Message from Task {0}", message.getId()); + DriverServiceHandlers.this.driverBridgeService.taskMessageHandler(message); + } + } + + /** + * Received a message from the client. + */ + public final class ClientMessageHandler implements EventHandler<byte[]> { + @Override + public void onNext(final byte[] message) { + LOG.log(Level.INFO, "JavaBridge: Message from Client"); + DriverServiceHandlers.this.driverBridgeService.clientMessageHandler(message); + } + } + + /** + * Received a close event from the client. + */ + public final class ClientCloseHandler implements EventHandler<Void> { + @Override + public void onNext(final Void value) { + LOG.log(Level.INFO, "JavaBridge: Close event from Client"); + DriverServiceHandlers.this.driverBridgeService.clientCloseHandler(); + } + } + + /** + * Received a close event with message. + */ + public final class ClientCloseWithMessageHandler implements EventHandler<byte[]> { + @Override + public void onNext(final byte[] message) { + LOG.log(Level.INFO, "JavaBridge: Close event with messages from Client"); + DriverServiceHandlers.this.driverBridgeService.clientCloseWithMessageHandler(message); + } + } + + /** + * Job driver is restarted after previous crash. + */ + public final class DriverRestartHandler implements EventHandler<DriverRestarted> { + @Override + public void onNext(final DriverRestarted driverRestarted) { + + } + } + + /** + * Receive notification that an context is active on Evaluator when the driver restarted. + */ + public final class DriverRestartActiveContextHandler implements EventHandler<ActiveContext> { + @Override + public void onNext(final ActiveContext context) { + + } + } + + /** + * Receive notification that the Task is running when driver restarted. + */ + public final class DriverRestartRunningTaskHandler implements EventHandler<RunningTask> { + @Override + public void onNext(final RunningTask task) { + + } + } + + /** + * Receive notification that driver restart has completed. + */ + public final class DriverRestartCompletedHandler implements EventHandler<DriverRestartCompleted> { + @Override + public void onNext(final DriverRestartCompleted driverRestartCompleted) { + + } + } + + /** + * Receive notification that the entire Evaluator had failed on Driver Restart. + */ + public final class DriverRestartFailedEvaluatorHandler implements EventHandler<FailedEvaluator> { + @Override + public void onNext(final FailedEvaluator eval) { + + } + } +} http://git-wip-us.apache.org/repos/asf/reef/blob/d243aa2a/lang/java/reef-bridge-proto-java/src/main/java/org/apache/reef/bridge/driver/service/IDriverService.java ---------------------------------------------------------------------- diff --git a/lang/java/reef-bridge-proto-java/src/main/java/org/apache/reef/bridge/driver/service/IDriverService.java b/lang/java/reef-bridge-proto-java/src/main/java/org/apache/reef/bridge/driver/service/IDriverService.java new file mode 100644 index 0000000..788260f --- /dev/null +++ b/lang/java/reef-bridge-proto-java/src/main/java/org/apache/reef/bridge/driver/service/IDriverService.java @@ -0,0 +1,170 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.reef.bridge.driver.service; + +import org.apache.reef.driver.context.ActiveContext; +import org.apache.reef.driver.context.ClosedContext; +import org.apache.reef.driver.context.ContextMessage; +import org.apache.reef.driver.context.FailedContext; +import org.apache.reef.driver.evaluator.AllocatedEvaluator; +import org.apache.reef.driver.evaluator.CompletedEvaluator; +import org.apache.reef.driver.evaluator.FailedEvaluator; +import org.apache.reef.driver.restart.DriverRestartCompleted; +import org.apache.reef.driver.restart.DriverRestarted; +import org.apache.reef.driver.task.*; +import org.apache.reef.runtime.common.driver.idle.DriverIdlenessSource; +import org.apache.reef.wake.time.event.StartTime; +import org.apache.reef.wake.time.event.StopTime; + +/** + * Interface implemented by a Driver Service. + */ +public interface IDriverService extends DriverIdlenessSource { + + /** + * Driver restart handler. + * @param restart event + */ + void driverRestarted(final DriverRestarted restart); + + /** + * Restart running task. + * @param task running + */ + void restartRunningTask(final RunningTask task); + + /** + * Restart active context. + * @param context restart + */ + void restartActiveContext(final ActiveContext context); + + /** + * Driver restart completed. + * @param restartCompleted event + */ + void driverRestartCompleted(final DriverRestartCompleted restartCompleted); + + /** + * Failed to restart evaluator. + * @param evaluator that failed. + */ + void restartFailedEvalautor(final FailedEvaluator evaluator); + + /** + * Handle start time event. + * @param startTime event + */ + void startHandler(final StartTime startTime); + + /** + * Handle stop event. + * @param stopTime event + */ + void stopHandler(final StopTime stopTime); + + /** + * Handle allocated evaluator event. + * @param eval allocated + */ + void allocatedEvaluatorHandler(final AllocatedEvaluator eval); + + /** + * Handle completed evaluator event. + * @param eval that completed + */ + void completedEvaluatorHandler(final CompletedEvaluator eval); + + /** + * Handle failed evaluator event. + * @param eval that failed + */ + void failedEvaluatorHandler(final FailedEvaluator eval); + + /** + * Handle active context. + * @param context activated + */ + void activeContextHandler(final ActiveContext context); + + /** + * Handle closed context event. + * @param context that closed + */ + void closedContextHandler(final ClosedContext context); + + /** + * Handle context message event. + * @param message sent by context + */ + void contextMessageHandler(final ContextMessage message); + + /** + * Handled failed context event. + * @param context that failed + */ + void failedContextHandler(final FailedContext context); + + /** + * Handle running task event. + * @param task that is now running + */ + void runningTaskHandler(final RunningTask task); + + /** + * Handle failed task event. + * @param task that failed + */ + void failedTaskHandler(final FailedTask task); + + /** + * Handle completed task event. + * @param task that completed + */ + void completedTaskHandler(final CompletedTask task); + + /** + * Handle suspended task event. + * @param task that is suspended + */ + void suspendedTaskHandler(final SuspendedTask task); + + /** + * Handle task message event. + * @param message sent by task + */ + void taskMessageHandler(final TaskMessage message); + + /** + * Handle client message event. + * @param message sent by client + */ + void clientMessageHandler(final byte[] message); + + /** + * Handle client close event. + */ + void clientCloseHandler(); + + /** + * Handle client close event with message. + * @param message sent by client + */ + void clientCloseWithMessageHandler(final byte[] message); +} http://git-wip-us.apache.org/repos/asf/reef/blob/d243aa2a/lang/java/reef-bridge-proto-java/src/main/java/org/apache/reef/bridge/driver/service/IDriverServiceConfigurationProvider.java ---------------------------------------------------------------------- diff --git a/lang/java/reef-bridge-proto-java/src/main/java/org/apache/reef/bridge/driver/service/IDriverServiceConfigurationProvider.java b/lang/java/reef-bridge-proto-java/src/main/java/org/apache/reef/bridge/driver/service/IDriverServiceConfigurationProvider.java new file mode 100644 index 0000000..8b8dea5 --- /dev/null +++ b/lang/java/reef-bridge-proto-java/src/main/java/org/apache/reef/bridge/driver/service/IDriverServiceConfigurationProvider.java @@ -0,0 +1,31 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.reef.bridge.driver.service; + +import org.apache.reef.bridge.client.IDriverBridgeConfigurationProvider; +import org.apache.reef.bridge.driver.service.grpc.GRPCDriverServiceConfigurationProvider; +import org.apache.reef.tang.annotations.DefaultImplementation; + +/** + * Configuration provider for the driver service. + */ +@DefaultImplementation(GRPCDriverServiceConfigurationProvider.class) +public interface IDriverServiceConfigurationProvider extends IDriverBridgeConfigurationProvider { +}
