[REEF-2003] Revise Driver Service to allow static configuration. The driver service structure has been revised to allow easy static configurations that do not involve Java on the client.
JIRA: [REEF-2003](https://issues.apache.org/jira/browse/REEF-2003) Pull Request: Closes # Project: http://git-wip-us.apache.org/repos/asf/reef/repo Commit: http://git-wip-us.apache.org/repos/asf/reef/commit/d243aa2a Tree: http://git-wip-us.apache.org/repos/asf/reef/tree/d243aa2a Diff: http://git-wip-us.apache.org/repos/asf/reef/diff/d243aa2a Branch: refs/heads/REEF-335 Commit: d243aa2a3cb89d03ee216c56c3dafbcea68eae89 Parents: 386069e Author: Tyson Condie <[email protected]> Authored: Wed Apr 25 09:38:17 2018 -0700 Committer: Doug Service <[email protected]> Committed: Tue May 1 01:28:25 2018 +0000 ---------------------------------------------------------------------- lang/common/proto/bridge/ClientProtocol.proto | 41 +- .../proto/bridge/DriverClientProtocol.proto | 42 +- lang/java/reef-bridge-proto-java/pom.xml | 17 + .../client/DefaultDriverClientStopHandler.java | 43 - .../reef/bridge/client/DriverClientClock.java | 127 --- .../client/DriverClientConfiguration.java | 202 ---- .../bridge/client/DriverClientDispatcher.java | 231 ----- .../client/DriverClientEvaluatorRequestor.java | 59 -- .../client/DriverClientExceptionHandler.java | 43 - .../bridge/client/DriverServiceLauncher.java | 193 ++++ .../bridge/client/IAlarmDispatchHandler.java | 30 - .../IDriverBridgeConfigurationProvider.java | 32 + .../bridge/client/IDriverClientService.java | 45 - .../IDriverRuntimeConfigurationProvider.java | 30 + .../bridge/client/IDriverServiceClient.java | 132 --- .../client/IDriverServiceRuntimeLauncher.java | 30 + .../reef/bridge/client/JVMClientProcess.java | 121 --- .../bridge/client/JavaDriverClientLauncher.java | 217 ----- .../client/WindowsRuntimePathProvider.java | 43 + .../client/events/ActiveContextBridge.java | 102 -- .../client/events/AllocatedEvaluatorBridge.java | 166 ---- .../client/events/ClosedContextBridge.java | 77 -- .../client/events/CompletedEvaluatorBridge.java | 39 - .../client/events/CompletedTaskBridge.java | 61 -- .../client/events/ContextMessageBridge.java | 69 -- .../client/events/FailedContextBridge.java | 110 --- .../client/events/FailedEvaluatorBridge.java | 75 -- .../bridge/client/events/RunningTaskBridge.java | 90 -- .../bridge/client/events/TaskMessageBridge.java | 78 -- .../reef/bridge/client/events/package-info.java | 22 - .../grpc/DriverClientGrpcConfiguration.java | 42 - .../bridge/client/grpc/DriverClientService.java | 458 --------- .../bridge/client/grpc/DriverServiceClient.java | 225 ----- .../reef/bridge/client/grpc/package-info.java | 22 - .../grpc/parameters/DriverServicePort.java | 29 - .../client/grpc/parameters/package-info.java | 22 - .../LocalDriverServiceRuntimeLauncher.java | 57 ++ .../YarnDriverServiceRuntimeLauncher.java | 58 ++ .../reef/bridge/client/launch/package-info.java | 22 + .../apache/reef/bridge/client/package-info.java | 2 +- .../parameters/ClientDriverStopHandler.java | 36 - .../DriverClientDispatchThreadCount.java | 30 - .../bridge/client/parameters/package-info.java | 22 - ...LocalDriverRuntimeConfigurationProvider.java | 57 ++ .../YarnDriverRuntimeConfigurationProvider.java | 66 ++ .../bridge/client/runtime/package-info.java | 22 + .../client/DefaultDriverClientStopHandler.java | 43 + .../bridge/driver/client/DriverClientClock.java | 127 +++ .../client/DriverClientConfiguration.java | 202 ++++ .../driver/client/DriverClientDispatcher.java | 346 +++++++ .../client/DriverClientEvaluatorRequestor.java | 59 ++ .../client/DriverClientExceptionHandler.java | 43 + .../driver/client/IAlarmDispatchHandler.java | 30 + .../driver/client/IDriverClientService.java | 45 + .../driver/client/IDriverServiceClient.java | 132 +++ .../bridge/driver/client/JVMClientProcess.java | 121 +++ .../driver/client/JavaDriverClientLauncher.java | 217 +++++ .../client/events/ActiveContextBridge.java | 111 +++ .../client/events/AllocatedEvaluatorBridge.java | 166 ++++ .../client/events/ClosedContextBridge.java | 77 ++ .../client/events/CompletedEvaluatorBridge.java | 39 + .../client/events/CompletedTaskBridge.java | 61 ++ .../client/events/ContextMessageBridge.java | 69 ++ .../client/events/FailedContextBridge.java | 110 +++ .../client/events/FailedEvaluatorBridge.java | 75 ++ .../driver/client/events/RunningTaskBridge.java | 90 ++ .../driver/client/events/TaskMessageBridge.java | 78 ++ .../driver/client/events/package-info.java | 22 + .../grpc/DriverClientGrpcConfiguration.java | 42 + .../driver/client/grpc/DriverClientService.java | 611 ++++++++++++ .../driver/client/grpc/DriverServiceClient.java | 232 +++++ .../bridge/driver/client/grpc/package-info.java | 22 + .../grpc/parameters/DriverServicePort.java | 29 + .../client/grpc/parameters/package-info.java | 22 + .../reef/bridge/driver/client/package-info.java | 22 + .../parameters/ClientDriverStopHandler.java | 36 + .../DriverClientDispatchThreadCount.java | 30 + .../driver/client/parameters/package-info.java | 22 + .../driver/service/DriverClientException.java | 30 + .../service/DriverServiceConfiguration.java | 45 + .../DriverServiceConfigurationProviderBase.java | 148 +++ .../driver/service/DriverServiceHandlers.java | 298 ++++++ .../bridge/driver/service/IDriverService.java | 170 ++++ .../IDriverServiceConfigurationProvider.java | 31 + .../driver/service/grpc/GRPCDriverService.java | 919 +++++++++++++++++++ .../GRPCDriverServiceConfigurationProvider.java | 53 ++ .../driver/service/grpc/package-info.java | 22 + .../bridge/driver/service/package-info.java | 22 + .../service/parameters/DriverClientCommand.java | 31 + .../driver/service/parameters/package-info.java | 22 + .../examples/WindowsRuntimePathProvider.java | 43 - .../reef/bridge/examples/hello/HelloDriver.java | 24 + .../reef/bridge/examples/hello/HelloREEF.java | 16 +- .../bridge/service/DriverClientException.java | 30 - .../service/DriverServiceConfiguration.java | 47 - .../bridge/service/DriverServiceHandlers.java | 236 ----- .../bridge/service/DriverServiceLauncher.java | 328 ------- .../reef/bridge/service/IDriverService.java | 138 --- .../reef/bridge/service/RuntimeNames.java | 36 - .../bridge/service/grpc/GRPCDriverService.java | 706 -------------- .../reef/bridge/service/grpc/package-info.java | 22 - .../reef/bridge/service/package-info.java | 22 - .../service/parameters/DriverClientCommand.java | 31 - .../bridge/service/parameters/package-info.java | 22 - 104 files changed, 5828 insertions(+), 4732 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/reef/blob/d243aa2a/lang/common/proto/bridge/ClientProtocol.proto ---------------------------------------------------------------------- diff --git a/lang/common/proto/bridge/ClientProtocol.proto b/lang/common/proto/bridge/ClientProtocol.proto index 68bdcaa..3962c00 100644 --- a/lang/common/proto/bridge/ClientProtocol.proto +++ b/lang/common/proto/bridge/ClientProtocol.proto @@ -19,8 +19,6 @@ syntax = "proto3"; -// option java_generic_services = true; -// option java_multiple_files = true; option java_package = "org.apache.reef.bridge.proto"; option java_outer_classname = "ClientProtocol"; option csharp_namespace = "Org.Apache.REEF.Bridge.Proto"; @@ -30,13 +28,15 @@ package driverbridge; message LocalRuntimeParameters { uint32 max_number_of_evaluators = 1; string runtime_root_folder = 2; - string jvm_heap_slack = 3; + float jvm_heap_slack = 3; repeated string rack_names = 4; } message YarnRuntimeParameters { string queue = 1; - string job_submission_directory_prefix = 2; + string job_submission_directory = 2; + string filesystem_url = 3; + bytes security_token = 4; } message AzureBatchRuntimeParameters { @@ -65,34 +65,11 @@ message DriverClientConfiguration { // The command to launch the driver client string driver_client_launch_command = 10; - enum Handlers { - // control events - START = 0; - STOP = 1; - - // evaluator events - EVALUATOR_ALLOCATED = 5; - EVALUATOR_COMPLETED = 6; - EVALUATOR_FAILED = 7; - - // context events - CONTEXT_ACTIVE = 10; - CONTEXT_CLOSED = 11; - CONTEXT_FAILED = 12; - CONTEXT_MESSAGE = 13; - - // task events - TASK_RUNNING = 15; - TASK_FAILED = 16; - TASK_COMPLETED = 17; - TASK_MESSAGE = 18; - - // client events - CLIENT_MESSAGE = 20; - CLIENT_CLOSE = 21; - CLIENT_CLOSE_WITH_MESSAGE = 22; - } - repeated Handlers handler = 11; + // Enable driver restart? + bool driver_restart_enable = 11; + + // Driver restart evaluator recovery seconds (optional) + uint32 driver_restart_evaluator_recovery_seconds = 12; // TCP port range uint32 tcp_port_range_begin = 15; http://git-wip-us.apache.org/repos/asf/reef/blob/d243aa2a/lang/common/proto/bridge/DriverClientProtocol.proto ---------------------------------------------------------------------- diff --git a/lang/common/proto/bridge/DriverClientProtocol.proto b/lang/common/proto/bridge/DriverClientProtocol.proto index f80cff2..6d0d08b 100644 --- a/lang/common/proto/bridge/DriverClientProtocol.proto +++ b/lang/common/proto/bridge/DriverClientProtocol.proto @@ -74,6 +74,33 @@ service DriverClient { rpc ClientCloseHandler (Void) returns (Void) {} rpc ClientCloseWithMessageHandler (ClientMessageInfo) returns (Void) {} + + // Driver Restart Handlers + rpc DriverRestartHandler (DriverRestartInfo) returns (Void) {} + + rpc DriverRestartActiveContextHandler (ContextInfo) returns (Void) {} + + rpc DriverRestartRunningTaskHandler (TaskInfo) returns (Void) {} + + rpc DriverRestartCompletedHandler (DriverRestartCompletedInfo) returns (Void) {} + + rpc DriverRestartFailedEvaluatorHandler (EvaluatorInfo) returns (Void) {} +} + +// Driver restart information +message DriverRestartInfo { + uint32 resubmission_attempts = 1; + + StartTimeInfo start_time = 2; + + repeated string expected_evaluator_ids = 3; +} + +// Driver restart completed information +message DriverRestartCompletedInfo { + StopTimeInfo completion_time = 1; + + bool is_timed_out = 2; } // IdleStatus response to idleness inquiry @@ -127,6 +154,9 @@ message ContextInfo { string parent_id = 3; + // Carry this with us for driver restart + EvaluatorDescriptorInfo evaluator_descriptor_info = 4; + // Optional exception information ExceptionInfo exception = 5; } @@ -142,13 +172,19 @@ message ContextMessageInfo { } message TaskInfo { + // Task identifier. string task_id = 1; - string context_id = 2; + // Task result. + bytes result = 2; - bytes result = 3; + /* Carry entire context info since client may not have received it + * when submitting task against allocated evalautor. + */ + ContextInfo context = 5; - ExceptionInfo exception = 5; + // Possible exception encountered in task execution. + ExceptionInfo exception = 10; } message TaskMessageInfo { http://git-wip-us.apache.org/repos/asf/reef/blob/d243aa2a/lang/java/reef-bridge-proto-java/pom.xml ---------------------------------------------------------------------- diff --git a/lang/java/reef-bridge-proto-java/pom.xml b/lang/java/reef-bridge-proto-java/pom.xml index f177b7c..a1ccc33 100644 --- a/lang/java/reef-bridge-proto-java/pom.xml +++ b/lang/java/reef-bridge-proto-java/pom.xml @@ -92,6 +92,23 @@ under the License. <artifactId>reef-runtime-yarn</artifactId> <version>0.17.0-SNAPSHOT</version> </dependency> + <dependency> + <groupId>org.apache.hadoop</groupId> + <artifactId>hadoop-yarn-api</artifactId> + <scope>compile</scope> + </dependency> + <dependency> + <groupId>com.google.guava</groupId> + <artifactId>guava</artifactId> + <version>20.0</version> + <scope>compile</scope> + <exclusions> + <exclusion> <!-- declare the exclusion here --> + <groupId>com.google.guava</groupId> + <artifactId>guava</artifactId> + </exclusion> + </exclusions> + </dependency> </dependencies> <build> http://git-wip-us.apache.org/repos/asf/reef/blob/d243aa2a/lang/java/reef-bridge-proto-java/src/main/java/org/apache/reef/bridge/client/DefaultDriverClientStopHandler.java ---------------------------------------------------------------------- diff --git a/lang/java/reef-bridge-proto-java/src/main/java/org/apache/reef/bridge/client/DefaultDriverClientStopHandler.java b/lang/java/reef-bridge-proto-java/src/main/java/org/apache/reef/bridge/client/DefaultDriverClientStopHandler.java deleted file mode 100644 index 8636f7a..0000000 --- a/lang/java/reef-bridge-proto-java/src/main/java/org/apache/reef/bridge/client/DefaultDriverClientStopHandler.java +++ /dev/null @@ -1,43 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, - * software distributed under the License is distributed on an - * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY - * KIND, either express or implied. See the License for the - * specific language governing permissions and limitations - * under the License. - */ - -package org.apache.reef.bridge.client; - -import org.apache.reef.wake.EventHandler; -import org.apache.reef.wake.time.event.StopTime; - -import javax.inject.Inject; -import java.util.logging.Level; -import java.util.logging.Logger; - -/** - * Default java client driver stop handler. - */ -public final class DefaultDriverClientStopHandler implements EventHandler<StopTime> { - - private static final Logger LOG = Logger.getLogger(DefaultDriverClientStopHandler.class.getName()); - - @Inject - private DefaultDriverClientStopHandler() {} - - @Override - public void onNext(final StopTime value) { - LOG.log(Level.FINEST, "Stop time {0}", value); - } -} http://git-wip-us.apache.org/repos/asf/reef/blob/d243aa2a/lang/java/reef-bridge-proto-java/src/main/java/org/apache/reef/bridge/client/DriverClientClock.java ---------------------------------------------------------------------- diff --git a/lang/java/reef-bridge-proto-java/src/main/java/org/apache/reef/bridge/client/DriverClientClock.java b/lang/java/reef-bridge-proto-java/src/main/java/org/apache/reef/bridge/client/DriverClientClock.java deleted file mode 100644 index 162cbe5..0000000 --- a/lang/java/reef-bridge-proto-java/src/main/java/org/apache/reef/bridge/client/DriverClientClock.java +++ /dev/null @@ -1,127 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, - * software distributed under the License is distributed on an - * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY - * KIND, either express or implied. See the License for the - * specific language governing permissions and limitations - * under the License. - */ - -package org.apache.reef.bridge.client; - -import org.apache.reef.wake.EventHandler; -import org.apache.reef.wake.time.Clock; -import org.apache.reef.wake.time.Time; -import org.apache.reef.wake.time.event.Alarm; -import org.apache.reef.wake.time.runtime.Timer; -import org.apache.reef.wake.time.runtime.event.ClientAlarm; - -import javax.inject.Inject; -import java.io.IOException; -import java.util.HashMap; -import java.util.Map; -import java.util.UUID; -import java.util.logging.Level; -import java.util.logging.Logger; - -/** - * The bridge driver client clock. - */ -public final class DriverClientClock implements Clock, IAlarmDispatchHandler { - - private static final Logger LOG = Logger.getLogger(DriverClientClock.class.getName()); - - private final IDriverClientService driverClientService; - - private final IDriverServiceClient driverServiceClient; - - private final Timer timer; - - private final Map<String, ClientAlarm> alarmMap = new HashMap<>(); - - private boolean closed = false; - - @Inject - private DriverClientClock( - final Timer timer, - final IDriverClientService driverClientService, - final IDriverServiceClient driverServiceClient) { - this.timer = timer; - this.driverClientService = driverClientService; - this.driverServiceClient = driverServiceClient; - } - - @Override - public Time scheduleAlarm(final int offset, final EventHandler<Alarm> handler) { - final ClientAlarm alarm = new ClientAlarm(this.timer.getCurrent() + offset, handler); - final String alarmId = UUID.randomUUID().toString(); - this.alarmMap.put(alarmId, alarm); - this.driverServiceClient.onSetAlarm(alarmId, offset); - return alarm; - } - - @Override - public void close() { - stop(); - } - - @Override - public void stop() { - if (!closed) { - this.closed = true; - this.driverServiceClient.onShutdown(); - } - } - - @Override - public void stop(final Throwable exception) { - if (!closed) { - this.closed = true; - this.driverServiceClient.onShutdown(exception); - } - } - - @Override - public boolean isIdle() { - return this.closed && this.alarmMap.isEmpty(); - } - - @Override - public boolean isClosed() { - return this.closed; - } - - @Override - public void run() { - try { - this.driverClientService.start(); - this.driverClientService.awaitTermination(); - } catch (IOException | InterruptedException e) { - throw new RuntimeException(e); - } - } - - /** - * Alarm clock event handler. - * @param alarmId alarm identifier - */ - @Override - public void onNext(final String alarmId) { - if (this.alarmMap.containsKey(alarmId)) { - final ClientAlarm clientAlarm = this.alarmMap.remove(alarmId); - clientAlarm.run(); - } else { - LOG.log(Level.SEVERE, "Unknown alarm id {0}", alarmId); - } - } -} http://git-wip-us.apache.org/repos/asf/reef/blob/d243aa2a/lang/java/reef-bridge-proto-java/src/main/java/org/apache/reef/bridge/client/DriverClientConfiguration.java ---------------------------------------------------------------------- diff --git a/lang/java/reef-bridge-proto-java/src/main/java/org/apache/reef/bridge/client/DriverClientConfiguration.java b/lang/java/reef-bridge-proto-java/src/main/java/org/apache/reef/bridge/client/DriverClientConfiguration.java deleted file mode 100644 index 50da3ce..0000000 --- a/lang/java/reef-bridge-proto-java/src/main/java/org/apache/reef/bridge/client/DriverClientConfiguration.java +++ /dev/null @@ -1,202 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, - * software distributed under the License is distributed on an - * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY - * KIND, either express or implied. See the License for the - * specific language governing permissions and limitations - * under the License. - */ - -package org.apache.reef.bridge.client; - -import org.apache.reef.bridge.client.parameters.DriverClientDispatchThreadCount; -import org.apache.reef.bridge.client.parameters.ClientDriverStopHandler; -import org.apache.reef.driver.context.ActiveContext; -import org.apache.reef.driver.context.ClosedContext; -import org.apache.reef.driver.context.ContextMessage; -import org.apache.reef.driver.context.FailedContext; -import org.apache.reef.driver.evaluator.AllocatedEvaluator; -import org.apache.reef.driver.evaluator.CompletedEvaluator; -import org.apache.reef.driver.evaluator.EvaluatorRequestor; -import org.apache.reef.driver.evaluator.FailedEvaluator; -import org.apache.reef.driver.parameters.*; -import org.apache.reef.driver.task.*; -import org.apache.reef.tang.formats.ConfigurationModule; -import org.apache.reef.tang.formats.ConfigurationModuleBuilder; -import org.apache.reef.tang.formats.OptionalImpl; -import org.apache.reef.tang.formats.RequiredImpl; -import org.apache.reef.wake.EventHandler; -import org.apache.reef.wake.time.Clock; -import org.apache.reef.wake.time.event.StartTime; -import org.apache.reef.wake.time.event.StopTime; - -/** - * Driver client configuration. - */ -public final class DriverClientConfiguration extends ConfigurationModuleBuilder { - - /** - * The event handler invoked right after the driver boots up. - */ - public static final RequiredImpl<EventHandler<StartTime>> ON_DRIVER_STARTED = new RequiredImpl<>(); - - /** - * The event handler invoked right before the driver shuts down. Defaults to ignore. - */ - public static final OptionalImpl<EventHandler<StopTime>> ON_DRIVER_STOP = new OptionalImpl<>(); - - // ***** EVALUATOR HANDLER BINDINGS: - - /** - * Event handler for allocated evaluators. Defaults to returning the evaluator if not bound. - */ - public static final OptionalImpl<EventHandler<AllocatedEvaluator>> ON_EVALUATOR_ALLOCATED = new OptionalImpl<>(); - - /** - * Event handler for completed evaluators. Defaults to logging if not bound. - */ - public static final OptionalImpl<EventHandler<CompletedEvaluator>> ON_EVALUATOR_COMPLETED = new OptionalImpl<>(); - - /** - * Event handler for failed evaluators. Defaults to job failure if not bound. - */ - public static final OptionalImpl<EventHandler<FailedEvaluator>> ON_EVALUATOR_FAILED = new OptionalImpl<>(); - - // ***** TASK HANDLER BINDINGS: - - /** - * Event handler for task messages. Defaults to logging if not bound. - */ - public static final OptionalImpl<EventHandler<TaskMessage>> ON_TASK_MESSAGE = new OptionalImpl<>(); - - /** - * Event handler for completed tasks. Defaults to closing the context the task ran on if not bound. - */ - public static final OptionalImpl<EventHandler<CompletedTask>> ON_TASK_COMPLETED = new OptionalImpl<>(); - - /** - * Event handler for failed tasks. Defaults to job failure if not bound. - */ - public static final OptionalImpl<EventHandler<FailedTask>> ON_TASK_FAILED = new OptionalImpl<>(); - - /** - * Event handler for running tasks. Defaults to logging if not bound. - */ - public static final OptionalImpl<EventHandler<RunningTask>> ON_TASK_RUNNING = new OptionalImpl<>(); - - /** - * Event handler for suspended tasks. Defaults to job failure if not bound. Rationale: many jobs don't support - * task suspension. Hence, this parameter should be optional. The only sane default is to crash the job, then. - */ - public static final OptionalImpl<EventHandler<SuspendedTask>> ON_TASK_SUSPENDED = new OptionalImpl<>(); - - // ***** CLIENT HANDLER BINDINGS: - - /** - * Event handler for client messages. Defaults to logging if not bound. - */ - public static final OptionalImpl<EventHandler<byte[]>> ON_CLIENT_MESSAGE = new OptionalImpl<>(); - - /** - * Event handler for close messages sent by the client. Defaults to job failure if not bound. - */ - public static final OptionalImpl<EventHandler<Void>> ON_CLIENT_CLOSED = new OptionalImpl<>(); - - /** - * Event handler for close messages sent by the client. Defaults to job failure if not bound. - */ - public static final OptionalImpl<EventHandler<byte[]>> ON_CLIENT_CLOSED_MESSAGE = new OptionalImpl<>(); - - // ***** CONTEXT HANDLER BINDINGS: - - /** - * Event handler for active context. Defaults to closing the context if not bound. - */ - public static final OptionalImpl<EventHandler<ActiveContext>> ON_CONTEXT_ACTIVE = new OptionalImpl<>(); - - /** - * Event handler for closed context. Defaults to logging if not bound. - */ - public static final OptionalImpl<EventHandler<ClosedContext>> ON_CONTEXT_CLOSED = new OptionalImpl<>(); - - /** - * Event handler for closed context. Defaults to job failure if not bound. - */ - public static final OptionalImpl<EventHandler<FailedContext>> ON_CONTEXT_FAILED = new OptionalImpl<>(); - - /** - * Event handler for context messages. Defaults to logging if not bound. - */ - public static final OptionalImpl<EventHandler<ContextMessage>> ON_CONTEXT_MESSAGE = new OptionalImpl<>(); - - /** - * Number of dispatch threads to use. - */ - public static final OptionalImpl<Integer> CLIENT_DRIVER_DISPATCH_THREAD_COUNT = new OptionalImpl<>(); - - /** - * Alarm dispatch handler. - */ - public static final OptionalImpl<IAlarmDispatchHandler> ALARM_DISPATCH_HANDLER = new OptionalImpl<>(); - - /** - * Default to gRPC Driver Client Service. - */ - public static final OptionalImpl<IDriverClientService> DRIVER_CLIENT_SERVICE = new OptionalImpl<>(); - - /** - * Default to gRPC Driver Service Client. - */ - public static final OptionalImpl<IDriverServiceClient> DRIVER_SERVICE_CLIENT = new OptionalImpl<>(); - - /** - * ConfigurationModule to fill out to get a legal Driver Configuration. - */ - public static final ConfigurationModule CONF = new DriverClientConfiguration() - .bindImplementation(Clock.class, DriverClientClock.class) - .bindImplementation(EvaluatorRequestor.class, DriverClientEvaluatorRequestor.class) - .bindImplementation(IAlarmDispatchHandler.class, ALARM_DISPATCH_HANDLER) - .bindImplementation(IDriverClientService.class, DRIVER_CLIENT_SERVICE) - .bindImplementation(IDriverServiceClient.class, DRIVER_SERVICE_CLIENT) - - .bindNamedParameter(DriverClientDispatchThreadCount.class, CLIENT_DRIVER_DISPATCH_THREAD_COUNT) - - // Driver start/stop handlers - .bindSetEntry(DriverStartHandler.class, ON_DRIVER_STARTED) - .bindSetEntry(ClientDriverStopHandler.class, ON_DRIVER_STOP) - - // Evaluator handlers - .bindSetEntry(EvaluatorAllocatedHandlers.class, ON_EVALUATOR_ALLOCATED) - .bindSetEntry(EvaluatorCompletedHandlers.class, ON_EVALUATOR_COMPLETED) - .bindSetEntry(EvaluatorFailedHandlers.class, ON_EVALUATOR_FAILED) - - // Task handlers - .bindSetEntry(TaskRunningHandlers.class, ON_TASK_RUNNING) - .bindSetEntry(TaskFailedHandlers.class, ON_TASK_FAILED) - .bindSetEntry(TaskMessageHandlers.class, ON_TASK_MESSAGE) - .bindSetEntry(TaskCompletedHandlers.class, ON_TASK_COMPLETED) - .bindSetEntry(TaskSuspendedHandlers.class, ON_TASK_SUSPENDED) - - // Context handlers - .bindSetEntry(ContextActiveHandlers.class, ON_CONTEXT_ACTIVE) - .bindSetEntry(ContextClosedHandlers.class, ON_CONTEXT_CLOSED) - .bindSetEntry(ContextMessageHandlers.class, ON_CONTEXT_MESSAGE) - .bindSetEntry(ContextFailedHandlers.class, ON_CONTEXT_FAILED) - - // Client handlers - .bindSetEntry(ClientMessageHandlers.class, ON_CLIENT_MESSAGE) - .bindSetEntry(ClientCloseHandlers.class, ON_CLIENT_CLOSED) - .bindSetEntry(ClientCloseWithMessageHandlers.class, ON_CLIENT_CLOSED_MESSAGE) - - .build(); -} http://git-wip-us.apache.org/repos/asf/reef/blob/d243aa2a/lang/java/reef-bridge-proto-java/src/main/java/org/apache/reef/bridge/client/DriverClientDispatcher.java ---------------------------------------------------------------------- diff --git a/lang/java/reef-bridge-proto-java/src/main/java/org/apache/reef/bridge/client/DriverClientDispatcher.java b/lang/java/reef-bridge-proto-java/src/main/java/org/apache/reef/bridge/client/DriverClientDispatcher.java deleted file mode 100644 index 3dd9b88..0000000 --- a/lang/java/reef-bridge-proto-java/src/main/java/org/apache/reef/bridge/client/DriverClientDispatcher.java +++ /dev/null @@ -1,231 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, - * software distributed under the License is distributed on an - * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY - * KIND, either express or implied. See the License for the - * specific language governing permissions and limitations - * under the License. - */ - -package org.apache.reef.bridge.client; - -import com.google.common.collect.Sets; -import org.apache.reef.annotations.audience.Private; -import org.apache.reef.bridge.client.parameters.DriverClientDispatchThreadCount; -import org.apache.reef.bridge.client.parameters.ClientDriverStopHandler; -import org.apache.reef.driver.context.ActiveContext; -import org.apache.reef.driver.context.ClosedContext; -import org.apache.reef.driver.context.ContextMessage; -import org.apache.reef.driver.context.FailedContext; -import org.apache.reef.driver.evaluator.AllocatedEvaluator; -import org.apache.reef.driver.evaluator.CompletedEvaluator; -import org.apache.reef.driver.evaluator.FailedEvaluator; -import org.apache.reef.driver.parameters.*; -import org.apache.reef.driver.task.*; -import org.apache.reef.runtime.common.utils.DispatchingEStage; -import org.apache.reef.tang.annotations.Parameter; -import org.apache.reef.wake.EventHandler; -import org.apache.reef.wake.time.event.StartTime; -import org.apache.reef.wake.time.event.StopTime; - -import javax.inject.Inject; -import java.util.Set; - -/** - * Async dispatch of client driver events. - */ -@Private -public final class DriverClientDispatcher { - - /** - * Dispatcher used for application provided event handlers. - */ - private final DispatchingEStage applicationDispatcher; - - /** - * Dispatcher for client close events. - */ - private final DispatchingEStage clientCloseDispatcher; - - /** - * Dispatcher for client close with message events. - */ - private final DispatchingEStage clientCloseWithMessageDispatcher; - - /** - * Dispatcher for client messages. - */ - private final DispatchingEStage clientMessageDispatcher; - - /** - * The alarm dispatcher. - */ - private final DispatchingEStage alarmDispatcher; - - @Inject - private DriverClientDispatcher( - final DriverClientExceptionHandler driverExceptionHandler, - final IAlarmDispatchHandler alarmDispatchHandler, - @Parameter(DriverClientDispatchThreadCount.class) - final Integer numberOfThreads, - // Application-provided start and stop handlers - @Parameter(DriverStartHandler.class) - final Set<EventHandler<StartTime>> startHandlers, - @Parameter(ClientDriverStopHandler.class) - final Set<EventHandler<StopTime>> stopHandlers, - // Application-provided Context event handlers - @Parameter(ContextActiveHandlers.class) - final Set<EventHandler<ActiveContext>> contextActiveHandlers, - @Parameter(ContextClosedHandlers.class) - final Set<EventHandler<ClosedContext>> contextClosedHandlers, - @Parameter(ContextFailedHandlers.class) - final Set<EventHandler<FailedContext>> contextFailedHandlers, - @Parameter(ContextMessageHandlers.class) - final Set<EventHandler<ContextMessage>> contextMessageHandlers, - // Application-provided Task event handlers - @Parameter(TaskRunningHandlers.class) - final Set<EventHandler<RunningTask>> taskRunningHandlers, - @Parameter(TaskCompletedHandlers.class) - final Set<EventHandler<CompletedTask>> taskCompletedHandlers, - @Parameter(TaskSuspendedHandlers.class) - final Set<EventHandler<SuspendedTask>> taskSuspendedHandlers, - @Parameter(TaskMessageHandlers.class) - final Set<EventHandler<TaskMessage>> taskMessageEventHandlers, - @Parameter(TaskFailedHandlers.class) - final Set<EventHandler<FailedTask>> taskExceptionEventHandlers, - // Application-provided Evaluator event handlers - @Parameter(EvaluatorAllocatedHandlers.class) - final Set<EventHandler<AllocatedEvaluator>> evaluatorAllocatedHandlers, - @Parameter(EvaluatorFailedHandlers.class) - final Set<EventHandler<FailedEvaluator>> evaluatorFailedHandlers, - @Parameter(EvaluatorCompletedHandlers.class) - final Set<EventHandler<CompletedEvaluator>> evaluatorCompletedHandlers, - // Client handlers - @Parameter(ClientCloseHandlers.class) - final Set<EventHandler<Void>> clientCloseHandlers, - @Parameter(ClientCloseWithMessageHandlers.class) - final Set<EventHandler<byte[]>> clientCloseWithMessageHandlers, - @Parameter(ClientMessageHandlers.class) - final Set<EventHandler<byte[]>> clientMessageHandlers) { - - this.applicationDispatcher = new DispatchingEStage( - driverExceptionHandler, numberOfThreads, "ClientDriverDispatcher"); - // Application start and stop handlers - this.applicationDispatcher.register(StartTime.class, startHandlers); - this.applicationDispatcher.register(StopTime.class, stopHandlers); - // Application Context event handlers - this.applicationDispatcher.register(ActiveContext.class, contextActiveHandlers); - this.applicationDispatcher.register(ClosedContext.class, contextClosedHandlers); - this.applicationDispatcher.register(FailedContext.class, contextFailedHandlers); - this.applicationDispatcher.register(ContextMessage.class, contextMessageHandlers); - - // Application Task event handlers. - this.applicationDispatcher.register(RunningTask.class, taskRunningHandlers); - this.applicationDispatcher.register(CompletedTask.class, taskCompletedHandlers); - this.applicationDispatcher.register(SuspendedTask.class, taskSuspendedHandlers); - this.applicationDispatcher.register(TaskMessage.class, taskMessageEventHandlers); - this.applicationDispatcher.register(FailedTask.class, taskExceptionEventHandlers); - - // Application Evaluator event handlers - this.applicationDispatcher.register(AllocatedEvaluator.class, evaluatorAllocatedHandlers); - this.applicationDispatcher.register(CompletedEvaluator.class, evaluatorCompletedHandlers); - this.applicationDispatcher.register(FailedEvaluator.class, evaluatorFailedHandlers); - - // Client event handlers; - this.clientCloseDispatcher = new DispatchingEStage(this.applicationDispatcher); - this.clientCloseDispatcher.register(Void.class, clientCloseHandlers); - - this.clientCloseWithMessageDispatcher = new DispatchingEStage(this.applicationDispatcher); - this.clientCloseWithMessageDispatcher.register(byte[].class, clientCloseWithMessageHandlers); - - this.clientMessageDispatcher = new DispatchingEStage(this.applicationDispatcher); - this.clientMessageDispatcher.register(byte[].class, clientMessageHandlers); - - // Alarm event handlers - this.alarmDispatcher = new DispatchingEStage(this.applicationDispatcher); - this.alarmDispatcher.register(String.class, - Sets.newHashSet((EventHandler<String>)alarmDispatchHandler)); - } - - public void dispatch(final StartTime startTime) { - this.applicationDispatcher.onNext(StartTime.class, startTime); - } - - public void dispatch(final StopTime stopTime) { - this.applicationDispatcher.onNext(StopTime.class, stopTime); - } - - public void dispatch(final ActiveContext context) { - this.applicationDispatcher.onNext(ActiveContext.class, context); - } - - public void dispatch(final ClosedContext context) { - this.applicationDispatcher.onNext(ClosedContext.class, context); - } - - public void dispatch(final FailedContext context) { - this.applicationDispatcher.onNext(FailedContext.class, context); - } - - public void dispatch(final ContextMessage message) { - this.applicationDispatcher.onNext(ContextMessage.class, message); - } - - public void dispatch(final AllocatedEvaluator evaluator) { - this.applicationDispatcher.onNext(AllocatedEvaluator.class, evaluator); - } - - public void dispatch(final FailedEvaluator evaluator) { - this.applicationDispatcher.onNext(FailedEvaluator.class, evaluator); - } - - public void dispatch(final CompletedEvaluator evaluator) { - this.applicationDispatcher.onNext(CompletedEvaluator.class, evaluator); - } - - public void dispatch(final RunningTask task) { - this.applicationDispatcher.onNext(RunningTask.class, task); - } - - public void dispatch(final CompletedTask task) { - this.applicationDispatcher.onNext(CompletedTask.class, task); - } - - public void dispatch(final FailedTask task) { - this.applicationDispatcher.onNext(FailedTask.class, task); - } - - public void dispatch(final SuspendedTask task) { - this.applicationDispatcher.onNext(SuspendedTask.class, task); - } - - public void dispatch(final TaskMessage message) { - this.applicationDispatcher.onNext(TaskMessage.class, message); - } - - public void clientCloseDispatch() { - this.clientCloseDispatcher.onNext(Void.class, null); - } - - public void clientCloseWithMessageDispatch(final byte[] message) { - this.clientCloseWithMessageDispatcher.onNext(byte[].class, message); - } - - public void clientMessageDispatch(final byte[] message) { - this.clientMessageDispatcher.onNext(byte[].class, message); - } - - public void dispatchAlarm(final String alarmId) { - this.alarmDispatcher.onNext(String.class, alarmId); - } -} http://git-wip-us.apache.org/repos/asf/reef/blob/d243aa2a/lang/java/reef-bridge-proto-java/src/main/java/org/apache/reef/bridge/client/DriverClientEvaluatorRequestor.java ---------------------------------------------------------------------- diff --git a/lang/java/reef-bridge-proto-java/src/main/java/org/apache/reef/bridge/client/DriverClientEvaluatorRequestor.java b/lang/java/reef-bridge-proto-java/src/main/java/org/apache/reef/bridge/client/DriverClientEvaluatorRequestor.java deleted file mode 100644 index a774b2f..0000000 --- a/lang/java/reef-bridge-proto-java/src/main/java/org/apache/reef/bridge/client/DriverClientEvaluatorRequestor.java +++ /dev/null @@ -1,59 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, - * software distributed under the License is distributed on an - * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY - * KIND, either express or implied. See the License for the - * specific language governing permissions and limitations - * under the License. - */ - -package org.apache.reef.bridge.client; - -import org.apache.reef.driver.evaluator.EvaluatorRequest; -import org.apache.reef.driver.evaluator.EvaluatorRequestor; - -import javax.inject.Inject; - -/** - * Driver Client evaluator requestor. - */ -public final class DriverClientEvaluatorRequestor implements EvaluatorRequestor { - - private final IDriverServiceClient driverServiceClient; - - @Inject - private DriverClientEvaluatorRequestor(final IDriverServiceClient driverServiceClient) { - this.driverServiceClient = driverServiceClient; - } - - @Override - public void submit(final EvaluatorRequest req) { - this.driverServiceClient.onEvaluatorRequest(req); - } - - @Override - public EvaluatorRequest.Builder newRequest() { - return new DriverClientEvaluatorRequestor.Builder(); - } - - /** - * {@link DriverClientEvaluatorRequestor.Builder} extended with a new submit method. - * {@link EvaluatorRequest}s are built using this builder. - */ - public final class Builder extends EvaluatorRequest.Builder<DriverClientEvaluatorRequestor.Builder> { - @Override - public synchronized void submit() { - DriverClientEvaluatorRequestor.this.submit(this.build()); - } - } -} http://git-wip-us.apache.org/repos/asf/reef/blob/d243aa2a/lang/java/reef-bridge-proto-java/src/main/java/org/apache/reef/bridge/client/DriverClientExceptionHandler.java ---------------------------------------------------------------------- diff --git a/lang/java/reef-bridge-proto-java/src/main/java/org/apache/reef/bridge/client/DriverClientExceptionHandler.java b/lang/java/reef-bridge-proto-java/src/main/java/org/apache/reef/bridge/client/DriverClientExceptionHandler.java deleted file mode 100644 index d09ce41..0000000 --- a/lang/java/reef-bridge-proto-java/src/main/java/org/apache/reef/bridge/client/DriverClientExceptionHandler.java +++ /dev/null @@ -1,43 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, - * software distributed under the License is distributed on an - * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY - * KIND, either express or implied. See the License for the - * specific language governing permissions and limitations - * under the License. - */ - -package org.apache.reef.bridge.client; - -import org.apache.reef.wake.EventHandler; - -import javax.inject.Inject; -import java.util.logging.Level; -import java.util.logging.Logger; - -/** - * Driver client exception handler. - */ -public final class DriverClientExceptionHandler implements EventHandler<Throwable> { - private static final Logger LOG = Logger.getLogger(DriverClientExceptionHandler.class.getName()); - - @Inject - private DriverClientExceptionHandler() { - LOG.log(Level.FINE, "Instantiated 'DriverExceptionHandler'"); - } - - - @Override - public void onNext(final Throwable throwable) { - } -} http://git-wip-us.apache.org/repos/asf/reef/blob/d243aa2a/lang/java/reef-bridge-proto-java/src/main/java/org/apache/reef/bridge/client/DriverServiceLauncher.java ---------------------------------------------------------------------- diff --git a/lang/java/reef-bridge-proto-java/src/main/java/org/apache/reef/bridge/client/DriverServiceLauncher.java b/lang/java/reef-bridge-proto-java/src/main/java/org/apache/reef/bridge/client/DriverServiceLauncher.java new file mode 100644 index 0000000..0f3567c --- /dev/null +++ b/lang/java/reef-bridge-proto-java/src/main/java/org/apache/reef/bridge/client/DriverServiceLauncher.java @@ -0,0 +1,193 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.reef.bridge.client; + +import com.google.protobuf.util.JsonFormat; +import org.apache.commons.lang.StringUtils; +import org.apache.reef.bridge.client.launch.LocalDriverServiceRuntimeLauncher; +import org.apache.reef.bridge.client.launch.YarnDriverServiceRuntimeLauncher; +import org.apache.reef.bridge.client.runtime.LocalDriverRuntimeConfigurationProvider; +import org.apache.reef.bridge.client.runtime.YarnDriverRuntimeConfigurationProvider; +import org.apache.reef.bridge.driver.service.IDriverServiceConfigurationProvider; +import org.apache.reef.bridge.driver.service.grpc.GRPCDriverServiceConfigurationProvider; +import org.apache.reef.bridge.driver.client.JavaDriverClientLauncher; +import org.apache.reef.bridge.proto.ClientProtocol; +import org.apache.reef.runtime.common.files.*; +import org.apache.reef.runtime.common.launch.JavaLaunchCommandBuilder; +import org.apache.reef.runtime.local.LocalClasspathProvider; +import org.apache.reef.runtime.yarn.YarnClasspathProvider; +import org.apache.reef.runtime.yarn.util.YarnConfigurationConstructor; +import org.apache.reef.tang.Configuration; +import org.apache.reef.tang.Injector; +import org.apache.reef.tang.Tang; +import org.apache.reef.tang.exceptions.BindException; +import org.apache.reef.tang.exceptions.InjectionException; +import org.apache.reef.tang.formats.ConfigurationSerializer; +import org.apache.reef.util.OSUtils; + +import java.io.File; +import java.io.IOException; +import java.io.PrintWriter; +import java.nio.file.Files; +import java.nio.file.Paths; +import java.util.Collections; +import java.util.List; +import java.util.logging.Level; +import java.util.logging.Logger; + +/** + * Driver Service Launcher - main class. + */ +public final class DriverServiceLauncher { + + /** + * Standard Java logger. + */ + private static final Logger LOG = Logger.getLogger(DriverServiceLauncher.class.getName()); + + /** + * This class should not be instantiated. + */ + private DriverServiceLauncher() { + throw new RuntimeException("Do not instantiate this class!"); + } + + public static void submit( + final ClientProtocol.DriverClientConfiguration driverClientConfigurationProto, + final Configuration driverClientConfiguration) + throws InjectionException, IOException { + ClientProtocol.DriverClientConfiguration.Builder builder = + ClientProtocol.DriverClientConfiguration.newBuilder(driverClientConfigurationProto); + final File driverClientConfigurationFile = new File("driverclient.conf"); + try { + // Write driver client configuration to a file + final Injector driverClientInjector = Tang.Factory.getTang().newInjector(driverClientConfiguration); + final ConfigurationSerializer configurationSerializer = + driverClientInjector.getInstance(ConfigurationSerializer.class); + configurationSerializer.toFile(driverClientConfiguration, driverClientConfigurationFile); + + // Resolve OS Runtime Path Provider. + final Configuration runtimeOSConfiguration = + driverClientConfigurationProto.getRuntimeCase() == + ClientProtocol.DriverClientConfiguration.RuntimeCase.YARN_RUNTIME ? + Tang.Factory.getTang().newConfigurationBuilder() + .bind(RuntimeClasspathProvider.class, YarnClasspathProvider.class) + .build() : + Tang.Factory.getTang().newConfigurationBuilder() + .bind(RuntimePathProvider.class, + OSUtils.isWindows() ? WindowsRuntimePathProvider.class : UnixJVMPathProvider.class) + .bind(RuntimeClasspathProvider.class, LocalClasspathProvider.class) + .build(); + final Injector runtimeInjector = Tang.Factory.getTang().newInjector(runtimeOSConfiguration); + final REEFFileNames fileNames = runtimeInjector.getInstance(REEFFileNames.class); + final ClasspathProvider classpathProvider = runtimeInjector.getInstance(ClasspathProvider.class); + final RuntimePathProvider runtimePathProvider = runtimeInjector.getInstance(RuntimePathProvider.class); + final List<String> launchCommand = new JavaLaunchCommandBuilder(JavaDriverClientLauncher.class, null) + .setConfigurationFilePaths( + Collections.singletonList("./" + fileNames.getLocalFolderPath() + "/" + + driverClientConfigurationFile.getName())) + .setJavaPath(runtimePathProvider.getPath()) + .setClassPath(classpathProvider.getEvaluatorClasspath()) + .build(); + final String cmd = StringUtils.join(launchCommand, ' '); + builder.setDriverClientLaunchCommand(cmd); + builder.addLocalFiles(driverClientConfigurationFile.getAbsolutePath()); + + // call main() + final File driverClientConfFile = new File("driverclient.json"); + try { + try (PrintWriter out = new PrintWriter(driverClientConfFile)) { + out.println(JsonFormat.printer().print(builder.build())); + } + main(new String[]{driverClientConfFile.getAbsolutePath()}); + } finally { + driverClientConfFile.delete(); + } + } finally { + driverClientConfigurationFile.delete(); + } + } + + private static IDriverServiceRuntimeLauncher getLocalDriverServiceLauncher() throws InjectionException { + final Configuration localJobSubmissionClientConfig = Tang.Factory.getTang().newConfigurationBuilder() + .bindImplementation(IDriverRuntimeConfigurationProvider.class, + LocalDriverRuntimeConfigurationProvider.class) + .bindImplementation(IDriverServiceConfigurationProvider.class, + GRPCDriverServiceConfigurationProvider.class) + .bindImplementation(RuntimeClasspathProvider.class, LocalClasspathProvider.class) + .build(); + return Tang.Factory.getTang() + .newInjector(localJobSubmissionClientConfig).getInstance(LocalDriverServiceRuntimeLauncher.class); + } + + + private static IDriverServiceRuntimeLauncher getYarnDriverServiceLauncher() throws InjectionException { + final Configuration yarnJobSubmissionClientConfig = Tang.Factory.getTang().newConfigurationBuilder() + .bindImplementation(IDriverRuntimeConfigurationProvider.class, + YarnDriverRuntimeConfigurationProvider.class) + .bindImplementation(IDriverServiceConfigurationProvider.class, + GRPCDriverServiceConfigurationProvider.class) + .bindImplementation(RuntimeClasspathProvider.class, YarnClasspathProvider.class) + .bindConstructor(org.apache.hadoop.yarn.conf.YarnConfiguration.class, YarnConfigurationConstructor.class) + .build(); + return Tang.Factory.getTang() + .newInjector(yarnJobSubmissionClientConfig).getInstance(YarnDriverServiceRuntimeLauncher.class); + } + + /** + * Main method that launches the REEF job. + * + * @param args command line parameters. + */ + public static void main(final String[] args) { + try { + if (args.length != 1) { + LOG.log(Level.SEVERE, DriverServiceLauncher.class.getName() + + " accepts single argument referencing a file that contains a client protocol buffer driver configuration"); + } + final String content; + try { + content = new String(Files.readAllBytes(Paths.get(args[0]))); + } catch (IOException e) { + throw new RuntimeException(e); + } + final ClientProtocol.DriverClientConfiguration.Builder driverClientConfigurationProtoBuilder = + ClientProtocol.DriverClientConfiguration.newBuilder(); + JsonFormat.parser() + .usingTypeRegistry(JsonFormat.TypeRegistry.getEmptyTypeRegistry()) + .merge(content, driverClientConfigurationProtoBuilder); + final ClientProtocol.DriverClientConfiguration driverClientConfigurationProto = + driverClientConfigurationProtoBuilder.build(); + switch (driverClientConfigurationProto.getRuntimeCase()) { + case YARN_RUNTIME: + final IDriverServiceRuntimeLauncher yarnDriverServiceLauncher = getYarnDriverServiceLauncher(); + yarnDriverServiceLauncher.launch(driverClientConfigurationProto); + break; + case LOCAL_RUNTIME: + final IDriverServiceRuntimeLauncher localDriverServiceLauncher = getLocalDriverServiceLauncher(); + localDriverServiceLauncher.launch(driverClientConfigurationProto); + break; + default: + } + LOG.log(Level.INFO, "JavaBridge: Stop Client {0}", driverClientConfigurationProto.getJobid()); + } catch (final BindException | InjectionException | IOException ex) { + LOG.log(Level.SEVERE, "Job configuration error", ex); + } + } +} http://git-wip-us.apache.org/repos/asf/reef/blob/d243aa2a/lang/java/reef-bridge-proto-java/src/main/java/org/apache/reef/bridge/client/IAlarmDispatchHandler.java ---------------------------------------------------------------------- diff --git a/lang/java/reef-bridge-proto-java/src/main/java/org/apache/reef/bridge/client/IAlarmDispatchHandler.java b/lang/java/reef-bridge-proto-java/src/main/java/org/apache/reef/bridge/client/IAlarmDispatchHandler.java deleted file mode 100644 index a9fee48..0000000 --- a/lang/java/reef-bridge-proto-java/src/main/java/org/apache/reef/bridge/client/IAlarmDispatchHandler.java +++ /dev/null @@ -1,30 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, - * software distributed under the License is distributed on an - * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY - * KIND, either express or implied. See the License for the - * specific language governing permissions and limitations - * under the License. - */ - -package org.apache.reef.bridge.client; - -import org.apache.reef.tang.annotations.DefaultImplementation; -import org.apache.reef.wake.EventHandler; - -/** - * Alarm dispatch handler. - */ -@DefaultImplementation(DriverClientClock.class) -public interface IAlarmDispatchHandler extends EventHandler<String> { -} http://git-wip-us.apache.org/repos/asf/reef/blob/d243aa2a/lang/java/reef-bridge-proto-java/src/main/java/org/apache/reef/bridge/client/IDriverBridgeConfigurationProvider.java ---------------------------------------------------------------------- diff --git a/lang/java/reef-bridge-proto-java/src/main/java/org/apache/reef/bridge/client/IDriverBridgeConfigurationProvider.java b/lang/java/reef-bridge-proto-java/src/main/java/org/apache/reef/bridge/client/IDriverBridgeConfigurationProvider.java new file mode 100644 index 0000000..c9fefb0 --- /dev/null +++ b/lang/java/reef-bridge-proto-java/src/main/java/org/apache/reef/bridge/client/IDriverBridgeConfigurationProvider.java @@ -0,0 +1,32 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.reef.bridge.client; + +import org.apache.reef.bridge.proto.ClientProtocol; +import org.apache.reef.tang.Configuration; + +/** + * Driver bridge configuration provider. + */ +public interface IDriverBridgeConfigurationProvider { + + Configuration getConfiguration(final ClientProtocol.DriverClientConfiguration driverConfiguration); + +} http://git-wip-us.apache.org/repos/asf/reef/blob/d243aa2a/lang/java/reef-bridge-proto-java/src/main/java/org/apache/reef/bridge/client/IDriverClientService.java ---------------------------------------------------------------------- diff --git a/lang/java/reef-bridge-proto-java/src/main/java/org/apache/reef/bridge/client/IDriverClientService.java b/lang/java/reef-bridge-proto-java/src/main/java/org/apache/reef/bridge/client/IDriverClientService.java deleted file mode 100644 index c71b554..0000000 --- a/lang/java/reef-bridge-proto-java/src/main/java/org/apache/reef/bridge/client/IDriverClientService.java +++ /dev/null @@ -1,45 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, - * software distributed under the License is distributed on an - * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY - * KIND, either express or implied. See the License for the - * specific language governing permissions and limitations - * under the License. - */ - -package org.apache.reef.bridge.client; - -import org.apache.reef.bridge.client.grpc.DriverClientService; -import org.apache.reef.tang.annotations.DefaultImplementation; - -import java.io.IOException; - -/** - * Interface that driver client services implement. - */ -@DefaultImplementation(DriverClientService.class) -public interface IDriverClientService { - - /** - * Start the DriverClient service. - * @throws IOException when unable to start service - */ - void start() throws IOException; - - - /** - * Wait for termination of driver client service. - */ - void awaitTermination() throws InterruptedException; - -} http://git-wip-us.apache.org/repos/asf/reef/blob/d243aa2a/lang/java/reef-bridge-proto-java/src/main/java/org/apache/reef/bridge/client/IDriverRuntimeConfigurationProvider.java ---------------------------------------------------------------------- diff --git a/lang/java/reef-bridge-proto-java/src/main/java/org/apache/reef/bridge/client/IDriverRuntimeConfigurationProvider.java b/lang/java/reef-bridge-proto-java/src/main/java/org/apache/reef/bridge/client/IDriverRuntimeConfigurationProvider.java new file mode 100644 index 0000000..4e32464 --- /dev/null +++ b/lang/java/reef-bridge-proto-java/src/main/java/org/apache/reef/bridge/client/IDriverRuntimeConfigurationProvider.java @@ -0,0 +1,30 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.reef.bridge.client; + +import org.apache.reef.bridge.client.runtime.LocalDriverRuntimeConfigurationProvider; +import org.apache.reef.tang.annotations.DefaultImplementation; + +/** + * Configuration provider for the runtime. + */ +@DefaultImplementation(LocalDriverRuntimeConfigurationProvider.class) +public interface IDriverRuntimeConfigurationProvider extends IDriverBridgeConfigurationProvider { +} http://git-wip-us.apache.org/repos/asf/reef/blob/d243aa2a/lang/java/reef-bridge-proto-java/src/main/java/org/apache/reef/bridge/client/IDriverServiceClient.java ---------------------------------------------------------------------- diff --git a/lang/java/reef-bridge-proto-java/src/main/java/org/apache/reef/bridge/client/IDriverServiceClient.java b/lang/java/reef-bridge-proto-java/src/main/java/org/apache/reef/bridge/client/IDriverServiceClient.java deleted file mode 100644 index e1f8cb7..0000000 --- a/lang/java/reef-bridge-proto-java/src/main/java/org/apache/reef/bridge/client/IDriverServiceClient.java +++ /dev/null @@ -1,132 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, - * software distributed under the License is distributed on an - * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY - * KIND, either express or implied. See the License for the - * specific language governing permissions and limitations - * under the License. - */ - -package org.apache.reef.bridge.client; - -import org.apache.reef.bridge.client.grpc.DriverServiceClient; -import org.apache.reef.driver.evaluator.EvaluatorRequest; -import org.apache.reef.tang.Configuration; -import org.apache.reef.tang.annotations.DefaultImplementation; -import org.apache.reef.util.Optional; - -import java.io.File; -import java.util.List; - -/** - * Forwards application requests to driver server. - */ -@DefaultImplementation(DriverServiceClient.class) -public interface IDriverServiceClient { - - /** - * Initiate shutdown. - */ - void onShutdown(); - - /** - * Initiate shutdown with error. - * @param ex exception error - */ - void onShutdown(final Throwable ex); - - /** - * Set alarm. - * @param alarmId alarm identifier - * @param timeoutMS timeout in milliseconds - */ - void onSetAlarm(final String alarmId, final int timeoutMS); - - /** - * Request evaluators. - * @param evaluatorRequest event - */ - void onEvaluatorRequest(final EvaluatorRequest evaluatorRequest); - - /** - * Close evaluator. - * @param evalautorId to close - */ - void onEvaluatorClose(final String evalautorId); - - /** - * Submit context and/or task. - * @param evaluatorId to submit against - * @param contextConfiguration context configuration - * @param taskConfiguration task configuration - * @param evaluatorProcess evaluator process - * @param addFileList to include - * @param addLibraryList to include - */ - void onEvaluatorSubmit( - final String evaluatorId, - final Optional<Configuration> contextConfiguration, - final Optional<Configuration> taskConfiguration, - final Optional<JVMClientProcess> evaluatorProcess, - final Optional<List<File>> addFileList, - final Optional<List<File>> addLibraryList); - - // Context Operations - - /** - * Close context. - * @param contextId to close - */ - void onContextClose(final String contextId); - - /** - * Submit child context. - * @param contextId to submit against - * @param contextConfiguration for child context - */ - void onContextSubmitContext( - final String contextId, - final Configuration contextConfiguration); - - /** - * Submit task. - * @param contextId to submit against - * @param taskConfiguration for task - */ - void onContextSubmitTask( - final String contextId, - final Configuration taskConfiguration); - - /** - * Send message to context. - * @param contextId to destination context - * @param message to send - */ - void onContextMessage(final String contextId, final byte[] message); - - // Task operations - - /** - * Close the task. - * @param taskId to close - * @param message optional message to include - */ - void onTaskClose(final String taskId, final Optional<byte[]> message); - - /** - * Send task a message. - * @param taskId of destination task - * @param message to send - */ - void onTaskMessage(final String taskId, final byte[] message); -} http://git-wip-us.apache.org/repos/asf/reef/blob/d243aa2a/lang/java/reef-bridge-proto-java/src/main/java/org/apache/reef/bridge/client/IDriverServiceRuntimeLauncher.java ---------------------------------------------------------------------- diff --git a/lang/java/reef-bridge-proto-java/src/main/java/org/apache/reef/bridge/client/IDriverServiceRuntimeLauncher.java b/lang/java/reef-bridge-proto-java/src/main/java/org/apache/reef/bridge/client/IDriverServiceRuntimeLauncher.java new file mode 100644 index 0000000..789fa03 --- /dev/null +++ b/lang/java/reef-bridge-proto-java/src/main/java/org/apache/reef/bridge/client/IDriverServiceRuntimeLauncher.java @@ -0,0 +1,30 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.reef.bridge.client; + +import org.apache.reef.bridge.proto.ClientProtocol; + +/** + * Driver service launcher. + */ +public interface IDriverServiceRuntimeLauncher { + + void launch(final ClientProtocol.DriverClientConfiguration driverClientConfiguration); +} http://git-wip-us.apache.org/repos/asf/reef/blob/d243aa2a/lang/java/reef-bridge-proto-java/src/main/java/org/apache/reef/bridge/client/JVMClientProcess.java ---------------------------------------------------------------------- diff --git a/lang/java/reef-bridge-proto-java/src/main/java/org/apache/reef/bridge/client/JVMClientProcess.java b/lang/java/reef-bridge-proto-java/src/main/java/org/apache/reef/bridge/client/JVMClientProcess.java deleted file mode 100644 index cdcb9b5..0000000 --- a/lang/java/reef-bridge-proto-java/src/main/java/org/apache/reef/bridge/client/JVMClientProcess.java +++ /dev/null @@ -1,121 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, - * software distributed under the License is distributed on an - * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY - * KIND, either express or implied. See the License for the - * specific language governing permissions and limitations - * under the License. - */ - -package org.apache.reef.bridge.client; - -import org.apache.reef.annotations.audience.Private; -import org.apache.reef.driver.evaluator.EvaluatorProcess; -import org.apache.reef.driver.evaluator.EvaluatorType; - -import java.util.ArrayList; -import java.util.List; - -/** - * Stub class for Evaluator Process on driver client. - */ -@Private -public final class JVMClientProcess implements EvaluatorProcess { - - private boolean optionSet = false; - - private int megaBytes = 0; - - private String configurationFileName = null; - - private String standardOut = null; - - private String standardErr = null; - - private final List<String> optionList = new ArrayList<>(); - - public JVMClientProcess() { - } - - @Override - public List<String> getCommandLine() { - throw new UnsupportedOperationException(); - } - - @Override - public EvaluatorType getType() { - return EvaluatorType.JVM; - } - - @Override - public JVMClientProcess setMemory(final int mb) { - this.megaBytes = mb; - this.optionSet = true; - return this; - } - - public int getMemory() { - return this.megaBytes; - } - - @Override - public boolean isOptionSet() { - return optionSet; - } - - @Override - public JVMClientProcess setConfigurationFileName(final String configurationFileName) { - this.configurationFileName = configurationFileName; - return this; - } - - public String getConfigurationFileName() { - return this.configurationFileName; - } - - @Override - public JVMClientProcess setStandardOut(final String standardOut) { - this.standardOut = standardOut; - return this; - } - - public String getStandardOut() { - return this.standardOut; - } - - @Override - public JVMClientProcess setStandardErr(final String standardErr) { - this.standardErr = standardErr; - return this; - } - - public String getStandardErr() { - return this.standardErr; - } - - /** - * Add a JVM option. - * @param option The full option, e.g. "-XX:+PrintGCDetails", "-Xms500m" - * @return this - */ - public JVMClientProcess addOption(final String option) { - this.optionList.add(option); - optionSet = true; - return this; - } - - public List<String> getOptions() { - return this.optionList; - } - -} http://git-wip-us.apache.org/repos/asf/reef/blob/d243aa2a/lang/java/reef-bridge-proto-java/src/main/java/org/apache/reef/bridge/client/JavaDriverClientLauncher.java ---------------------------------------------------------------------- diff --git a/lang/java/reef-bridge-proto-java/src/main/java/org/apache/reef/bridge/client/JavaDriverClientLauncher.java b/lang/java/reef-bridge-proto-java/src/main/java/org/apache/reef/bridge/client/JavaDriverClientLauncher.java deleted file mode 100644 index a7bf37c..0000000 --- a/lang/java/reef-bridge-proto-java/src/main/java/org/apache/reef/bridge/client/JavaDriverClientLauncher.java +++ /dev/null @@ -1,217 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, - * software distributed under the License is distributed on an - * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY - * KIND, either express or implied. See the License for the - * specific language governing permissions and limitations - * under the License. - */ - -package org.apache.reef.bridge.client; - -import org.apache.reef.bridge.client.grpc.DriverClientGrpcConfiguration; -import org.apache.reef.bridge.client.grpc.parameters.DriverServicePort; -import org.apache.reef.runtime.common.REEFLauncher; -import org.apache.reef.runtime.common.evaluator.PIDStoreStartHandler; -import org.apache.reef.runtime.common.launch.REEFErrorHandler; -import org.apache.reef.runtime.common.launch.REEFMessageCodec; -import org.apache.reef.runtime.common.launch.REEFUncaughtExceptionHandler; -import org.apache.reef.runtime.common.launch.parameters.ClockConfigurationPath; -import org.apache.reef.tang.Configuration; -import org.apache.reef.tang.Configurations; -import org.apache.reef.tang.Injector; -import org.apache.reef.tang.Tang; -import org.apache.reef.tang.annotations.Parameter; -import org.apache.reef.tang.exceptions.BindException; -import org.apache.reef.tang.exceptions.InjectionException; -import org.apache.reef.tang.formats.ConfigurationSerializer; -import org.apache.reef.util.EnvironmentUtils; -import org.apache.reef.util.ThreadLogger; -import org.apache.reef.util.logging.LoggingSetup; -import org.apache.reef.wake.remote.RemoteConfiguration; -import org.apache.reef.wake.time.Clock; - -import javax.inject.Inject; -import java.io.File; -import java.io.FileNotFoundException; -import java.io.IOException; -import java.util.logging.Level; -import java.util.logging.Logger; - -/** - * Driver client launcher. - */ -public final class JavaDriverClientLauncher { - - private static final Logger LOG = Logger.getLogger(REEFLauncher.class.getName()); - - private static final Tang TANG = Tang.Factory.getTang(); - - private static final Configuration LAUNCHER_STATIC_CONFIG = - TANG.newConfigurationBuilder() - .bindNamedParameter(RemoteConfiguration.ManagerName.class, "DRIVER_CLIENT_LAUNCHER") - .bindNamedParameter(RemoteConfiguration.ErrorHandler.class, REEFErrorHandler.class) - .bindNamedParameter(RemoteConfiguration.MessageCodec.class, REEFMessageCodec.class) - .bindSetEntry(Clock.RuntimeStartHandler.class, PIDStoreStartHandler.class) - .build(); - - static { - LoggingSetup.setupCommonsLogging(); - } - - /** - * Main configuration object of the REEF component we are launching here. - */ - private final Configuration envConfig; - - /** - * REEFLauncher is instantiated in the main() method below using - * Tang configuration file provided as a command line argument. - * @param configurationPath Path to the serialized Tang configuration file. - * (The file must be in the local file system). - * @param configurationSerializer Serializer used to read the configuration file. - * We currently use Avro to serialize Tang configs. - */ - @Inject - private JavaDriverClientLauncher( - @Parameter(DriverServicePort.class) final Integer driverServicePort, - @Parameter(ClockConfigurationPath.class) final String configurationPath, - final ConfigurationSerializer configurationSerializer) { - - this.envConfig = Configurations.merge( - LAUNCHER_STATIC_CONFIG, - DriverClientGrpcConfiguration.CONF - .set(DriverClientGrpcConfiguration.DRIVER_SERVICE_PORT, driverServicePort) - .build(), - readConfigurationFromDisk(configurationPath, configurationSerializer)); - } - - /** - * Instantiate REEF DriverServiceLauncher. This method is called from REEFLauncher.main(). - * @param clockConfigPath Path to the local file that contains serialized configuration - * for the driver client. - * @return An instance of the configured REEFLauncher object. - */ - private static JavaDriverClientLauncher getLauncher(final String clockConfigPath, final int driverServicePort) { - - try { - - final Configuration clockArgConfig = Configurations.merge( - LAUNCHER_STATIC_CONFIG, - DriverClientGrpcConfiguration.CONF - .set(DriverClientGrpcConfiguration.DRIVER_SERVICE_PORT, driverServicePort) - .build(), - TANG.newConfigurationBuilder() - .bindNamedParameter(ClockConfigurationPath.class, clockConfigPath) - .build()); - - return TANG.newInjector(clockArgConfig).getInstance(JavaDriverClientLauncher.class); - - } catch (final BindException ex) { - throw fatal("Error in parsing the command line", ex); - } catch (final InjectionException ex) { - throw fatal("Unable to instantiate REEFLauncher.", ex); - } - } - - /** - * Read configuration from a given file and deserialize it - * into Tang configuration object that can be used for injection. - * Configuration is currently serialized using Avro. - * This method also prints full deserialized configuration into log. - * @param configPath Path to the local file that contains serialized configuration - * of a REEF component to launch (can be either Driver or Evaluator). - * @param serializer An object to deserialize the configuration file. - * @return Tang configuration read and deserialized from a given file. - */ - private static Configuration readConfigurationFromDisk( - final String configPath, final ConfigurationSerializer serializer) { - - LOG.log(Level.FINER, "Loading configuration file: {0}", configPath); - - final File evaluatorConfigFile = new File(configPath); - - if (!evaluatorConfigFile.exists()) { - throw fatal( - "Configuration file " + configPath + " does not exist. Can be an issue in job submission.", - new FileNotFoundException(configPath)); - } - - if (!evaluatorConfigFile.canRead()) { - throw fatal( - "Configuration file " + configPath + " exists, but can't be read.", - new IOException(configPath)); - } - - try { - - final Configuration config = serializer.fromFile(evaluatorConfigFile); - LOG.log(Level.FINEST, "The configuration file loaded: {0}", configPath); - - return config; - - } catch (final IOException e) { - throw fatal("Unable to parse the configuration file: " + configPath, e); - } - } - - /** - * Launches a REEF client process (Driver or Evaluator). - * @param args Command-line arguments. - * Must be a single element containing local path to the configuration file. - */ - @SuppressWarnings("checkstyle:illegalcatch") - public static void main(final String[] args) { - - LOG.log(Level.INFO, "Entering JavaDriverClientLauncher.main()."); - - LOG.log(Level.FINE, "JavaDriverClientLauncher started with user name [{0}]", System.getProperty("user.name")); - LOG.log(Level.FINE, "JavaDriverClientLauncher started. Assertions are {0} in this process.", - EnvironmentUtils.areAssertionsEnabled() ? "ENABLED" : "DISABLED"); - - if (args.length != 2) { - final String message = "JavaDriverClientLauncher have two and only two arguments to specify the runtime clock " + - "configuration path and driver service port"; - - throw fatal(message, new IllegalArgumentException(message)); - } - - final JavaDriverClientLauncher launcher = getLauncher(args[0], Integer.parseInt(args[1])); - - Thread.setDefaultUncaughtExceptionHandler(new REEFUncaughtExceptionHandler(launcher.envConfig)); - final Injector injector = Tang.Factory.getTang().newInjector(launcher.envConfig); - try (final Clock reef = injector.getInstance(Clock.class)) { - reef.run(); - } catch (final Throwable ex) { - throw fatal("Unable to configure and start Clock.", ex); - } - - ThreadLogger.logThreads(LOG, Level.FINEST, "Threads running after Clock.close():"); - - LOG.log(Level.INFO, "Exiting REEFLauncher.main()"); - - System.exit(0); // TODO[REEF-1715]: Should be able to exit cleanly at the end of main() - } - - /** - * Wrap an exception into RuntimeException with a given message, - * and write the same message and exception to the log. - * @param msg an error message to log and pass into the RuntimeException. - * @param t A Throwable exception to log and wrap. - * @return a new Runtime exception wrapping a Throwable. - */ - private static RuntimeException fatal(final String msg, final Throwable t) { - LOG.log(Level.SEVERE, msg, t); - return new RuntimeException(msg, t); - } -} http://git-wip-us.apache.org/repos/asf/reef/blob/d243aa2a/lang/java/reef-bridge-proto-java/src/main/java/org/apache/reef/bridge/client/WindowsRuntimePathProvider.java ---------------------------------------------------------------------- diff --git a/lang/java/reef-bridge-proto-java/src/main/java/org/apache/reef/bridge/client/WindowsRuntimePathProvider.java b/lang/java/reef-bridge-proto-java/src/main/java/org/apache/reef/bridge/client/WindowsRuntimePathProvider.java new file mode 100644 index 0000000..9b6d02c --- /dev/null +++ b/lang/java/reef-bridge-proto-java/src/main/java/org/apache/reef/bridge/client/WindowsRuntimePathProvider.java @@ -0,0 +1,43 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.reef.bridge.client; + +import org.apache.reef.runtime.common.files.RuntimePathProvider; + +import javax.inject.Inject; +/** + * Supplies the java binary's path for HDInsight. + */ +public final class WindowsRuntimePathProvider implements RuntimePathProvider { + + @Inject + public WindowsRuntimePathProvider() { + } + + @Override + public String getPath() { + return "java"; + } + + @Override + public String toString() { + return getPath(); + } +} http://git-wip-us.apache.org/repos/asf/reef/blob/d243aa2a/lang/java/reef-bridge-proto-java/src/main/java/org/apache/reef/bridge/client/events/ActiveContextBridge.java ---------------------------------------------------------------------- diff --git a/lang/java/reef-bridge-proto-java/src/main/java/org/apache/reef/bridge/client/events/ActiveContextBridge.java b/lang/java/reef-bridge-proto-java/src/main/java/org/apache/reef/bridge/client/events/ActiveContextBridge.java deleted file mode 100644 index 54645a0..0000000 --- a/lang/java/reef-bridge-proto-java/src/main/java/org/apache/reef/bridge/client/events/ActiveContextBridge.java +++ /dev/null @@ -1,102 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, - * software distributed under the License is distributed on an - * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY - * KIND, either express or implied. See the License for the - * specific language governing permissions and limitations - * under the License. - */ - -package org.apache.reef.bridge.client.events; - -import org.apache.reef.bridge.client.IDriverServiceClient; -import org.apache.reef.driver.context.ActiveContext; -import org.apache.reef.driver.evaluator.EvaluatorDescriptor; -import org.apache.reef.tang.Configuration; -import org.apache.reef.util.Optional; - -/** - * Active context bridge. - */ -public final class ActiveContextBridge implements ActiveContext { - - private final IDriverServiceClient driverServiceClient; - - private final String contextId; - - private final Optional<String> parentId; - - private final String evaluatorId; - - private final EvaluatorDescriptor evaluatorDescriptor; - - public ActiveContextBridge( - final IDriverServiceClient driverServiceClient, - final String contextId, - final Optional<String> parentId, - final String evaluatorId, - final EvaluatorDescriptor evaluatorDescriptor) { - this.driverServiceClient = driverServiceClient; - this.contextId = contextId; - this.parentId = parentId; - this.evaluatorId = evaluatorId; - this.evaluatorDescriptor = evaluatorDescriptor; - } - - @Override - public void close() { - this.driverServiceClient.onContextClose(this.contextId); - } - - @Override - public void submitTask(final Configuration taskConf) { - this.driverServiceClient.onContextSubmitTask(this.contextId, taskConf); - } - - @Override - public void submitContext(final Configuration contextConfiguration) { - this.driverServiceClient.onContextSubmitContext(this.contextId, contextConfiguration); - } - - @Override - public void submitContextAndService( - final Configuration contextConfiguration, - final Configuration serviceConfiguration) { - throw new UnsupportedOperationException("Service not supported"); - } - - @Override - public void sendMessage(final byte[] message) { - this.driverServiceClient.onContextMessage(this.contextId, message); - } - - @Override - public String getEvaluatorId() { - return this.evaluatorId; - } - - @Override - public Optional<String> getParentId() { - return this.parentId; - } - - @Override - public EvaluatorDescriptor getEvaluatorDescriptor() { - return this.evaluatorDescriptor; - } - - @Override - public String getId() { - return this.contextId; - } -}
